This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ee785a89ba2 Fixes compat issue HTTPX proxy configuration in 
KiotaRequestAdapterHook and fixed retry in MSGraphSensor (#45746)
ee785a89ba2 is described below

commit ee785a89ba27a59246cdfcc0d83129b691a39f3e
Author: David Blain <[email protected]>
AuthorDate: Sun Jan 19 14:33:17 2025 +0100

    Fixes compat issue HTTPX proxy configuration in KiotaRequestAdapterHook and 
fixed retry in MSGraphSensor (#45746)
    
    
    ---------
    
    Co-authored-by: David Blain <[email protected]>
---
 .../providers/microsoft/azure/hooks/msgraph.py     |  8 +++----
 .../providers/microsoft/azure/sensors/msgraph.py   |  1 +
 .../tests/microsoft/azure/resources/status.json    |  2 +-
 .../tests/microsoft/azure/sensors/test_msgraph.py  | 27 +++++++++++++++++-----
 providers/tests/microsoft/conftest.py              |  6 +++--
 5 files changed, 31 insertions(+), 13 deletions(-)

diff --git a/providers/src/airflow/providers/microsoft/azure/hooks/msgraph.py 
b/providers/src/airflow/providers/microsoft/azure/hooks/msgraph.py
index 0e12696d9f3..f01fa1c5858 100644
--- a/providers/src/airflow/providers/microsoft/azure/hooks/msgraph.py
+++ b/providers/src/airflow/providers/microsoft/azure/hooks/msgraph.py
@@ -27,7 +27,7 @@ from urllib.parse import quote, urljoin, urlparse
 
 import httpx
 from azure.identity import ClientSecretCredential
-from httpx import Timeout
+from httpx import AsyncHTTPTransport, Timeout
 from kiota_abstractions.api_error import APIError
 from kiota_abstractions.method import Method
 from kiota_abstractions.request_information import RequestInformation
@@ -208,9 +208,9 @@ class KiotaRequestAdapterHook(BaseHook):
     def to_httpx_proxies(cls, proxies: dict) -> dict:
         proxies = proxies.copy()
         if proxies.get("http"):
-            proxies["http://";] = proxies.pop("http")
+            proxies["http://";] = AsyncHTTPTransport(proxy=proxies.pop("http"))
         if proxies.get("https"):
-            proxies["https://";] = proxies.pop("https")
+            proxies["https://";] = 
AsyncHTTPTransport(proxy=proxies.pop("https"))
         if proxies.get("no"):
             for url in proxies.pop("no", "").split(","):
                 proxies[cls.format_no_proxy_url(url.strip())] = None
@@ -288,7 +288,7 @@ class KiotaRequestAdapterHook(BaseHook):
             http_client = GraphClientFactory.create_with_default_middleware(
                 api_version=api_version,  # type: ignore
                 client=httpx.AsyncClient(
-                    proxy=httpx_proxies,  # type: ignore
+                    mounts=httpx_proxies,
                     timeout=Timeout(timeout=self.timeout),
                     verify=verify,
                     trust_env=trust_env,
diff --git a/providers/src/airflow/providers/microsoft/azure/sensors/msgraph.py 
b/providers/src/airflow/providers/microsoft/azure/sensors/msgraph.py
index 42c58529005..6b5622e2d7a 100644
--- a/providers/src/airflow/providers/microsoft/azure/sensors/msgraph.py
+++ b/providers/src/airflow/providers/microsoft/azure/sensors/msgraph.py
@@ -129,6 +129,7 @@ class MSGraphSensor(BaseSensorOperator):
     def retry_execute(
         self,
         context: Context,
+        **kwargs,
     ) -> Any:
         self.execute(context=context)
 
diff --git a/providers/tests/microsoft/azure/resources/status.json 
b/providers/tests/microsoft/azure/resources/status.json
index 6bff9e29afb..bfece9ed6a9 100644
--- a/providers/tests/microsoft/azure/resources/status.json
+++ b/providers/tests/microsoft/azure/resources/status.json
@@ -1 +1 @@
-{"id": "0a1b1bf3-37de-48f7-9863-ed4cda97a9ef", "createdDateTime": 
"2024-04-10T15:05:17.357", "status": "Succeeded"}
+[{"id": "0a1b1bf3-37de-48f7-9863-ed4cda97a9ef", "createdDateTime": 
"2024-04-10T15:05:17.357", "status": "InProgress"},{"id": 
"0a1b1bf3-37de-48f7-9863-ed4cda97a9ef", "createdDateTime": 
"2024-04-10T15:05:17.357", "status": "Succeeded"}]
diff --git a/providers/tests/microsoft/azure/sensors/test_msgraph.py 
b/providers/tests/microsoft/azure/sensors/test_msgraph.py
index 9ad03ccf170..4240a88e403 100644
--- a/providers/tests/microsoft/azure/sensors/test_msgraph.py
+++ b/providers/tests/microsoft/azure/sensors/test_msgraph.py
@@ -17,6 +17,7 @@
 from __future__ import annotations
 
 import json
+from datetime import datetime
 
 import pytest
 
@@ -31,7 +32,7 @@ from tests_common.test_utils.version_compat import 
AIRFLOW_V_2_10_PLUS
 class TestMSGraphSensor(Base):
     def test_execute(self):
         status = load_json("resources", "status.json")
-        response = mock_json_response(200, status)
+        response = mock_json_response(200, *status)
 
         with self.patch_hook_and_request_adapter(response):
             sensor = MSGraphSensor(
@@ -40,6 +41,7 @@ class TestMSGraphSensor(Base):
                 url="myorg/admin/workspaces/scanStatus/{scanId}",
                 path_parameters={"scanId": 
"0a1b1bf3-37de-48f7-9863-ed4cda97a9ef"},
                 result_processor=lambda context, result: result["id"],
+                retry_delay=5,
                 timeout=350.0,
             )
 
@@ -48,16 +50,22 @@ class TestMSGraphSensor(Base):
             assert sensor.path_parameters == {"scanId": 
"0a1b1bf3-37de-48f7-9863-ed4cda97a9ef"}
             assert isinstance(results, str)
             assert results == "0a1b1bf3-37de-48f7-9863-ed4cda97a9ef"
-            assert len(events) == 1
+            assert len(events) == 3
             assert isinstance(events[0], TriggerEvent)
             assert events[0].payload["status"] == "success"
             assert events[0].payload["type"] == "builtins.dict"
-            assert events[0].payload["response"] == json.dumps(status)
+            assert events[0].payload["response"] == json.dumps(status[0])
+            assert isinstance(events[1], TriggerEvent)
+            assert isinstance(events[1].payload, datetime)
+            assert isinstance(events[2], TriggerEvent)
+            assert events[2].payload["status"] == "success"
+            assert events[2].payload["type"] == "builtins.dict"
+            assert events[2].payload["response"] == json.dumps(status[1])
 
     @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Lambda parameters 
works in Airflow >= 2.10.0")
     def test_execute_with_lambda_parameter(self):
         status = load_json("resources", "status.json")
-        response = mock_json_response(200, status)
+        response = mock_json_response(200, *status)
 
         with self.patch_hook_and_request_adapter(response):
             sensor = MSGraphSensor(
@@ -66,6 +74,7 @@ class TestMSGraphSensor(Base):
                 url="myorg/admin/workspaces/scanStatus/{scanId}",
                 path_parameters=lambda context, jinja_env: {"scanId": 
"0a1b1bf3-37de-48f7-9863-ed4cda97a9ef"},
                 result_processor=lambda context, result: result["id"],
+                retry_delay=5,
                 timeout=350.0,
             )
 
@@ -74,11 +83,17 @@ class TestMSGraphSensor(Base):
             assert sensor.path_parameters == {"scanId": 
"0a1b1bf3-37de-48f7-9863-ed4cda97a9ef"}
             assert isinstance(results, str)
             assert results == "0a1b1bf3-37de-48f7-9863-ed4cda97a9ef"
-            assert len(events) == 1
+            assert len(events) == 3
             assert isinstance(events[0], TriggerEvent)
             assert events[0].payload["status"] == "success"
             assert events[0].payload["type"] == "builtins.dict"
-            assert events[0].payload["response"] == json.dumps(status)
+            assert events[0].payload["response"] == json.dumps(status[0])
+            assert isinstance(events[1], TriggerEvent)
+            assert isinstance(events[1].payload, datetime)
+            assert isinstance(events[2], TriggerEvent)
+            assert events[2].payload["status"] == "success"
+            assert events[2].payload["type"] == "builtins.dict"
+            assert events[2].payload["response"] == json.dumps(status[1])
 
     def test_template_fields(self):
         sensor = MSGraphSensor(
diff --git a/providers/tests/microsoft/conftest.py 
b/providers/tests/microsoft/conftest.py
index d875096402b..d4a65075f50 100644
--- a/providers/tests/microsoft/conftest.py
+++ b/providers/tests/microsoft/conftest.py
@@ -149,8 +149,10 @@ def mock_context(task) -> Context:
             run_id: str | None = None,
         ) -> Any:
             if map_indexes:
-                return values.get(f"{task_ids or self.task_id}_{dag_id or 
self.dag_id}_{key}_{map_indexes}")
-            return values.get(f"{task_ids or self.task_id}_{dag_id or 
self.dag_id}_{key}")
+                return values.get(
+                    f"{task_ids or self.task_id}_{dag_id or 
self.dag_id}_{key}_{map_indexes}", default
+                )
+            return values.get(f"{task_ids or self.task_id}_{dag_id or 
self.dag_id}_{key}", default)
 
         def xcom_push(self, key: str, value: Any, session: Session = 
NEW_SESSION, **kwargs) -> None:
             values[f"{self.task_id}_{self.dag_id}_{key}_{self.map_index}"] = 
value

Reply via email to