This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 82f07ca1153 Suppress deprecation warning when unpacking context in
default_event_handler of MSGraphAsyncOperator (#47069)
82f07ca1153 is described below
commit 82f07ca115307e5452536643d8a94260bf918b53
Author: David Blain <[email protected]>
AuthorDate: Fri Apr 18 15:33:05 2025 +0200
Suppress deprecation warning when unpacking context in
default_event_handler of MSGraphAsyncOperator (#47069)
* refactor: Make a copy of context items as dict to avoid deprecation
warnings when unpacking in default_event_handler method of MSGraphAsyncOperator
* refactor: Updated TestMSGraphAsyncOperator
* refactor: Try to ignore the TypeError on warns(None)
* refactor: Use warnings.catch_warnings(record=True) instead of
pytest.warn(None) to avoid TypeError: exceptions must be derived from Warning,
not <class 'NoneType'>
* refactor: Don't copy context but suppress warnings message instead
* refactor: Forgot to filter deprecation warnings
* refactor: Also filter UserWarnings as AirflowContextDeprecationWarning
raise by Context extends from it
* refactor: Try to make import of AirflowContextDeprecationWarning cross
Airflow version compatible
* refactor: Added openlineage module to Microsoft Azure provider
* refactor: Added microsoft.aure provider in
selected-providers-list-as-string for trigger openlineage and related providers
tests when Assets files changed
* refactor: Added microsoft.aure provider in docs-list-as-string for
trigger openlineage and related providers tests when Assets files changed
* refactor: Adapted providers-test-types-list-as-strings-in-json
* refactor: Added openlineage as optional dependency in provider_info
* refactor: Invert import of AirflowContextDeprecationWarning
* refactor: Redefine AirflowContextDeprecationWarning if Airflow 3 or higher
* Revert "refactor: Added openlineage as optional dependency in
provider_info"
This reverts commit b8cdad5ba47c2d3825623b17b7fe67d7621dc0bf.
* Revert "refactor: Adapted providers-test-types-list-as-strings-in-json"
This reverts commit b4dbfc0407815dcc26721345022f94c8c1ae6746.
* Revert "refactor: Added microsoft.aure provider in docs-list-as-string
for trigger openlineage and related providers tests when Assets files changed"
This reverts commit a3e6ac694d2709c78f3c9f1e094ed1e5ea8ada51.
* Revert "refactor: Added microsoft.aure provider in
selected-providers-list-as-string for trigger openlineage and related providers
tests when Assets files changed"
This reverts commit 32abded7af06b81460c8b1955f78458b5ca96ed4.
* refactor: Also filter UserWarnings
* refactor: Catch AttributeError instead of ImportError
* refactor: Try to satisfy mypy regarding AirflowContextDeprecationWarning
* refactor: Removed openlineage from README.rst
* refactor: Removed openlineage from pyproject.toml
* refactor: Handle AirflowContextDeprecationWarning directly within
execute_callable method
* refactor: Ignore MyPy AttributeError
* refactor: Suppress AttributeError: module 'os' has no attribute
'register_at_fork' when running test on Windows machine
* refactor: Also suppress ImportError for AirflowContextDeprecationWarning
in execute_callback
* refactor: Instead of suppressing AttributeError, which is dangerous,
check platform instead and if not windows then register fork
* refactor: Reorganized imports settings
* refactor: Reformatted execute_callback
* refactor: Move mypy ignore one line up
---------
Co-authored-by: David Blain <[email protected]>
Co-authored-by: David Blain <[email protected]>
---
airflow-core/src/airflow/settings.py | 21 +++++++------
.../providers/microsoft/azure/operators/msgraph.py | 27 +++++++++++++---
.../unit/microsoft/azure/operators/test_msgraph.py | 36 ++++++++++++----------
3 files changed, 54 insertions(+), 30 deletions(-)
diff --git a/airflow-core/src/airflow/settings.py
b/airflow-core/src/airflow/settings.py
index ce36f1e676a..076aab43e27 100644
--- a/airflow-core/src/airflow/settings.py
+++ b/airflow-core/src/airflow/settings.py
@@ -22,6 +22,7 @@ import functools
import json
import logging
import os
+import platform
import sys
import warnings
from importlib import metadata
@@ -384,15 +385,17 @@ def configure_orm(disable_connection_pool=False,
pool_class=None):
NonScopedSession = _session_maker(engine)
Session = scoped_session(NonScopedSession)
- #
https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork
- def clean_in_fork():
- _globals = globals()
- if engine := _globals.get("engine"):
- engine.dispose(close=False)
- if async_engine := _globals.get("async_engine"):
- async_engine.sync_engine.dispose(close=False)
-
- os.register_at_fork(after_in_child=clean_in_fork)
+ if not platform.system() == "Windows":
+ #
https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork
+ def clean_in_fork():
+ _globals = globals()
+ if engine := _globals.get("engine"):
+ engine.dispose(close=False)
+ if async_engine := _globals.get("async_engine"):
+ async_engine.sync_engine.dispose(close=False)
+
+ # Won't work on Windows
+ os.register_at_fork(after_in_child=clean_in_fork)
DEFAULT_ENGINE_ARGS = {
diff --git
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/msgraph.py
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/msgraph.py
index 241b98431f0..31d342ccc14 100644
---
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/msgraph.py
+++
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/msgraph.py
@@ -19,6 +19,7 @@ from __future__ import annotations
import warnings
from collections.abc import Sequence
+from contextlib import suppress
from copy import deepcopy
from typing import (
TYPE_CHECKING,
@@ -58,7 +59,16 @@ def execute_callable(
message: str,
) -> Any:
try:
- return func(value, **context) # type: ignore
+ with warnings.catch_warnings():
+ with suppress(AttributeError, ImportError):
+ from airflow.utils.context import ( # type:
ignore[attr-defined]
+ AirflowContextDeprecationWarning,
+ )
+
+ warnings.filterwarnings("ignore",
category=AirflowContextDeprecationWarning)
+ warnings.simplefilter("ignore", category=DeprecationWarning)
+ warnings.simplefilter("ignore", category=UserWarning)
+ return func(value, **context) # type: ignore
except TypeError:
warnings.warn(
message,
@@ -216,8 +226,6 @@ class MSGraphAsyncOperator(BaseOperator):
self.log.debug("processed response: %s", result)
- event["response"] = result
-
try:
self.trigger_next_link(
response=response,
method_name=self.execute_complete.__name__, context=context
@@ -293,7 +301,13 @@ class MSGraphAsyncOperator(BaseOperator):
def push_xcom(self, context: Any, value) -> None:
self.log.debug("do_xcom_push: %s", self.do_xcom_push)
if self.do_xcom_push:
- self.log.info("Pushing XCom with key '%s': %s", self.key, value)
+ self.log.info(
+ "Pushing XCom with task_id '%s' and dag_id '%s' and key '%s':
%s",
+ self.task_id,
+ self.dag_id,
+ self.key,
+ value,
+ )
self.xcom_push(context=context, key=self.key, value=value)
@staticmethod
@@ -316,7 +330,10 @@ class MSGraphAsyncOperator(BaseOperator):
def trigger_next_link(self, response, method_name: str, context: Context)
-> None:
if isinstance(response, dict):
try:
- url, query_parameters = self.pagination_function(self,
response, **dict(context.items())) # type: ignore
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore",
category=DeprecationWarning)
+ warnings.filterwarnings("ignore", category=UserWarning)
+ url, query_parameters = self.pagination_function(self,
response, **context) # type: ignore
except TypeError:
warnings.warn(
"pagination_function signature has changed, context
parameter should be a kwargs argument!",
diff --git
a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_msgraph.py
b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_msgraph.py
index 458171ec6f7..e33c71f39f1 100644
---
a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_msgraph.py
+++
b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_msgraph.py
@@ -18,9 +18,10 @@ from __future__ import annotations
import json
import locale
+import warnings
from base64 import b64encode
from os.path import dirname
-from typing import TYPE_CHECKING, Any
+from typing import Any
import pytest
@@ -36,12 +37,11 @@ from tests_common.test_utils.version_compat import
AIRFLOW_V_2_10_PLUS
from unit.microsoft.azure.base import Base
from unit.microsoft.azure.test_utils import mock_json_response, mock_response
-if TYPE_CHECKING:
- try:
- from airflow.sdk.definitions.context import Context
- except ImportError:
- # TODO: Remove once provider drops support for Airflow 2
- from airflow.utils.context import Context
+try:
+ from airflow.sdk.definitions.context import Context
+except ImportError:
+ # TODO: Remove once provider drops support for Airflow 2
+ from airflow.utils.context import Context
class TestMSGraphAsyncOperator(Base):
@@ -336,17 +336,21 @@ class TestMSGraphAsyncOperator(Base):
execute_callable(
lambda context, response: response,
"response",
- {"execution_date": timezone.utcnow()},
+ Context({"execution_date": timezone.utcnow()}),
"result_processor signature has changed, result parameter
should be defined before context!",
)
== "response"
)
- assert (
- execute_callable(
- lambda response, **context: response,
- "response",
- {"execution_date": timezone.utcnow()},
- "result_processor signature has changed, result parameter
should be defined before context!",
+
+ with warnings.catch_warnings(record=True) as recorded_warnings:
+ warnings.simplefilter("error") # Treat warnings as errors
+ assert (
+ execute_callable(
+ lambda response, **context: response,
+ "response",
+ Context({"execution_date": timezone.utcnow()}),
+ "result_processor signature has changed, result parameter
should be defined before context!",
+ )
+ == "response"
)
- == "response"
- )
+ assert len(recorded_warnings) == 0