This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cfe840f943f Introduced retries to the TableauSensor (#52770)
cfe840f943f is described below
commit cfe840f943f305546e656bb8022b2da5d5bf224f
Author: Dominik <[email protected]>
AuthorDate: Tue Apr 7 00:36:18 2026 +0200
Introduced retries to the TableauSensor (#52770)
* Introduced retries to the TableauSensor to account for transient errors
* Renamed param retries_on_failure to max_status_retries to ensure
distinction to airflow retries
---
.../airflow/providers/tableau/sensors/tableau.py | 12 ++++++++
.../tests/unit/tableau/sensors/test_tableau.py | 33 ++++++++++++++++++++++
2 files changed, 45 insertions(+)
diff --git a/providers/tableau/src/airflow/providers/tableau/sensors/tableau.py
b/providers/tableau/src/airflow/providers/tableau/sensors/tableau.py
index 4c07d996022..eb06ec992a0 100644
--- a/providers/tableau/src/airflow/providers/tableau/sensors/tableau.py
+++ b/providers/tableau/src/airflow/providers/tableau/sensors/tableau.py
@@ -40,6 +40,8 @@ class TableauJobStatusSensor(BaseSensorOperator):
:param site_id: The id of the site where the workbook belongs to.
:param tableau_conn_id: The :ref:`Tableau Connection id
<howto/connection:tableau>`
containing the credentials to authenticate to the Tableau Server.
+ :param max_status_retries: How often to rerun get_job_status in case
+ of an error or cancellation, to account for transient errors.
"""
template_fields: Sequence[str] = ("job_id",)
@@ -50,17 +52,23 @@ class TableauJobStatusSensor(BaseSensorOperator):
job_id: str,
site_id: str | None = None,
tableau_conn_id: str = "tableau_default",
+ max_status_retries: int = 0,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.tableau_conn_id = tableau_conn_id
self.job_id = job_id
self.site_id = site_id
+ self.max_status_retries = max_status_retries
+ self._retry_attempts = 0
def poke(self, context: Context) -> bool:
"""
Pokes until the job has successfully finished.
+ When max_status_retries is set, the Sensor will retry
+ in case of a TableauJobFinishCode.ERROR, TableauJobFinishCode.CANCELED
before raising an Error.
+
:param context: The task context during execution.
:return: True if it succeeded and False if not.
"""
@@ -69,6 +77,10 @@ class TableauJobStatusSensor(BaseSensorOperator):
self.log.info("Current finishCode is %s (%s)", finish_code.name,
finish_code.value)
if finish_code in (TableauJobFinishCode.ERROR,
TableauJobFinishCode.CANCELED):
+ if self._retry_attempts < self.max_status_retries:
+ self.log.info("Retrying to get the job status")
+ self._retry_attempts += 1
+ return False
message = "The Tableau Refresh Workbook Job failed!"
raise TableauJobFailedException(message)
diff --git a/providers/tableau/tests/unit/tableau/sensors/test_tableau.py
b/providers/tableau/tests/unit/tableau/sensors/test_tableau.py
index 89f4b6a5678..87214c05808 100644
--- a/providers/tableau/tests/unit/tableau/sensors/test_tableau.py
+++ b/providers/tableau/tests/unit/tableau/sensors/test_tableau.py
@@ -22,6 +22,7 @@ import pytest
from airflow.providers.common.compat.sdk import AirflowException
from airflow.providers.tableau.sensors.tableau import (
+ TableauJobFailedException,
TableauJobFinishCode,
TableauJobStatusSensor,
)
@@ -68,3 +69,35 @@ class TestTableauJobStatusSensor:
with pytest.raises(AirflowException):
sensor.poke({})
mock_tableau_hook.get_job_status.assert_called_once_with(job_id=sensor.job_id)
+
+ @patch("airflow.providers.tableau.sensors.tableau.TableauHook")
+ def test_poke_succeeds_on_last_try(self, mock_tableau_hook_class):
+ mock_tableau_hook = Mock()
+ mock_tableau_hook.get_job_status.side_effect = [
+ TableauJobFinishCode.ERROR,
+ TableauJobFinishCode.CANCELED,
+ TableauJobFinishCode.SUCCESS,
+ ]
+ mock_tableau_hook_class.return_value.__enter__.return_value =
mock_tableau_hook
+ sensor = TableauJobStatusSensor(**self.kwargs, max_status_retries=2)
+
+ assert not sensor.poke({})
+ assert not sensor.poke({})
+ assert sensor.poke({})
+ assert mock_tableau_hook.get_job_status.call_count == 3
+
+ @patch("airflow.providers.tableau.sensors.tableau.TableauHook")
+ def test_poke_failed_on_last_try(self, mock_tableau_hook_class):
+ mock_tableau_hook = Mock()
+ mock_tableau_hook.get_job_status.side_effect = [
+ TableauJobFinishCode.ERROR,
+ TableauJobFinishCode.CANCELED,
+ TableauJobFinishCode.ERROR,
+ ]
+ mock_tableau_hook_class.return_value.__enter__.return_value =
mock_tableau_hook
+ sensor = TableauJobStatusSensor(**self.kwargs, max_status_retries=2,
poke_interval=10.0)
+ assert not sensor.poke({})
+ assert not sensor.poke({})
+ with pytest.raises(TableauJobFailedException, match="The Tableau
Refresh Workbook Job failed!"):
+ sensor.poke({})
+ assert mock_tableau_hook.get_job_status.call_count == 3