This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ee785a89ba2 Fixes compat issue HTTPX proxy configuration in
KiotaRequestAdapterHook and fixed retry in MSGraphSensor (#45746)
ee785a89ba2 is described below
commit ee785a89ba27a59246cdfcc0d83129b691a39f3e
Author: David Blain <[email protected]>
AuthorDate: Sun Jan 19 14:33:17 2025 +0100
Fixes compat issue HTTPX proxy configuration in KiotaRequestAdapterHook and
fixed retry in MSGraphSensor (#45746)
---------
Co-authored-by: David Blain <[email protected]>
---
.../providers/microsoft/azure/hooks/msgraph.py | 8 +++----
.../providers/microsoft/azure/sensors/msgraph.py | 1 +
.../tests/microsoft/azure/resources/status.json | 2 +-
.../tests/microsoft/azure/sensors/test_msgraph.py | 27 +++++++++++++++++-----
providers/tests/microsoft/conftest.py | 6 +++--
5 files changed, 31 insertions(+), 13 deletions(-)
diff --git a/providers/src/airflow/providers/microsoft/azure/hooks/msgraph.py
b/providers/src/airflow/providers/microsoft/azure/hooks/msgraph.py
index 0e12696d9f3..f01fa1c5858 100644
--- a/providers/src/airflow/providers/microsoft/azure/hooks/msgraph.py
+++ b/providers/src/airflow/providers/microsoft/azure/hooks/msgraph.py
@@ -27,7 +27,7 @@ from urllib.parse import quote, urljoin, urlparse
import httpx
from azure.identity import ClientSecretCredential
-from httpx import Timeout
+from httpx import AsyncHTTPTransport, Timeout
from kiota_abstractions.api_error import APIError
from kiota_abstractions.method import Method
from kiota_abstractions.request_information import RequestInformation
@@ -208,9 +208,9 @@ class KiotaRequestAdapterHook(BaseHook):
def to_httpx_proxies(cls, proxies: dict) -> dict:
proxies = proxies.copy()
if proxies.get("http"):
- proxies["http://"] = proxies.pop("http")
+ proxies["http://"] = AsyncHTTPTransport(proxy=proxies.pop("http"))
if proxies.get("https"):
- proxies["https://"] = proxies.pop("https")
+ proxies["https://"] =
AsyncHTTPTransport(proxy=proxies.pop("https"))
if proxies.get("no"):
for url in proxies.pop("no", "").split(","):
proxies[cls.format_no_proxy_url(url.strip())] = None
@@ -288,7 +288,7 @@ class KiotaRequestAdapterHook(BaseHook):
http_client = GraphClientFactory.create_with_default_middleware(
api_version=api_version, # type: ignore
client=httpx.AsyncClient(
- proxy=httpx_proxies, # type: ignore
+ mounts=httpx_proxies,
timeout=Timeout(timeout=self.timeout),
verify=verify,
trust_env=trust_env,
diff --git a/providers/src/airflow/providers/microsoft/azure/sensors/msgraph.py
b/providers/src/airflow/providers/microsoft/azure/sensors/msgraph.py
index 42c58529005..6b5622e2d7a 100644
--- a/providers/src/airflow/providers/microsoft/azure/sensors/msgraph.py
+++ b/providers/src/airflow/providers/microsoft/azure/sensors/msgraph.py
@@ -129,6 +129,7 @@ class MSGraphSensor(BaseSensorOperator):
def retry_execute(
self,
context: Context,
+ **kwargs,
) -> Any:
self.execute(context=context)
diff --git a/providers/tests/microsoft/azure/resources/status.json
b/providers/tests/microsoft/azure/resources/status.json
index 6bff9e29afb..bfece9ed6a9 100644
--- a/providers/tests/microsoft/azure/resources/status.json
+++ b/providers/tests/microsoft/azure/resources/status.json
@@ -1 +1 @@
-{"id": "0a1b1bf3-37de-48f7-9863-ed4cda97a9ef", "createdDateTime":
"2024-04-10T15:05:17.357", "status": "Succeeded"}
+[{"id": "0a1b1bf3-37de-48f7-9863-ed4cda97a9ef", "createdDateTime":
"2024-04-10T15:05:17.357", "status": "InProgress"},{"id":
"0a1b1bf3-37de-48f7-9863-ed4cda97a9ef", "createdDateTime":
"2024-04-10T15:05:17.357", "status": "Succeeded"}]
diff --git a/providers/tests/microsoft/azure/sensors/test_msgraph.py
b/providers/tests/microsoft/azure/sensors/test_msgraph.py
index 9ad03ccf170..4240a88e403 100644
--- a/providers/tests/microsoft/azure/sensors/test_msgraph.py
+++ b/providers/tests/microsoft/azure/sensors/test_msgraph.py
@@ -17,6 +17,7 @@
from __future__ import annotations
import json
+from datetime import datetime
import pytest
@@ -31,7 +32,7 @@ from tests_common.test_utils.version_compat import
AIRFLOW_V_2_10_PLUS
class TestMSGraphSensor(Base):
def test_execute(self):
status = load_json("resources", "status.json")
- response = mock_json_response(200, status)
+ response = mock_json_response(200, *status)
with self.patch_hook_and_request_adapter(response):
sensor = MSGraphSensor(
@@ -40,6 +41,7 @@ class TestMSGraphSensor(Base):
url="myorg/admin/workspaces/scanStatus/{scanId}",
path_parameters={"scanId":
"0a1b1bf3-37de-48f7-9863-ed4cda97a9ef"},
result_processor=lambda context, result: result["id"],
+ retry_delay=5,
timeout=350.0,
)
@@ -48,16 +50,22 @@ class TestMSGraphSensor(Base):
assert sensor.path_parameters == {"scanId":
"0a1b1bf3-37de-48f7-9863-ed4cda97a9ef"}
assert isinstance(results, str)
assert results == "0a1b1bf3-37de-48f7-9863-ed4cda97a9ef"
- assert len(events) == 1
+ assert len(events) == 3
assert isinstance(events[0], TriggerEvent)
assert events[0].payload["status"] == "success"
assert events[0].payload["type"] == "builtins.dict"
- assert events[0].payload["response"] == json.dumps(status)
+ assert events[0].payload["response"] == json.dumps(status[0])
+ assert isinstance(events[1], TriggerEvent)
+ assert isinstance(events[1].payload, datetime)
+ assert isinstance(events[2], TriggerEvent)
+ assert events[2].payload["status"] == "success"
+ assert events[2].payload["type"] == "builtins.dict"
+ assert events[2].payload["response"] == json.dumps(status[1])
@pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Lambda parameters
works in Airflow >= 2.10.0")
def test_execute_with_lambda_parameter(self):
status = load_json("resources", "status.json")
- response = mock_json_response(200, status)
+ response = mock_json_response(200, *status)
with self.patch_hook_and_request_adapter(response):
sensor = MSGraphSensor(
@@ -66,6 +74,7 @@ class TestMSGraphSensor(Base):
url="myorg/admin/workspaces/scanStatus/{scanId}",
path_parameters=lambda context, jinja_env: {"scanId":
"0a1b1bf3-37de-48f7-9863-ed4cda97a9ef"},
result_processor=lambda context, result: result["id"],
+ retry_delay=5,
timeout=350.0,
)
@@ -74,11 +83,17 @@ class TestMSGraphSensor(Base):
assert sensor.path_parameters == {"scanId":
"0a1b1bf3-37de-48f7-9863-ed4cda97a9ef"}
assert isinstance(results, str)
assert results == "0a1b1bf3-37de-48f7-9863-ed4cda97a9ef"
- assert len(events) == 1
+ assert len(events) == 3
assert isinstance(events[0], TriggerEvent)
assert events[0].payload["status"] == "success"
assert events[0].payload["type"] == "builtins.dict"
- assert events[0].payload["response"] == json.dumps(status)
+ assert events[0].payload["response"] == json.dumps(status[0])
+ assert isinstance(events[1], TriggerEvent)
+ assert isinstance(events[1].payload, datetime)
+ assert isinstance(events[2], TriggerEvent)
+ assert events[2].payload["status"] == "success"
+ assert events[2].payload["type"] == "builtins.dict"
+ assert events[2].payload["response"] == json.dumps(status[1])
def test_template_fields(self):
sensor = MSGraphSensor(
diff --git a/providers/tests/microsoft/conftest.py
b/providers/tests/microsoft/conftest.py
index d875096402b..d4a65075f50 100644
--- a/providers/tests/microsoft/conftest.py
+++ b/providers/tests/microsoft/conftest.py
@@ -149,8 +149,10 @@ def mock_context(task) -> Context:
run_id: str | None = None,
) -> Any:
if map_indexes:
- return values.get(f"{task_ids or self.task_id}_{dag_id or
self.dag_id}_{key}_{map_indexes}")
- return values.get(f"{task_ids or self.task_id}_{dag_id or
self.dag_id}_{key}")
+ return values.get(
+ f"{task_ids or self.task_id}_{dag_id or
self.dag_id}_{key}_{map_indexes}", default
+ )
+ return values.get(f"{task_ids or self.task_id}_{dag_id or
self.dag_id}_{key}", default)
def xcom_push(self, key: str, value: Any, session: Session =
NEW_SESSION, **kwargs) -> None:
values[f"{self.task_id}_{self.dag_id}_{key}_{self.map_index}"] =
value