This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 82f07ca1153 Suppress deprecation warning when unpacking context in 
default_event_handler of MSGraphAsyncOperator (#47069)
82f07ca1153 is described below

commit 82f07ca115307e5452536643d8a94260bf918b53
Author: David Blain <[email protected]>
AuthorDate: Fri Apr 18 15:33:05 2025 +0200

    Suppress deprecation warning when unpacking context in 
default_event_handler of MSGraphAsyncOperator (#47069)
    
    * refactor: Make a copy of context items as dict to avoid deprecation 
warnings when unpacking in default_event_handler method of MSGraphAsyncOperator
    
    * refactor: Updated TestMSGraphAsyncOperator
    
    * refactor: Try to ignore the TypeError on warns(None)
    
    * refactor: Use warnings.catch_warnings(record=True) instead of 
pytest.warn(None) to avoid TypeError: exceptions must be derived from Warning, 
not <class 'NoneType'>
    
    * refactor: Don't copy context but suppress warnings message instead
    
    * refactor: Forgot to filter deprecation warnings
    
    * refactor: Also filter UserWarnings as AirflowContextDeprecationWarning 
raise by Context extends from it
    
    * refactor: Try to make import of AirflowContextDeprecationWarning cross 
Airflow version compatible
    
    * refactor: Added openlineage module to Microsoft Azure provider
    
    * refactor: Added microsoft.aure provider in 
selected-providers-list-as-string for trigger openlineage and related providers 
tests when Assets files changed
    
    * refactor: Added microsoft.aure provider in docs-list-as-string for 
trigger openlineage and related providers tests when Assets files changed
    
    * refactor: Adapted providers-test-types-list-as-strings-in-json
    
    * refactor: Added openlineage as optional dependency in provider_info
    
    * refactor: Invert import of AirflowContextDeprecationWarning
    
    * refactor: Redefine AirflowContextDeprecationWarning if Airflow 3 or higher
    
    * Revert "refactor: Added openlineage as optional dependency in 
provider_info"
    
    This reverts commit b8cdad5ba47c2d3825623b17b7fe67d7621dc0bf.
    
    * Revert "refactor: Adapted providers-test-types-list-as-strings-in-json"
    
    This reverts commit b4dbfc0407815dcc26721345022f94c8c1ae6746.
    
    * Revert "refactor: Added microsoft.aure provider in docs-list-as-string 
for trigger openlineage and related providers tests when Assets files changed"
    
    This reverts commit a3e6ac694d2709c78f3c9f1e094ed1e5ea8ada51.
    
    * Revert "refactor: Added microsoft.aure provider in 
selected-providers-list-as-string for trigger openlineage and related providers 
tests when Assets files changed"
    
    This reverts commit 32abded7af06b81460c8b1955f78458b5ca96ed4.
    
    * refactor: Also filter UserWarnings
    
    * refactor: Catch AttributeError instead of ImportError
    
    * refactor: Try to satisfy mypy regarding AirflowContextDeprecationWarning
    
    * refactor: Removed openlineage from README.rst
    
    * refactor: Removed openlineage from pyproject.toml
    
    * refactor: Handle AirflowContextDeprecationWarning directly within 
execute_callable method
    
    * refactor: Ignore MyPy AttributeError
    
    * refactor: Suppress AttributeError: module 'os' has no attribute 
'register_at_fork' when running test on Windows machine
    
    * refactor: Also suppress ImportError for AirflowContextDeprecationWarning 
in execute_callback
    
    * refactor: Instead of suppressing AttributeError, which is dangerous, 
check platform instead and if not windows then register fork
    
    * refactor: Reorganized imports settings
    
    * refactor: Reformatted execute_callback
    
    * refactor: Move mypy ignore one line up
    
    ---------
    
    Co-authored-by: David Blain <[email protected]>
    Co-authored-by: David Blain <[email protected]>
---
 airflow-core/src/airflow/settings.py               | 21 +++++++------
 .../providers/microsoft/azure/operators/msgraph.py | 27 +++++++++++++---
 .../unit/microsoft/azure/operators/test_msgraph.py | 36 ++++++++++++----------
 3 files changed, 54 insertions(+), 30 deletions(-)

diff --git a/airflow-core/src/airflow/settings.py 
b/airflow-core/src/airflow/settings.py
index ce36f1e676a..076aab43e27 100644
--- a/airflow-core/src/airflow/settings.py
+++ b/airflow-core/src/airflow/settings.py
@@ -22,6 +22,7 @@ import functools
 import json
 import logging
 import os
+import platform
 import sys
 import warnings
 from importlib import metadata
@@ -384,15 +385,17 @@ def configure_orm(disable_connection_pool=False, 
pool_class=None):
     NonScopedSession = _session_maker(engine)
     Session = scoped_session(NonScopedSession)
 
-    # 
https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork
-    def clean_in_fork():
-        _globals = globals()
-        if engine := _globals.get("engine"):
-            engine.dispose(close=False)
-        if async_engine := _globals.get("async_engine"):
-            async_engine.sync_engine.dispose(close=False)
-
-    os.register_at_fork(after_in_child=clean_in_fork)
+    if not platform.system() == "Windows":
+        # 
https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork
+        def clean_in_fork():
+            _globals = globals()
+            if engine := _globals.get("engine"):
+                engine.dispose(close=False)
+            if async_engine := _globals.get("async_engine"):
+                async_engine.sync_engine.dispose(close=False)
+
+        # Won't work on Windows
+        os.register_at_fork(after_in_child=clean_in_fork)
 
 
 DEFAULT_ENGINE_ARGS = {
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/msgraph.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/msgraph.py
index 241b98431f0..31d342ccc14 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/msgraph.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/msgraph.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 
 import warnings
 from collections.abc import Sequence
+from contextlib import suppress
 from copy import deepcopy
 from typing import (
     TYPE_CHECKING,
@@ -58,7 +59,16 @@ def execute_callable(
     message: str,
 ) -> Any:
     try:
-        return func(value, **context)  # type: ignore
+        with warnings.catch_warnings():
+            with suppress(AttributeError, ImportError):
+                from airflow.utils.context import (  # type: 
ignore[attr-defined]
+                    AirflowContextDeprecationWarning,
+                )
+
+                warnings.filterwarnings("ignore", 
category=AirflowContextDeprecationWarning)
+            warnings.simplefilter("ignore", category=DeprecationWarning)
+            warnings.simplefilter("ignore", category=UserWarning)
+            return func(value, **context)  # type: ignore
     except TypeError:
         warnings.warn(
             message,
@@ -216,8 +226,6 @@ class MSGraphAsyncOperator(BaseOperator):
 
                 self.log.debug("processed response: %s", result)
 
-                event["response"] = result
-
                 try:
                     self.trigger_next_link(
                         response=response, 
method_name=self.execute_complete.__name__, context=context
@@ -293,7 +301,13 @@ class MSGraphAsyncOperator(BaseOperator):
     def push_xcom(self, context: Any, value) -> None:
         self.log.debug("do_xcom_push: %s", self.do_xcom_push)
         if self.do_xcom_push:
-            self.log.info("Pushing XCom with key '%s': %s", self.key, value)
+            self.log.info(
+                "Pushing XCom with task_id '%s' and dag_id '%s' and key '%s': 
%s",
+                self.task_id,
+                self.dag_id,
+                self.key,
+                value,
+            )
             self.xcom_push(context=context, key=self.key, value=value)
 
     @staticmethod
@@ -316,7 +330,10 @@ class MSGraphAsyncOperator(BaseOperator):
     def trigger_next_link(self, response, method_name: str, context: Context) 
-> None:
         if isinstance(response, dict):
             try:
-                url, query_parameters = self.pagination_function(self, 
response, **dict(context.items()))  # type: ignore
+                with warnings.catch_warnings():
+                    warnings.simplefilter("ignore", 
category=DeprecationWarning)
+                    warnings.filterwarnings("ignore", category=UserWarning)
+                    url, query_parameters = self.pagination_function(self, 
response, **context)  # type: ignore
             except TypeError:
                 warnings.warn(
                     "pagination_function signature has changed, context 
parameter should be a kwargs argument!",
diff --git 
a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_msgraph.py
 
b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_msgraph.py
index 458171ec6f7..e33c71f39f1 100644
--- 
a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_msgraph.py
+++ 
b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_msgraph.py
@@ -18,9 +18,10 @@ from __future__ import annotations
 
 import json
 import locale
+import warnings
 from base64 import b64encode
 from os.path import dirname
-from typing import TYPE_CHECKING, Any
+from typing import Any
 
 import pytest
 
@@ -36,12 +37,11 @@ from tests_common.test_utils.version_compat import 
AIRFLOW_V_2_10_PLUS
 from unit.microsoft.azure.base import Base
 from unit.microsoft.azure.test_utils import mock_json_response, mock_response
 
-if TYPE_CHECKING:
-    try:
-        from airflow.sdk.definitions.context import Context
-    except ImportError:
-        # TODO: Remove once provider drops support for Airflow 2
-        from airflow.utils.context import Context
+try:
+    from airflow.sdk.definitions.context import Context
+except ImportError:
+    # TODO: Remove once provider drops support for Airflow 2
+    from airflow.utils.context import Context
 
 
 class TestMSGraphAsyncOperator(Base):
@@ -336,17 +336,21 @@ class TestMSGraphAsyncOperator(Base):
                 execute_callable(
                     lambda context, response: response,
                     "response",
-                    {"execution_date": timezone.utcnow()},
+                    Context({"execution_date": timezone.utcnow()}),
                     "result_processor signature has changed, result parameter 
should be defined before context!",
                 )
                 == "response"
             )
-        assert (
-            execute_callable(
-                lambda response, **context: response,
-                "response",
-                {"execution_date": timezone.utcnow()},
-                "result_processor signature has changed, result parameter 
should be defined before context!",
+
+        with warnings.catch_warnings(record=True) as recorded_warnings:
+            warnings.simplefilter("error")  # Treat warnings as errors
+            assert (
+                execute_callable(
+                    lambda response, **context: response,
+                    "response",
+                    Context({"execution_date": timezone.utcnow()}),
+                    "result_processor signature has changed, result parameter 
should be defined before context!",
+                )
+                == "response"
             )
-            == "response"
-        )
+            assert len(recorded_warnings) == 0

Reply via email to