This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 47faf3c0723 Fix coroutine serialization error in 
PowerBIDatasetRefreshOperator (#63829)
47faf3c0723 is described below

commit 47faf3c07231687ae1b9c3b2e2a695c3a5c56a0a
Author: Henry Chen <[email protected]>
AuthorDate: Fri Jun 5 18:08:53 2026 +0800

    Fix coroutine serialization error in PowerBIDatasetRefreshOperator (#63829)
    
    * Fix coroutine serialization error in PowerBIDatasetRefreshOperator
    
    * add unit test
---
 .../providers/microsoft/azure/operators/powerbi.py |  63 +++++---
 .../unit/microsoft/azure/operators/test_powerbi.py | 177 +++++++++++++++------
 2 files changed, 163 insertions(+), 77 deletions(-)

diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py
index a3f9792d456..ccbf025effe 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py
@@ -66,12 +66,13 @@ class PowerBIDatasetRefreshOperator(BaseOperator):
     :param dataset_id: The dataset id.
     :param group_id: The workspace id.
     :param conn_id: Airflow Connection ID that contains the connection 
information for the Power BI account used for authentication.
-    :param timeout: Time in seconds to wait for a dataset to reach a terminal 
status for asynchronous waits. Used only if ``wait_for_completion`` is True.
+    :param timeout: Time in seconds to wait for a dataset to reach a terminal 
status for asynchronous waits.
     :param check_interval: Number of seconds to wait before rechecking the
         refresh status.
     :param request_body: Additional arguments to pass to the request body, as 
described in 
https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/refresh-dataset-in-group#request-body.
     :param wait_for_completion: If True, wait for the dataset refresh to 
complete. If False, trigger the refresh and return immediately without waiting.
-    :param deferrable: This parameter is deprecated and no longer has any 
effect. The operator now always uses deferrable execution when 
``wait_for_completion=True``.
+    :param deferrable: This parameter is deprecated and no longer has any 
effect. The operator now always
+        uses deferrable execution.
     """
 
     template_fields: Sequence[str] = (
@@ -99,9 +100,7 @@ class PowerBIDatasetRefreshOperator(BaseOperator):
     ) -> None:
         super().__init__(**kwargs)
         if "deferrable" in kwargs or deferrable is not True:
-            self.log.warning(
-                "The PowerBIDatasetRefreshOperator now always uses deferrable 
execution when wait_for_completion=True."
-            )
+            self.log.warning("The PowerBIDatasetRefreshOperator now always 
uses deferrable execution.")
         self.hook = PowerBIHook(conn_id=conn_id, proxies=proxies, 
api_version=api_version, timeout=timeout)
         self.dataset_id = dataset_id
         self.group_id = group_id
@@ -121,40 +120,56 @@ class PowerBIDatasetRefreshOperator(BaseOperator):
 
     def execute(self, context: Context):
         """Refresh the Power BI Dataset."""
-        if not self.wait_for_completion:
-            # Fire and forget - synchronous execution, no deferral
-            hook = PowerBIHook(
-                conn_id=self.conn_id, proxies=self.proxies, 
api_version=self.api_version, timeout=self.timeout
-            )
-
-            dataset_refresh_id = hook.trigger_dataset_refresh(
-                dataset_id=self.dataset_id,
+        self.defer(
+            trigger=PowerBITrigger(
+                conn_id=self.conn_id,
                 group_id=self.group_id,
+                dataset_id=self.dataset_id,
+                timeout=self.timeout,
+                proxies=self.proxies,
+                api_version=self.api_version,
+                check_interval=self.check_interval,
+                wait_for_termination=False,
                 request_body=self.request_body,
+            ),
+            method_name=self.handle_refresh.__name__,
+        )
+
+    def handle_refresh(self, context: Context, event: dict[str, str] | None) 
-> None:
+        """
+        Handle refresh-trigger event and optionally defer again to wait for 
refresh completion.
+
+        :param context: Airflow context dictionary
+        :param event: Event dict from trigger with status and 
dataset_refresh_id
+        """
+        if not event:
+            return
+
+        dataset_refresh_id = event.get("dataset_refresh_id")
+        if dataset_refresh_id:
+            context["ti"].xcom_push(
+                key=f"{self.task_id}.powerbi_dataset_refresh_id",
+                value=dataset_refresh_id,
             )
 
-            if dataset_refresh_id:
-                self.log.info("Triggered dataset refresh %s 
(fire-and-forget)", dataset_refresh_id)
-                context["ti"].xcom_push(
-                    key=f"{self.task_id}.powerbi_dataset_refresh_id",
-                    value=dataset_refresh_id,
-                )
-            else:
-                raise AirflowException("Failed to trigger dataset refresh")
+        if event["status"] == "error":
+            raise AirflowException(event["message"])
+
+        if not self.wait_for_completion:
+            self.log.info("Triggered dataset refresh %s (fire-and-forget)", 
dataset_refresh_id)
             return
 
-        # Wait for termination - use deferrable trigger
         self.defer(
             trigger=PowerBITrigger(
                 conn_id=self.conn_id,
                 group_id=self.group_id,
                 dataset_id=self.dataset_id,
+                dataset_refresh_id=dataset_refresh_id,
                 timeout=self.timeout,
                 proxies=self.proxies,
                 api_version=self.api_version,
                 check_interval=self.check_interval,
-                wait_for_termination=self.wait_for_completion,
-                request_body=self.request_body,
+                wait_for_termination=True,
             ),
             method_name=self.execute_complete.__name__,
         )
diff --git 
a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_powerbi.py
 
b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_powerbi.py
index b5be85f3c7f..ad8408455f9 100644
--- 
a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_powerbi.py
+++ 
b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_powerbi.py
@@ -26,17 +26,19 @@ from airflow.providers.common.compat.sdk import 
AirflowException, BaseHook, Task
 from airflow.providers.microsoft.azure.hooks.powerbi import (
     PowerBIDatasetRefreshFields,
     PowerBIDatasetRefreshStatus,
+    PowerBIHook,
 )
 from airflow.providers.microsoft.azure.operators.powerbi import 
PowerBIDatasetRefreshOperator
 from airflow.providers.microsoft.azure.triggers.powerbi import PowerBITrigger
 
 from tests_common.test_utils.mock_context import mock_context
+from tests_common.test_utils.operators.run_deferrable import execute_operator
 from unit.microsoft.azure.test_utils import get_airflow_connection
 
 try:
     from airflow.sdk import timezone
 except ImportError:
-    from airflow.utils import timezone  # type: ignore[no-redef]
+    from airflow.utils import timezone  # type: ignore[attr-defined, no-redef]
 
 
 DEFAULT_CONNECTION_CLIENT_SECRET = "powerbi_conn_id"
@@ -109,10 +111,12 @@ class TestPowerBIDatasetRefreshOperator:
 
         assert isinstance(exc.value.trigger, PowerBITrigger)
         assert exc.value.trigger.dataset_refresh_id is None
+        assert exc.value.trigger.wait_for_termination is False
+        assert exc.value.method_name == "handle_refresh"
 
     @mock.patch.object(BaseHook, "get_connection", 
side_effect=get_airflow_connection)
     def test_powerbi_operator_async_get_refresh_status_success(self, 
connection):
-        """Test that execute defers once when wait_for_completion=True"""
+        """Test that execute first defers to trigger refresh when 
wait_for_completion=True"""
         operator = PowerBIDatasetRefreshOperator(
             **CONFIG,
             wait_for_completion=True,  # Explicitly set to True
@@ -128,14 +132,119 @@ class TestPowerBIDatasetRefreshOperator:
         # Verify trigger has correct parameters
         assert exc.value.trigger.dataset_id == DATASET_ID
         assert exc.value.trigger.group_id == GROUP_ID
-        assert exc.value.trigger.wait_for_termination is True
+        assert exc.value.trigger.wait_for_termination is False
 
         # Verify callback method name
-        assert exc.value.method_name == "execute_complete"
+        assert exc.value.method_name == "handle_refresh"
 
         # Verify dataset_refresh_id is None (trigger will create it)
         assert exc.value.trigger.dataset_refresh_id is None
 
+    @mock.patch.object(BaseHook, "get_connection", 
side_effect=get_airflow_connection)
+    def test_handle_refresh_wait_for_completion(self, connection):
+        """Test that handle_refresh defers to execute_complete when 
wait_for_completion=True."""
+        operator = PowerBIDatasetRefreshOperator(
+            **CONFIG,
+            wait_for_completion=True,
+        )
+        context = {"ti": MagicMock()}
+
+        with pytest.raises(TaskDeferred) as exc:
+            operator.handle_refresh(
+                context=context,
+                event=SUCCESS_TRIGGER_EVENT,
+            )
+
+        assert context["ti"].xcom_push.call_count == 1
+        assert isinstance(exc.value.trigger, PowerBITrigger)
+        assert exc.value.trigger.dataset_refresh_id == NEW_REFRESH_REQUEST_ID
+        assert exc.value.trigger.wait_for_termination is True
+        assert exc.value.method_name == "execute_complete"
+
+    @mock.patch.object(BaseHook, "get_connection", 
side_effect=get_airflow_connection)
+    def test_handle_refresh_fire_and_forget(self, connection):
+        """Test that handle_refresh finishes immediately for fire-and-forget 
mode."""
+        operator = PowerBIDatasetRefreshOperator(
+            **CONFIG,
+            wait_for_completion=False,
+        )
+        context = {"ti": MagicMock()}
+
+        operator.handle_refresh(
+            context=context,
+            event=SUCCESS_TRIGGER_EVENT,
+        )
+
+        assert context["ti"].xcom_push.call_count == 1
+
+    @mock.patch.object(BaseHook, "get_connection", 
side_effect=get_airflow_connection)
+    def test_handle_refresh_failure(self, connection):
+        """Test that handle_refresh raises exception on trigger error."""
+        operator = PowerBIDatasetRefreshOperator(
+            **CONFIG,
+            wait_for_completion=False,
+        )
+        context = {"ti": MagicMock()}
+
+        with pytest.raises(AirflowException, match="Failed to trigger the 
dataset refresh."):
+            operator.handle_refresh(
+                context=context,
+                event={
+                    "status": "error",
+                    "dataset_refresh_status": None,
+                    "message": "Failed to trigger the dataset refresh.",
+                    "dataset_refresh_id": None,
+                },
+            )
+
+        assert context["ti"].xcom_push.call_count == 0
+
+    @mock.patch.object(PowerBIHook, "get_refresh_details_by_refresh_id")
+    @mock.patch.object(PowerBIHook, "trigger_dataset_refresh")
+    def test_execute_operator_wait_for_completion_full_lifecycle(
+        self, mock_trigger_dataset_refresh, 
mock_get_refresh_details_by_refresh_id
+    ):
+        """Assert the full deferrable lifecycle completes successfully."""
+        mock_trigger_dataset_refresh.return_value = NEW_REFRESH_REQUEST_ID
+        mock_get_refresh_details_by_refresh_id.side_effect = [
+            {
+                PowerBIDatasetRefreshFields.STATUS.value: 
PowerBIDatasetRefreshStatus.IN_PROGRESS,
+                PowerBIDatasetRefreshFields.ERROR.value: None,
+            },
+            {
+                PowerBIDatasetRefreshFields.STATUS.value: 
PowerBIDatasetRefreshStatus.COMPLETED,
+                PowerBIDatasetRefreshFields.ERROR.value: None,
+            },
+        ]
+
+        operator = PowerBIDatasetRefreshOperator(
+            **CONFIG,
+            wait_for_completion=True,
+        )
+
+        result, events = execute_operator(operator)
+
+        assert result is None
+        mock_trigger_dataset_refresh.assert_called_once_with(
+            dataset_id=DATASET_ID,
+            group_id=GROUP_ID,
+            request_body=REQUEST_BODY,
+        )
+        assert mock_get_refresh_details_by_refresh_id.call_count == 2
+        assert len(events) == 2
+        assert events[0].payload == {
+            "status": "success",
+            "dataset_refresh_status": None,
+            "message": f"The dataset refresh {NEW_REFRESH_REQUEST_ID} has been 
triggered.",
+            "dataset_refresh_id": NEW_REFRESH_REQUEST_ID,
+        }
+        assert events[1].payload == {
+            "status": "success",
+            "dataset_refresh_status": PowerBIDatasetRefreshStatus.COMPLETED,
+            "message": f"The dataset refresh {NEW_REFRESH_REQUEST_ID} has 
{PowerBIDatasetRefreshStatus.COMPLETED}.",
+            "dataset_refresh_id": NEW_REFRESH_REQUEST_ID,
+        }
+
     def test_powerbi_operator_async_execute_complete_success(self):
         """Assert that execute_complete processes success event correctly"""
         operator = PowerBIDatasetRefreshOperator(**CONFIG)
@@ -232,63 +341,25 @@ class TestPowerBIDatasetRefreshOperator:
 
         assert url == EXPECTED_ITEM_RUN_OP_EXTRA_LINK
 
-    
@mock.patch("airflow.providers.microsoft.azure.operators.powerbi.PowerBIHook")
-    @mock.patch.object(BaseHook, "get_connection", 
side_effect=get_airflow_connection)
-    def test_execute_fire_and_forget_mode(self, mock_connection, 
mock_hook_class):
-        """Test fire-and-forget mode (wait_for_completion=False)"""
-        mock_hook_instance = mock_hook_class.return_value
-        mock_hook_instance.trigger_dataset_refresh.return_value = 
NEW_REFRESH_REQUEST_ID
-
-        operator = PowerBIDatasetRefreshOperator(
-            **CONFIG,
-            wait_for_completion=False,
-        )
-        context = {"ti": MagicMock()}
-        context["ti"].task_id = TASK_ID
-
-        # Should not raise TaskDeferred
-        result = operator.execute(context)
-
-        # Verify hook was called correctly
-        mock_hook_instance.trigger_dataset_refresh.assert_called_once_with(
-            dataset_id=DATASET_ID,
-            group_id=GROUP_ID,
-            request_body=REQUEST_BODY,
-        )
-
-        # Verify XCom push
-        assert context["ti"].xcom_push.call_count == 1
-        call_args = context["ti"].xcom_push.call_args
-        assert call_args[1]["key"] == f"{TASK_ID}.powerbi_dataset_refresh_id"
-        assert call_args[1]["value"] == NEW_REFRESH_REQUEST_ID
-
-        # Should return None (completes synchronously)
-        assert result is None
-
-    
@mock.patch("airflow.providers.microsoft.azure.operators.powerbi.PowerBIHook")
     @mock.patch.object(BaseHook, "get_connection", 
side_effect=get_airflow_connection)
-    def test_execute_fire_and_forget_mode_failure(self, mock_connection, 
mock_hook_class):
-        """Test fire-and-forget mode raises exception when trigger fails"""
-        mock_hook_instance = mock_hook_class.return_value
-        mock_hook_instance.trigger_dataset_refresh.return_value = None
-
+    def test_execute_fire_and_forget_mode(self, mock_connection):
+        """Test fire-and-forget mode defers to trigger refresh."""
         operator = PowerBIDatasetRefreshOperator(
             **CONFIG,
             wait_for_completion=False,
         )
-        context = {"ti": MagicMock()}
-        context["ti"].task_id = TASK_ID
+        context = mock_context(task=operator)
 
-        # Should raise AirflowException
-        with pytest.raises(AirflowException, match="Failed to trigger dataset 
refresh"):
+        with pytest.raises(TaskDeferred) as exc:
             operator.execute(context)
 
-        # Should not push to XCom on failure
-        assert context["ti"].xcom_push.call_count == 0
+        assert isinstance(exc.value.trigger, PowerBITrigger)
+        assert exc.value.trigger.wait_for_termination is False
+        assert exc.value.method_name == "handle_refresh"
 
     @mock.patch.object(BaseHook, "get_connection", 
side_effect=get_airflow_connection)
     def test_execute_default_behavior_waits_for_completion(self, 
mock_connection):
-        """Test that default behavior (wait_for_completion=True) defers and 
waits"""
+        """Test that default behavior (wait_for_completion=True) first defers 
to trigger refresh."""
         config_without_wait = {
             "task_id": TASK_ID,
             "conn_id": DEFAULT_CONNECTION_CLIENT_SECRET,
@@ -303,11 +374,11 @@ class TestPowerBIDatasetRefreshOperator:
         operator = PowerBIDatasetRefreshOperator(**config_without_wait)
         context = mock_context(task=operator)
 
-        # Should defer (because default is wait_for_completion=True)
+        # Should defer for initial trigger step (because default is 
wait_for_completion=True)
         with pytest.raises(TaskDeferred) as exc:
             operator.execute(context)
 
         # Verify it deferred with correct trigger
         assert isinstance(exc.value.trigger, PowerBITrigger)
-        assert exc.value.trigger.wait_for_termination is True
-        assert exc.value.method_name == "execute_complete"
+        assert exc.value.trigger.wait_for_termination is False
+        assert exc.value.method_name == "handle_refresh"

Reply via email to