This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1d5150eef00 Add incremental refresh support to `TableauOperator`
(#67340)
1d5150eef00 is described below
commit 1d5150eef007c0ab8779ceab68e70dd361426b94
Author: Subham <[email protected]>
AuthorDate: Tue May 26 16:58:54 2026 +0530
Add incremental refresh support to `TableauOperator` (#67340)
* Add incremental refresh support to TableauOperator
* Add warning when incremental_refresh used with non-refresh method
* Fix incremental refresh version compatibility for tableauserverclient
* Raise AirflowOptionalProviderFeatureException on unsupported TS client
version
* Address review: inspect TypeError message and chain exception
* Fix trailing whitespace in test_tableau.py
---
.../airflow/providers/tableau/operators/tableau.py | 31 ++-
.../tests/unit/tableau/operators/test_tableau.py | 212 ++++++++++++++++++++-
2 files changed, 241 insertions(+), 2 deletions(-)
diff --git
a/providers/tableau/src/airflow/providers/tableau/operators/tableau.py
b/providers/tableau/src/airflow/providers/tableau/operators/tableau.py
index d176e3ea2ff..9cb58657645 100644
--- a/providers/tableau/src/airflow/providers/tableau/operators/tableau.py
+++ b/providers/tableau/src/airflow/providers/tableau/operators/tableau.py
@@ -21,7 +21,11 @@ from typing import TYPE_CHECKING
from tableauserverclient import JobItem
-from airflow.providers.common.compat.sdk import AirflowException, BaseOperator
+from airflow.providers.common.compat.sdk import (
+ AirflowException,
+ AirflowOptionalProviderFeatureException,
+ BaseOperator,
+)
from airflow.providers.tableau.hooks.tableau import (
TableauHook,
TableauJobFailedException,
@@ -62,6 +66,8 @@ class TableauOperator(BaseOperator):
:param blocking_refresh: By default will be blocking means it will wait
until it has finished.
:param check_interval: time in seconds that the job should wait in
between each instance state checks until operation is completed
+ :param incremental_refresh: Whether to perform an incremental refresh
instead of a full refresh.
+ Only applies to datasource and workbook refresh operations. Defaults
to False (full refresh).
:param tableau_conn_id: The :ref:`Tableau Connection id
<howto/connection:tableau>`
containing the credentials to authenticate to the Tableau Server.
"""
@@ -81,6 +87,7 @@ class TableauOperator(BaseOperator):
site_id: str | None = None,
blocking_refresh: bool = True,
check_interval: float = 20,
+ incremental_refresh: bool = False,
tableau_conn_id: str = "tableau_default",
**kwargs,
) -> None:
@@ -92,6 +99,7 @@ class TableauOperator(BaseOperator):
self.check_interval = check_interval
self.site_id = site_id
self.blocking_refresh = blocking_refresh
+ self.incremental_refresh = incremental_refresh
self.tableau_conn_id = tableau_conn_id
def execute(self, context: Context) -> str:
@@ -111,6 +119,13 @@ class TableauOperator(BaseOperator):
error_message = f"Method not found! Available methods for
{self.resource}: {available_methods}"
raise AirflowException(error_message)
+ if self.incremental_refresh and self.method != "refresh":
+ self.log.warning(
+ "incremental_refresh parameter is set to True but method is
'%s'. "
+ "This parameter only applies to 'refresh' operations and will
be ignored.",
+ self.method,
+ )
+
with TableauHook(self.site_id, self.tableau_conn_id) as tableau_hook:
resource = getattr(tableau_hook.server, self.resource)
method = getattr(resource, self.method)
@@ -124,6 +139,20 @@ class TableauOperator(BaseOperator):
if not job_items:
raise ValueError("Tableau tasks.run returned no JobItem in
response")
job_id = job_items[0].id
+ elif self.method == "refresh":
+ if self.incremental_refresh:
+ try:
+ response = method(resource_id, incremental=True)
+ except TypeError as e:
+ if "incremental" in str(e):
+ raise AirflowOptionalProviderFeatureException(
+ "Incremental refresh requires
tableauserverclient>=0.35. "
+ "Please upgrade: pip install
'tableauserverclient>=0.35'"
+ ) from e
+ raise
+ else:
+ response = method(resource_id)
+ job_id = response.id
else:
response = method(resource_id)
job_id = response.id
diff --git a/providers/tableau/tests/unit/tableau/operators/test_tableau.py
b/providers/tableau/tests/unit/tableau/operators/test_tableau.py
index ff22d128c43..c2e9e5157fc 100644
--- a/providers/tableau/tests/unit/tableau/operators/test_tableau.py
+++ b/providers/tableau/tests/unit/tableau/operators/test_tableau.py
@@ -20,7 +20,7 @@ from unittest.mock import Mock, patch
import pytest
-from airflow.providers.common.compat.sdk import AirflowException
+from airflow.providers.common.compat.sdk import AirflowException,
AirflowOptionalProviderFeatureException
from airflow.providers.tableau.hooks.tableau import TableauJobFinishCode
from airflow.providers.tableau.operators.tableau import TableauOperator
@@ -277,3 +277,213 @@ class TestTableauOperator:
resource_id = "res_id"
operator = TableauOperator(resource="tasks", find=resource_id,
method="run", task_id="t", dag=None)
assert operator._get_resource_id(Mock()) == resource_id
+
+ @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+ def test_execute_datasources_incremental_refresh(self, mock_tableau_hook):
+ """
+ Test execute datasources with incremental refresh
+ """
+ mock_tableau_hook.get_all = Mock(return_value=self.mock_datasources)
+ mock_tableau_hook.return_value.__enter__ =
Mock(return_value=mock_tableau_hook)
+ operator = TableauOperator(
+ blocking_refresh=False,
+ find="ds_2",
+ resource="datasources",
+ incremental_refresh=True,
+ **self.kwargs,
+ )
+
+ job_id = operator.execute(context={})
+
+
mock_tableau_hook.server.datasources.refresh.assert_called_once_with(2,
incremental=True)
+ assert mock_tableau_hook.server.datasources.refresh.return_value.id ==
job_id
+
+ @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+ def test_execute_datasources_full_refresh(self, mock_tableau_hook):
+ """
+ Test execute datasources with full refresh (default behavior)
+ """
+ mock_tableau_hook.get_all = Mock(return_value=self.mock_datasources)
+ mock_tableau_hook.return_value.__enter__ =
Mock(return_value=mock_tableau_hook)
+ operator = TableauOperator(
+ blocking_refresh=False,
+ find="ds_2",
+ resource="datasources",
+ incremental_refresh=False,
+ **self.kwargs,
+ )
+
+ job_id = operator.execute(context={})
+
+ mock_tableau_hook.server.datasources.refresh.assert_called_once_with(2)
+ assert mock_tableau_hook.server.datasources.refresh.return_value.id ==
job_id
+
+ @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+ def test_execute_workbooks_incremental_refresh(self, mock_tableau_hook):
+ """
+ Test execute workbooks with incremental refresh
+ """
+ mock_tableau_hook.get_all = Mock(return_value=self.mocked_workbooks)
+ mock_tableau_hook.return_value.__enter__ =
Mock(return_value=mock_tableau_hook)
+ operator = TableauOperator(
+ blocking_refresh=False,
+ find="wb_2",
+ resource="workbooks",
+ incremental_refresh=True,
+ **self.kwargs,
+ )
+
+ job_id = operator.execute(context={})
+
+ mock_tableau_hook.server.workbooks.refresh.assert_called_once_with(2,
incremental=True)
+ assert mock_tableau_hook.server.workbooks.refresh.return_value.id ==
job_id
+
+ @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+ def test_execute_workbooks_full_refresh(self, mock_tableau_hook):
+ """
+ Test execute workbooks with full refresh (default behavior)
+ """
+ mock_tableau_hook.get_all = Mock(return_value=self.mocked_workbooks)
+ mock_tableau_hook.return_value.__enter__ =
Mock(return_value=mock_tableau_hook)
+ operator = TableauOperator(
+ blocking_refresh=False,
+ find="wb_2",
+ resource="workbooks",
+ incremental_refresh=False,
+ **self.kwargs,
+ )
+
+ job_id = operator.execute(context={})
+
+ mock_tableau_hook.server.workbooks.refresh.assert_called_once_with(2)
+ assert mock_tableau_hook.server.workbooks.refresh.return_value.id ==
job_id
+
+ @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+ def test_execute_datasources_incremental_refresh_blocking(self,
mock_tableau_hook):
+ """
+ Test execute datasources with incremental refresh blocking
+ """
+ mock_signed_in = [False]
+
+ def mock_hook_enter():
+ mock_signed_in[0] = True
+ return mock_tableau_hook
+
+ def mock_hook_exit(exc_type, exc_val, exc_tb):
+ mock_signed_in[0] = False
+
+ def mock_wait_for_state(job_id, target_state, check_interval):
+ if not mock_signed_in[0]:
+ raise Exception("Not signed in")
+ return True
+
+ mock_tableau_hook.return_value.__enter__ =
Mock(side_effect=mock_hook_enter)
+ mock_tableau_hook.return_value.__exit__ =
Mock(side_effect=mock_hook_exit)
+ mock_tableau_hook.wait_for_state =
Mock(side_effect=mock_wait_for_state)
+ mock_tableau_hook.get_all = Mock(return_value=self.mock_datasources)
+
+ operator = TableauOperator(
+ find="ds_2",
+ resource="datasources",
+ incremental_refresh=True,
+ **self.kwargs,
+ )
+
+ job_id = operator.execute(context={})
+
+
mock_tableau_hook.server.datasources.refresh.assert_called_once_with(2,
incremental=True)
+ assert mock_tableau_hook.server.datasources.refresh.return_value.id ==
job_id
+ mock_tableau_hook.wait_for_state.assert_called_once_with(
+ job_id=job_id, check_interval=20,
target_state=TableauJobFinishCode.SUCCESS
+ )
+
+ @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+ def test_incremental_refresh_warning_on_non_refresh_method(self,
mock_tableau_hook, caplog):
+ """
+ Test that a warning is logged when incremental_refresh is set but
method is not 'refresh'
+ """
+ mock_tableau_hook.return_value.__enter__ =
Mock(return_value=mock_tableau_hook)
+ mock_tableau_hook.get_all = Mock(return_value=self.mock_datasources)
+
+ operator = TableauOperator(
+ find="ds_2",
+ resource="datasources",
+ method="delete",
+ incremental_refresh=True,
+ dag=None,
+ task_id="test",
+ )
+
+ operator.execute(context={})
+
+ assert "incremental_refresh parameter is set to True but method is
'delete'" in caplog.text
+ assert "This parameter only applies to 'refresh' operations" in
caplog.text
+
+ @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+ def test_incremental_refresh_unsupported_version_raises(self,
mock_tableau_hook):
+ """
+ Test that AirflowOptionalProviderFeatureException is raised when
incremental_refresh=True
+ but the installed tableauserverclient does not support the incremental
parameter.
+ """
+ mock_tableau_hook.get_all = Mock(return_value=self.mock_datasources)
+ mock_tableau_hook.return_value.__enter__ =
Mock(return_value=mock_tableau_hook)
+ # Simulate older tableauserverclient that doesn't accept incremental
kwarg
+ mock_tableau_hook.server.datasources.refresh.side_effect = TypeError(
+ "refresh() got an unexpected keyword argument 'incremental'"
+ )
+
+ operator = TableauOperator(
+ blocking_refresh=False,
+ find="ds_2",
+ resource="datasources",
+ incremental_refresh=True,
+ **self.kwargs,
+ )
+
+ with pytest.raises(AirflowOptionalProviderFeatureException,
match="tableauserverclient>=0.35"):
+ operator.execute(context={})
+
+ @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+ def test_incremental_refresh_unrelated_type_error_is_raised(self,
mock_tableau_hook):
+ """
+ Test that an unrelated TypeError during refresh is re-raised.
+ """
+ mock_tableau_hook.get_all = Mock(return_value=self.mock_datasources)
+ mock_tableau_hook.return_value.__enter__ =
Mock(return_value=mock_tableau_hook)
+
+ # Simulate a different TypeError that does NOT contain the string
"incremental"
+ mock_tableau_hook.server.datasources.refresh.side_effect =
TypeError("Some other type error")
+
+ operator = TableauOperator(
+ blocking_refresh=False,
+ find="ds_2",
+ resource="datasources",
+ incremental_refresh=True,
+ **self.kwargs,
+ )
+
+ with pytest.raises(TypeError, match="Some other type error"):
+ operator.execute(context={})
+
+ @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+ def test_full_refresh_works_on_older_versions(self, mock_tableau_hook):
+ """
+ Test that full refresh (incremental_refresh=False) works fine on older
+ tableauserverclient versions since the incremental kwarg is not passed.
+ """
+ mock_tableau_hook.get_all = Mock(return_value=self.mock_datasources)
+ mock_tableau_hook.return_value.__enter__ =
Mock(return_value=mock_tableau_hook)
+
+ operator = TableauOperator(
+ blocking_refresh=False,
+ find="ds_2",
+ resource="datasources",
+ incremental_refresh=False,
+ **self.kwargs,
+ )
+
+ job_id = operator.execute(context={})
+
+ # Verify that refresh was called WITHOUT the incremental parameter
+ mock_tableau_hook.server.datasources.refresh.assert_called_once_with(2)
+ assert job_id ==
mock_tableau_hook.server.datasources.refresh.return_value.id