This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f9119133f66 Fix TableauOperator tasks.run AttributeError (#66682)
f9119133f66 is described below

commit f9119133f66031dabb37421e22304ac7d0283db7
Author: deepinsight coder <[email protected]>
AuthorDate: Fri May 15 04:24:29 2026 -0700

    Fix TableauOperator tasks.run AttributeError (#66682)
    
    * Fix TableauOperator failing with AttributeError on tasks.run
    
    * Use standard exception for empty Tableau task responses
    
    * Drop changelog entry — 5.4.0 already released
    
    The release manager regenerates providers/<provider>/docs/changelog.rst
    from git log at release time, so per-PR changelog entries are not needed
    and should not be appended to an already-released version section.
---
 .../airflow/providers/tableau/operators/tableau.py | 15 +++-
 .../tests/unit/tableau/operators/test_tableau.py   | 86 +++++++++++++++++++++-
 2 files changed, 96 insertions(+), 5 deletions(-)

diff --git 
a/providers/tableau/src/airflow/providers/tableau/operators/tableau.py 
b/providers/tableau/src/airflow/providers/tableau/operators/tableau.py
index b9b41738d2f..d176e3ea2ff 100644
--- a/providers/tableau/src/airflow/providers/tableau/operators/tableau.py
+++ b/providers/tableau/src/airflow/providers/tableau/operators/tableau.py
@@ -19,6 +19,8 @@ from __future__ import annotations
 from collections.abc import Sequence
 from typing import TYPE_CHECKING
 
+from tableauserverclient import JobItem
+
 from airflow.providers.common.compat.sdk import AirflowException, BaseOperator
 from airflow.providers.tableau.hooks.tableau import (
     TableauHook,
@@ -115,9 +117,16 @@ class TableauOperator(BaseOperator):
 
             resource_id = self._get_resource_id(tableau_hook)
 
-            response = method(resource_id)
-
-            job_id = response.id
+            if self.resource == "tasks" and self.method == "run":
+                task_item = resource.get_by_id(resource_id)
+                response_bytes = method(task_item)
+                job_items = JobItem.from_response(response_bytes, 
tableau_hook.server.namespace)
+                if not job_items:
+                    raise ValueError("Tableau tasks.run returned no JobItem in 
response")
+                job_id = job_items[0].id
+            else:
+                response = method(resource_id)
+                job_id = response.id
 
             if self.method == "refresh":
                 if self.blocking_refresh:
diff --git a/providers/tableau/tests/unit/tableau/operators/test_tableau.py 
b/providers/tableau/tests/unit/tableau/operators/test_tableau.py
index d55598be588..ff22d128c43 100644
--- a/providers/tableau/tests/unit/tableau/operators/test_tableau.py
+++ b/providers/tableau/tests/unit/tableau/operators/test_tableau.py
@@ -33,6 +33,7 @@ class TestTableauOperator:
     def setup_method(self):
         self.mocked_workbooks = []
         self.mock_datasources = []
+        self.mock_tasks = []
 
         for i in range(3):
             mock_workbook = Mock()
@@ -45,6 +46,11 @@ class TestTableauOperator:
             mock_datasource.name = f"ds_{i}"
             self.mock_datasources.append(mock_datasource)
 
+            mock_task = Mock()
+            mock_task.id = f"task-id-{i}"
+            mock_task.name = f"task_{i}"
+            self.mock_tasks.append(mock_task)
+
         self.kwargs = {
             "site_id": "test_site",
             "task_id": "task",
@@ -179,6 +185,82 @@ class TestTableauOperator:
         with pytest.raises(AirflowException):
             operator.execute({})
 
+    @patch("airflow.providers.tableau.operators.tableau.JobItem")
+    @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+    def test_execute_tasks_run_with_id(self, mock_tableau_hook, mock_job_item):
+        """tasks.run must fetch a TaskItem via get_by_id and parse JobItem 
from response bytes."""
+        mock_tableau_hook.return_value.__enter__ = 
Mock(return_value=mock_tableau_hook)
+
+        mock_task_item = Mock()
+        mock_tableau_hook.server.tasks.get_by_id = 
Mock(return_value=mock_task_item)
+        mock_tableau_hook.server.tasks.run = 
Mock(return_value=b"<tsResponse/>")
+
+        mock_job = Mock()
+        mock_job.id = "job-123"
+        mock_job_item.from_response = Mock(return_value=[mock_job])
+
+        kwargs = {**self.kwargs, "method": "run", "match_with": "id"}
+        operator = TableauOperator(find="task-abc", resource="tasks", **kwargs)
+
+        job_id = operator.execute(context={})
+
+        
mock_tableau_hook.server.tasks.get_by_id.assert_called_once_with("task-abc")
+        
mock_tableau_hook.server.tasks.run.assert_called_once_with(mock_task_item)
+        mock_job_item.from_response.assert_called_once_with(
+            b"<tsResponse/>", mock_tableau_hook.server.namespace
+        )
+        assert job_id == "job-123"
+
+    @patch("airflow.providers.tableau.operators.tableau.JobItem")
+    @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+    def test_execute_tasks_run_with_match_with_name(self, mock_tableau_hook, 
mock_job_item):
+        """tasks.run should resolve the task id before fetching the TaskItem 
to run."""
+        mock_tableau_hook.get_all = Mock(return_value=self.mock_tasks)
+        mock_tableau_hook.return_value.__enter__ = 
Mock(return_value=mock_tableau_hook)
+
+        mock_task_item = Mock()
+        mock_tableau_hook.server.tasks.get_by_id = 
Mock(return_value=mock_task_item)
+        mock_tableau_hook.server.tasks.run = 
Mock(return_value=b"<tsResponse/>")
+
+        mock_job = Mock()
+        mock_job.id = "job-456"
+        mock_job_item.from_response = Mock(return_value=[mock_job])
+
+        kwargs = {**self.kwargs, "method": "run"}
+        operator = TableauOperator(find="task_2", resource="tasks", **kwargs)
+
+        job_id = operator.execute(context={})
+
+        
mock_tableau_hook.server.tasks.get_by_id.assert_called_once_with(self.mock_tasks[2].id)
+        
mock_tableau_hook.server.tasks.run.assert_called_once_with(mock_task_item)
+        assert job_id == "job-456"
+
+    @patch("airflow.providers.tableau.operators.tableau.JobItem")
+    @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+    def test_execute_tasks_run_empty_job_response_raises(self, 
mock_tableau_hook, mock_job_item):
+        """A tasks.run response without a JobItem should raise a provider 
exception."""
+        mock_tableau_hook.return_value.__enter__ = 
Mock(return_value=mock_tableau_hook)
+        mock_tableau_hook.server.tasks.get_by_id = Mock(return_value=Mock())
+        mock_tableau_hook.server.tasks.run = 
Mock(return_value=b"<tsResponse/>")
+        mock_job_item.from_response = Mock(return_value=[])
+
+        kwargs = {**self.kwargs, "method": "run", "match_with": "id"}
+        operator = TableauOperator(find="task-abc", resource="tasks", **kwargs)
+
+        with pytest.raises(ValueError, match="no JobItem"):
+            operator.execute(context={})
+
+    @patch("airflow.providers.tableau.operators.tableau.TableauHook")
+    def test_execute_missing_task(self, mock_tableau_hook):
+        """Test execute missing task."""
+        mock_tableau_hook.get_all = Mock(return_value=self.mock_tasks)
+        mock_tableau_hook.return_value.__enter__ = 
Mock(return_value=mock_tableau_hook)
+        kwargs = {**self.kwargs, "method": "run"}
+        operator = TableauOperator(find="test", resource="tasks", **kwargs)
+
+        with pytest.raises(AirflowException):
+            operator.execute({})
+
     def test_execute_unavailable_resource(self):
         """
         Test execute unavailable resource
@@ -193,5 +275,5 @@ class TestTableauOperator:
         Test get resource id
         """
         resource_id = "res_id"
-        operator = TableauOperator(resource="task", find=resource_id, 
method="run", task_id="t", dag=None)
-        assert operator._get_resource_id(resource_id) == resource_id
+        operator = TableauOperator(resource="tasks", find=resource_id, 
method="run", task_id="t", dag=None)
+        assert operator._get_resource_id(Mock()) == resource_id

Reply via email to