This is an automated email from the ASF dual-hosted git repository.
rahulvats pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5c6c34243b9 Order of task arguments in task definition causing error
when parsing DAG (#62174)
5c6c34243b9 is described below
commit 5c6c34243b9a72782b1049a1bd4b64dcc13ab4ff
Author: Software Developer <[email protected]>
AuthorDate: Mon Mar 23 14:30:50 2026 +0100
Order of task arguments in task definition causing error when parsing DAG
(#62174)
* *fix the issue when -> order of task arguments in task definition causing
error when parsing DAG
* *add more related unit tests.
* *apply PR remarks. unify a bit of naming.
* *reorganize tests.
* *fix for tests.
* *address PR remarks.
* *address PR remarks.
* *address PR remarks.
* *address PR remarks.
* *address PR remarks.
* *address PR remarks.
---
task-sdk/src/airflow/sdk/bases/decorator.py | 38 ++++++
task-sdk/tests/task_sdk/bases/test_decorator.py | 147 +++++++++++++++++++++++-
2 files changed, 184 insertions(+), 1 deletion(-)
diff --git a/task-sdk/src/airflow/sdk/bases/decorator.py
b/task-sdk/src/airflow/sdk/bases/decorator.py
index 28119ced429..2e98f826c7e 100644
--- a/task-sdk/src/airflow/sdk/bases/decorator.py
+++ b/task-sdk/src/airflow/sdk/bases/decorator.py
@@ -313,6 +313,33 @@ class DecoratedOperator(BaseOperator):
param.replace(default=None) if param.name in KNOWN_CONTEXT_KEYS
else param
for param in signature.parameters.values()
]
+
+ # Python requires that positional parameters with defaults don't
precede those without.
+ # This only applies to POSITIONAL_ONLY and POSITIONAL_OR_KEYWORD
parameters — *args,
+ # **kwargs, and keyword-only parameters follow different rules.
+ positional_kinds = (inspect.Parameter.POSITIONAL_ONLY,
inspect.Parameter.POSITIONAL_OR_KEYWORD)
+ positional = [(i, p) for i, p in enumerate(parameters) if p.kind in
positional_kinds]
+ first_default_idx = next((i for i, p in positional if p.default !=
inspect.Parameter.empty), None)
+
+ # Names of non-context-key params that receive an injected None
default purely to satisfy
+ # Python's ordering constraint. These params are still semantically
required and must be
+ # explicitly provided via op_args/op_kwargs — we verify this below
after bind().
+ injected_for_ordering: set[str] = set()
+ if first_default_idx is not None:
+ new_parameters = []
+ for i, param in enumerate(parameters):
+ if (
+ i > first_default_idx
+ and param.kind in positional_kinds
+ and param.default == inspect.Parameter.empty
+ ):
+ new_parameters.append(param.replace(default=None))
+ if param.name not in KNOWN_CONTEXT_KEYS:
+ injected_for_ordering.add(param.name)
+ else:
+ new_parameters.append(param)
+ parameters = new_parameters
+
try:
signature = signature.replace(parameters=parameters)
except ValueError as err:
@@ -342,6 +369,17 @@ class DecoratedOperator(BaseOperator):
else:
signature.bind(*op_args, **op_kwargs)
+ # Params in injected_for_ordering are semantically required even
though they received a
+ # None default to satisfy Python's ordering constraint. Verify they
are actually provided.
+ if injected_for_ordering and not
kwargs.get("_airflow_mapped_validation_only"):
+ positional_param_names = [p.name for p in parameters if p.kind in
positional_kinds]
+ covered_by_pos = set(positional_param_names[: len(op_args)])
+ provided = covered_by_pos | set(op_kwargs.keys())
+ missing = injected_for_ordering - provided
+ if missing:
+ missing_str = ", ".join(sorted(missing))
+ raise TypeError(f"missing required argument(s): {missing_str}")
+
self.op_args = op_args
self.op_kwargs = op_kwargs
super().__init__(task_id=task_id, **kwargs_to_upstream, **kwargs)
diff --git a/task-sdk/tests/task_sdk/bases/test_decorator.py
b/task-sdk/tests/task_sdk/bases/test_decorator.py
index 913e4c6e5f0..6021f6f0aad 100644
--- a/task-sdk/tests/task_sdk/bases/test_decorator.py
+++ b/task-sdk/tests/task_sdk/bases/test_decorator.py
@@ -25,7 +25,7 @@ from pathlib import Path
import pytest
from airflow.sdk import task
-from airflow.sdk.bases.decorator import DecoratedOperator, is_async_callable
+from airflow.sdk.bases.decorator import KNOWN_CONTEXT_KEYS, DecoratedOperator,
is_async_callable
RAW_CODE = """
from airflow.sdk import task
@@ -69,6 +69,151 @@ class TestBaseDecorator:
assert cleaned.lstrip().splitlines()[0].startswith("def a_task")
+class DummyDecoratedOperator(DecoratedOperator):
+ custom_operator_name = "@task.dummy"
+
+ def execute(self, context):
+ return self.python_callable(*self.op_args, **self.op_kwargs)
+
+
+class TestDefaultFillingLogic:
+ @pytest.mark.parametrize(
+ ("func", "kwargs", "args"),
+ [
+ pytest.param(
+ lambda: 42,
+ {},
+ [],
+ id="no_params",
+ ),
+ pytest.param(
+ lambda x, y=99: (x, y),
+ {"x": 1},
+ [],
+ id="param_after_first_default_is_given_none",
+ ),
+ pytest.param(
+ lambda a, b=5, c=None: (a, b, c),
+ {"a": 1},
+ [],
+ id="all_params_after_first_default_already_have_defaults",
+ ),
+ pytest.param(
+ lambda a, b, c=99: (a, b, c),
+ {},
+ [1, 2],
+ id="single_trailing_optional",
+ ),
+ ],
+ )
+ def test_construction_succeeds(self, func, kwargs, args):
+ op = make_op(func, op_kwargs=kwargs, op_args=args)
+ assert op is not None
+
+ def test_construction_succeeds_with_context_key_params(self):
+ def foo(ds, my_data):
+ return my_data
+
+ assert make_op(foo, op_args=[None, None]) is not None
+
+ def test_context_key_default_none_does_not_raise(self):
+ ctx_key = next(iter(sorted(KNOWN_CONTEXT_KEYS)))
+ f = _make_func(f"def dummy_task(x, {ctx_key}=None): return x")
+ assert make_op(f, op_kwargs={"x": 1}) is not None
+
+ def test_context_key_with_non_none_default_raises(self):
+ ctx_key = next(iter(sorted(KNOWN_CONTEXT_KEYS)))
+ f = _make_func(f"def dummy_task(x, {ctx_key}='bad_default'): return x")
+ with pytest.raises(ValueError, match="can't have a default other than
None"):
+ make_op(f, op_kwargs={"x": 1})
+
+ @pytest.mark.parametrize(
+ ("func_src", "op_kwargs"),
+ [
+ pytest.param(
+ "def dummy_task({ctx0}, x, y=10): return (x, y)",
+ {"x": 1},
+ id="context_key_before_first_default_shifts_boundary",
+ ),
+ pytest.param(
+ "def dummy_task(x, y=5, {ctx0}=None): return (x, y)",
+ {"x": 1},
+ id="context_key_after_regular_default",
+ ),
+ pytest.param(
+ "def dummy_task(a, {ctx0}=None, b=7, {ctx1}=None): return (a,
b)",
+ {"a": 1},
+ id="multiple_context_keys_mixed_with_regular_defaults",
+ ),
+ pytest.param(
+ "def dummy_task({ctx0}, x, y=10): return (x, y)",
+ {"x": 42},
+
id="required_param_between_context_key_and_regular_default_gets_none",
+ ),
+ pytest.param(
+ "def dummy_task({ctx0}=None, {ctx1}=None, {ctx2}=None): return
True",
+ {},
+ id="context_key_only_signature",
+ ),
+ ],
+ )
+ def test_context_key_construction_succeeds(self, func_src, op_kwargs):
+ """All context-key signature shapes must construct without raising."""
+
+ ctx_keys = sorted(KNOWN_CONTEXT_KEYS)
+ src = func_src.format(
+ ctx0=ctx_keys[0],
+ ctx1=ctx_keys[1] if len(ctx_keys) > 1 else ctx_keys[0],
+ ctx2=ctx_keys[2] if len(ctx_keys) > 2 else ctx_keys[0],
+ )
+ op = make_op(_make_func(src), op_kwargs=op_kwargs)
+ assert op is not None
+
+ def test_non_context_param_after_context_key_gets_none_injected(self):
+ ctx_key = next(iter(sorted(KNOWN_CONTEXT_KEYS)))
+ f = _make_func(f"def dummy_task({ctx_key}, a): ...")
+ assert make_op(f, op_kwargs={"a": "2024-01-01"}) is not None
+
+ def test_non_context_param_after_context_key_without_value_raises(self):
+ ctx_key = next(iter(sorted(KNOWN_CONTEXT_KEYS)))
+ f = _make_func(f"def dummy_task({ctx_key}, a): ...")
+ with pytest.raises(TypeError, match="missing required argument"):
+ make_op(f)
+
+ def test_bind_validation_fails_for_missing_required_args(self):
+ """Truly required args with no supplied value must still cause a bind
failure."""
+
+ def dummy_task(required_arg):
+ return required_arg
+
+ with pytest.raises(TypeError):
+ make_op(dummy_task)
+
+ def test_variadic_and_keyword_only_params_are_not_assigned_defaults(self):
+ """Construction succeeds when variadic and keyword-only params are
present."""
+
+ def dummy_task(a, b=1, *args, kw_required, **kwargs):
+ return a, b, args, kw_required, kwargs
+
+ assert make_op(dummy_task, op_kwargs={"a": 1, "kw_required": "x"}) is
not None
+
+
+def make_op(func, op_args=None, op_kwargs=None, **kwargs):
+ return DummyDecoratedOperator(
+ task_id="test_task",
+ python_callable=func,
+ op_args=op_args or [],
+ op_kwargs=op_kwargs or {},
+ **kwargs,
+ )
+
+
+def _make_func(src: str):
+ ns: dict = {}
+ exec(src, ns)
+ return next(v for v in ns.values() if callable(v))
+
+
def simple_decorator(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):