This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 01390839dd Microsoft Power BI operator to refresh the dataset (#40356)
01390839dd is described below
commit 01390839dd57bc8a104a8d41c923beb5455fe489
Author: ambikagarg <[email protected]>
AuthorDate: Wed Aug 14 18:41:15 2024 +0530
Microsoft Power BI operator to refresh the dataset (#40356)
* Add Power BI operator that refreshes the powerbi dataset
Add Power BI integration to the provider.yaml
* Extend Power BI Operator to support async mode
* Extend PowerBIHook call to msgraph operator
* Add the trigger class to enable deffering
* Enable cache token
* refactor: Refactored PowerBIHook based on the KiotaRequestAdapterHook
into one hook, also take into account proxies. This is how I would do it, it
isn't finished of course but that should put you in right direction. As there
is a lot of polling involved, I would just like the MSGraphOperator, make it a
pure async operator but that's my opinion.
* Refactor: To support operator's async behavior
* Add unit tests for the power bi trigger and refactor the code
* unit tests for powerbi operator
* refactor: Did some small changes to PowerBIOperator, removed unnecessary
logging statements (don't just log info statements to log them, those can have
performance/cost implications)
* Fixed the unit test
* Added more tests for full code coverage
* Added system test for operator
* Fix system test
* Refactor: To use more of defferable mechanism, shifted all the async code
in trigger
* Fix unit tests and remove unnecessary parameters
* refactor: Initialize hosts within constructor to make sure it's
initialized correctly and immutable
* fix: Changed the 'powerbi_conn_id' parameter to 'conn_id' for the dataset
refresh example in PowerBI
* Remove redundant system test for powerbi dataset refresh operator and
rename the existing test more meaningfully
* remove extra comments
* Fix msgraph hook tests
* Fix powerbi trigger tests
* Refactor to pass the provider[microsoft.azure] tests
* refactor: Removed commented out (dead) code
* Refactor: Remove unused parameters and dead code
---------
Co-authored-by: David Blain <[email protected]>
Co-authored-by: David Blain <[email protected]>
---
airflow/providers/microsoft/azure/hooks/msgraph.py | 11 +-
airflow/providers/microsoft/azure/hooks/powerbi.py | 218 +++++++++++++++++
.../providers/microsoft/azure/operators/powerbi.py | 120 ++++++++++
airflow/providers/microsoft/azure/provider.yaml | 15 ++
.../providers/microsoft/azure/triggers/powerbi.py | 181 +++++++++++++++
.../microsoft/azure/hooks/test_msgraph.py | 6 +-
.../microsoft/azure/hooks/test_powerbi.py | 229 ++++++++++++++++++
.../microsoft/azure/operators/test_powerbi.py | 157 +++++++++++++
.../microsoft/azure/triggers/test_powerbi.py | 257 +++++++++++++++++++++
.../azure/example_powerbi_dataset_refresh.py | 88 +++++++
10 files changed, 1276 insertions(+), 6 deletions(-)
diff --git a/airflow/providers/microsoft/azure/hooks/msgraph.py
b/airflow/providers/microsoft/azure/hooks/msgraph.py
index 56abfa155d..8410d8d707 100644
--- a/airflow/providers/microsoft/azure/hooks/msgraph.py
+++ b/airflow/providers/microsoft/azure/hooks/msgraph.py
@@ -110,12 +110,16 @@ class KiotaRequestAdapterHook(BaseHook):
conn_id: str = default_conn_name,
timeout: float | None = None,
proxies: dict | None = None,
+ host: str = NationalClouds.Global.value,
+ scopes: list[str] | None = None,
api_version: APIVersion | str | None = None,
):
super().__init__()
self.conn_id = conn_id
self.timeout = timeout
self.proxies = proxies
+ self.host = host
+ self.scopes = scopes or ["https://graph.microsoft.com/.default"]
self._api_version = self.resolve_api_version_from_value(api_version)
@property
@@ -141,11 +145,10 @@ class KiotaRequestAdapterHook(BaseHook):
)
return self._api_version
- @staticmethod
- def get_host(connection: Connection) -> str:
+ def get_host(self, connection: Connection) -> str:
if connection.schema and connection.host:
return f"{connection.schema}://{connection.host}"
- return NationalClouds.Global.value
+ return self.host
@staticmethod
def format_no_proxy_url(url: str) -> str:
@@ -198,7 +201,7 @@ class KiotaRequestAdapterHook(BaseHook):
proxies = self.proxies or config.get("proxies", {})
msal_proxies = self.to_msal_proxies(authority=authority,
proxies=proxies)
httpx_proxies = self.to_httpx_proxies(proxies=proxies)
- scopes = config.get("scopes",
["https://graph.microsoft.com/.default"])
+ scopes = config.get("scopes", self.scopes)
verify = config.get("verify", True)
trust_env = config.get("trust_env", False)
disable_instance_discovery =
config.get("disable_instance_discovery", False)
diff --git a/airflow/providers/microsoft/azure/hooks/powerbi.py
b/airflow/providers/microsoft/azure/hooks/powerbi.py
new file mode 100644
index 0000000000..04326f4fec
--- /dev/null
+++ b/airflow/providers/microsoft/azure/hooks/powerbi.py
@@ -0,0 +1,218 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from enum import Enum
+from typing import TYPE_CHECKING, Any
+
+from airflow.exceptions import AirflowException
+from airflow.providers.microsoft.azure.hooks.msgraph import
KiotaRequestAdapterHook
+
+if TYPE_CHECKING:
+ from msgraph_core import APIVersion
+
+
+class PowerBIDatasetRefreshFields(Enum):
+ """Power BI refresh dataset details."""
+
+ REQUEST_ID = "request_id"
+ STATUS = "status"
+ ERROR = "error"
+
+
+class PowerBIDatasetRefreshStatus:
+ """Power BI refresh dataset statuses."""
+
+ IN_PROGRESS = "In Progress"
+ FAILED = "Failed"
+ COMPLETED = "Completed"
+ DISABLED = "Disabled"
+
+ TERMINAL_STATUSES = {FAILED, COMPLETED}
+
+
+class PowerBIDatasetRefreshException(AirflowException):
+ """An exception that indicates a dataset refresh failed to complete."""
+
+
+class PowerBIHook(KiotaRequestAdapterHook):
+ """
+ A async hook to interact with Power BI.
+
+ :param conn_id: The Power BI connection id.
+ """
+
+ conn_type: str = "powerbi"
+ conn_name_attr: str = "conn_id"
+ default_conn_name: str = "powerbi_default"
+ hook_name: str = "Power BI"
+
+ def __init__(
+ self,
+ conn_id: str = default_conn_name,
+ proxies: dict | None = None,
+ timeout: float = 60 * 60 * 24 * 7,
+ api_version: APIVersion | str | None = None,
+ ):
+ super().__init__(
+ conn_id=conn_id,
+ proxies=proxies,
+ timeout=timeout,
+ host="https://api.powerbi.com",
+ scopes=["https://analysis.windows.net/powerbi/api/.default"],
+ api_version=api_version,
+ )
+
+ @classmethod
+ def get_connection_form_widgets(cls) -> dict[str, Any]:
+ """Return connection widgets to add to connection form."""
+ from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
+ from flask_babel import lazy_gettext
+ from wtforms import StringField
+
+ return {
+ "tenant_id": StringField(lazy_gettext("Tenant ID"),
widget=BS3TextFieldWidget()),
+ }
+
+ @classmethod
+ def get_ui_field_behaviour(cls) -> dict[str, Any]:
+ """Return custom field behaviour."""
+ return {
+ "hidden_fields": ["schema", "port", "host", "extra"],
+ "relabeling": {
+ "login": "Client ID",
+ "password": "Client Secret",
+ },
+ }
+
+ async def get_refresh_history(
+ self,
+ dataset_id: str,
+ group_id: str,
+ ) -> list[dict[str, str]]:
+ """
+ Retrieve the refresh history of the specified dataset from the given
group ID.
+
+ :param dataset_id: The dataset ID.
+ :param group_id: The workspace ID.
+
+ :return: Dictionary containing all the refresh histories of the
dataset.
+ """
+ try:
+ response = await self.run(
+ url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes",
+ path_parameters={
+ "group_id": group_id,
+ "dataset_id": dataset_id,
+ },
+ )
+
+ refresh_histories = response.get("value")
+ return [self.raw_to_refresh_details(refresh_history) for
refresh_history in refresh_histories]
+
+ except AirflowException:
+ raise PowerBIDatasetRefreshException("Failed to retrieve refresh
history")
+
+ @classmethod
+ def raw_to_refresh_details(cls, refresh_details: dict) -> dict[str, str]:
+ """
+ Convert raw refresh details into a dictionary containing required
fields.
+
+ :param refresh_details: Raw object of refresh details.
+ """
+ return {
+ PowerBIDatasetRefreshFields.REQUEST_ID.value:
str(refresh_details.get("requestId")),
+ PowerBIDatasetRefreshFields.STATUS.value: (
+ "In Progress"
+ if str(refresh_details.get("status")) == "Unknown"
+ else str(refresh_details.get("status"))
+ ),
+ PowerBIDatasetRefreshFields.ERROR.value:
str(refresh_details.get("serviceExceptionJson")),
+ }
+
+ async def get_refresh_details_by_refresh_id(
+ self, dataset_id: str, group_id: str, refresh_id: str
+ ) -> dict[str, str]:
+ """
+ Get the refresh details of the given request Id.
+
+ :param refresh_id: Request Id of the Dataset refresh.
+ """
+ refresh_histories = await
self.get_refresh_history(dataset_id=dataset_id, group_id=group_id)
+
+ if len(refresh_histories) == 0:
+ raise PowerBIDatasetRefreshException(
+ f"Unable to fetch the details of dataset refresh with Request
Id: {refresh_id}"
+ )
+
+ refresh_ids = [
+ refresh_history.get(PowerBIDatasetRefreshFields.REQUEST_ID.value)
+ for refresh_history in refresh_histories
+ ]
+
+ if refresh_id not in refresh_ids:
+ raise PowerBIDatasetRefreshException(
+ f"Unable to fetch the details of dataset refresh with Request
Id: {refresh_id}"
+ )
+
+ refresh_details = refresh_histories[refresh_ids.index(refresh_id)]
+
+ return refresh_details
+
+ async def trigger_dataset_refresh(self, *, dataset_id: str, group_id: str)
-> str:
+ """
+ Triggers a refresh for the specified dataset from the given group id.
+
+ :param dataset_id: The dataset id.
+ :param group_id: The workspace id.
+
+ :return: Request id of the dataset refresh request.
+ """
+ try:
+ response = await self.run(
+ url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes",
+ method="POST",
+ path_parameters={
+ "group_id": group_id,
+ "dataset_id": dataset_id,
+ },
+ )
+
+ request_id = response.get("requestid")
+ return request_id
+ except AirflowException:
+ raise PowerBIDatasetRefreshException("Failed to trigger dataset
refresh.")
+
+ async def cancel_dataset_refresh(self, dataset_id: str, group_id: str,
dataset_refresh_id: str) -> None:
+ """
+ Cancel the dataset refresh.
+
+ :param dataset_id: The dataset Id.
+ :param group_id: The workspace Id.
+ :param dataset_refresh_id: The dataset refresh Id.
+ """
+ await self.run(
+
url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes/{dataset_refresh_id}",
+ response_type=None,
+ path_parameters={
+ "group_id": group_id,
+ "dataset_id": dataset_id,
+ "dataset_refresh_id": dataset_refresh_id,
+ },
+ method="DELETE",
+ )
diff --git a/airflow/providers/microsoft/azure/operators/powerbi.py
b/airflow/providers/microsoft/azure/operators/powerbi.py
new file mode 100644
index 0000000000..e54ad250bd
--- /dev/null
+++ b/airflow/providers/microsoft/azure/operators/powerbi.py
@@ -0,0 +1,120 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any, Sequence
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator, BaseOperatorLink
+from airflow.providers.microsoft.azure.hooks.powerbi import (
+ PowerBIHook,
+)
+from airflow.providers.microsoft.azure.triggers.powerbi import PowerBITrigger
+
+if TYPE_CHECKING:
+ from msgraph_core import APIVersion
+
+ from airflow.models.taskinstancekey import TaskInstanceKey
+ from airflow.utils.context import Context
+
+
+class PowerBILink(BaseOperatorLink):
+ """Construct a link to monitor a dataset in Power BI."""
+
+ name = "Monitor PowerBI Dataset"
+
+ def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
+ url = (
+ "https://app.powerbi.com" # type: ignore[attr-defined]
+ f"/groups/{operator.group_id}/datasets/{operator.dataset_id}" #
type: ignore[attr-defined]
+ "/details?experience=power-bi"
+ )
+
+ return url
+
+
+class PowerBIDatasetRefreshOperator(BaseOperator):
+ """
+ Refreshes a Power BI dataset.
+
+ :param dataset_id: The dataset id.
+ :param group_id: The workspace id.
+ :param conn_id: Airflow Connection ID that contains the connection
information for the Power BI account used for authentication.
+ :param timeout: Time in seconds to wait for a dataset to reach a terminal
status for asynchronous waits. Used only if ``wait_for_termination`` is True.
+ :param check_interval: Number of seconds to wait before rechecking the
+ refresh status.
+ """
+
+ template_fields: Sequence[str] = (
+ "dataset_id",
+ "group_id",
+ )
+ template_fields_renderers = {"parameters": "json"}
+
+ operator_extra_links = (PowerBILink(),)
+
+ def __init__(
+ self,
+ *,
+ dataset_id: str,
+ group_id: str,
+ conn_id: str = PowerBIHook.default_conn_name,
+ timeout: float = 60 * 60 * 24 * 7,
+ proxies: dict | None = None,
+ api_version: APIVersion | None = None,
+ check_interval: int = 60,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.hook = PowerBIHook(conn_id=conn_id, proxies=proxies,
api_version=api_version, timeout=timeout)
+ self.dataset_id = dataset_id
+ self.group_id = group_id
+ self.wait_for_termination = True
+ self.conn_id = conn_id
+ self.timeout = timeout
+ self.check_interval = check_interval
+
+ def execute(self, context: Context):
+ """Refresh the Power BI Dataset."""
+ if self.wait_for_termination:
+ self.defer(
+ trigger=PowerBITrigger(
+ conn_id=self.conn_id,
+ group_id=self.group_id,
+ dataset_id=self.dataset_id,
+ timeout=self.timeout,
+ check_interval=self.check_interval,
+ wait_for_termination=self.wait_for_termination,
+ ),
+ method_name=self.execute_complete.__name__,
+ )
+
+ def execute_complete(self, context: Context, event: dict[str, str]) -> Any:
+ """
+ Return immediately - callback for when the trigger fires.
+
+ Relies on trigger to throw an exception, otherwise it assumes
execution was successful.
+ """
+ if event:
+ if event["status"] == "error":
+ raise AirflowException(event["message"])
+
+ self.xcom_push(
+ context=context, key="powerbi_dataset_refresh_Id",
value=event["dataset_refresh_id"]
+ )
+ self.xcom_push(context=context,
key="powerbi_dataset_refresh_status", value=event["status"])
diff --git a/airflow/providers/microsoft/azure/provider.yaml
b/airflow/providers/microsoft/azure/provider.yaml
index 04e7311b44..a2bc784173 100644
--- a/airflow/providers/microsoft/azure/provider.yaml
+++ b/airflow/providers/microsoft/azure/provider.yaml
@@ -176,6 +176,9 @@ integrations:
how-to-guide:
- /docs/apache-airflow-providers-microsoft-azure/operators/msgraph.rst
tags: [azure]
+ - integration-name: Microsoft Power BI
+ external-doc-url: https://learn.microsoft.com/en-us/rest/api/power-bi/
+ tags: [azure]
operators:
- integration-name: Microsoft Azure Data Lake Storage
@@ -208,6 +211,9 @@ operators:
- integration-name: Microsoft Graph API
python-modules:
- airflow.providers.microsoft.azure.operators.msgraph
+ - integration-name: Microsoft Power BI
+ python-modules:
+ - airflow.providers.microsoft.azure.operators.powerbi
sensors:
- integration-name: Microsoft Azure Cosmos DB
@@ -268,6 +274,9 @@ hooks:
- integration-name: Microsoft Graph API
python-modules:
- airflow.providers.microsoft.azure.hooks.msgraph
+ - integration-name: Microsoft Power BI
+ python-modules:
+ - airflow.providers.microsoft.azure.hooks.powerbi
triggers:
- integration-name: Microsoft Azure Data Factory
@@ -279,6 +288,9 @@ triggers:
- integration-name: Microsoft Graph API
python-modules:
- airflow.providers.microsoft.azure.triggers.msgraph
+ - integration-name: Microsoft Power BI
+ python-modules:
+ - airflow.providers.microsoft.azure.triggers.powerbi
transfers:
- source-integration-name: Local
@@ -334,6 +346,8 @@ connection-types:
connection-type: azure_synapse
- hook-class-name:
airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook
connection-type: adls
+ - hook-class-name:
airflow.providers.microsoft.azure.hooks.powerbi.PowerBIHook
+ connection-type: powerbi
secrets-backends:
- airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend
@@ -344,6 +358,7 @@ logging:
extra-links:
-
airflow.providers.microsoft.azure.operators.data_factory.AzureDataFactoryPipelineRunLink
-
airflow.providers.microsoft.azure.operators.synapse.AzureSynapsePipelineRunLink
+ - airflow.providers.microsoft.azure.operators.powerbi.PowerBILink
config:
azure_remote_logging:
diff --git a/airflow/providers/microsoft/azure/triggers/powerbi.py
b/airflow/providers/microsoft/azure/triggers/powerbi.py
new file mode 100644
index 0000000000..d25802b84f
--- /dev/null
+++ b/airflow/providers/microsoft/azure/triggers/powerbi.py
@@ -0,0 +1,181 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import asyncio
+import time
+from typing import TYPE_CHECKING, AsyncIterator
+
+from airflow.providers.microsoft.azure.hooks.powerbi import (
+ PowerBIDatasetRefreshStatus,
+ PowerBIHook,
+)
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+if TYPE_CHECKING:
+ from msgraph_core import APIVersion
+
+
+class PowerBITrigger(BaseTrigger):
+ """
+ Triggers when Power BI dataset refresh is completed.
+
+ Wait for termination will always be True.
+
+ :param conn_id: The connection Id to connect to PowerBI.
+ :param timeout: The HTTP timeout being used by the `KiotaRequestAdapter`
(default is None).
+ When no timeout is specified or set to None then there is no HTTP
timeout on each request.
+ :param proxies: A dict defining the HTTP proxies to be used (default is
None).
+ :param api_version: The API version of the Microsoft Graph API to be used
(default is v1).
+ You can pass an enum named APIVersion which has 2 possible members v1
and beta,
+ or you can pass a string as `v1.0` or `beta`.
+ :param dataset_id: The dataset Id to refresh.
+ :param group_id: The workspace Id where dataset is located.
+ :param end_time: Time in seconds when trigger should stop polling.
+ :param check_interval: Time in seconds to wait between each poll.
+ :param wait_for_termination: Wait for the dataset refresh to complete or
fail.
+ """
+
+ def __init__(
+ self,
+ conn_id: str,
+ dataset_id: str,
+ group_id: str,
+ timeout: float = 60 * 60 * 24 * 7,
+ proxies: dict | None = None,
+ api_version: APIVersion | None = None,
+ check_interval: int = 60,
+ wait_for_termination: bool = True,
+ ):
+ super().__init__()
+ self.hook = PowerBIHook(conn_id=conn_id, proxies=proxies,
api_version=api_version, timeout=timeout)
+ self.dataset_id = dataset_id
+ self.timeout = timeout
+ self.group_id = group_id
+ self.check_interval = check_interval
+ self.wait_for_termination = wait_for_termination
+
+ def serialize(self):
+ """Serialize the trigger instance."""
+ api_version = self.api_version.value if self.api_version else None
+ return (
+
"airflow.providers.microsoft.azure.triggers.powerbi.PowerBITrigger",
+ {
+ "conn_id": self.conn_id,
+ "proxies": self.proxies,
+ "api_version": api_version,
+ "dataset_id": self.dataset_id,
+ "group_id": self.group_id,
+ "timeout": self.timeout,
+ "check_interval": self.check_interval,
+ "wait_for_termination": self.wait_for_termination,
+ },
+ )
+
+ @property
+ def conn_id(self) -> str:
+ return self.hook.conn_id
+
+ @property
+ def proxies(self) -> dict | None:
+ return self.hook.proxies
+
+ @property
+ def api_version(self) -> APIVersion:
+ return self.hook.api_version
+
+ async def run(self) -> AsyncIterator[TriggerEvent]:
+ """Make async connection to the PowerBI and polls for the dataset
refresh status."""
+ self.dataset_refresh_id = await self.hook.trigger_dataset_refresh(
+ dataset_id=self.dataset_id,
+ group_id=self.group_id,
+ )
+ try:
+ dataset_refresh_status = None
+ start_time = time.monotonic()
+ while start_time + self.timeout > time.monotonic():
+ refresh_details = await
self.hook.get_refresh_details_by_refresh_id(
+ dataset_id=self.dataset_id,
+ group_id=self.group_id,
+ refresh_id=self.dataset_refresh_id,
+ )
+ dataset_refresh_status = refresh_details.get("status")
+
+ if dataset_refresh_status ==
PowerBIDatasetRefreshStatus.COMPLETED:
+ yield TriggerEvent(
+ {
+ "status": dataset_refresh_status,
+ "message": f"The dataset refresh
{self.dataset_refresh_id} has {dataset_refresh_status}.",
+ "dataset_refresh_id": self.dataset_refresh_id,
+ }
+ )
+ return
+ elif dataset_refresh_status ==
PowerBIDatasetRefreshStatus.FAILED:
+ yield TriggerEvent(
+ {
+ "status": dataset_refresh_status,
+ "message": f"The dataset refresh
{self.dataset_refresh_id} has {dataset_refresh_status}.",
+ "dataset_refresh_id": self.dataset_refresh_id,
+ }
+ )
+ return
+
+ self.log.info(
+ "Sleeping for %s. The dataset refresh status is %s.",
+ self.check_interval,
+ dataset_refresh_status,
+ )
+ await asyncio.sleep(self.check_interval)
+
+ yield TriggerEvent(
+ {
+ "status": "error",
+ "message": f"Timeout occurred while waiting for dataset
refresh to complete: The dataset refresh {self.dataset_refresh_id} has status
{dataset_refresh_status}.",
+ "dataset_refresh_id": self.dataset_refresh_id,
+ }
+ )
+ return
+ except Exception as error:
+ if self.dataset_refresh_id:
+ try:
+ self.log.info(
+ "Unexpected error %s caught. Canceling dataset refresh
%s",
+ error,
+ self.dataset_refresh_id,
+ )
+ await self.hook.cancel_dataset_refresh(
+ dataset_id=self.dataset_id,
+ group_id=self.group_id,
+ dataset_refresh_id=self.dataset_refresh_id,
+ )
+ except Exception as e:
+ yield TriggerEvent(
+ {
+ "status": "error",
+ "message": f"An error occurred while canceling
dataset: {e}",
+ "dataset_refresh_id": self.dataset_refresh_id,
+ }
+ )
+ return
+ yield TriggerEvent(
+ {
+ "status": "error",
+ "message": f"An error occurred: {error}",
+ "dataset_refresh_id": self.dataset_refresh_id,
+ }
+ )
diff --git a/tests/providers/microsoft/azure/hooks/test_msgraph.py
b/tests/providers/microsoft/azure/hooks/test_msgraph.py
index 71d280a197..390be17ba7 100644
--- a/tests/providers/microsoft/azure/hooks/test_msgraph.py
+++ b/tests/providers/microsoft/azure/hooks/test_msgraph.py
@@ -82,13 +82,15 @@ class TestKiotaRequestAdapterHook:
def test_get_host_when_connection_has_scheme_and_host(self):
connection = mock_connection(schema="https", host="graph.microsoft.de")
- actual = KiotaRequestAdapterHook.get_host(connection)
+ hook = KiotaRequestAdapterHook()
+ actual = hook.get_host(connection)
assert actual == NationalClouds.Germany.value
def test_get_host_when_connection_has_no_scheme_or_host(self):
connection = mock_connection()
- actual = KiotaRequestAdapterHook.get_host(connection)
+ hook = KiotaRequestAdapterHook()
+ actual = hook.get_host(connection)
assert actual == NationalClouds.Global.value
diff --git a/tests/providers/microsoft/azure/hooks/test_powerbi.py
b/tests/providers/microsoft/azure/hooks/test_powerbi.py
new file mode 100644
index 0000000000..a3a521b45e
--- /dev/null
+++ b/tests/providers/microsoft/azure/hooks/test_powerbi.py
@@ -0,0 +1,229 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import pytest
+
+from airflow.exceptions import AirflowException
+from airflow.providers.microsoft.azure.hooks.msgraph import
KiotaRequestAdapterHook
+from airflow.providers.microsoft.azure.hooks.powerbi import (
+ PowerBIDatasetRefreshException,
+ PowerBIDatasetRefreshFields,
+ PowerBIDatasetRefreshStatus,
+ PowerBIHook,
+)
+
+FORMATTED_RESPONSE = [
+ # Completed refresh
+ {
+ PowerBIDatasetRefreshFields.REQUEST_ID.value:
"5e2d9921-e91b-491f-b7e1-e7d8db49194c",
+ PowerBIDatasetRefreshFields.STATUS.value:
PowerBIDatasetRefreshStatus.COMPLETED,
+ PowerBIDatasetRefreshFields.ERROR.value: "None",
+ },
+ # In-progress refresh
+ {
+ PowerBIDatasetRefreshFields.REQUEST_ID.value:
"6b6536c1-cfcb-4148-9c21-402c3f5241e4",
+ PowerBIDatasetRefreshFields.STATUS.value:
PowerBIDatasetRefreshStatus.IN_PROGRESS,
+ PowerBIDatasetRefreshFields.ERROR.value: "None",
+ },
+ # Failed refresh
+ {
+ PowerBIDatasetRefreshFields.REQUEST_ID.value:
"11bf290a-346b-48b7-8973-c5df149337ff",
+ PowerBIDatasetRefreshFields.STATUS.value:
PowerBIDatasetRefreshStatus.FAILED,
+ PowerBIDatasetRefreshFields.ERROR.value:
'{"errorCode":"ModelRefreshFailed_CredentialsNotSpecified"}',
+ },
+]
+
+DEFAULT_CONNECTION_CLIENT_SECRET = "powerbi_conn_id"
+GROUP_ID = "group_id"
+DATASET_ID = "dataset_id"
+
+CONFIG = {"conn_id": DEFAULT_CONNECTION_CLIENT_SECRET, "timeout": 3,
"api_version": "v1.0"}
+
+
[email protected]
+def powerbi_hook():
+ return PowerBIHook(**CONFIG)
+
+
[email protected]
+async def test_get_refresh_history(powerbi_hook):
+ response_data = {"value": [{"requestId": "1234", "status": "Completed",
"serviceExceptionJson": ""}]}
+
+ with mock.patch.object(KiotaRequestAdapterHook, "run",
new_callable=mock.AsyncMock) as mock_run:
+ mock_run.return_value = response_data
+ result = await powerbi_hook.get_refresh_history(DATASET_ID, GROUP_ID)
+
+ expected = [{"request_id": "1234", "status": "Completed", "error": ""}]
+ assert result == expected
+
+
[email protected]
+async def test_get_refresh_history_airflow_exception(powerbi_hook):
+ """Test handling of AirflowException in get_refresh_history."""
+
+ with mock.patch.object(KiotaRequestAdapterHook, "run",
new_callable=mock.AsyncMock) as mock_run:
+ mock_run.side_effect = AirflowException("Test exception")
+
+ with pytest.raises(PowerBIDatasetRefreshException, match="Failed to
retrieve refresh history"):
+ await powerbi_hook.get_refresh_history(DATASET_ID, GROUP_ID)
+
+
[email protected](
+ "input_data, expected_output",
+ [
+ (
+ {"requestId": "1234", "status": "Completed",
"serviceExceptionJson": ""},
+ {
+ PowerBIDatasetRefreshFields.REQUEST_ID.value: "1234",
+ PowerBIDatasetRefreshFields.STATUS.value: "Completed",
+ PowerBIDatasetRefreshFields.ERROR.value: "",
+ },
+ ),
+ (
+ {"requestId": "5678", "status": "Unknown", "serviceExceptionJson":
"Some error"},
+ {
+ PowerBIDatasetRefreshFields.REQUEST_ID.value: "5678",
+ PowerBIDatasetRefreshFields.STATUS.value: "In Progress",
+ PowerBIDatasetRefreshFields.ERROR.value: "Some error",
+ },
+ ),
+ (
+ {"requestId": None, "status": None, "serviceExceptionJson": None},
+ {
+ PowerBIDatasetRefreshFields.REQUEST_ID.value: "None",
+ PowerBIDatasetRefreshFields.STATUS.value: "None",
+ PowerBIDatasetRefreshFields.ERROR.value: "None",
+ },
+ ),
+ (
+ {}, # Empty input dictionary
+ {
+ PowerBIDatasetRefreshFields.REQUEST_ID.value: "None",
+ PowerBIDatasetRefreshFields.STATUS.value: "None",
+ PowerBIDatasetRefreshFields.ERROR.value: "None",
+ },
+ ),
+ ],
+)
+def test_raw_to_refresh_details(input_data, expected_output):
+ """Test raw_to_refresh_details method."""
+ result = PowerBIHook.raw_to_refresh_details(input_data)
+ assert result == expected_output
+
+
[email protected]
+async def test_get_refresh_details_by_refresh_id(powerbi_hook):
+ # Mock the get_refresh_history method to return a list of refresh histories
+ refresh_histories = FORMATTED_RESPONSE
+ powerbi_hook.get_refresh_history =
mock.AsyncMock(return_value=refresh_histories)
+
+ # Call the function with a valid request ID
+ refresh_id = "5e2d9921-e91b-491f-b7e1-e7d8db49194c"
+ result = await powerbi_hook.get_refresh_details_by_refresh_id(
+ dataset_id=DATASET_ID, group_id=GROUP_ID, refresh_id=refresh_id
+ )
+
+ # Assert that the correct refresh details are returned
+ assert result == {
+ PowerBIDatasetRefreshFields.REQUEST_ID.value:
"5e2d9921-e91b-491f-b7e1-e7d8db49194c",
+ PowerBIDatasetRefreshFields.STATUS.value: "Completed",
+ PowerBIDatasetRefreshFields.ERROR.value: "None",
+ }
+
+ # Call the function with an invalid request ID
+ invalid_request_id = "invalid_request_id"
+ with pytest.raises(PowerBIDatasetRefreshException):
+ await powerbi_hook.get_refresh_details_by_refresh_id(
+ dataset_id=DATASET_ID, group_id=GROUP_ID,
refresh_id=invalid_request_id
+ )
+
+
[email protected]
+async def test_get_refresh_details_by_refresh_id_empty_history(powerbi_hook):
+ """Test exception when refresh history is empty."""
+ # Mock the get_refresh_history method to return an empty list
+ powerbi_hook.get_refresh_history = mock.AsyncMock(return_value=[])
+
+ # Call the function with a request ID
+ refresh_id = "any_request_id"
+ with pytest.raises(
+ PowerBIDatasetRefreshException,
+ match=f"Unable to fetch the details of dataset refresh with Request
Id: {refresh_id}",
+ ):
+ await powerbi_hook.get_refresh_details_by_refresh_id(
+ dataset_id=DATASET_ID, group_id=GROUP_ID, refresh_id=refresh_id
+ )
+
+
[email protected]
+async def test_get_refresh_details_by_refresh_id_not_found(powerbi_hook):
+ """Test exception when the refresh ID is not found in the refresh
history."""
+ # Mock the get_refresh_history method to return a list of refresh
histories without the specified ID
+ powerbi_hook.get_refresh_history =
mock.AsyncMock(return_value=FORMATTED_RESPONSE)
+
+ # Call the function with an invalid request ID
+ invalid_request_id = "invalid_request_id"
+ with pytest.raises(
+ PowerBIDatasetRefreshException,
+ match=f"Unable to fetch the details of dataset refresh with Request
Id: {invalid_request_id}",
+ ):
+ await powerbi_hook.get_refresh_details_by_refresh_id(
+ dataset_id=DATASET_ID, group_id=GROUP_ID,
refresh_id=invalid_request_id
+ )
+
+
[email protected]
+async def test_trigger_dataset_refresh_success(powerbi_hook):
+ response_data = {"requestid": "5e2d9921-e91b-491f-b7e1-e7d8db49194c"}
+
+ with mock.patch.object(KiotaRequestAdapterHook, "run",
new_callable=mock.AsyncMock) as mock_run:
+ mock_run.return_value = response_data
+ result = await
powerbi_hook.trigger_dataset_refresh(dataset_id=DATASET_ID, group_id=GROUP_ID)
+
+ assert result == "5e2d9921-e91b-491f-b7e1-e7d8db49194c"
+
+
[email protected]
+async def test_trigger_dataset_refresh_failure(powerbi_hook):
+ """Test failure to trigger dataset refresh due to AirflowException."""
+ with mock.patch.object(KiotaRequestAdapterHook, "run",
new_callable=mock.AsyncMock) as mock_run:
+ mock_run.side_effect = AirflowException("Test exception")
+
+ with pytest.raises(PowerBIDatasetRefreshException, match="Failed to
trigger dataset refresh."):
+ await powerbi_hook.trigger_dataset_refresh(dataset_id=DATASET_ID,
group_id=GROUP_ID)
+
+
[email protected]
+async def test_cancel_dataset_refresh(powerbi_hook):
+ dataset_refresh_id = "5e2d9921-e91b-491f-b7e1-e7d8db49194c"
+
+ with mock.patch.object(KiotaRequestAdapterHook, "run",
new_callable=mock.AsyncMock) as mock_run:
+ await powerbi_hook.cancel_dataset_refresh(DATASET_ID, GROUP_ID,
dataset_refresh_id)
+
+ mock_run.assert_called_once_with(
+
url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes/{dataset_refresh_id}",
+ response_type=None,
+ path_parameters={
+ "group_id": GROUP_ID,
+ "dataset_id": DATASET_ID,
+ "dataset_refresh_id": dataset_refresh_id,
+ },
+ method="DELETE",
+ )
diff --git a/tests/providers/microsoft/azure/operators/test_powerbi.py
b/tests/providers/microsoft/azure/operators/test_powerbi.py
new file mode 100644
index 0000000000..2ee5ee723d
--- /dev/null
+++ b/tests/providers/microsoft/azure/operators/test_powerbi.py
@@ -0,0 +1,157 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from unittest.mock import MagicMock
+
+import pytest
+
+from airflow.exceptions import AirflowException, TaskDeferred
+from airflow.providers.microsoft.azure.hooks.powerbi import (
+ PowerBIDatasetRefreshFields,
+ PowerBIDatasetRefreshStatus,
+ PowerBIHook,
+)
+from airflow.providers.microsoft.azure.operators.powerbi import
PowerBIDatasetRefreshOperator
+from airflow.providers.microsoft.azure.triggers.powerbi import PowerBITrigger
+from airflow.utils import timezone
+
+DEFAULT_CONNECTION_CLIENT_SECRET = "powerbi_conn_id"
+TASK_ID = "run_powerbi_operator"
+GROUP_ID = "group_id"
+DATASET_ID = "dataset_id"
+CONFIG = {
+ "task_id": TASK_ID,
+ "conn_id": DEFAULT_CONNECTION_CLIENT_SECRET,
+ "group_id": GROUP_ID,
+ "dataset_id": DATASET_ID,
+ "check_interval": 1,
+ "timeout": 3,
+}
+NEW_REFRESH_REQUEST_ID = "5e2d9921-e91b-491f-b7e1-e7d8db49194c"
+
+SUCCESS_TRIGGER_EVENT = {
+ "status": "success",
+ "message": "success",
+ "dataset_refresh_id": NEW_REFRESH_REQUEST_ID,
+}
+
+DEFAULT_DATE = timezone.datetime(2021, 1, 1)
+
+
+# Sample responses from PowerBI API
+COMPLETED_REFRESH_DETAILS = {
+ PowerBIDatasetRefreshFields.REQUEST_ID.value: NEW_REFRESH_REQUEST_ID,
+ PowerBIDatasetRefreshFields.STATUS.value:
PowerBIDatasetRefreshStatus.COMPLETED,
+}
+
+FAILED_REFRESH_DETAILS = {
+ PowerBIDatasetRefreshFields.REQUEST_ID.value: NEW_REFRESH_REQUEST_ID,
+ PowerBIDatasetRefreshFields.STATUS.value:
PowerBIDatasetRefreshStatus.FAILED,
+ PowerBIDatasetRefreshFields.ERROR.value:
'{"errorCode":"ModelRefreshFailed_CredentialsNotSpecified"}',
+}
+
+IN_PROGRESS_REFRESH_DETAILS = {
+ PowerBIDatasetRefreshFields.REQUEST_ID.value: NEW_REFRESH_REQUEST_ID,
+ PowerBIDatasetRefreshFields.STATUS.value:
PowerBIDatasetRefreshStatus.IN_PROGRESS, # endtime is not available.
+}
+
+
[email protected]
+def mock_powerbi_hook():
+ hook = PowerBIHook()
+ return hook
+
+
+def test_execute_wait_for_termination_with_Deferrable(mock_powerbi_hook):
+ operator = PowerBIDatasetRefreshOperator(
+ **CONFIG,
+ )
+ operator.hook = mock_powerbi_hook
+ context = {"ti": MagicMock()}
+
+ with pytest.raises(TaskDeferred) as exc:
+ operator.execute(context)
+
+ assert isinstance(exc.value.trigger, PowerBITrigger)
+
+
+def test_powerbi_operator_async_execute_complete_success():
+ """Assert that execute_complete log success message"""
+ operator = PowerBIDatasetRefreshOperator(
+ **CONFIG,
+ )
+ context = {"ti": MagicMock()}
+ operator.execute_complete(
+ context=context,
+ event=SUCCESS_TRIGGER_EVENT,
+ )
+ assert context["ti"].xcom_push.call_count == 2
+
+
+def test_powerbi_operator_async_execute_complete_fail():
+ """Assert that execute_complete raise exception on error"""
+ operator = PowerBIDatasetRefreshOperator(
+ **CONFIG,
+ )
+ context = {"ti": MagicMock()}
+ with pytest.raises(AirflowException):
+ operator.execute_complete(
+ context=context,
+ event={"status": "error", "message": "error",
"dataset_refresh_id": "1234"},
+ )
+ assert context["ti"].xcom_push.call_count == 0
+
+
+def test_execute_complete_no_event():
+ """Test execute_complete when event is None or empty."""
+ operator = PowerBIDatasetRefreshOperator(
+ **CONFIG,
+ )
+ context = {"ti": MagicMock()}
+ operator.execute_complete(
+ context=context,
+ event=None,
+ )
+ assert context["ti"].xcom_push.call_count == 0
+
+
[email protected]_test
+def test_powerbilink(create_task_instance_of_operator):
+ """Assert Power BI Extra link matches the expected URL."""
+ ti = create_task_instance_of_operator(
+ PowerBIDatasetRefreshOperator,
+ dag_id="test_powerbi_refresh_op_link",
+ execution_date=DEFAULT_DATE,
+ task_id=TASK_ID,
+ conn_id=DEFAULT_CONNECTION_CLIENT_SECRET,
+ group_id=GROUP_ID,
+ dataset_id=DATASET_ID,
+ check_interval=1,
+ timeout=3,
+ )
+
+ ti.xcom_push(key="powerbi_dataset_refresh_id",
value=NEW_REFRESH_REQUEST_ID)
+ url = ti.task.get_extra_links(ti, "Monitor PowerBI Dataset")
+ EXPECTED_ITEM_RUN_OP_EXTRA_LINK = (
+ "https://app.powerbi.com" # type: ignore[attr-defined]
+ f"/groups/{GROUP_ID}/datasets/{DATASET_ID}" # type:
ignore[attr-defined]
+ "/details?experience=power-bi"
+ )
+
+ assert url == EXPECTED_ITEM_RUN_OP_EXTRA_LINK
diff --git a/tests/providers/microsoft/azure/triggers/test_powerbi.py
b/tests/providers/microsoft/azure/triggers/test_powerbi.py
new file mode 100644
index 0000000000..5b44a84149
--- /dev/null
+++ b/tests/providers/microsoft/azure/triggers/test_powerbi.py
@@ -0,0 +1,257 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import asyncio
+from unittest import mock
+from unittest.mock import patch
+
+import pytest
+
+from airflow.providers.microsoft.azure.hooks.powerbi import
PowerBIDatasetRefreshStatus, PowerBIHook
+from airflow.providers.microsoft.azure.triggers.powerbi import PowerBITrigger
+from airflow.triggers.base import TriggerEvent
+from tests.providers.microsoft.conftest import get_airflow_connection
+
+POWERBI_CONN_ID = "powerbi_default"
+DATASET_ID = "dataset_id"
+GROUP_ID = "group_id"
+DATASET_REFRESH_ID = "dataset_refresh_id"
+TIMEOUT = 30
+MODULE = "airflow.providers.microsoft.azure"
+CHECK_INTERVAL = 10
+API_VERSION = "v1.0"
+
+
[email protected]
+def powerbi_trigger():
+ trigger = PowerBITrigger(
+ conn_id=POWERBI_CONN_ID,
+ proxies=None,
+ api_version=API_VERSION,
+ dataset_id=DATASET_ID,
+ group_id=GROUP_ID,
+ check_interval=CHECK_INTERVAL,
+ wait_for_termination=True,
+ timeout=TIMEOUT,
+ )
+
+ return trigger
+
+
[email protected]
+def mock_powerbi_hook():
+ hook = PowerBIHook()
+ return hook
+
+
+def test_powerbi_trigger_serialization():
+ """Asserts that the PowerBI Trigger correctly serializes its arguments and
classpath."""
+
+ with patch(
+ "airflow.hooks.base.BaseHook.get_connection",
+ side_effect=get_airflow_connection,
+ ):
+ powerbi_trigger = PowerBITrigger(
+ conn_id=POWERBI_CONN_ID,
+ proxies=None,
+ api_version=API_VERSION,
+ dataset_id=DATASET_ID,
+ group_id=GROUP_ID,
+ check_interval=CHECK_INTERVAL,
+ wait_for_termination=True,
+ timeout=TIMEOUT,
+ )
+
+ classpath, kwargs = powerbi_trigger.serialize()
+ assert classpath == f"{MODULE}.triggers.powerbi.PowerBITrigger"
+ assert kwargs == {
+ "conn_id": POWERBI_CONN_ID,
+ "dataset_id": DATASET_ID,
+ "timeout": TIMEOUT,
+ "group_id": GROUP_ID,
+ "proxies": None,
+ "api_version": API_VERSION,
+ "check_interval": CHECK_INTERVAL,
+ "wait_for_termination": True,
+ }
+
+
[email protected]
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id")
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh")
+async def test_powerbi_trigger_run_inprogress(
+ mock_trigger_dataset_refresh, mock_get_refresh_details_by_refresh_id,
powerbi_trigger
+):
+ """Assert task isn't completed until timeout if dataset refresh is in
progress."""
+ mock_get_refresh_details_by_refresh_id.return_value = {"status":
PowerBIDatasetRefreshStatus.IN_PROGRESS}
+ mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID
+ task = asyncio.create_task(powerbi_trigger.run().__anext__())
+ await asyncio.sleep(0.5)
+
+ # Assert TriggerEvent was not returned
+ assert task.done() is False
+ asyncio.get_event_loop().stop()
+
+
[email protected]
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id")
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh")
+async def test_powerbi_trigger_run_failed(
+ mock_trigger_dataset_refresh, mock_get_refresh_details_by_refresh_id,
powerbi_trigger
+):
+ """Assert event is triggered upon failed dataset refresh."""
+ mock_get_refresh_details_by_refresh_id.return_value = {"status":
PowerBIDatasetRefreshStatus.FAILED}
+ mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID
+
+ generator = powerbi_trigger.run()
+ actual = await generator.asend(None)
+ expected = TriggerEvent(
+ {
+ "status": "Failed",
+ "message": f"The dataset refresh {DATASET_REFRESH_ID} has "
+ f"{PowerBIDatasetRefreshStatus.FAILED}.",
+ "dataset_refresh_id": DATASET_REFRESH_ID,
+ }
+ )
+ assert expected == actual
+
+
[email protected]
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id")
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh")
+async def test_powerbi_trigger_run_completed(
+ mock_trigger_dataset_refresh, mock_get_refresh_details_by_refresh_id,
powerbi_trigger
+):
+ """Assert event is triggered upon successful dataset refresh."""
+ mock_get_refresh_details_by_refresh_id.return_value = {"status":
PowerBIDatasetRefreshStatus.COMPLETED}
+ mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID
+
+ generator = powerbi_trigger.run()
+ actual = await generator.asend(None)
+ expected = TriggerEvent(
+ {
+ "status": "Completed",
+ "message": f"The dataset refresh {DATASET_REFRESH_ID} has "
+ f"{PowerBIDatasetRefreshStatus.COMPLETED}.",
+ "dataset_refresh_id": DATASET_REFRESH_ID,
+ }
+ )
+ assert expected == actual
+
+
[email protected]
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.cancel_dataset_refresh")
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id")
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh")
+async def test_powerbi_trigger_run_exception_during_refresh_check_loop(
+ mock_trigger_dataset_refresh,
+ mock_get_refresh_details_by_refresh_id,
+ mock_cancel_dataset_refresh,
+ powerbi_trigger,
+):
+ """Assert that run catch exception if Power BI API throw exception"""
+ mock_get_refresh_details_by_refresh_id.side_effect = Exception("Test
exception")
+ mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID
+
+ task = [i async for i in powerbi_trigger.run()]
+ response = TriggerEvent(
+ {
+ "status": "error",
+ "message": "An error occurred: Test exception",
+ "dataset_refresh_id": DATASET_REFRESH_ID,
+ }
+ )
+ assert len(task) == 1
+ assert response in task
+ mock_cancel_dataset_refresh.assert_called_once()
+
+
[email protected]
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.cancel_dataset_refresh")
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id")
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh")
+async def test_powerbi_trigger_run_exception_during_refresh_cancellation(
+ mock_trigger_dataset_refresh,
+ mock_get_refresh_details_by_refresh_id,
+ mock_cancel_dataset_refresh,
+ powerbi_trigger,
+):
+ """Assert that run catch exception if Power BI API throw exception"""
+ mock_get_refresh_details_by_refresh_id.side_effect = Exception("Test
exception")
+ mock_cancel_dataset_refresh.side_effect = Exception("Exception caused by
cancel_dataset_refresh")
+ mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID
+
+ task = [i async for i in powerbi_trigger.run()]
+ response = TriggerEvent(
+ {
+ "status": "error",
+ "message": "An error occurred while canceling dataset: Exception
caused by cancel_dataset_refresh",
+ "dataset_refresh_id": DATASET_REFRESH_ID,
+ }
+ )
+
+ assert len(task) == 1
+ assert response in task
+ mock_cancel_dataset_refresh.assert_called_once()
+
+
[email protected]
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id")
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh")
+async def test_powerbi_trigger_run_exception_without_refresh_id(
+ mock_trigger_dataset_refresh, mock_get_refresh_details_by_refresh_id,
powerbi_trigger
+):
+ """Assert handling of exception when there is no dataset_refresh_id"""
+ powerbi_trigger.dataset_refresh_id = None
+ mock_get_refresh_details_by_refresh_id.side_effect = Exception("Test
exception for no dataset_refresh_id")
+ mock_trigger_dataset_refresh.return_value = None
+
+ task = [i async for i in powerbi_trigger.run()]
+ response = TriggerEvent(
+ {
+ "status": "error",
+ "message": "An error occurred: Test exception for no
dataset_refresh_id",
+ "dataset_refresh_id": None,
+ }
+ )
+ assert len(task) == 1
+ assert response in task
+
+
[email protected]
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id")
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh")
+async def test_powerbi_trigger_run_timeout(
+ mock_trigger_dataset_refresh, mock_get_refresh_details_by_refresh_id,
powerbi_trigger
+):
+ """Assert that powerbi run timesout after end_time elapses"""
+ mock_get_refresh_details_by_refresh_id.return_value = {"status":
PowerBIDatasetRefreshStatus.IN_PROGRESS}
+ mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID
+
+ generator = powerbi_trigger.run()
+ actual = await generator.asend(None)
+ expected = TriggerEvent(
+ {
+ "status": "error",
+ "message": f"Timeout occurred while waiting for dataset refresh to
complete: The dataset refresh {DATASET_REFRESH_ID} has status In Progress.",
+ "dataset_refresh_id": DATASET_REFRESH_ID,
+ }
+ )
+
+ assert expected == actual
diff --git
a/tests/system/providers/microsoft/azure/example_powerbi_dataset_refresh.py
b/tests/system/providers/microsoft/azure/example_powerbi_dataset_refresh.py
new file mode 100644
index 0000000000..52f1f001e9
--- /dev/null
+++ b/tests/system/providers/microsoft/azure/example_powerbi_dataset_refresh.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from airflow import DAG, settings
+from airflow.decorators import task
+from airflow.models import Connection
+from airflow.models.baseoperator import chain
+from airflow.providers.microsoft.azure.operators.powerbi import
PowerBIDatasetRefreshOperator
+
+DAG_ID = "example_refresh_powerbi_dataset"
+CONN_ID = "powerbi_default"
+
+# Before running this system test, you should set following environment
variables:
+DATASET_ID = os.environ.get("DATASET_ID", "None")
+GROUP_ID = os.environ.get("GROUP_ID", "None")
+CLIENT_ID = os.environ.get("CLIENT_ID", None)
+CLIENT_SECRET = os.environ.get("CLIENT_SECRET", None)
+TENANT_ID = os.environ.get("TENANT_ID", None)
+
+
+@task
+def create_connection(conn_id_name: str):
+ conn = Connection(
+ conn_id=conn_id_name,
+ conn_type="powerbi",
+ login=CLIENT_ID,
+ password=CLIENT_SECRET,
+ extra={"tenant_id": TENANT_ID},
+ )
+ session = settings.Session()
+ session.add(conn)
+ session.commit()
+
+
+with DAG(
+ dag_id=DAG_ID,
+ start_date=datetime(2021, 1, 1),
+ schedule=None,
+ tags=["example"],
+) as dag:
+ set_up_connection = create_connection(CONN_ID)
+
+ # [START howto_operator_powerbi_refresh_async]
+ refresh_powerbi_dataset = PowerBIDatasetRefreshOperator(
+ conn_id="powerbi_default",
+ task_id="refresh_powerbi_dataset",
+ dataset_id=DATASET_ID,
+ group_id=GROUP_ID,
+ check_interval=30,
+ timeout=120,
+ )
+ # [END howto_operator_powerbi_refresh_async]
+
+ chain(
+ # TEST SETUP
+ set_up_connection,
+ # TEST BODY
+ refresh_powerbi_dataset,
+ )
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)