This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ede34ebee7 Allow custom api versions in MSGraphAsyncOperator (#41331)
ede34ebee7 is described below
commit ede34ebee70648f83209c43e51d39ac6779c66b5
Author: David Blain <[email protected]>
AuthorDate: Thu Sep 5 11:11:01 2024 +0200
Allow custom api versions in MSGraphAsyncOperator (#41331)
---
airflow/providers/microsoft/azure/hooks/msgraph.py | 47 ++-
.../providers/microsoft/azure/operators/msgraph.py | 2 +-
.../providers/microsoft/azure/operators/powerbi.py | 12 +-
.../providers/microsoft/azure/sensors/msgraph.py | 2 +-
.../providers/microsoft/azure/triggers/msgraph.py | 7 +-
.../providers/microsoft/azure/triggers/powerbi.py | 7 +-
.../operators/msgraph.rst | 9 +
.../microsoft/azure/hooks/test_msgraph.py | 95 +++++-
.../microsoft/azure/hooks/test_powerbi.py | 320 ++++++++++---------
.../microsoft/azure/operators/test_powerbi.py | 147 +++++----
.../microsoft/azure/triggers/test_powerbi.py | 337 ++++++++++-----------
tests/providers/microsoft/conftest.py | 31 +-
.../providers/microsoft/azure/example_msfabric.py | 63 ++++
13 files changed, 613 insertions(+), 466 deletions(-)
diff --git a/airflow/providers/microsoft/azure/hooks/msgraph.py
b/airflow/providers/microsoft/azure/hooks/msgraph.py
index 8410d8d707..61e555f4ca 100644
--- a/airflow/providers/microsoft/azure/hooks/msgraph.py
+++ b/airflow/providers/microsoft/azure/hooks/msgraph.py
@@ -96,6 +96,8 @@ class KiotaRequestAdapterHook(BaseHook):
:param timeout: The HTTP timeout being used by the KiotaRequestAdapter
(default is None).
When no timeout is specified or set to None then no HTTP timeout is
applied on each request.
:param proxies: A Dict defining the HTTP proxies to be used (default is
None).
+ :param host: The host to be used (default is
"https://graph.microsoft.com").
+ :param scopes: The scopes to be used (default is
["https://graph.microsoft.com/.default"]).
:param api_version: The API version of the Microsoft Graph API to be used
(default is v1).
You can pass an enum named APIVersion which has 2 possible members v1
and beta,
or you can pass a string as "v1.0" or "beta".
@@ -123,27 +125,22 @@ class KiotaRequestAdapterHook(BaseHook):
self._api_version = self.resolve_api_version_from_value(api_version)
@property
- def api_version(self) -> APIVersion:
+ def api_version(self) -> str | None:
self.get_conn() # Make sure config has been loaded through get_conn
to have correct api version!
return self._api_version
@staticmethod
def resolve_api_version_from_value(
- api_version: APIVersion | str, default: APIVersion | None = None
- ) -> APIVersion:
+ api_version: APIVersion | str, default: str | None = None
+ ) -> str | None:
if isinstance(api_version, APIVersion):
- return api_version
- return next(
- filter(lambda version: version.value == api_version, APIVersion),
- default,
- )
+ return api_version.value
+ return api_version or default
- def get_api_version(self, config: dict) -> APIVersion:
- if self._api_version is None:
- return self.resolve_api_version_from_value(
- api_version=config.get("api_version"), default=APIVersion.v1
- )
- return self._api_version
+ def get_api_version(self, config: dict) -> str:
+ return self._api_version or self.resolve_api_version_from_value(
+ config.get("api_version"), APIVersion.v1.value
+ ) # type: ignore
def get_host(self, connection: Connection) -> str:
if connection.schema and connection.host:
@@ -169,15 +166,15 @@ class KiotaRequestAdapterHook(BaseHook):
return proxies
def to_msal_proxies(self, authority: str | None, proxies: dict):
- self.log.info("authority: %s", authority)
+ self.log.debug("authority: %s", authority)
if authority:
no_proxies = proxies.get("no")
- self.log.info("no_proxies: %s", no_proxies)
+ self.log.debug("no_proxies: %s", no_proxies)
if no_proxies:
for url in no_proxies.split(","):
self.log.info("url: %s", url)
domain_name = urlparse(url).path.replace("*", "")
- self.log.info("domain_name: %s", domain_name)
+ self.log.debug("domain_name: %s", domain_name)
if authority.endswith(domain_name):
return None
return proxies
@@ -193,10 +190,10 @@ class KiotaRequestAdapterHook(BaseHook):
client_id = connection.login
client_secret = connection.password
config = connection.extra_dejson if connection.extra else {}
- tenant_id = config.get("tenant_id")
+ tenant_id = config.get("tenant_id") or config.get("tenantId")
api_version = self.get_api_version(config)
host = self.get_host(connection)
- base_url = config.get("base_url", urljoin(host, api_version.value))
+ base_url = config.get("base_url", urljoin(host, api_version))
authority = config.get("authority")
proxies = self.proxies or config.get("proxies", {})
msal_proxies = self.to_msal_proxies(authority=authority,
proxies=proxies)
@@ -209,7 +206,7 @@ class KiotaRequestAdapterHook(BaseHook):
self.log.info(
"Creating Microsoft Graph SDK client %s for conn_id: %s",
- api_version.value,
+ api_version,
self.conn_id,
)
self.log.info("Host: %s", host)
@@ -217,7 +214,7 @@ class KiotaRequestAdapterHook(BaseHook):
self.log.info("Tenant id: %s", tenant_id)
self.log.info("Client id: %s", client_id)
self.log.info("Client secret: %s", client_secret)
- self.log.info("API version: %s", api_version.value)
+ self.log.info("API version: %s", api_version)
self.log.info("Scope: %s", scopes)
self.log.info("Verify: %s", verify)
self.log.info("Timeout: %s", self.timeout)
@@ -238,17 +235,17 @@ class KiotaRequestAdapterHook(BaseHook):
connection_verify=verify,
)
http_client = GraphClientFactory.create_with_default_middleware(
- api_version=api_version,
+ api_version=api_version, # type: ignore
client=httpx.AsyncClient(
proxies=httpx_proxies,
timeout=Timeout(timeout=self.timeout),
verify=verify,
trust_env=trust_env,
),
- host=host,
+ host=host, # type: ignore
)
auth_provider = AzureIdentityAuthenticationProvider(
- credentials=credentials,
+ credentials=credentials, # type: ignore
scopes=scopes,
allowed_hosts=allowed_hosts,
)
@@ -295,7 +292,7 @@ class KiotaRequestAdapterHook(BaseHook):
error_map=self.error_mapping(),
)
- self.log.info("response: %s", response)
+ self.log.debug("response: %s", response)
return response
diff --git a/airflow/providers/microsoft/azure/operators/msgraph.py
b/airflow/providers/microsoft/azure/operators/msgraph.py
index cd38795473..74409f3600 100644
--- a/airflow/providers/microsoft/azure/operators/msgraph.py
+++ b/airflow/providers/microsoft/azure/operators/msgraph.py
@@ -99,7 +99,7 @@ class MSGraphAsyncOperator(BaseOperator):
key: str = XCOM_RETURN_KEY,
timeout: float | None = None,
proxies: dict | None = None,
- api_version: APIVersion | None = None,
+ api_version: APIVersion | str | None = None,
pagination_function: Callable[[MSGraphAsyncOperator, dict], tuple[str,
dict]] | None = None,
result_processor: Callable[[Context, Any], Any] = lambda context,
result: result,
serializer: type[ResponseSerializer] = ResponseSerializer,
diff --git a/airflow/providers/microsoft/azure/operators/powerbi.py
b/airflow/providers/microsoft/azure/operators/powerbi.py
index e54ad250bd..fc812e852d 100644
--- a/airflow/providers/microsoft/azure/operators/powerbi.py
+++ b/airflow/providers/microsoft/azure/operators/powerbi.py
@@ -76,7 +76,7 @@ class PowerBIDatasetRefreshOperator(BaseOperator):
conn_id: str = PowerBIHook.default_conn_name,
timeout: float = 60 * 60 * 24 * 7,
proxies: dict | None = None,
- api_version: APIVersion | None = None,
+ api_version: APIVersion | str | None = None,
check_interval: int = 60,
**kwargs,
) -> None:
@@ -89,6 +89,14 @@ class PowerBIDatasetRefreshOperator(BaseOperator):
self.timeout = timeout
self.check_interval = check_interval
+ @property
+ def proxies(self) -> dict | None:
+ return self.hook.proxies
+
+ @property
+ def api_version(self) -> str | None:
+ return self.hook.api_version
+
def execute(self, context: Context):
"""Refresh the Power BI Dataset."""
if self.wait_for_termination:
@@ -98,6 +106,8 @@ class PowerBIDatasetRefreshOperator(BaseOperator):
group_id=self.group_id,
dataset_id=self.dataset_id,
timeout=self.timeout,
+ proxies=self.proxies,
+ api_version=self.api_version,
check_interval=self.check_interval,
wait_for_termination=self.wait_for_termination,
),
diff --git a/airflow/providers/microsoft/azure/sensors/msgraph.py
b/airflow/providers/microsoft/azure/sensors/msgraph.py
index 3e1b10cbeb..6736ea59c9 100644
--- a/airflow/providers/microsoft/azure/sensors/msgraph.py
+++ b/airflow/providers/microsoft/azure/sensors/msgraph.py
@@ -82,7 +82,7 @@ class MSGraphSensor(BaseSensorOperator):
data: dict[str, Any] | str | BytesIO | None = None,
conn_id: str = KiotaRequestAdapterHook.default_conn_name,
proxies: dict | None = None,
- api_version: APIVersion | None = None,
+ api_version: APIVersion | str | None = None,
event_processor: Callable[[Context, Any], bool] = lambda context, e:
e.get("status") == "Succeeded",
result_processor: Callable[[Context, Any], Any] = lambda context,
result: result,
serializer: type[ResponseSerializer] = ResponseSerializer,
diff --git a/airflow/providers/microsoft/azure/triggers/msgraph.py
b/airflow/providers/microsoft/azure/triggers/msgraph.py
index 4b9ccb7a71..0015964be8 100644
--- a/airflow/providers/microsoft/azure/triggers/msgraph.py
+++ b/airflow/providers/microsoft/azure/triggers/msgraph.py
@@ -122,7 +122,7 @@ class MSGraphTrigger(BaseTrigger):
conn_id: str = KiotaRequestAdapterHook.default_conn_name,
timeout: float | None = None,
proxies: dict | None = None,
- api_version: APIVersion | None = None,
+ api_version: APIVersion | str | None = None,
serializer: type[ResponseSerializer] = ResponseSerializer,
):
super().__init__()
@@ -152,14 +152,13 @@ class MSGraphTrigger(BaseTrigger):
def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serialize the HttpTrigger arguments and classpath."""
- api_version = self.api_version.value if self.api_version else None
return (
f"{self.__class__.__module__}.{self.__class__.__name__}",
{
"conn_id": self.conn_id,
"timeout": self.timeout,
"proxies": self.proxies,
- "api_version": api_version,
+ "api_version": self.api_version,
"serializer":
f"{self.serializer.__class__.__module__}.{self.serializer.__class__.__name__}",
"url": self.url,
"path_parameters": self.path_parameters,
@@ -188,7 +187,7 @@ class MSGraphTrigger(BaseTrigger):
return self.hook.proxies
@property
- def api_version(self) -> APIVersion:
+ def api_version(self) -> APIVersion | str:
return self.hook.api_version
async def run(self) -> AsyncIterator[TriggerEvent]:
diff --git a/airflow/providers/microsoft/azure/triggers/powerbi.py
b/airflow/providers/microsoft/azure/triggers/powerbi.py
index d25802b84f..a74898f55f 100644
--- a/airflow/providers/microsoft/azure/triggers/powerbi.py
+++ b/airflow/providers/microsoft/azure/triggers/powerbi.py
@@ -58,7 +58,7 @@ class PowerBITrigger(BaseTrigger):
group_id: str,
timeout: float = 60 * 60 * 24 * 7,
proxies: dict | None = None,
- api_version: APIVersion | None = None,
+ api_version: APIVersion | str | None = None,
check_interval: int = 60,
wait_for_termination: bool = True,
):
@@ -72,13 +72,12 @@ class PowerBITrigger(BaseTrigger):
def serialize(self):
"""Serialize the trigger instance."""
- api_version = self.api_version.value if self.api_version else None
return (
"airflow.providers.microsoft.azure.triggers.powerbi.PowerBITrigger",
{
"conn_id": self.conn_id,
"proxies": self.proxies,
- "api_version": api_version,
+ "api_version": self.api_version,
"dataset_id": self.dataset_id,
"group_id": self.group_id,
"timeout": self.timeout,
@@ -96,7 +95,7 @@ class PowerBITrigger(BaseTrigger):
return self.hook.proxies
@property
- def api_version(self) -> APIVersion:
+ def api_version(self) -> APIVersion | str:
return self.hook.api_version
async def run(self) -> AsyncIterator[TriggerEvent]:
diff --git
a/docs/apache-airflow-providers-microsoft-azure/operators/msgraph.rst
b/docs/apache-airflow-providers-microsoft-azure/operators/msgraph.rst
index 342bf54276..56a9259f93 100644
--- a/docs/apache-airflow-providers-microsoft-azure/operators/msgraph.rst
+++ b/docs/apache-airflow-providers-microsoft-azure/operators/msgraph.rst
@@ -72,6 +72,14 @@ Below is an example of using this operator to refresh
PowerBI dataset.
:start-after: [START howto_operator_powerbi_refresh_dataset]
:end-before: [END howto_operator_powerbi_refresh_dataset]
+Below is an example of using this operator to create an item schedule in
Fabric.
+
+.. exampleinclude::
/../../tests/system/providers/microsoft/azure/example_msfabric.py
+ :language: python
+ :dedent: 0
+ :start-after: [START howto_operator_ms_fabric_create_item_schedule]
+ :end-before: [END howto_operator_ms_fabric_create_item_schedule]
+
Reference
---------
@@ -80,3 +88,4 @@ For further information, look at:
* `Use the Microsoft Graph API
<https://learn.microsoft.com/en-us/graph/use-the-api/>`__
* `Using the Power BI REST APIs
<https://learn.microsoft.com/en-us/rest/api/power-bi/>`__
+* `Using the Fabric REST APIs
<https://learn.microsoft.com/en-us/rest/api/fabric/articles/using-fabric-apis/>`__
diff --git a/tests/providers/microsoft/azure/hooks/test_msgraph.py
b/tests/providers/microsoft/azure/hooks/test_msgraph.py
index 390be17ba7..04e8552561 100644
--- a/tests/providers/microsoft/azure/hooks/test_msgraph.py
+++ b/tests/providers/microsoft/azure/hooks/test_msgraph.py
@@ -18,6 +18,7 @@ from __future__ import annotations
import asyncio
from json import JSONDecodeError
+from typing import TYPE_CHECKING
from unittest.mock import patch
import pytest
@@ -38,8 +39,20 @@ from tests.providers.microsoft.conftest import (
mock_response,
)
+if TYPE_CHECKING:
+ from kiota_abstractions.request_adapter import RequestAdapter
+
class TestKiotaRequestAdapterHook:
+ def setup_method(self):
+ KiotaRequestAdapterHook.cached_request_adapters.clear()
+
+ @staticmethod
+ def assert_tenant_id(request_adapter: RequestAdapter, expected_tenant_id:
str):
+ assert isinstance(request_adapter, HttpxRequestAdapter)
+ tenant_id =
request_adapter._authentication_provider.access_token_provider._credentials._tenant_id
+ assert tenant_id == expected_tenant_id
+
def test_get_conn(self):
with patch(
"airflow.hooks.base.BaseHook.get_connection",
@@ -51,6 +64,23 @@ class TestKiotaRequestAdapterHook:
assert isinstance(actual, HttpxRequestAdapter)
assert actual.base_url == "https://graph.microsoft.com/v1.0"
+ def test_get_conn_with_custom_base_url(self):
+ connection = lambda conn_id: get_airflow_connection(
+ conn_id=conn_id,
+ host="api.fabric.microsoft.com",
+ api_version="v1",
+ )
+
+ with patch(
+ "airflow.hooks.base.BaseHook.get_connection",
+ side_effect=connection,
+ ):
+ hook = KiotaRequestAdapterHook(conn_id="msgraph_api")
+ actual = hook.get_conn()
+
+ assert isinstance(actual, HttpxRequestAdapter)
+ assert actual.base_url == "https://api.fabric.microsoft.com/v1"
+
def test_api_version(self):
with patch(
"airflow.hooks.base.BaseHook.get_connection",
@@ -58,7 +88,7 @@ class TestKiotaRequestAdapterHook:
):
hook = KiotaRequestAdapterHook(conn_id="msgraph_api")
- assert hook.api_version == APIVersion.v1
+ assert hook.api_version == APIVersion.v1.value
def test_get_api_version_when_empty_config_dict(self):
with patch(
@@ -68,7 +98,7 @@ class TestKiotaRequestAdapterHook:
hook = KiotaRequestAdapterHook(conn_id="msgraph_api")
actual = hook.get_api_version({})
- assert actual == APIVersion.v1
+ assert actual == APIVersion.v1.value
def test_get_api_version_when_api_version_in_config_dict(self):
with patch(
@@ -78,21 +108,64 @@ class TestKiotaRequestAdapterHook:
hook = KiotaRequestAdapterHook(conn_id="msgraph_api")
actual = hook.get_api_version({"api_version": "beta"})
- assert actual == APIVersion.beta
+ assert actual == APIVersion.beta.value
+
+ def test_get_api_version_when_custom_api_version_in_config_dict(self):
+ with patch(
+ "airflow.hooks.base.BaseHook.get_connection",
+ side_effect=get_airflow_connection,
+ ):
+ hook = KiotaRequestAdapterHook(conn_id="msgraph_api",
api_version="v1")
+ actual = hook.get_api_version({})
+
+ assert actual == "v1"
def test_get_host_when_connection_has_scheme_and_host(self):
- connection = mock_connection(schema="https", host="graph.microsoft.de")
- hook = KiotaRequestAdapterHook()
- actual = hook.get_host(connection)
+ with patch(
+ "airflow.hooks.base.BaseHook.get_connection",
+ side_effect=get_airflow_connection,
+ ):
+ hook = KiotaRequestAdapterHook(conn_id="msgraph_api")
+ connection = mock_connection(schema="https",
host="graph.microsoft.de")
+ actual = hook.get_host(connection)
- assert actual == NationalClouds.Germany.value
+ assert actual == NationalClouds.Germany.value
def test_get_host_when_connection_has_no_scheme_or_host(self):
- connection = mock_connection()
- hook = KiotaRequestAdapterHook()
- actual = hook.get_host(connection)
+ with patch(
+ "airflow.hooks.base.BaseHook.get_connection",
+ side_effect=get_airflow_connection,
+ ):
+ hook = KiotaRequestAdapterHook(conn_id="msgraph_api")
+ connection = mock_connection()
+ actual = hook.get_host(connection)
+
+ assert actual == NationalClouds.Global.value
+
+ def test_tenant_id(self):
+ with patch(
+ "airflow.hooks.base.BaseHook.get_connection",
+ side_effect=get_airflow_connection,
+ ):
+ hook = KiotaRequestAdapterHook(conn_id="msgraph_api")
+ actual = hook.get_conn()
+
+ self.assert_tenant_id(actual, "tenant-id")
+
+ def test_azure_tenant_id(self):
+ airflow_connection = lambda conn_id: get_airflow_connection(
+ conn_id=conn_id,
+ azure_tenant_id="azure-tenant-id",
+ )
+
+ with patch(
+ "airflow.hooks.base.BaseHook.get_connection",
+ side_effect=airflow_connection,
+ ):
+ hook = KiotaRequestAdapterHook(conn_id="msgraph_api")
+ actual = hook.get_conn()
- assert actual == NationalClouds.Global.value
+ self.assert_tenant_id(actual, "azure-tenant-id")
def test_encoded_query_parameters(self):
actual = KiotaRequestAdapterHook.encoded_query_parameters(
diff --git a/tests/providers/microsoft/azure/hooks/test_powerbi.py
b/tests/providers/microsoft/azure/hooks/test_powerbi.py
index a3a521b45e..f22116f6ef 100644
--- a/tests/providers/microsoft/azure/hooks/test_powerbi.py
+++ b/tests/providers/microsoft/azure/hooks/test_powerbi.py
@@ -54,176 +54,162 @@ DEFAULT_CONNECTION_CLIENT_SECRET = "powerbi_conn_id"
GROUP_ID = "group_id"
DATASET_ID = "dataset_id"
-CONFIG = {"conn_id": DEFAULT_CONNECTION_CLIENT_SECRET, "timeout": 3,
"api_version": "v1.0"}
-
[email protected]
-def powerbi_hook():
- return PowerBIHook(**CONFIG)
-
-
[email protected]
-async def test_get_refresh_history(powerbi_hook):
- response_data = {"value": [{"requestId": "1234", "status": "Completed",
"serviceExceptionJson": ""}]}
-
- with mock.patch.object(KiotaRequestAdapterHook, "run",
new_callable=mock.AsyncMock) as mock_run:
- mock_run.return_value = response_data
- result = await powerbi_hook.get_refresh_history(DATASET_ID, GROUP_ID)
-
- expected = [{"request_id": "1234", "status": "Completed", "error": ""}]
- assert result == expected
-
-
[email protected]
-async def test_get_refresh_history_airflow_exception(powerbi_hook):
- """Test handling of AirflowException in get_refresh_history."""
-
- with mock.patch.object(KiotaRequestAdapterHook, "run",
new_callable=mock.AsyncMock) as mock_run:
- mock_run.side_effect = AirflowException("Test exception")
-
- with pytest.raises(PowerBIDatasetRefreshException, match="Failed to
retrieve refresh history"):
- await powerbi_hook.get_refresh_history(DATASET_ID, GROUP_ID)
-
-
[email protected](
- "input_data, expected_output",
- [
- (
- {"requestId": "1234", "status": "Completed",
"serviceExceptionJson": ""},
- {
- PowerBIDatasetRefreshFields.REQUEST_ID.value: "1234",
- PowerBIDatasetRefreshFields.STATUS.value: "Completed",
- PowerBIDatasetRefreshFields.ERROR.value: "",
- },
- ),
- (
- {"requestId": "5678", "status": "Unknown", "serviceExceptionJson":
"Some error"},
- {
- PowerBIDatasetRefreshFields.REQUEST_ID.value: "5678",
- PowerBIDatasetRefreshFields.STATUS.value: "In Progress",
- PowerBIDatasetRefreshFields.ERROR.value: "Some error",
- },
- ),
- (
- {"requestId": None, "status": None, "serviceExceptionJson": None},
- {
- PowerBIDatasetRefreshFields.REQUEST_ID.value: "None",
- PowerBIDatasetRefreshFields.STATUS.value: "None",
- PowerBIDatasetRefreshFields.ERROR.value: "None",
- },
- ),
- (
- {}, # Empty input dictionary
- {
- PowerBIDatasetRefreshFields.REQUEST_ID.value: "None",
- PowerBIDatasetRefreshFields.STATUS.value: "None",
- PowerBIDatasetRefreshFields.ERROR.value: "None",
- },
- ),
- ],
-)
-def test_raw_to_refresh_details(input_data, expected_output):
- """Test raw_to_refresh_details method."""
- result = PowerBIHook.raw_to_refresh_details(input_data)
- assert result == expected_output
-
-
[email protected]
-async def test_get_refresh_details_by_refresh_id(powerbi_hook):
- # Mock the get_refresh_history method to return a list of refresh histories
- refresh_histories = FORMATTED_RESPONSE
- powerbi_hook.get_refresh_history =
mock.AsyncMock(return_value=refresh_histories)
-
- # Call the function with a valid request ID
- refresh_id = "5e2d9921-e91b-491f-b7e1-e7d8db49194c"
- result = await powerbi_hook.get_refresh_details_by_refresh_id(
- dataset_id=DATASET_ID, group_id=GROUP_ID, refresh_id=refresh_id
+class TestPowerBIHook:
+ @pytest.mark.asyncio
+ async def test_get_refresh_history(self, powerbi_hook):
+ response_data = {"value": [{"requestId": "1234", "status":
"Completed", "serviceExceptionJson": ""}]}
+
+ with mock.patch.object(KiotaRequestAdapterHook, "run",
new_callable=mock.AsyncMock) as mock_run:
+ mock_run.return_value = response_data
+ result = await powerbi_hook.get_refresh_history(DATASET_ID,
GROUP_ID)
+
+ expected = [{"request_id": "1234", "status": "Completed", "error":
""}]
+ assert result == expected
+
+ @pytest.mark.asyncio
+ async def test_get_refresh_history_airflow_exception(self, powerbi_hook):
+ """Test handling of AirflowException in get_refresh_history."""
+
+ with mock.patch.object(KiotaRequestAdapterHook, "run",
new_callable=mock.AsyncMock) as mock_run:
+ mock_run.side_effect = AirflowException("Test exception")
+
+ with pytest.raises(PowerBIDatasetRefreshException, match="Failed
to retrieve refresh history"):
+ await powerbi_hook.get_refresh_history(DATASET_ID, GROUP_ID)
+
+ @pytest.mark.parametrize(
+ "input_data, expected_output",
+ [
+ (
+ {"requestId": "1234", "status": "Completed",
"serviceExceptionJson": ""},
+ {
+ PowerBIDatasetRefreshFields.REQUEST_ID.value: "1234",
+ PowerBIDatasetRefreshFields.STATUS.value: "Completed",
+ PowerBIDatasetRefreshFields.ERROR.value: "",
+ },
+ ),
+ (
+ {"requestId": "5678", "status": "Unknown",
"serviceExceptionJson": "Some error"},
+ {
+ PowerBIDatasetRefreshFields.REQUEST_ID.value: "5678",
+ PowerBIDatasetRefreshFields.STATUS.value: "In Progress",
+ PowerBIDatasetRefreshFields.ERROR.value: "Some error",
+ },
+ ),
+ (
+ {"requestId": None, "status": None, "serviceExceptionJson":
None},
+ {
+ PowerBIDatasetRefreshFields.REQUEST_ID.value: "None",
+ PowerBIDatasetRefreshFields.STATUS.value: "None",
+ PowerBIDatasetRefreshFields.ERROR.value: "None",
+ },
+ ),
+ (
+ {}, # Empty input dictionary
+ {
+ PowerBIDatasetRefreshFields.REQUEST_ID.value: "None",
+ PowerBIDatasetRefreshFields.STATUS.value: "None",
+ PowerBIDatasetRefreshFields.ERROR.value: "None",
+ },
+ ),
+ ],
)
-
- # Assert that the correct refresh details are returned
- assert result == {
- PowerBIDatasetRefreshFields.REQUEST_ID.value:
"5e2d9921-e91b-491f-b7e1-e7d8db49194c",
- PowerBIDatasetRefreshFields.STATUS.value: "Completed",
- PowerBIDatasetRefreshFields.ERROR.value: "None",
- }
-
- # Call the function with an invalid request ID
- invalid_request_id = "invalid_request_id"
- with pytest.raises(PowerBIDatasetRefreshException):
- await powerbi_hook.get_refresh_details_by_refresh_id(
- dataset_id=DATASET_ID, group_id=GROUP_ID,
refresh_id=invalid_request_id
- )
-
-
[email protected]
-async def test_get_refresh_details_by_refresh_id_empty_history(powerbi_hook):
- """Test exception when refresh history is empty."""
- # Mock the get_refresh_history method to return an empty list
- powerbi_hook.get_refresh_history = mock.AsyncMock(return_value=[])
-
- # Call the function with a request ID
- refresh_id = "any_request_id"
- with pytest.raises(
- PowerBIDatasetRefreshException,
- match=f"Unable to fetch the details of dataset refresh with Request
Id: {refresh_id}",
- ):
- await powerbi_hook.get_refresh_details_by_refresh_id(
+ def test_raw_to_refresh_details(self, input_data, expected_output):
+ """Test raw_to_refresh_details method."""
+ result = PowerBIHook.raw_to_refresh_details(input_data)
+ assert result == expected_output
+
+ @pytest.mark.asyncio
+ async def test_get_refresh_details_by_refresh_id(self, powerbi_hook):
+ # Mock the get_refresh_history method to return a list of refresh
histories
+ refresh_histories = FORMATTED_RESPONSE
+ powerbi_hook.get_refresh_history =
mock.AsyncMock(return_value=refresh_histories)
+
+ # Call the function with a valid request ID
+ refresh_id = "5e2d9921-e91b-491f-b7e1-e7d8db49194c"
+ result = await powerbi_hook.get_refresh_details_by_refresh_id(
dataset_id=DATASET_ID, group_id=GROUP_ID, refresh_id=refresh_id
)
-
[email protected]
-async def test_get_refresh_details_by_refresh_id_not_found(powerbi_hook):
- """Test exception when the refresh ID is not found in the refresh
history."""
- # Mock the get_refresh_history method to return a list of refresh
histories without the specified ID
- powerbi_hook.get_refresh_history =
mock.AsyncMock(return_value=FORMATTED_RESPONSE)
-
- # Call the function with an invalid request ID
- invalid_request_id = "invalid_request_id"
- with pytest.raises(
- PowerBIDatasetRefreshException,
- match=f"Unable to fetch the details of dataset refresh with Request
Id: {invalid_request_id}",
- ):
- await powerbi_hook.get_refresh_details_by_refresh_id(
- dataset_id=DATASET_ID, group_id=GROUP_ID,
refresh_id=invalid_request_id
+ # Assert that the correct refresh details are returned
+ assert result == {
+ PowerBIDatasetRefreshFields.REQUEST_ID.value:
"5e2d9921-e91b-491f-b7e1-e7d8db49194c",
+ PowerBIDatasetRefreshFields.STATUS.value: "Completed",
+ PowerBIDatasetRefreshFields.ERROR.value: "None",
+ }
+
+ # Call the function with an invalid request ID
+ invalid_request_id = "invalid_request_id"
+ with pytest.raises(PowerBIDatasetRefreshException):
+ await powerbi_hook.get_refresh_details_by_refresh_id(
+ dataset_id=DATASET_ID, group_id=GROUP_ID,
refresh_id=invalid_request_id
+ )
+
+ @pytest.mark.asyncio
+ async def test_get_refresh_details_by_refresh_id_empty_history(self,
powerbi_hook):
+ """Test exception when refresh history is empty."""
+ # Mock the get_refresh_history method to return an empty list
+ powerbi_hook.get_refresh_history = mock.AsyncMock(return_value=[])
+
+ # Call the function with a request ID
+ refresh_id = "any_request_id"
+ with pytest.raises(
+ PowerBIDatasetRefreshException,
+ match=f"Unable to fetch the details of dataset refresh with
Request Id: {refresh_id}",
+ ):
+ await powerbi_hook.get_refresh_details_by_refresh_id(
+ dataset_id=DATASET_ID, group_id=GROUP_ID, refresh_id=refresh_id
+ )
+
+ @pytest.mark.asyncio
+ async def test_get_refresh_details_by_refresh_id_not_found(self,
powerbi_hook):
+ """Test exception when the refresh ID is not found in the refresh
history."""
+ # Mock the get_refresh_history method to return a list of refresh
histories without the specified ID
+ powerbi_hook.get_refresh_history =
mock.AsyncMock(return_value=FORMATTED_RESPONSE)
+
+ # Call the function with an invalid request ID
+ invalid_request_id = "invalid_request_id"
+ with pytest.raises(
+ PowerBIDatasetRefreshException,
+ match=f"Unable to fetch the details of dataset refresh with
Request Id: {invalid_request_id}",
+ ):
+ await powerbi_hook.get_refresh_details_by_refresh_id(
+ dataset_id=DATASET_ID, group_id=GROUP_ID,
refresh_id=invalid_request_id
+ )
+
+ @pytest.mark.asyncio
+ async def test_trigger_dataset_refresh_success(self, powerbi_hook):
+ response_data = {"requestid": "5e2d9921-e91b-491f-b7e1-e7d8db49194c"}
+
+ with mock.patch.object(KiotaRequestAdapterHook, "run",
new_callable=mock.AsyncMock) as mock_run:
+ mock_run.return_value = response_data
+ result = await
powerbi_hook.trigger_dataset_refresh(dataset_id=DATASET_ID, group_id=GROUP_ID)
+
+ assert result == "5e2d9921-e91b-491f-b7e1-e7d8db49194c"
+
+ @pytest.mark.asyncio
+ async def test_trigger_dataset_refresh_failure(self, powerbi_hook):
+ """Test failure to trigger dataset refresh due to AirflowException."""
+ with mock.patch.object(KiotaRequestAdapterHook, "run",
new_callable=mock.AsyncMock) as mock_run:
+ mock_run.side_effect = AirflowException("Test exception")
+
+ with pytest.raises(PowerBIDatasetRefreshException, match="Failed
to trigger dataset refresh."):
+ await
powerbi_hook.trigger_dataset_refresh(dataset_id=DATASET_ID, group_id=GROUP_ID)
+
+ @pytest.mark.asyncio
+ async def test_cancel_dataset_refresh(self, powerbi_hook):
+ dataset_refresh_id = "5e2d9921-e91b-491f-b7e1-e7d8db49194c"
+
+ with mock.patch.object(KiotaRequestAdapterHook, "run",
new_callable=mock.AsyncMock) as mock_run:
+ await powerbi_hook.cancel_dataset_refresh(DATASET_ID, GROUP_ID,
dataset_refresh_id)
+
+ mock_run.assert_called_once_with(
+
url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes/{dataset_refresh_id}",
+ response_type=None,
+ path_parameters={
+ "group_id": GROUP_ID,
+ "dataset_id": DATASET_ID,
+ "dataset_refresh_id": dataset_refresh_id,
+ },
+ method="DELETE",
)
-
-
[email protected]
-async def test_trigger_dataset_refresh_success(powerbi_hook):
- response_data = {"requestid": "5e2d9921-e91b-491f-b7e1-e7d8db49194c"}
-
- with mock.patch.object(KiotaRequestAdapterHook, "run",
new_callable=mock.AsyncMock) as mock_run:
- mock_run.return_value = response_data
- result = await
powerbi_hook.trigger_dataset_refresh(dataset_id=DATASET_ID, group_id=GROUP_ID)
-
- assert result == "5e2d9921-e91b-491f-b7e1-e7d8db49194c"
-
-
[email protected]
-async def test_trigger_dataset_refresh_failure(powerbi_hook):
- """Test failure to trigger dataset refresh due to AirflowException."""
- with mock.patch.object(KiotaRequestAdapterHook, "run",
new_callable=mock.AsyncMock) as mock_run:
- mock_run.side_effect = AirflowException("Test exception")
-
- with pytest.raises(PowerBIDatasetRefreshException, match="Failed to
trigger dataset refresh."):
- await powerbi_hook.trigger_dataset_refresh(dataset_id=DATASET_ID,
group_id=GROUP_ID)
-
-
[email protected]
-async def test_cancel_dataset_refresh(powerbi_hook):
- dataset_refresh_id = "5e2d9921-e91b-491f-b7e1-e7d8db49194c"
-
- with mock.patch.object(KiotaRequestAdapterHook, "run",
new_callable=mock.AsyncMock) as mock_run:
- await powerbi_hook.cancel_dataset_refresh(DATASET_ID, GROUP_ID,
dataset_refresh_id)
-
- mock_run.assert_called_once_with(
-
url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes/{dataset_refresh_id}",
- response_type=None,
- path_parameters={
- "group_id": GROUP_ID,
- "dataset_id": DATASET_ID,
- "dataset_refresh_id": dataset_refresh_id,
- },
- method="DELETE",
- )
diff --git a/tests/providers/microsoft/azure/operators/test_powerbi.py
b/tests/providers/microsoft/azure/operators/test_powerbi.py
index 2ee5ee723d..35bb76f782 100644
--- a/tests/providers/microsoft/azure/operators/test_powerbi.py
+++ b/tests/providers/microsoft/azure/operators/test_powerbi.py
@@ -17,6 +17,7 @@
from __future__ import annotations
+from unittest import mock
from unittest.mock import MagicMock
import pytest
@@ -25,11 +26,12 @@ from airflow.exceptions import AirflowException,
TaskDeferred
from airflow.providers.microsoft.azure.hooks.powerbi import (
PowerBIDatasetRefreshFields,
PowerBIDatasetRefreshStatus,
- PowerBIHook,
)
from airflow.providers.microsoft.azure.operators.powerbi import
PowerBIDatasetRefreshOperator
from airflow.providers.microsoft.azure.triggers.powerbi import PowerBITrigger
from airflow.utils import timezone
+from tests.providers.microsoft.azure.base import Base
+from tests.providers.microsoft.conftest import get_airflow_connection,
mock_context
DEFAULT_CONNECTION_CLIENT_SECRET = "powerbi_conn_id"
TASK_ID = "run_powerbi_operator"
@@ -72,86 +74,77 @@ IN_PROGRESS_REFRESH_DETAILS = {
}
[email protected]
-def mock_powerbi_hook():
- hook = PowerBIHook()
- return hook
-
-
-def test_execute_wait_for_termination_with_Deferrable(mock_powerbi_hook):
- operator = PowerBIDatasetRefreshOperator(
- **CONFIG,
- )
- operator.hook = mock_powerbi_hook
- context = {"ti": MagicMock()}
-
- with pytest.raises(TaskDeferred) as exc:
- operator.execute(context)
-
- assert isinstance(exc.value.trigger, PowerBITrigger)
+class TestPowerBIDatasetRefreshOperator(Base):
+ @mock.patch("airflow.hooks.base.BaseHook.get_connection",
side_effect=get_airflow_connection)
+ def test_execute_wait_for_termination_with_deferrable(self, connection):
+ operator = PowerBIDatasetRefreshOperator(
+ **CONFIG,
+ )
+ context = mock_context(task=operator)
+ with pytest.raises(TaskDeferred) as exc:
+ operator.execute(context)
-def test_powerbi_operator_async_execute_complete_success():
- """Assert that execute_complete log success message"""
- operator = PowerBIDatasetRefreshOperator(
- **CONFIG,
- )
- context = {"ti": MagicMock()}
- operator.execute_complete(
- context=context,
- event=SUCCESS_TRIGGER_EVENT,
- )
- assert context["ti"].xcom_push.call_count == 2
+ assert isinstance(exc.value.trigger, PowerBITrigger)
+ def test_powerbi_operator_async_execute_complete_success(self):
+ """Assert that execute_complete log success message"""
+ operator = PowerBIDatasetRefreshOperator(
+ **CONFIG,
+ )
+ context = {"ti": MagicMock()}
+ operator.execute_complete(
+ context=context,
+ event=SUCCESS_TRIGGER_EVENT,
+ )
+ assert context["ti"].xcom_push.call_count == 2
-def test_powerbi_operator_async_execute_complete_fail():
- """Assert that execute_complete raise exception on error"""
- operator = PowerBIDatasetRefreshOperator(
- **CONFIG,
- )
- context = {"ti": MagicMock()}
- with pytest.raises(AirflowException):
+ def test_powerbi_operator_async_execute_complete_fail(self):
+ """Assert that execute_complete raise exception on error"""
+ operator = PowerBIDatasetRefreshOperator(
+ **CONFIG,
+ )
+ context = {"ti": MagicMock()}
+ with pytest.raises(AirflowException):
+ operator.execute_complete(
+ context=context,
+ event={"status": "error", "message": "error",
"dataset_refresh_id": "1234"},
+ )
+ assert context["ti"].xcom_push.call_count == 0
+
+ def test_execute_complete_no_event(self):
+ """Test execute_complete when event is None or empty."""
+ operator = PowerBIDatasetRefreshOperator(
+ **CONFIG,
+ )
+ context = {"ti": MagicMock()}
operator.execute_complete(
context=context,
- event={"status": "error", "message": "error",
"dataset_refresh_id": "1234"},
+ event=None,
+ )
+ assert context["ti"].xcom_push.call_count == 0
+
+ @pytest.mark.db_test
+ def test_powerbi_link(self, create_task_instance_of_operator):
+ """Assert Power BI Extra link matches the expected URL."""
+ ti = create_task_instance_of_operator(
+ PowerBIDatasetRefreshOperator,
+ dag_id="test_powerbi_refresh_op_link",
+ execution_date=DEFAULT_DATE,
+ task_id=TASK_ID,
+ conn_id=DEFAULT_CONNECTION_CLIENT_SECRET,
+ group_id=GROUP_ID,
+ dataset_id=DATASET_ID,
+ check_interval=1,
+ timeout=3,
)
- assert context["ti"].xcom_push.call_count == 0
-
-
-def test_execute_complete_no_event():
- """Test execute_complete when event is None or empty."""
- operator = PowerBIDatasetRefreshOperator(
- **CONFIG,
- )
- context = {"ti": MagicMock()}
- operator.execute_complete(
- context=context,
- event=None,
- )
- assert context["ti"].xcom_push.call_count == 0
-
-
[email protected]_test
-def test_powerbilink(create_task_instance_of_operator):
- """Assert Power BI Extra link matches the expected URL."""
- ti = create_task_instance_of_operator(
- PowerBIDatasetRefreshOperator,
- dag_id="test_powerbi_refresh_op_link",
- execution_date=DEFAULT_DATE,
- task_id=TASK_ID,
- conn_id=DEFAULT_CONNECTION_CLIENT_SECRET,
- group_id=GROUP_ID,
- dataset_id=DATASET_ID,
- check_interval=1,
- timeout=3,
- )
-
- ti.xcom_push(key="powerbi_dataset_refresh_id",
value=NEW_REFRESH_REQUEST_ID)
- url = ti.task.get_extra_links(ti, "Monitor PowerBI Dataset")
- EXPECTED_ITEM_RUN_OP_EXTRA_LINK = (
- "https://app.powerbi.com" # type: ignore[attr-defined]
- f"/groups/{GROUP_ID}/datasets/{DATASET_ID}" # type:
ignore[attr-defined]
- "/details?experience=power-bi"
- )
-
- assert url == EXPECTED_ITEM_RUN_OP_EXTRA_LINK
+
+ ti.xcom_push(key="powerbi_dataset_refresh_id",
value=NEW_REFRESH_REQUEST_ID)
+ url = ti.task.get_extra_links(ti, "Monitor PowerBI Dataset")
+ EXPECTED_ITEM_RUN_OP_EXTRA_LINK = (
+ "https://app.powerbi.com" # type: ignore[attr-defined]
+ f"/groups/{GROUP_ID}/datasets/{DATASET_ID}" # type:
ignore[attr-defined]
+ "/details?experience=power-bi"
+ )
+
+ assert url == EXPECTED_ITEM_RUN_OP_EXTRA_LINK
diff --git a/tests/providers/microsoft/azure/triggers/test_powerbi.py
b/tests/providers/microsoft/azure/triggers/test_powerbi.py
index 5b44a84149..c3276e258b 100644
--- a/tests/providers/microsoft/azure/triggers/test_powerbi.py
+++ b/tests/providers/microsoft/azure/triggers/test_powerbi.py
@@ -19,11 +19,10 @@ from __future__ import annotations
import asyncio
from unittest import mock
-from unittest.mock import patch
import pytest
-from airflow.providers.microsoft.azure.hooks.powerbi import
PowerBIDatasetRefreshStatus, PowerBIHook
+from airflow.providers.microsoft.azure.hooks.powerbi import
PowerBIDatasetRefreshStatus
from airflow.providers.microsoft.azure.triggers.powerbi import PowerBITrigger
from airflow.triggers.base import TriggerEvent
from tests.providers.microsoft.conftest import get_airflow_connection
@@ -54,19 +53,10 @@ def powerbi_trigger():
return trigger
[email protected]
-def mock_powerbi_hook():
- hook = PowerBIHook()
- return hook
-
-
-def test_powerbi_trigger_serialization():
- """Asserts that the PowerBI Trigger correctly serializes its arguments and
classpath."""
-
- with patch(
- "airflow.hooks.base.BaseHook.get_connection",
- side_effect=get_airflow_connection,
- ):
+class TestPowerBITrigger:
+ @mock.patch("airflow.hooks.base.BaseHook.get_connection",
side_effect=get_airflow_connection)
+ def test_powerbi_trigger_serialization(self, connection):
+ """Asserts that the PowerBI Trigger correctly serializes its arguments
and classpath."""
powerbi_trigger = PowerBITrigger(
conn_id=POWERBI_CONN_ID,
proxies=None,
@@ -91,167 +81,170 @@ def test_powerbi_trigger_serialization():
"wait_for_termination": True,
}
-
[email protected]
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id")
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh")
-async def test_powerbi_trigger_run_inprogress(
- mock_trigger_dataset_refresh, mock_get_refresh_details_by_refresh_id,
powerbi_trigger
-):
- """Assert task isn't completed until timeout if dataset refresh is in
progress."""
- mock_get_refresh_details_by_refresh_id.return_value = {"status":
PowerBIDatasetRefreshStatus.IN_PROGRESS}
- mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID
- task = asyncio.create_task(powerbi_trigger.run().__anext__())
- await asyncio.sleep(0.5)
-
- # Assert TriggerEvent was not returned
- assert task.done() is False
- asyncio.get_event_loop().stop()
-
-
[email protected]
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id")
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh")
-async def test_powerbi_trigger_run_failed(
- mock_trigger_dataset_refresh, mock_get_refresh_details_by_refresh_id,
powerbi_trigger
-):
- """Assert event is triggered upon failed dataset refresh."""
- mock_get_refresh_details_by_refresh_id.return_value = {"status":
PowerBIDatasetRefreshStatus.FAILED}
- mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID
-
- generator = powerbi_trigger.run()
- actual = await generator.asend(None)
- expected = TriggerEvent(
- {
- "status": "Failed",
- "message": f"The dataset refresh {DATASET_REFRESH_ID} has "
- f"{PowerBIDatasetRefreshStatus.FAILED}.",
- "dataset_refresh_id": DATASET_REFRESH_ID,
- }
- )
- assert expected == actual
-
-
[email protected]
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id")
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh")
-async def test_powerbi_trigger_run_completed(
- mock_trigger_dataset_refresh, mock_get_refresh_details_by_refresh_id,
powerbi_trigger
-):
- """Assert event is triggered upon successful dataset refresh."""
- mock_get_refresh_details_by_refresh_id.return_value = {"status":
PowerBIDatasetRefreshStatus.COMPLETED}
- mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID
-
- generator = powerbi_trigger.run()
- actual = await generator.asend(None)
- expected = TriggerEvent(
- {
- "status": "Completed",
- "message": f"The dataset refresh {DATASET_REFRESH_ID} has "
- f"{PowerBIDatasetRefreshStatus.COMPLETED}.",
- "dataset_refresh_id": DATASET_REFRESH_ID,
- }
- )
- assert expected == actual
-
-
[email protected]
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.cancel_dataset_refresh")
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id")
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh")
-async def test_powerbi_trigger_run_exception_during_refresh_check_loop(
- mock_trigger_dataset_refresh,
- mock_get_refresh_details_by_refresh_id,
- mock_cancel_dataset_refresh,
- powerbi_trigger,
-):
- """Assert that run catch exception if Power BI API throw exception"""
- mock_get_refresh_details_by_refresh_id.side_effect = Exception("Test
exception")
- mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID
-
- task = [i async for i in powerbi_trigger.run()]
- response = TriggerEvent(
- {
- "status": "error",
- "message": "An error occurred: Test exception",
- "dataset_refresh_id": DATASET_REFRESH_ID,
- }
- )
- assert len(task) == 1
- assert response in task
- mock_cancel_dataset_refresh.assert_called_once()
-
-
[email protected]
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.cancel_dataset_refresh")
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id")
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh")
-async def test_powerbi_trigger_run_exception_during_refresh_cancellation(
- mock_trigger_dataset_refresh,
- mock_get_refresh_details_by_refresh_id,
- mock_cancel_dataset_refresh,
- powerbi_trigger,
-):
- """Assert that run catch exception if Power BI API throw exception"""
- mock_get_refresh_details_by_refresh_id.side_effect = Exception("Test
exception")
- mock_cancel_dataset_refresh.side_effect = Exception("Exception caused by
cancel_dataset_refresh")
- mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID
-
- task = [i async for i in powerbi_trigger.run()]
- response = TriggerEvent(
- {
- "status": "error",
- "message": "An error occurred while canceling dataset: Exception
caused by cancel_dataset_refresh",
- "dataset_refresh_id": DATASET_REFRESH_ID,
+ @pytest.mark.asyncio
+
@mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id")
+ @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh")
+ async def test_powerbi_trigger_run_inprogress(
+ self, mock_trigger_dataset_refresh,
mock_get_refresh_details_by_refresh_id, powerbi_trigger
+ ):
+ """Assert task isn't completed until timeout if dataset refresh is in
progress."""
+ mock_get_refresh_details_by_refresh_id.return_value = {
+ "status": PowerBIDatasetRefreshStatus.IN_PROGRESS
}
- )
-
- assert len(task) == 1
- assert response in task
- mock_cancel_dataset_refresh.assert_called_once()
-
-
[email protected]
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id")
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh")
-async def test_powerbi_trigger_run_exception_without_refresh_id(
- mock_trigger_dataset_refresh, mock_get_refresh_details_by_refresh_id,
powerbi_trigger
-):
- """Assert handling of exception when there is no dataset_refresh_id"""
- powerbi_trigger.dataset_refresh_id = None
- mock_get_refresh_details_by_refresh_id.side_effect = Exception("Test
exception for no dataset_refresh_id")
- mock_trigger_dataset_refresh.return_value = None
+ mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID
+ task = asyncio.create_task(powerbi_trigger.run().__anext__())
+ await asyncio.sleep(0.5)
+
+ # Assert TriggerEvent was not returned
+ assert task.done() is False
+ asyncio.get_event_loop().stop()
+
+ @pytest.mark.asyncio
+
@mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id")
+ @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh")
+ async def test_powerbi_trigger_run_failed(
+ self, mock_trigger_dataset_refresh,
mock_get_refresh_details_by_refresh_id, powerbi_trigger
+ ):
+ """Assert event is triggered upon failed dataset refresh."""
+ mock_get_refresh_details_by_refresh_id.return_value = {"status":
PowerBIDatasetRefreshStatus.FAILED}
+ mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID
+
+ generator = powerbi_trigger.run()
+ actual = await generator.asend(None)
+ expected = TriggerEvent(
+ {
+ "status": "Failed",
+ "message": f"The dataset refresh {DATASET_REFRESH_ID} has "
+ f"{PowerBIDatasetRefreshStatus.FAILED}.",
+ "dataset_refresh_id": DATASET_REFRESH_ID,
+ }
+ )
+ assert expected == actual
- task = [i async for i in powerbi_trigger.run()]
- response = TriggerEvent(
- {
- "status": "error",
- "message": "An error occurred: Test exception for no
dataset_refresh_id",
- "dataset_refresh_id": None,
+ @pytest.mark.asyncio
+
@mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id")
+ @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh")
+ async def test_powerbi_trigger_run_completed(
+ self, mock_trigger_dataset_refresh,
mock_get_refresh_details_by_refresh_id, powerbi_trigger
+ ):
+ """Assert event is triggered upon successful dataset refresh."""
+ mock_get_refresh_details_by_refresh_id.return_value = {
+ "status": PowerBIDatasetRefreshStatus.COMPLETED
}
- )
- assert len(task) == 1
- assert response in task
-
+ mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID
+
+ generator = powerbi_trigger.run()
+ actual = await generator.asend(None)
+ expected = TriggerEvent(
+ {
+ "status": "Completed",
+ "message": f"The dataset refresh {DATASET_REFRESH_ID} has "
+ f"{PowerBIDatasetRefreshStatus.COMPLETED}.",
+ "dataset_refresh_id": DATASET_REFRESH_ID,
+ }
+ )
+ assert expected == actual
+
+ @pytest.mark.asyncio
+ @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.cancel_dataset_refresh")
+
@mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id")
+ @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh")
+ async def test_powerbi_trigger_run_exception_during_refresh_check_loop(
+ self,
+ mock_trigger_dataset_refresh,
+ mock_get_refresh_details_by_refresh_id,
+ mock_cancel_dataset_refresh,
+ powerbi_trigger,
+ ):
+ """Assert that run catch exception if Power BI API throw exception"""
+ mock_get_refresh_details_by_refresh_id.side_effect = Exception("Test
exception")
+ mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID
+
+ task = [i async for i in powerbi_trigger.run()]
+ response = TriggerEvent(
+ {
+ "status": "error",
+ "message": "An error occurred: Test exception",
+ "dataset_refresh_id": DATASET_REFRESH_ID,
+ }
+ )
+ assert len(task) == 1
+ assert response in task
+ mock_cancel_dataset_refresh.assert_called_once()
+
+ @pytest.mark.asyncio
+ @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.cancel_dataset_refresh")
+
@mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id")
+ @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh")
+ async def test_powerbi_trigger_run_exception_during_refresh_cancellation(
+ self,
+ mock_trigger_dataset_refresh,
+ mock_get_refresh_details_by_refresh_id,
+ mock_cancel_dataset_refresh,
+ powerbi_trigger,
+ ):
+ """Assert that run catch exception if Power BI API throw exception"""
+ mock_get_refresh_details_by_refresh_id.side_effect = Exception("Test
exception")
+ mock_cancel_dataset_refresh.side_effect = Exception("Exception caused
by cancel_dataset_refresh")
+ mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID
+
+ task = [i async for i in powerbi_trigger.run()]
+ response = TriggerEvent(
+ {
+ "status": "error",
+ "message": "An error occurred while canceling dataset:
Exception caused by cancel_dataset_refresh",
+ "dataset_refresh_id": DATASET_REFRESH_ID,
+ }
+ )
[email protected]
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id")
[email protected](f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh")
-async def test_powerbi_trigger_run_timeout(
- mock_trigger_dataset_refresh, mock_get_refresh_details_by_refresh_id,
powerbi_trigger
-):
- """Assert that powerbi run timesout after end_time elapses"""
- mock_get_refresh_details_by_refresh_id.return_value = {"status":
PowerBIDatasetRefreshStatus.IN_PROGRESS}
- mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID
+ assert len(task) == 1
+ assert response in task
+ mock_cancel_dataset_refresh.assert_called_once()
- generator = powerbi_trigger.run()
- actual = await generator.asend(None)
- expected = TriggerEvent(
- {
- "status": "error",
- "message": f"Timeout occurred while waiting for dataset refresh to
complete: The dataset refresh {DATASET_REFRESH_ID} has status In Progress.",
- "dataset_refresh_id": DATASET_REFRESH_ID,
+ @pytest.mark.asyncio
+
@mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id")
+ @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh")
+ async def test_powerbi_trigger_run_exception_without_refresh_id(
+ self, mock_trigger_dataset_refresh,
mock_get_refresh_details_by_refresh_id, powerbi_trigger
+ ):
+ """Assert handling of exception when there is no dataset_refresh_id"""
+ powerbi_trigger.dataset_refresh_id = None
+ mock_get_refresh_details_by_refresh_id.side_effect = Exception(
+ "Test exception for no dataset_refresh_id"
+ )
+ mock_trigger_dataset_refresh.return_value = None
+
+ task = [i async for i in powerbi_trigger.run()]
+ response = TriggerEvent(
+ {
+ "status": "error",
+ "message": "An error occurred: Test exception for no
dataset_refresh_id",
+ "dataset_refresh_id": None,
+ }
+ )
+ assert len(task) == 1
+ assert response in task
+
+ @pytest.mark.asyncio
+
@mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.get_refresh_details_by_refresh_id")
+ @mock.patch(f"{MODULE}.hooks.powerbi.PowerBIHook.trigger_dataset_refresh")
+ async def test_powerbi_trigger_run_timeout(
+ self, mock_trigger_dataset_refresh,
mock_get_refresh_details_by_refresh_id, powerbi_trigger
+ ):
+ """Assert that powerbi run timesout after end_time elapses"""
+ mock_get_refresh_details_by_refresh_id.return_value = {
+ "status": PowerBIDatasetRefreshStatus.IN_PROGRESS
}
- )
+ mock_trigger_dataset_refresh.return_value = DATASET_REFRESH_ID
+
+ generator = powerbi_trigger.run()
+ actual = await generator.asend(None)
+ expected = TriggerEvent(
+ {
+ "status": "error",
+ "message": f"Timeout occurred while waiting for dataset
refresh to complete: The dataset refresh {DATASET_REFRESH_ID} has status In
Progress.",
+ "dataset_refresh_id": DATASET_REFRESH_ID,
+ }
+ )
- assert expected == actual
+ assert expected == actual
diff --git a/tests/providers/microsoft/conftest.py
b/tests/providers/microsoft/conftest.py
index 8a25873529..de25d24fb0 100644
--- a/tests/providers/microsoft/conftest.py
+++ b/tests/providers/microsoft/conftest.py
@@ -32,6 +32,7 @@ from httpx import Headers, Response
from msgraph_core import APIVersion
from airflow.models import Connection
+from airflow.providers.microsoft.azure.hooks.powerbi import PowerBIHook
from airflow.utils.context import Context
if TYPE_CHECKING:
@@ -183,21 +184,45 @@ def load_file(*args: str, mode="r", encoding="utf-8"):
def get_airflow_connection(
conn_id: str,
+ host: str = "graph.microsoft.com",
login: str = "client_id",
password: str = "client_secret",
tenant_id: str = "tenant-id",
+ azure_tenant_id: str | None = None,
proxies: dict | None = None,
- api_version: APIVersion = APIVersion.v1,
+ scopes: list[str] | None = None,
+ api_version: APIVersion | str | None = APIVersion.v1.value,
+ authority: str | None = None,
+ disable_instance_discovery: bool = False,
):
from airflow.models import Connection
+ extra = {
+ "api_version": api_version,
+ "proxies": proxies or {},
+ "verify": False,
+ "scopes": scopes or [],
+ "authority": authority,
+ "disable_instance_discovery": disable_instance_discovery,
+ }
+
+ if azure_tenant_id:
+ extra["tenantId"] = azure_tenant_id
+ else:
+ extra["tenant_id"] = tenant_id
+
return Connection(
schema="https",
conn_id=conn_id,
conn_type="http",
- host="graph.microsoft.com",
+ host=host,
port=80,
login=login,
password=password,
- extra={"tenant_id": tenant_id, "api_version": api_version.value,
"proxies": proxies or {}},
+ extra=extra,
)
+
+
[email protected]
+def powerbi_hook():
+ return PowerBIHook(**{"conn_id": "powerbi_conn_id", "timeout": 3,
"api_version": "v1.0"})
diff --git a/tests/system/providers/microsoft/azure/example_msfabric.py
b/tests/system/providers/microsoft/azure/example_msfabric.py
new file mode 100644
index 0000000000..7d62a49e0b
--- /dev/null
+++ b/tests/system/providers/microsoft/azure/example_msfabric.py
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow import models
+from airflow.datasets import Dataset
+from airflow.providers.microsoft.azure.operators.msgraph import
MSGraphAsyncOperator
+
+DAG_ID = "example_msfabric"
+
+with models.DAG(
+ DAG_ID,
+ start_date=datetime(2021, 1, 1),
+ schedule=None,
+ tags=["example"],
+) as dag:
+ # [START howto_operator_ms_fabric_create_item_schedule]
+ #
https://learn.microsoft.com/en-us/rest/api/fabric/core/job-scheduler/create-item-schedule?tabs=HTTP
+ workspaces_task = MSGraphAsyncOperator(
+ task_id="schedule_datapipeline",
+ conn_id="powerbi",
+ method="POST",
+ url="workspaces/{workspaceId}/items/{itemId}/jobs/instances",
+ path_parameters={
+ "workspaceId": "e90b2873-4812-4dfb-9246-593638165644",
+ "itemId": "65448530-e5ec-4aeb-a97e-7cebf5d67c18",
+ },
+ query_parameters={"jobType": "Pipeline"},
+ dag=dag,
+ outlets=[
+ Dataset(
+
"workspaces/e90b2873-4812-4dfb-9246-593638165644/items/65448530-e5ec-4aeb-a97e-7cebf5d67c18/jobs/instances?jobType=Pipeline"
+ )
+ ],
+ )
+ # [END howto_operator_ms_fabric_create_item_schedule]
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)