This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 47faf3c0723 Fix coroutine serialization error in
PowerBIDatasetRefreshOperator (#63829)
47faf3c0723 is described below
commit 47faf3c07231687ae1b9c3b2e2a695c3a5c56a0a
Author: Henry Chen <[email protected]>
AuthorDate: Fri Jun 5 18:08:53 2026 +0800
Fix coroutine serialization error in PowerBIDatasetRefreshOperator (#63829)
* Fix coroutine serialization error in PowerBIDatasetRefreshOperator
* add unit test
---
.../providers/microsoft/azure/operators/powerbi.py | 63 +++++---
.../unit/microsoft/azure/operators/test_powerbi.py | 177 +++++++++++++++------
2 files changed, 163 insertions(+), 77 deletions(-)
diff --git
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py
index a3f9792d456..ccbf025effe 100644
---
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py
+++
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py
@@ -66,12 +66,13 @@ class PowerBIDatasetRefreshOperator(BaseOperator):
:param dataset_id: The dataset id.
:param group_id: The workspace id.
:param conn_id: Airflow Connection ID that contains the connection
information for the Power BI account used for authentication.
- :param timeout: Time in seconds to wait for a dataset to reach a terminal
status for asynchronous waits. Used only if ``wait_for_completion`` is True.
+ :param timeout: Time in seconds to wait for a dataset to reach a terminal
status for asynchronous waits.
:param check_interval: Number of seconds to wait before rechecking the
refresh status.
:param request_body: Additional arguments to pass to the request body, as
described in
https://learn.microsoft.com/en-us/rest/api/power-bi/datasets/refresh-dataset-in-group#request-body.
:param wait_for_completion: If True, wait for the dataset refresh to
complete. If False, trigger the refresh and return immediately without waiting.
- :param deferrable: This parameter is deprecated and no longer has any
effect. The operator now always uses deferrable execution when
``wait_for_completion=True``.
+ :param deferrable: This parameter is deprecated and no longer has any
effect. The operator now always
+ uses deferrable execution.
"""
template_fields: Sequence[str] = (
@@ -99,9 +100,7 @@ class PowerBIDatasetRefreshOperator(BaseOperator):
) -> None:
super().__init__(**kwargs)
if "deferrable" in kwargs or deferrable is not True:
- self.log.warning(
- "The PowerBIDatasetRefreshOperator now always uses deferrable
execution when wait_for_completion=True."
- )
+ self.log.warning("The PowerBIDatasetRefreshOperator now always
uses deferrable execution.")
self.hook = PowerBIHook(conn_id=conn_id, proxies=proxies,
api_version=api_version, timeout=timeout)
self.dataset_id = dataset_id
self.group_id = group_id
@@ -121,40 +120,56 @@ class PowerBIDatasetRefreshOperator(BaseOperator):
def execute(self, context: Context):
"""Refresh the Power BI Dataset."""
- if not self.wait_for_completion:
- # Fire and forget - synchronous execution, no deferral
- hook = PowerBIHook(
- conn_id=self.conn_id, proxies=self.proxies,
api_version=self.api_version, timeout=self.timeout
- )
-
- dataset_refresh_id = hook.trigger_dataset_refresh(
- dataset_id=self.dataset_id,
+ self.defer(
+ trigger=PowerBITrigger(
+ conn_id=self.conn_id,
group_id=self.group_id,
+ dataset_id=self.dataset_id,
+ timeout=self.timeout,
+ proxies=self.proxies,
+ api_version=self.api_version,
+ check_interval=self.check_interval,
+ wait_for_termination=False,
request_body=self.request_body,
+ ),
+ method_name=self.handle_refresh.__name__,
+ )
+
+ def handle_refresh(self, context: Context, event: dict[str, str] | None)
-> None:
+ """
+ Handle refresh-trigger event and optionally defer again to wait for
refresh completion.
+
+ :param context: Airflow context dictionary
+ :param event: Event dict from trigger with status and
dataset_refresh_id
+ """
+ if not event:
+ return
+
+ dataset_refresh_id = event.get("dataset_refresh_id")
+ if dataset_refresh_id:
+ context["ti"].xcom_push(
+ key=f"{self.task_id}.powerbi_dataset_refresh_id",
+ value=dataset_refresh_id,
)
- if dataset_refresh_id:
- self.log.info("Triggered dataset refresh %s
(fire-and-forget)", dataset_refresh_id)
- context["ti"].xcom_push(
- key=f"{self.task_id}.powerbi_dataset_refresh_id",
- value=dataset_refresh_id,
- )
- else:
- raise AirflowException("Failed to trigger dataset refresh")
+ if event["status"] == "error":
+ raise AirflowException(event["message"])
+
+ if not self.wait_for_completion:
+ self.log.info("Triggered dataset refresh %s (fire-and-forget)",
dataset_refresh_id)
return
- # Wait for termination - use deferrable trigger
self.defer(
trigger=PowerBITrigger(
conn_id=self.conn_id,
group_id=self.group_id,
dataset_id=self.dataset_id,
+ dataset_refresh_id=dataset_refresh_id,
timeout=self.timeout,
proxies=self.proxies,
api_version=self.api_version,
check_interval=self.check_interval,
- wait_for_termination=self.wait_for_completion,
- request_body=self.request_body,
+ wait_for_termination=True,
),
method_name=self.execute_complete.__name__,
)
diff --git
a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_powerbi.py
b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_powerbi.py
index b5be85f3c7f..ad8408455f9 100644
---
a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_powerbi.py
+++
b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_powerbi.py
@@ -26,17 +26,19 @@ from airflow.providers.common.compat.sdk import
AirflowException, BaseHook, Task
from airflow.providers.microsoft.azure.hooks.powerbi import (
PowerBIDatasetRefreshFields,
PowerBIDatasetRefreshStatus,
+ PowerBIHook,
)
from airflow.providers.microsoft.azure.operators.powerbi import
PowerBIDatasetRefreshOperator
from airflow.providers.microsoft.azure.triggers.powerbi import PowerBITrigger
from tests_common.test_utils.mock_context import mock_context
+from tests_common.test_utils.operators.run_deferrable import execute_operator
from unit.microsoft.azure.test_utils import get_airflow_connection
try:
from airflow.sdk import timezone
except ImportError:
- from airflow.utils import timezone # type: ignore[no-redef]
+ from airflow.utils import timezone # type: ignore[attr-defined, no-redef]
DEFAULT_CONNECTION_CLIENT_SECRET = "powerbi_conn_id"
@@ -109,10 +111,12 @@ class TestPowerBIDatasetRefreshOperator:
assert isinstance(exc.value.trigger, PowerBITrigger)
assert exc.value.trigger.dataset_refresh_id is None
+ assert exc.value.trigger.wait_for_termination is False
+ assert exc.value.method_name == "handle_refresh"
@mock.patch.object(BaseHook, "get_connection",
side_effect=get_airflow_connection)
def test_powerbi_operator_async_get_refresh_status_success(self,
connection):
- """Test that execute defers once when wait_for_completion=True"""
+ """Test that execute first defers to trigger refresh when
wait_for_completion=True"""
operator = PowerBIDatasetRefreshOperator(
**CONFIG,
wait_for_completion=True, # Explicitly set to True
@@ -128,14 +132,119 @@ class TestPowerBIDatasetRefreshOperator:
# Verify trigger has correct parameters
assert exc.value.trigger.dataset_id == DATASET_ID
assert exc.value.trigger.group_id == GROUP_ID
- assert exc.value.trigger.wait_for_termination is True
+ assert exc.value.trigger.wait_for_termination is False
# Verify callback method name
- assert exc.value.method_name == "execute_complete"
+ assert exc.value.method_name == "handle_refresh"
# Verify dataset_refresh_id is None (trigger will create it)
assert exc.value.trigger.dataset_refresh_id is None
+ @mock.patch.object(BaseHook, "get_connection",
side_effect=get_airflow_connection)
+ def test_handle_refresh_wait_for_completion(self, connection):
+ """Test that handle_refresh defers to execute_complete when
wait_for_completion=True."""
+ operator = PowerBIDatasetRefreshOperator(
+ **CONFIG,
+ wait_for_completion=True,
+ )
+ context = {"ti": MagicMock()}
+
+ with pytest.raises(TaskDeferred) as exc:
+ operator.handle_refresh(
+ context=context,
+ event=SUCCESS_TRIGGER_EVENT,
+ )
+
+ assert context["ti"].xcom_push.call_count == 1
+ assert isinstance(exc.value.trigger, PowerBITrigger)
+ assert exc.value.trigger.dataset_refresh_id == NEW_REFRESH_REQUEST_ID
+ assert exc.value.trigger.wait_for_termination is True
+ assert exc.value.method_name == "execute_complete"
+
+ @mock.patch.object(BaseHook, "get_connection",
side_effect=get_airflow_connection)
+ def test_handle_refresh_fire_and_forget(self, connection):
+ """Test that handle_refresh finishes immediately for fire-and-forget
mode."""
+ operator = PowerBIDatasetRefreshOperator(
+ **CONFIG,
+ wait_for_completion=False,
+ )
+ context = {"ti": MagicMock()}
+
+ operator.handle_refresh(
+ context=context,
+ event=SUCCESS_TRIGGER_EVENT,
+ )
+
+ assert context["ti"].xcom_push.call_count == 1
+
+ @mock.patch.object(BaseHook, "get_connection",
side_effect=get_airflow_connection)
+ def test_handle_refresh_failure(self, connection):
+ """Test that handle_refresh raises exception on trigger error."""
+ operator = PowerBIDatasetRefreshOperator(
+ **CONFIG,
+ wait_for_completion=False,
+ )
+ context = {"ti": MagicMock()}
+
+ with pytest.raises(AirflowException, match="Failed to trigger the
dataset refresh."):
+ operator.handle_refresh(
+ context=context,
+ event={
+ "status": "error",
+ "dataset_refresh_status": None,
+ "message": "Failed to trigger the dataset refresh.",
+ "dataset_refresh_id": None,
+ },
+ )
+
+ assert context["ti"].xcom_push.call_count == 0
+
+ @mock.patch.object(PowerBIHook, "get_refresh_details_by_refresh_id")
+ @mock.patch.object(PowerBIHook, "trigger_dataset_refresh")
+ def test_execute_operator_wait_for_completion_full_lifecycle(
+ self, mock_trigger_dataset_refresh,
mock_get_refresh_details_by_refresh_id
+ ):
+ """Assert the full deferrable lifecycle completes successfully."""
+ mock_trigger_dataset_refresh.return_value = NEW_REFRESH_REQUEST_ID
+ mock_get_refresh_details_by_refresh_id.side_effect = [
+ {
+ PowerBIDatasetRefreshFields.STATUS.value:
PowerBIDatasetRefreshStatus.IN_PROGRESS,
+ PowerBIDatasetRefreshFields.ERROR.value: None,
+ },
+ {
+ PowerBIDatasetRefreshFields.STATUS.value:
PowerBIDatasetRefreshStatus.COMPLETED,
+ PowerBIDatasetRefreshFields.ERROR.value: None,
+ },
+ ]
+
+ operator = PowerBIDatasetRefreshOperator(
+ **CONFIG,
+ wait_for_completion=True,
+ )
+
+ result, events = execute_operator(operator)
+
+ assert result is None
+ mock_trigger_dataset_refresh.assert_called_once_with(
+ dataset_id=DATASET_ID,
+ group_id=GROUP_ID,
+ request_body=REQUEST_BODY,
+ )
+ assert mock_get_refresh_details_by_refresh_id.call_count == 2
+ assert len(events) == 2
+ assert events[0].payload == {
+ "status": "success",
+ "dataset_refresh_status": None,
+ "message": f"The dataset refresh {NEW_REFRESH_REQUEST_ID} has been
triggered.",
+ "dataset_refresh_id": NEW_REFRESH_REQUEST_ID,
+ }
+ assert events[1].payload == {
+ "status": "success",
+ "dataset_refresh_status": PowerBIDatasetRefreshStatus.COMPLETED,
+ "message": f"The dataset refresh {NEW_REFRESH_REQUEST_ID} has
{PowerBIDatasetRefreshStatus.COMPLETED}.",
+ "dataset_refresh_id": NEW_REFRESH_REQUEST_ID,
+ }
+
def test_powerbi_operator_async_execute_complete_success(self):
"""Assert that execute_complete processes success event correctly"""
operator = PowerBIDatasetRefreshOperator(**CONFIG)
@@ -232,63 +341,25 @@ class TestPowerBIDatasetRefreshOperator:
assert url == EXPECTED_ITEM_RUN_OP_EXTRA_LINK
-
@mock.patch("airflow.providers.microsoft.azure.operators.powerbi.PowerBIHook")
- @mock.patch.object(BaseHook, "get_connection",
side_effect=get_airflow_connection)
- def test_execute_fire_and_forget_mode(self, mock_connection,
mock_hook_class):
- """Test fire-and-forget mode (wait_for_completion=False)"""
- mock_hook_instance = mock_hook_class.return_value
- mock_hook_instance.trigger_dataset_refresh.return_value =
NEW_REFRESH_REQUEST_ID
-
- operator = PowerBIDatasetRefreshOperator(
- **CONFIG,
- wait_for_completion=False,
- )
- context = {"ti": MagicMock()}
- context["ti"].task_id = TASK_ID
-
- # Should not raise TaskDeferred
- result = operator.execute(context)
-
- # Verify hook was called correctly
- mock_hook_instance.trigger_dataset_refresh.assert_called_once_with(
- dataset_id=DATASET_ID,
- group_id=GROUP_ID,
- request_body=REQUEST_BODY,
- )
-
- # Verify XCom push
- assert context["ti"].xcom_push.call_count == 1
- call_args = context["ti"].xcom_push.call_args
- assert call_args[1]["key"] == f"{TASK_ID}.powerbi_dataset_refresh_id"
- assert call_args[1]["value"] == NEW_REFRESH_REQUEST_ID
-
- # Should return None (completes synchronously)
- assert result is None
-
-
@mock.patch("airflow.providers.microsoft.azure.operators.powerbi.PowerBIHook")
@mock.patch.object(BaseHook, "get_connection",
side_effect=get_airflow_connection)
- def test_execute_fire_and_forget_mode_failure(self, mock_connection,
mock_hook_class):
- """Test fire-and-forget mode raises exception when trigger fails"""
- mock_hook_instance = mock_hook_class.return_value
- mock_hook_instance.trigger_dataset_refresh.return_value = None
-
+ def test_execute_fire_and_forget_mode(self, mock_connection):
+ """Test fire-and-forget mode defers to trigger refresh."""
operator = PowerBIDatasetRefreshOperator(
**CONFIG,
wait_for_completion=False,
)
- context = {"ti": MagicMock()}
- context["ti"].task_id = TASK_ID
+ context = mock_context(task=operator)
- # Should raise AirflowException
- with pytest.raises(AirflowException, match="Failed to trigger dataset
refresh"):
+ with pytest.raises(TaskDeferred) as exc:
operator.execute(context)
- # Should not push to XCom on failure
- assert context["ti"].xcom_push.call_count == 0
+ assert isinstance(exc.value.trigger, PowerBITrigger)
+ assert exc.value.trigger.wait_for_termination is False
+ assert exc.value.method_name == "handle_refresh"
@mock.patch.object(BaseHook, "get_connection",
side_effect=get_airflow_connection)
def test_execute_default_behavior_waits_for_completion(self,
mock_connection):
- """Test that default behavior (wait_for_completion=True) defers and
waits"""
+ """Test that default behavior (wait_for_completion=True) first defers
to trigger refresh."""
config_without_wait = {
"task_id": TASK_ID,
"conn_id": DEFAULT_CONNECTION_CLIENT_SECRET,
@@ -303,11 +374,11 @@ class TestPowerBIDatasetRefreshOperator:
operator = PowerBIDatasetRefreshOperator(**config_without_wait)
context = mock_context(task=operator)
- # Should defer (because default is wait_for_completion=True)
+ # Should defer for initial trigger step (because default is
wait_for_completion=True)
with pytest.raises(TaskDeferred) as exc:
operator.execute(context)
# Verify it deferred with correct trigger
assert isinstance(exc.value.trigger, PowerBITrigger)
- assert exc.value.trigger.wait_for_termination is True
- assert exc.value.method_name == "execute_complete"
+ assert exc.value.trigger.wait_for_termination is False
+ assert exc.value.method_name == "handle_refresh"