This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6b87d1cef91 Fixed retry of PowerBIDatasetRefreshOperator when dataset 
refresh wasn't directly available (#45513)
6b87d1cef91 is described below

commit 6b87d1cef91150984648f25bb2163a13bc205d6c
Author: Emma <[email protected]>
AuthorDate: Sun Jan 26 18:42:42 2025 +0100

    Fixed retry of PowerBIDatasetRefreshOperator when dataset refresh wasn't 
directly available (#45513)
---
 .../providers/microsoft/azure/operators/powerbi.py | 39 +++++++++++--
 .../providers/microsoft/azure/triggers/powerbi.py  | 66 ++++++++++++++++++----
 .../microsoft/azure/operators/test_powerbi.py      | 35 ++++++++++--
 .../tests/microsoft/azure/triggers/test_powerbi.py | 60 +++++++++++++++++++-
 4 files changed, 177 insertions(+), 23 deletions(-)

diff --git 
a/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py 
b/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py
index b334a1708a6..1c6878c27af 100644
--- a/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py
+++ b/providers/src/airflow/providers/microsoft/azure/operators/powerbi.py
@@ -114,6 +114,35 @@ class PowerBIDatasetRefreshOperator(BaseOperator):
                     check_interval=self.check_interval,
                     wait_for_termination=self.wait_for_termination,
                 ),
+                method_name=self.get_refresh_status.__name__,
+            )
+
+    def get_refresh_status(self, context: Context, event: dict[str, str] | 
None = None):
+        """Push the refresh Id to XCom then runs the Trigger to wait for 
refresh completion."""
+        if event:
+            if event["status"] == "error":
+                raise AirflowException(event["message"])
+
+            dataset_refresh_id = event["dataset_refresh_id"]
+
+        if dataset_refresh_id:
+            self.xcom_push(
+                context=context,
+                key=f"{self.task_id}.powerbi_dataset_refresh_Id",
+                value=dataset_refresh_id,
+            )
+            self.defer(
+                trigger=PowerBITrigger(
+                    conn_id=self.conn_id,
+                    group_id=self.group_id,
+                    dataset_id=self.dataset_id,
+                    dataset_refresh_id=dataset_refresh_id,
+                    timeout=self.timeout,
+                    proxies=self.proxies,
+                    api_version=self.api_version,
+                    check_interval=self.check_interval,
+                    wait_for_termination=self.wait_for_termination,
+                ),
                 method_name=self.execute_complete.__name__,
             )
 
@@ -124,10 +153,10 @@ class PowerBIDatasetRefreshOperator(BaseOperator):
         Relies on trigger to throw an exception, otherwise it assumes 
execution was successful.
         """
         if event:
-            if event["status"] == "error":
-                raise AirflowException(event["message"])
-
             self.xcom_push(
-                context=context, key="powerbi_dataset_refresh_Id", 
value=event["dataset_refresh_id"]
+                context=context,
+                key=f"{self.task_id}.powerbi_dataset_refresh_status",
+                value=event["dataset_refresh_status"],
             )
-            self.xcom_push(context=context, 
key="powerbi_dataset_refresh_status", value=event["status"])
+            if event["status"] == "error":
+                raise AirflowException(event["message"])
diff --git 
a/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py 
b/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py
index a2132f6c393..8f749311806 100644
--- a/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py
+++ b/providers/src/airflow/providers/microsoft/azure/triggers/powerbi.py
@@ -22,7 +22,13 @@ import time
 from collections.abc import AsyncIterator
 from typing import TYPE_CHECKING
 
-from airflow.providers.microsoft.azure.hooks.powerbi import 
PowerBIDatasetRefreshStatus, PowerBIHook
+import tenacity
+
+from airflow.providers.microsoft.azure.hooks.powerbi import (
+    PowerBIDatasetRefreshException,
+    PowerBIDatasetRefreshStatus,
+    PowerBIHook,
+)
 from airflow.triggers.base import BaseTrigger, TriggerEvent
 
 if TYPE_CHECKING:
@@ -43,6 +49,7 @@ class PowerBITrigger(BaseTrigger):
         You can pass an enum named APIVersion which has 2 possible members v1 
and beta,
         or you can pass a string as `v1.0` or `beta`.
     :param dataset_id: The dataset Id to refresh.
+    :param dataset_refresh_id: The dataset refresh Id to poll for the status, 
if not provided a new refresh will be triggered.
     :param group_id: The workspace Id where dataset is located.
     :param end_time: Time in seconds when trigger should stop polling.
     :param check_interval: Time in seconds to wait between each poll.
@@ -55,6 +62,7 @@ class PowerBITrigger(BaseTrigger):
         dataset_id: str,
         group_id: str,
         timeout: float = 60 * 60 * 24 * 7,
+        dataset_refresh_id: str | None = None,
         proxies: dict | None = None,
         api_version: APIVersion | str | None = None,
         check_interval: int = 60,
@@ -63,6 +71,7 @@ class PowerBITrigger(BaseTrigger):
         super().__init__()
         self.hook = PowerBIHook(conn_id=conn_id, proxies=proxies, 
api_version=api_version, timeout=timeout)
         self.dataset_id = dataset_id
+        self.dataset_refresh_id = dataset_refresh_id
         self.timeout = timeout
         self.group_id = group_id
         self.check_interval = check_interval
@@ -77,6 +86,7 @@ class PowerBITrigger(BaseTrigger):
                 "proxies": self.proxies,
                 "api_version": self.api_version,
                 "dataset_id": self.dataset_id,
+                "dataset_refresh_id": self.dataset_refresh_id,
                 "group_id": self.group_id,
                 "timeout": self.timeout,
                 "check_interval": self.check_interval,
@@ -98,19 +108,53 @@ class PowerBITrigger(BaseTrigger):
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
         """Make async connection to the PowerBI and polls for the dataset 
