This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new aa36f045d73 Move determine_kwargs and KeywordParameters to SDK 
DecoratedOperator (#62746)
aa36f045d73 is described below

commit aa36f045d732e392dd7f43b1606855649044cf55
Author: Kaxil Naik <[email protected]>
AuthorDate: Tue Mar 3 02:32:12 2026 +0000

    Move determine_kwargs and KeywordParameters to SDK DecoratedOperator 
(#62746)
    
    Move `KeywordParameters` class and `determine_kwargs()` from
    `airflow.utils.operator_helpers` to 
`task-sdk/src/airflow/sdk/bases/decorator.py`,
    next to `DecoratedOperator` which owns `python_callable` and `op_args`.
    
    Add a `determine_kwargs()` instance method on `DecoratedOperator` (mirrors
    the existing pattern on `PythonOperator`).
    
    All 6 decorated operators now import the standalone `determine_kwargs` from
    `common.compat.sdk` instead of `airflow.utils.operator_helpers`. Using the
    compat layer (rather than the instance method) handles the case where a 
newer
    provider runs against an older Task SDK whose `DecoratedOperator` lacks the
    method. The compat fallback resolves to `airflow.utils.operator_helpers` on
    older Airflow versions.
    
    Other changes:
    - `operator_helpers.py` re-exports via `add_deprecated_classes` so existing
      callers get a deprecation warning pointing to the SDK location
    - `PythonOperator` imports `KeywordParameters` from compat SDK
    - Affected provider pyproject.toml files marked with `# use next version`
      for the common-compat dependency
    - Cleaned up stale `serializing()` reference in `KeywordParameters` 
docstring
    - Removed redundant local `import itertools` (already at module level)
---
 airflow-core/src/airflow/utils/operator_helpers.py | 89 +++++-----------------
 providers/cncf/kubernetes/pyproject.toml           |  2 +-
 .../cncf/kubernetes/decorators/kubernetes_cmd.py   |  2 +-
 .../airflow/providers/common/ai/decorators/llm.py  |  2 +-
 .../providers/common/ai/decorators/llm_sql.py      |  2 +-
 .../src/airflow/providers/common/compat/sdk.py     |  4 +
 providers/common/sql/pyproject.toml                |  2 +-
 .../providers/common/sql/decorators/analytics.py   |  2 +-
 .../airflow/providers/common/sql/decorators/sql.py |  2 +-
 providers/standard/pyproject.toml                  |  2 +-
 .../airflow/providers/standard/decorators/bash.py  |  2 +-
 .../airflow/providers/standard/operators/python.py |  8 +-
 task-sdk/src/airflow/sdk/bases/decorator.py        | 64 ++++++++++++++++
 13 files changed, 101 insertions(+), 82 deletions(-)

diff --git a/airflow-core/src/airflow/utils/operator_helpers.py 
b/airflow-core/src/airflow/utils/operator_helpers.py
index 8b8d339b157..5de5a74eda6 100644
--- a/airflow-core/src/airflow/utils/operator_helpers.py
+++ b/airflow-core/src/airflow/utils/operator_helpers.py
@@ -17,82 +17,27 @@
 # under the License.
 from __future__ import annotations
 
-import inspect
-from collections.abc import Callable, Collection, Mapping
-from typing import Any, TypeVar
+from collections.abc import Callable
+from typing import TYPE_CHECKING, TypeVar
 
-R = TypeVar("R")
-
-
-class KeywordParameters:
-    """
-    Wrapper representing ``**kwargs`` to a callable.
-
-    The actual ``kwargs`` can be obtained by calling either ``unpacking()`` or
-    ``serializing()``. They behave almost the same and are only different if
-    the containing ``kwargs`` is an Airflow Context object, and the calling
-    function uses ``**kwargs`` in the argument list.
-
-    In this particular case, ``unpacking()`` uses ``lazy-object-proxy`` to
-    prevent the Context from emitting deprecation warnings too eagerly when 
it's
-    unpacked by ``**``. ``serializing()`` does not do this, and will allow the
-    warnings to be emitted eagerly, which is useful when you want to dump the
-    content and use it somewhere else without needing ``lazy-object-proxy``.
-    """
-
-    def __init__(self, kwargs: Mapping[str, Any]) -> None:
-        self._kwargs = kwargs
-
-    @classmethod
-    def determine(
-        cls,
-        func: Callable[..., Any],
-        args: Collection[Any],
-        kwargs: Mapping[str, Any],
-    ) -> KeywordParameters:
-        import itertools
+from airflow.utils.deprecation_tools import add_deprecated_classes
 
-        signature = inspect.signature(func)
-        has_wildcard_kwargs = any(p.kind == p.VAR_KEYWORD for p in 
signature.parameters.values())
+if TYPE_CHECKING:
+    from airflow.sdk.bases.decorator import KeywordParameters, 
determine_kwargs  # noqa: F401
 
-        for name, param in itertools.islice(signature.parameters.items(), 
len(args)):
-            # Keyword-only arguments can't be passed positionally and are not 
checked.
-            if param.kind == inspect.Parameter.KEYWORD_ONLY:
-                continue
-            if param.kind == inspect.Parameter.VAR_KEYWORD:
-                continue
+__all__ = ["make_kwargs_callable"]
 
-            # Check if args conflict with names in kwargs.
-            if name in kwargs:
-                raise ValueError(f"The key {name!r} in args is a part of 
kwargs and therefore reserved.")
-
-        if has_wildcard_kwargs:
-            # If the callable has a **kwargs argument, it's ready to accept 
all the kwargs.
-            return cls(kwargs)
-
-        # If the callable has no **kwargs argument, it only wants the 
arguments it requested.
-        filtered_kwargs = {key: kwargs[key] for key in signature.parameters if 
key in kwargs}
-        return cls(filtered_kwargs)
-
-    def unpacking(self) -> Mapping[str, Any]:
-        """Dump the kwargs mapping to unpack with ``**`` in a function call."""
-        return self._kwargs
-
-
-def determine_kwargs(
-    func: Callable[..., Any],
-    args: Collection[Any],
-    kwargs: Mapping[str, Any],
-) -> Mapping[str, Any]:
-    """
-    Inspect the signature of a callable to determine which kwargs need to be 
passed to the callable.
+R = TypeVar("R")
 
-    :param func: The callable that you want to invoke
-    :param args: The positional arguments that need to be passed to the 
callable, so we know how many to skip.
-    :param kwargs: The keyword arguments that need to be filtered before 
passing to the callable.
-    :return: A dictionary which contains the keyword arguments that are 
compatible with the callable.
-    """
-    return KeywordParameters.determine(func, args, kwargs).unpacking()
+add_deprecated_classes(
+    {
+        __name__: {
+            "KeywordParameters": 
"airflow.sdk.bases.decorator.KeywordParameters",
+            "determine_kwargs": "airflow.sdk.bases.decorator.determine_kwargs",
+        },
+    },
+    package=__name__,
+)
 
 
 def make_kwargs_callable(func: Callable[..., R]) -> Callable[..., R]:
@@ -104,6 +49,8 @@ def make_kwargs_callable(func: Callable[..., R]) -> 
Callable[..., R]:
     """
     import functools
 
+    from airflow.sdk.bases.decorator import determine_kwargs
+
     @functools.wraps(func)
     def kwargs_func(*args, **kwargs):
         kwargs = determine_kwargs(func, args, kwargs)
diff --git a/providers/cncf/kubernetes/pyproject.toml 
b/providers/cncf/kubernetes/pyproject.toml
index bbefa2830f1..da26bb1c01a 100644
--- a/providers/cncf/kubernetes/pyproject.toml
+++ b/providers/cncf/kubernetes/pyproject.toml
@@ -60,7 +60,7 @@ requires-python = ">=3.10"
 dependencies = [
     "aiofiles>=23.2.0",
     "apache-airflow>=2.11.0",
-    "apache-airflow-providers-common-compat>=1.13.0",
+    "apache-airflow-providers-common-compat>=1.13.0",  # use next version
     "asgiref>=3.5.2",
     # TODO(potiuk): We should bump cryptography to >=46.0.0 when 
sqlalchemy>=2.0 is required
     "cryptography>=41.0.0,<46.0.0",
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py
index 3e710ce48e3..fb15f5097bc 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py
@@ -25,9 +25,9 @@ from airflow.providers.common.compat.sdk import (
     DecoratedOperator,
     TaskDecorator,
     context_merge,
+    determine_kwargs,
     task_decorator_factory,
 )
-from airflow.utils.operator_helpers import determine_kwargs
 
 if TYPE_CHECKING:
     from airflow.sdk import Context
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/decorators/llm.py 
b/providers/common/ai/src/airflow/providers/common/ai/decorators/llm.py
index 04c194aed09..f21bcd343c0 100644
--- a/providers/common/ai/src/airflow/providers/common/ai/decorators/llm.py
+++ b/providers/common/ai/src/airflow/providers/common/ai/decorators/llm.py
@@ -33,10 +33,10 @@ from airflow.providers.common.compat.sdk import (
     DecoratedOperator,
     TaskDecorator,
     context_merge,
+    determine_kwargs,
     task_decorator_factory,
 )
 from airflow.sdk.definitions._internal.types import SET_DURING_EXECUTION
-from airflow.utils.operator_helpers import determine_kwargs
 
 if TYPE_CHECKING:
     from airflow.sdk import Context
diff --git 
a/providers/common/ai/src/airflow/providers/common/ai/decorators/llm_sql.py 
b/providers/common/ai/src/airflow/providers/common/ai/decorators/llm_sql.py
index 25fa57b5d14..d0ebb1a9bb0 100644
--- a/providers/common/ai/src/airflow/providers/common/ai/decorators/llm_sql.py
+++ b/providers/common/ai/src/airflow/providers/common/ai/decorators/llm_sql.py
@@ -32,10 +32,10 @@ from airflow.providers.common.compat.sdk import (
     DecoratedOperator,
     TaskDecorator,
     context_merge,
+    determine_kwargs,
     task_decorator_factory,
 )
 from airflow.sdk.definitions._internal.types import SET_DURING_EXECUTION
-from airflow.utils.operator_helpers import determine_kwargs
 
 if TYPE_CHECKING:
     from airflow.sdk import Context
diff --git a/providers/common/compat/src/airflow/providers/common/compat/sdk.py 
b/providers/common/compat/src/airflow/providers/common/compat/sdk.py
index e444085e0d8..bcd28c4799f 100644
--- a/providers/common/compat/src/airflow/providers/common/compat/sdk.py
+++ b/providers/common/compat/src/airflow/providers/common/compat/sdk.py
@@ -73,7 +73,9 @@ if TYPE_CHECKING:
     from airflow.sdk.bases.decorator import (
         DecoratedMappedOperator as DecoratedMappedOperator,
         DecoratedOperator as DecoratedOperator,
+        KeywordParameters as KeywordParameters,
         TaskDecorator as TaskDecorator,
+        determine_kwargs as determine_kwargs,
         get_unique_task_id as get_unique_task_id,
         task_decorator_factory as task_decorator_factory,
     )
@@ -167,6 +169,8 @@ _IMPORT_MAP: dict[str, str | tuple[str, ...]] = {
     "TaskDecorator": ("airflow.sdk.bases.decorator", "airflow.decorators"),
     "task_decorator_factory": ("airflow.sdk.bases.decorator", 
"airflow.decorators.base"),
     "get_unique_task_id": ("airflow.sdk.bases.decorator", 
"airflow.decorators.base"),
+    "KeywordParameters": ("airflow.sdk.bases.decorator", 
"airflow.utils.operator_helpers"),
+    "determine_kwargs": ("airflow.sdk.bases.decorator", 
"airflow.utils.operator_helpers"),
     # 
============================================================================
     # Models
     # 
============================================================================
diff --git a/providers/common/sql/pyproject.toml 
b/providers/common/sql/pyproject.toml
index 0b5cd1d0f86..cd9484e49ea 100644
--- a/providers/common/sql/pyproject.toml
+++ b/providers/common/sql/pyproject.toml
@@ -59,7 +59,7 @@ requires-python = ">=3.10"
 # After you modify the dependencies, and rebuild your Breeze CI image with 
``breeze ci-image build``
 dependencies = [
     "apache-airflow>=2.11.0",
-    "apache-airflow-providers-common-compat>=1.12.0",
+    "apache-airflow-providers-common-compat>=1.12.0",  # use next version
     "sqlparse>=0.5.1",
     "more-itertools>=9.0.0",
     # The methodtools dependency is necessary since the introduction of 
dialects:
diff --git 
a/providers/common/sql/src/airflow/providers/common/sql/decorators/analytics.py 
b/providers/common/sql/src/airflow/providers/common/sql/decorators/analytics.py
index 29fb66d872a..1c297ce8254 100644
--- 
a/providers/common/sql/src/airflow/providers/common/sql/decorators/analytics.py
+++ 
b/providers/common/sql/src/airflow/providers/common/sql/decorators/analytics.py
@@ -25,10 +25,10 @@ from airflow.providers.common.compat.sdk import (
     DecoratedOperator,
     TaskDecorator,
     context_merge,
+    determine_kwargs,
     task_decorator_factory,
 )
 from airflow.providers.common.sql.operators.analytics import AnalyticsOperator
-from airflow.utils.operator_helpers import determine_kwargs
 
 if AIRFLOW_V_3_0_PLUS:
     from airflow.sdk.definitions._internal.types import SET_DURING_EXECUTION
diff --git 
a/providers/common/sql/src/airflow/providers/common/sql/decorators/sql.py 
b/providers/common/sql/src/airflow/providers/common/sql/decorators/sql.py
index 46eac567a6c..72f9b48da7f 100644
--- a/providers/common/sql/src/airflow/providers/common/sql/decorators/sql.py
+++ b/providers/common/sql/src/airflow/providers/common/sql/decorators/sql.py
@@ -25,10 +25,10 @@ from airflow.providers.common.compat.sdk import (
     DecoratedOperator,
     TaskDecorator,
     context_merge,
+    determine_kwargs,
     task_decorator_factory,
 )
 from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
-from airflow.utils.operator_helpers import determine_kwargs
 
 if AIRFLOW_V_3_0_PLUS:
     from airflow.sdk.definitions._internal.types import SET_DURING_EXECUTION
diff --git a/providers/standard/pyproject.toml 
b/providers/standard/pyproject.toml
index 86d14eb3e4b..d6d9df1972e 100644
--- a/providers/standard/pyproject.toml
+++ b/providers/standard/pyproject.toml
@@ -59,7 +59,7 @@ requires-python = ">=3.10"
 # After you modify the dependencies, and rebuild your Breeze CI image with 
``breeze ci-image build``
 dependencies = [
     "apache-airflow>=2.11.0",
-    "apache-airflow-providers-common-compat>=1.13.0",
+    "apache-airflow-providers-common-compat>=1.13.0",  # use next version
 ]
 
 # The optional dependencies should be modified in place in the generated file
diff --git 
a/providers/standard/src/airflow/providers/standard/decorators/bash.py 
b/providers/standard/src/airflow/providers/standard/decorators/bash.py
index e415223fc01..169a8dea58a 100644
--- a/providers/standard/src/airflow/providers/standard/decorators/bash.py
+++ b/providers/standard/src/airflow/providers/standard/decorators/bash.py
@@ -25,11 +25,11 @@ from airflow.providers.common.compat.sdk import (
     DecoratedOperator,
     TaskDecorator,
     context_merge,
+    determine_kwargs,
     task_decorator_factory,
 )
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.sdk.definitions._internal.types import SET_DURING_EXECUTION
-from airflow.utils.operator_helpers import determine_kwargs
 
 if TYPE_CHECKING:
     from airflow.providers.common.compat.sdk import Context
diff --git 
a/providers/standard/src/airflow/providers/standard/operators/python.py 
b/providers/standard/src/airflow/providers/standard/operators/python.py
index 7e6bbd166a6..cba2c1f31dd 100644
--- a/providers/standard/src/airflow/providers/standard/operators/python.py
+++ b/providers/standard/src/airflow/providers/standard/operators/python.py
@@ -47,7 +47,12 @@ from airflow.exceptions import (
     DeserializingResultError,
 )
 from airflow.models.variable import Variable
-from airflow.providers.common.compat.sdk import AirflowException, 
AirflowSkipException, context_merge
+from airflow.providers.common.compat.sdk import (
+    AirflowException,
+    AirflowSkipException,
+    KeywordParameters,
+    context_merge,
+)
 from airflow.providers.common.compat.standard.operators import (
     BaseAsyncOperator,
     is_async_callable,
@@ -61,7 +66,6 @@ from airflow.providers.standard.utils.python_virtualenv 
import (
 from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, 
AIRFLOW_V_3_2_PLUS
 from airflow.utils import hashlib_wrapper
 from airflow.utils.file import get_unique_dag_module_name
-from airflow.utils.operator_helpers import KeywordParameters
 
 if AIRFLOW_V_3_0_PLUS:
     from airflow.providers.standard.operators.branch import BaseBranchOperator
diff --git a/task-sdk/src/airflow/sdk/bases/decorator.py 
b/task-sdk/src/airflow/sdk/bases/decorator.py
index 131750ae8c9..53565c531a6 100644
--- a/task-sdk/src/airflow/sdk/bases/decorator.py
+++ b/task-sdk/src/airflow/sdk/bases/decorator.py
@@ -192,6 +192,67 @@ def is_async_callable(func):
     return False
 
 
+class KeywordParameters:
+    """
+    Wrapper representing ``**kwargs`` to a callable.
+
+    The actual ``kwargs`` can be obtained by calling ``unpacking()``, which
+    returns the mapping suitable for unpacking with ``**`` in a function call.
+    """
+
+    def __init__(self, kwargs: Mapping[str, Any]) -> None:
+        self._kwargs = kwargs
+
+    @classmethod
+    def determine(
+        cls,
+        func: Callable[..., Any],
+        args: Collection[Any],
+        kwargs: Mapping[str, Any],
+    ) -> KeywordParameters:
+        signature = inspect.signature(func)
+        has_wildcard_kwargs = any(p.kind == p.VAR_KEYWORD for p in 
signature.parameters.values())
+
+        for name, param in itertools.islice(signature.parameters.items(), 
len(args)):
+            # Keyword-only arguments can't be passed positionally and are not 
checked.
+            if param.kind == inspect.Parameter.KEYWORD_ONLY:
+                continue
+            if param.kind == inspect.Parameter.VAR_KEYWORD:
+                continue
+
+            # Check if args conflict with names in kwargs.
+            if name in kwargs:
+                raise ValueError(f"The key {name!r} in args is a part of 
kwargs and therefore reserved.")
+
+        if has_wildcard_kwargs:
+            # If the callable has a **kwargs argument, it's ready to accept 
all the kwargs.
+            return cls(kwargs)
+
+        # If the callable has no **kwargs argument, it only wants the 
arguments it requested.
+        filtered_kwargs = {key: kwargs[key] for key in signature.parameters if 
key in kwargs}
+        return cls(filtered_kwargs)
+
+    def unpacking(self) -> Mapping[str, Any]:
+        """Dump the kwargs mapping to unpack with ``**`` in a function call."""
+        return self._kwargs
+
+
+def determine_kwargs(
+    func: Callable[..., Any],
+    args: Collection[Any],
+    kwargs: Mapping[str, Any],
+) -> Mapping[str, Any]:
+    """
+    Inspect the signature of a callable to determine which kwargs need to be 
passed to the callable.
+
+    :param func: The callable that you want to invoke
+    :param args: The positional arguments that need to be passed to the 
callable, so we know how many to skip.
+    :param kwargs: The keyword arguments that need to be filtered before 
passing to the callable.
+    :return: A dictionary which contains the keyword arguments that are 
compatible with the callable.
+    """
+    return KeywordParameters.determine(func, args, kwargs).unpacking()
+
+
 class DecoratedOperator(BaseOperator):
     """
     Wraps a Python callable and captures args/kwargs when called for execution.
@@ -331,6 +392,9 @@ class DecoratedOperator(BaseOperator):
         kwargs["op_kwargs"] = op_kwargs
         return args, kwargs
 
+    def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, 
Any]:
+        return KeywordParameters.determine(self.python_callable, self.op_args, 
context).unpacking()
+
     def get_python_source(self):
         raw_source = inspect.getsource(self.python_callable)
         raw_source_lines = [line for line in raw_source.splitlines() if not 
line.strip().startswith("#")]

Reply via email to