This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f9119133f66 Fix TableauOperator tasks.run AttributeError (#66682)
f9119133f66 is described below
commit f9119133f66031dabb37421e22304ac7d0283db7
Author: deepinsight coder <[email protected]>
AuthorDate: Fri May 15 04:24:29 2026 -0700
Fix TableauOperator tasks.run AttributeError (#66682)
* Fix TableauOperator failing with AttributeError on tasks.run
* Use standard exception for empty Tableau task responses
* Drop changelog entry — 5.4.0 already released
The release manager regenerates providers/<provider>/docs/changelog.rst
from git log at release time, so per-PR changelog entries are not needed
and should not be appended to an already-released version section.
---
.../airflow/providers/tableau/operators/tableau.py | 15 +++-
.../tests/unit/tableau/operators/test_tableau.py | 86 +++++++++++++++++++++-
2 files changed, 96 insertions(+), 5 deletions(-)
diff --git
a/providers/tableau/src/airflow/providers/tableau/operators/tableau.py
b/providers/tableau/src/airflow/providers/tableau/operators/tableau.py
index b9b41738d2f..d176e3ea2ff 100644
--- a/providers/tableau/src/airflow/providers/tableau/operators/tableau.py
+++ b/providers/tableau/src/airflow/providers/tableau/operators/tableau.py
@@ -19,6 +19,8 @@ from __future__ import annotations
from collections.abc import Sequence
from typing import TYPE_CHECKING
+from tableauserverclient import JobItem
+
from airflow.providers.common.compat.sdk import AirflowException, BaseOperator
from airflow.providers.tableau.hooks.tableau import (
TableauHook,
@@ -115,9 +117,16 @@ class TableauOperator(BaseOperator):
resource_id = self._get_resource_id(tableau_hook)
- response = method(resource_id)
-
- job_id = response.id
+ if self.resource == "tasks" and self.method == "run":
+ task_item = resource.get_by_id(resource_id)
+ response_bytes = method(task_item)
+ job_items = JobItem.from_response(response_bytes,
tableau_hook.server.namespace)
+ if not job_items:
+ raise ValueError("Tableau tasks.run returned no JobItem in
response")
+ job_id = job_items[0].id
+ else:
+ response = method(resource_id)
+ job_id = response.id
if self.method == "refresh":
if self.blocking_refresh:
diff --git a/providers/tableau/tests/unit/tableau/operators/test_tableau.py
b/providers/tableau/tests/unit/tableau/operators/test_tableau.py
index d55598be588..ff22d128c43 100644
--- a/providers/tableau/tests/unit/tableau/operators/test_tableau.py
+++ b/providers/tableau/tests/unit/tableau/operators/test_tableau.py
@@ -33,6 +33,7 @@ class TestTableauOperator:
def setup_method(self):
self.mocked_workbooks = []
self.mock_datasources = []
+ self.mock_tasks = []
for i in range(3):
mock_workbook = Mock()
@@ -45,6 +46,11 @@ class TestTableauOperator:
mock_datasource.name = f"ds_{i}"
self.mock_datasources.append(mock_datasource)
+ mock_task = Mock()
+ mock_task.id = f"task-id-{i}"
+ mock_task.name = f"task_{i}"
+ self.mock_tasks.append(mock_task)
+
self.kwargs = {
"site_id": "test_site",
"task_id": "task",
@@ -179,6 +185,82 @@ class TestTableauOperator:
with pytest.raises(AirflowException):
operator.execute({})
+ @patch("airflow.providers.tableau.operators.tableau.JobItem")
+ @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+ def test_execute_tasks_run_with_id(self, mock_tableau_hook, mock_job_item):
+ """tasks.run must fetch a TaskItem via get_by_id and parse JobItem
from response bytes."""
+ mock_tableau_hook.return_value.__enter__ =
Mock(return_value=mock_tableau_hook)
+
+ mock_task_item = Mock()
+ mock_tableau_hook.server.tasks.get_by_id =
Mock(return_value=mock_task_item)
+ mock_tableau_hook.server.tasks.run =
Mock(return_value=b"<tsResponse/>")
+
+ mock_job = Mock()
+ mock_job.id = "job-123"
+ mock_job_item.from_response = Mock(return_value=[mock_job])
+
+ kwargs = {**self.kwargs, "method": "run", "match_with": "id"}
+ operator = TableauOperator(find="task-abc", resource="tasks", **kwargs)
+
+ job_id = operator.execute(context={})
+
+
mock_tableau_hook.server.tasks.get_by_id.assert_called_once_with("task-abc")
+
mock_tableau_hook.server.tasks.run.assert_called_once_with(mock_task_item)
+ mock_job_item.from_response.assert_called_once_with(
+ b"<tsResponse/>", mock_tableau_hook.server.namespace
+ )
+ assert job_id == "job-123"
+
+ @patch("airflow.providers.tableau.operators.tableau.JobItem")
+ @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+ def test_execute_tasks_run_with_match_with_name(self, mock_tableau_hook,
mock_job_item):
+ """tasks.run should resolve the task id before fetching the TaskItem
to run."""
+ mock_tableau_hook.get_all = Mock(return_value=self.mock_tasks)
+ mock_tableau_hook.return_value.__enter__ =
Mock(return_value=mock_tableau_hook)
+
+ mock_task_item = Mock()
+ mock_tableau_hook.server.tasks.get_by_id =
Mock(return_value=mock_task_item)
+ mock_tableau_hook.server.tasks.run =
Mock(return_value=b"<tsResponse/>")
+
+ mock_job = Mock()
+ mock_job.id = "job-456"
+ mock_job_item.from_response = Mock(return_value=[mock_job])
+
+ kwargs = {**self.kwargs, "method": "run"}
+ operator = TableauOperator(find="task_2", resource="tasks", **kwargs)
+
+ job_id = operator.execute(context={})
+
+
mock_tableau_hook.server.tasks.get_by_id.assert_called_once_with(self.mock_tasks[2].id)
+
mock_tableau_hook.server.tasks.run.assert_called_once_with(mock_task_item)
+ assert job_id == "job-456"
+
+ @patch("airflow.providers.tableau.operators.tableau.JobItem")
+ @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+ def test_execute_tasks_run_empty_job_response_raises(self,
mock_tableau_hook, mock_job_item):
+ """A tasks.run response without a JobItem should raise a provider
exception."""
+ mock_tableau_hook.return_value.__enter__ =
Mock(return_value=mock_tableau_hook)
+ mock_tableau_hook.server.tasks.get_by_id = Mock(return_value=Mock())
+ mock_tableau_hook.server.tasks.run =
Mock(return_value=b"<tsResponse/>")
+ mock_job_item.from_response = Mock(return_value=[])
+
+ kwargs = {**self.kwargs, "method": "run", "match_with": "id"}
+ operator = TableauOperator(find="task-abc", resource="tasks", **kwargs)
+
+ with pytest.raises(ValueError, match="no JobItem"):
+ operator.execute(context={})
+
+ @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+ def test_execute_missing_task(self, mock_tableau_hook):
+ """Test execute missing task."""
+ mock_tableau_hook.get_all = Mock(return_value=self.mock_tasks)
+ mock_tableau_hook.return_value.__enter__ =
Mock(return_value=mock_tableau_hook)
+ kwargs = {**self.kwargs, "method": "run"}
+ operator = TableauOperator(find="test", resource="tasks", **kwargs)
+
+ with pytest.raises(AirflowException):
+ operator.execute({})
+
def test_execute_unavailable_resource(self):
"""
Test execute unavailable resource
@@ -193,5 +275,5 @@ class TestTableauOperator:
Test get resource id
"""
resource_id = "res_id"
- operator = TableauOperator(resource="task", find=resource_id,
method="run", task_id="t", dag=None)
- assert operator._get_resource_id(resource_id) == resource_id
+ operator = TableauOperator(resource="tasks", find=resource_id,
method="run", task_id="t", dag=None)
+ assert operator._get_resource_id(Mock()) == resource_id