refresh status."""
-        self.dataset_refresh_id = await self.hook.trigger_dataset_refresh(
-            dataset_id=self.dataset_id,
-            group_id=self.group_id,
-        )
-
-        async def fetch_refresh_status_and_error() -> tuple[str, str]:
-            """Fetch the current status and error of the dataset refresh."""
-            refresh_details = await 
self.hook.get_refresh_details_by_refresh_id(
+        if not self.dataset_refresh_id:
+            # Trigger the dataset refresh
+            dataset_refresh_id = await self.hook.trigger_dataset_refresh(
                 dataset_id=self.dataset_id,
                 group_id=self.group_id,
-                refresh_id=self.dataset_refresh_id,
             )
-            return refresh_details["status"], refresh_details["error"]
+
+            if dataset_refresh_id:
+                self.log.info("Triggered dataset refresh %s", 
dataset_refresh_id)
+                yield TriggerEvent(
+                    {
+                        "status": "success",
+                        "dataset_refresh_status": None,
+                        "message": f"The dataset refresh {dataset_refresh_id} 
has been triggered.",
+                        "dataset_refresh_id": dataset_refresh_id,
+                    }
+                )
+                return
+
+            yield TriggerEvent(
+                {
+                    "status": "error",
+                    "dataset_refresh_status": None,
+                    "message": "Failed to trigger the dataset refresh.",
+                    "dataset_refresh_id": None,
+                }
+            )
+            return
+
+        # The dataset refresh is already triggered. Poll for the dataset 
refresh status.
+        @tenacity.retry(
+            stop=tenacity.stop_after_attempt(3),
+            wait=tenacity.wait_exponential(min=5, multiplier=2),
+            reraise=True,
+            
retry=tenacity.retry_if_exception_type(PowerBIDatasetRefreshException),
+        )
+        async def fetch_refresh_status_and_error() -> tuple[str, str]:
+            """Fetch the current status and error of the dataset refresh."""
+            if self.dataset_refresh_id:
+                refresh_details = await 
self.hook.get_refresh_details_by_refresh_id(
+                    dataset_id=self.dataset_id,
+                    group_id=self.group_id,
+                    refresh_id=self.dataset_refresh_id,
+                )
+                return refresh_details["status"], refresh_details["error"]
+
+            raise PowerBIDatasetRefreshException("Dataset refresh Id is 
missing.")
 
         try:
             dataset_refresh_status, dataset_refresh_error = await 
fetch_refresh_status_and_error()
diff --git a/providers/tests/microsoft/azure/operators/test_powerbi.py 
b/providers/tests/microsoft/azure/operators/test_powerbi.py
index a115b4c52dc..7975411d50c 100644
--- a/providers/tests/microsoft/azure/operators/test_powerbi.py
+++ b/providers/tests/microsoft/azure/operators/test_powerbi.py
@@ -49,6 +49,13 @@ CONFIG = {
 NEW_REFRESH_REQUEST_ID = "5e2d9921-e91b-491f-b7e1-e7d8db49194c"
 
 SUCCESS_TRIGGER_EVENT = {
+    "status": "success",
+    "dataset_refresh_status": None,
+    "message": "success",
+    "dataset_refresh_id": NEW_REFRESH_REQUEST_ID,
+}
+
+SUCCESS_REFRESH_EVENT = {
     "status": "success",
     "dataset_refresh_status": PowerBIDatasetRefreshStatus.COMPLETED,
     "message": "success",
@@ -88,6 +95,26 @@ class TestPowerBIDatasetRefreshOperator(Base):
             operator.execute(context)
 
         assert isinstance(exc.value.trigger, PowerBITrigger)
+        assert exc.value.trigger.dataset_refresh_id is None
+
+    @mock.patch("airflow.hooks.base.BaseHook.get_connection", 
side_effect=get_airflow_connection)
+    def test_powerbi_operator_async_get_refresh_status_success(self, 
connection):
+        """Assert that get_refresh_status log success message"""
+        operator = PowerBIDatasetRefreshOperator(
+            **CONFIG,
+        )
+        context = {"ti": MagicMock()}
+        context["ti"].task_id = TASK_ID
+
+        with pytest.raises(TaskDeferred) as exc:
+            operator.get_refresh_status(
+                context=context,
+                event=SUCCESS_TRIGGER_EVENT,
+            )
+
+        assert isinstance(exc.value.trigger, PowerBITrigger)
+        assert exc.value.trigger.dataset_refresh_id is NEW_REFRESH_REQUEST_ID
+        assert context["ti"].xcom_push.call_count == 1
 
     def test_powerbi_operator_async_execute_complete_success(self):
         """Assert that execute_complete log success message"""
@@ -97,9 +124,9 @@ class TestPowerBIDatasetRefreshOperator(Base):
         context = {"ti": MagicMock()}
         operator.execute_complete(
             context=context,
-            event=SUCCESS_TRIGGER_EVENT,
+            event=SUCCESS_REFRESH_EVENT,
         )
-        assert context["ti"].xcom_push.call_count == 2
+        assert context["ti"].xcom_push.call_count == 1
 
     def test_powerbi_operator_async_execute_complete_fail(self):
         """Assert that execute_complete raise exception on error"""
@@ -117,7 +144,7 @@ class TestPowerBIDatasetRefreshOperator(Base):
                     "dataset_refresh_id": "1234",
                 },
             )
-        assert context["ti"].xcom_push.call_count == 0
+        assert context["ti"].xcom_push.call_count == 1
         assert str(exc.value) == "error"
 
     def test_powerbi_operator_refresh_fail(self):
@@ -136,7 +163,7 @@ class TestPowerBIDatasetRefreshOperator(Base):
                     "dataset_refresh_id": "1234",
                 },
             )
-        assert context["ti"].xcom_push.call_count == 0
+        assert context["ti"].xcom_push.call_count == 1
         assert str(exc.value) == "error message"
 
     def test_execute_complete_no_event(self):
diff --git a/providers/tests/microsoft/azure/triggers/test_powerbi.py 
b/providers/tests/microsoft/azure/triggers/test_powerbi.py
index 303b7d06c80..58bb3489fd5 100644
--- a/providers/tests/microsoft/azure/triggers/test_powerbi.py
+++ b/providers/tests/microsoft/azure/triggers/test_powerbi.py
@@ -22,7 +22,10 @@ from unittest import mock
 
 import pytest
 
-from airflow.providers.microsoft.azure.hooks.powerbi import 
PowerBIDatasetRefreshStatus
+from airflow.providers.microsoft.azure.hooks.powerbi import (
+    PowerBIDatasetRefreshException,
+    PowerBIDatasetRefreshStatus,
+)
 from airflow.providers.microsoft.azure.triggers.powerbi import PowerBITrigger
 from airflow.triggers.base import TriggerEvent
 
@@ -46,6 +49,7 @@ def powerbi_trigger(timeout=TIMEOUT, 
check_interval=CHECK_INTERVAL) -> PowerBITr
         proxies=None,
         api_version=API_VERSION,
         dataset_id=DATASET_ID,
+        dataset_refresh_id=DATASET_REFRESH_ID,
         group_id=GROUP_ID,
         check_interval=check_interval,
         wait_for_termination=True,
@@ -62,6 +66,7 @@ class TestPowerBITrigger:
             proxies=None,
             api_version=API_VERSION,
             dataset_id=DATASET_ID,
+            dataset_refresh_id=DATASET_REFRESH_ID,
             group_id=GROUP_ID,
             check_interval=CHECK_INTERVAL,
             wait_for_termination=True,
@@ -73,6 +78,7 @@ class TestPowerBITrigger:
         assert kwargs == {
             "conn_id": POWERBI_CONN_ID,
             "dataset_id": DATASET_ID,
+            "dataset_refresh_id": DATASET_REFRESH_ID,
             "timeout": TIMEOUT,
             "group_id": GROUP_ID,
             "proxies": None,
@@ -126,13 +132,32 @@ class TestPowerBITrigger:
         )
         assert expected == actual
 
+    @pytest.mark.asyncio
+    @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh")
+    async def test_powerbi_trigger_run_trigger_refresh(self, 
mock_trigger_dataset_refresh, powerbi_trigger):
+        """Assert event is triggered upon successful new refresh trigger."""
+        powerbi_trigger.dataset_refresh_id = None
+        mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID
+
+        task = [i async for i in powerbi_trigger.run()]
+        response = TriggerEvent(
+            {
+                "status": "success",
+                "dataset_refresh_status": None,
+                "message": f"The dataset refresh {DATASET_REFRESH_ID} has been 
triggered.",
+                "dataset_refresh_id": DATASET_REFRESH_ID,
+            }
+        )
+        assert len(task) == 1
+        assert response in task
+
     @pytest.mark.asyncio
     
@mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id")
     @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh")
     async def test_powerbi_trigger_run_completed(
         self, mock_trigger_dataset_refresh, 
mock_get_refresh_details_by_refresh_id, powerbi_trigger
     ):
-        """Assert event is triggered upon successful dataset refresh."""
+        """Assert event is triggered upon successful dataset refresh 
completion."""
         mock_get_refresh_details_by_refresh_id.return_value = {
             "status": PowerBIDatasetRefreshStatus.COMPLETED,
             "error": None,
@@ -180,6 +205,35 @@ class TestPowerBITrigger:
         assert response in task
         mock_cancel_dataset_refresh.assert_called_once()
 
+    @pytest.mark.asyncio
+    @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.cancel_dataset_refresh")
+    
@mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id")
+    @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh")
+    async def 
test_powerbi_trigger_run_PowerBIDatasetRefreshException_during_refresh_check_loop(
+        self,
+        mock_trigger_dataset_refresh,
+        mock_get_refresh_details_by_refresh_id,
+        mock_cancel_dataset_refresh,
+        powerbi_trigger,
+    ):
+        """Assert that run catch PowerBIDatasetRefreshException and triggers 
retry mechanism"""
+        mock_get_refresh_details_by_refresh_id.side_effect = 
PowerBIDatasetRefreshException("Test exception")
+        mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID
+
+        task = [i async for i in powerbi_trigger.run()]
+        response = TriggerEvent(
+            {
+                "status": "error",
+                "dataset_refresh_status": None,
+                "message": "An error occurred: Test exception",
+                "dataset_refresh_id": DATASET_REFRESH_ID,
+            }
+        )
+        assert mock_get_refresh_details_by_refresh_id.call_count == 3
+        assert len(task) == 1
+        assert response in task
+        assert mock_cancel_dataset_refresh.call_count == 1
+
     @pytest.mark.asyncio
     @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.cancel_dataset_refresh")
     
@mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id")
@@ -228,7 +282,7 @@ class TestPowerBITrigger:
             {
                 "status": "error",
                 "dataset_refresh_status": None,
-                "message": "An error occurred: Test exception for no 
dataset_refresh_id",
+                "message": "Failed to trigger the dataset refresh.",
                 "dataset_refresh_id": None,
             }
         )

Reply via email to