This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new cfe840f943f Introduced retries to the TableauSensor (#52770)
cfe840f943f is described below

commit cfe840f943f305546e656bb8022b2da5d5bf224f
Author: Dominik <[email protected]>
AuthorDate: Tue Apr 7 00:36:18 2026 +0200

    Introduced retries to the TableauSensor (#52770)
    
    * Introduced retries to the TableauSensor to account for transient errors
    
    * Renamed param retries_on_failure to max_status_retries to ensure 
distinction to airflow retries
---
 .../airflow/providers/tableau/sensors/tableau.py   | 12 ++++++++
 .../tests/unit/tableau/sensors/test_tableau.py     | 33 ++++++++++++++++++++++
 2 files changed, 45 insertions(+)

diff --git a/providers/tableau/src/airflow/providers/tableau/sensors/tableau.py 
b/providers/tableau/src/airflow/providers/tableau/sensors/tableau.py
index 4c07d996022..eb06ec992a0 100644
--- a/providers/tableau/src/airflow/providers/tableau/sensors/tableau.py
+++ b/providers/tableau/src/airflow/providers/tableau/sensors/tableau.py
@@ -40,6 +40,8 @@ class TableauJobStatusSensor(BaseSensorOperator):
     :param site_id: The id of the site where the workbook belongs to.
     :param tableau_conn_id: The :ref:`Tableau Connection id 
<howto/connection:tableau>`
         containing the credentials to authenticate to the Tableau Server.
+    :param max_status_retries: How often to rerun get_job_status in case
+        of an error or cancellation, to account for transient errors.
     """
 
     template_fields: Sequence[str] = ("job_id",)
@@ -50,17 +52,23 @@ class TableauJobStatusSensor(BaseSensorOperator):
         job_id: str,
         site_id: str | None = None,
         tableau_conn_id: str = "tableau_default",
+        max_status_retries: int = 0,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
         self.tableau_conn_id = tableau_conn_id
         self.job_id = job_id
         self.site_id = site_id
+        self.max_status_retries = max_status_retries
+        self._retry_attempts = 0
 
     def poke(self, context: Context) -> bool:
         """
         Pokes until the job has successfully finished.
 
+        When max_status_retries is set, the Sensor will retry
+        in case of a TableauJobFinishCode.ERROR, TableauJobFinishCode.CANCELED 
before raising an Error.
+
         :param context: The task context during execution.
         :return: True if it succeeded and False if not.
         """
@@ -69,6 +77,10 @@ class TableauJobStatusSensor(BaseSensorOperator):
             self.log.info("Current finishCode is %s (%s)", finish_code.name, 
finish_code.value)
 
             if finish_code in (TableauJobFinishCode.ERROR, 
TableauJobFinishCode.CANCELED):
+                if self._retry_attempts < self.max_status_retries:
+                    self.log.info("Retrying to get the job status")
+                    self._retry_attempts += 1
+                    return False
                 message = "The Tableau Refresh Workbook Job failed!"
                 raise TableauJobFailedException(message)
 
diff --git a/providers/tableau/tests/unit/tableau/sensors/test_tableau.py 
b/providers/tableau/tests/unit/tableau/sensors/test_tableau.py
index 89f4b6a5678..87214c05808 100644
--- a/providers/tableau/tests/unit/tableau/sensors/test_tableau.py
+++ b/providers/tableau/tests/unit/tableau/sensors/test_tableau.py
@@ -22,6 +22,7 @@ import pytest
 
 from airflow.providers.common.compat.sdk import AirflowException
 from airflow.providers.tableau.sensors.tableau import (
+    TableauJobFailedException,
     TableauJobFinishCode,
     TableauJobStatusSensor,
 )
@@ -68,3 +69,35 @@ class TestTableauJobStatusSensor:
         with pytest.raises(AirflowException):
             sensor.poke({})
         
mock_tableau_hook.get_job_status.assert_called_once_with(job_id=sensor.job_id)
+
+    @patch("airflow.providers.tableau.sensors.tableau.TableauHook")
+    def test_poke_succeeds_on_last_try(self, mock_tableau_hook_class):
+        mock_tableau_hook = Mock()
+        mock_tableau_hook.get_job_status.side_effect = [
+            TableauJobFinishCode.ERROR,
+            TableauJobFinishCode.CANCELED,
+            TableauJobFinishCode.SUCCESS,
+        ]
+        mock_tableau_hook_class.return_value.__enter__.return_value = 
mock_tableau_hook
+        sensor = TableauJobStatusSensor(**self.kwargs, max_status_retries=2)
+
+        assert not sensor.poke({})
+        assert not sensor.poke({})
+        assert sensor.poke({})
+        assert mock_tableau_hook.get_job_status.call_count == 3
+
+    @patch("airflow.providers.tableau.sensors.tableau.TableauHook")
+    def test_poke_failed_on_last_try(self, mock_tableau_hook_class):
+        mock_tableau_hook = Mock()
+        mock_tableau_hook.get_job_status.side_effect = [
+            TableauJobFinishCode.ERROR,
+            TableauJobFinishCode.CANCELED,
+            TableauJobFinishCode.ERROR,
+        ]
+        mock_tableau_hook_class.return_value.__enter__.return_value = 
mock_tableau_hook
+        sensor = TableauJobStatusSensor(**self.kwargs, max_status_retries=2, 
poke_interval=10.0)
+        assert not sensor.poke({})
+        assert not sensor.poke({})
+        with pytest.raises(TableauJobFailedException, match="The Tableau 
Refresh Workbook Job failed!"):
+            sensor.poke({})
+        assert mock_tableau_hook.get_job_status.call_count == 3

Reply via email to