This is an automated email from the ASF dual-hosted git repository.
rahulvats pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 891c7fbb6a8 Fix deferred task resume failure when worker is older than
server (#64598)
891c7fbb6a8 is described below
commit 891c7fbb6a8e27db8340d318e7097a31a1396bef
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Apr 2 09:09:28 2026 +0100
Fix deferred task resume failure when worker is older than server (#64598)
Fix deferred task resume failure when worker is older than server (#64598)
---
airflow-core/.pre-commit-config.yaml | 1 +
.../execution_api/versions/v2026_04_06.py | 28 +++++
airflow-core/src/airflow/models/trigger.py | 8 +-
.../versions/v2026_04_06/test_task_instances.py | 127 +++++++++++++++++++++
4 files changed, 161 insertions(+), 3 deletions(-)
diff --git a/airflow-core/.pre-commit-config.yaml
b/airflow-core/.pre-commit-config.yaml
index 121b51d4e8b..5d2e3e7fe2b 100644
--- a/airflow-core/.pre-commit-config.yaml
+++ b/airflow-core/.pre-commit-config.yaml
@@ -313,6 +313,7 @@ repos:
^src/airflow/api_fastapi/core_api/services/ui/task_group.py$|
^src/airflow/api_fastapi/execution_api/routes/hitl\.py$|
^src/airflow/api_fastapi/execution_api/routes/task_instances\.py$|
+ ^src/airflow/api_fastapi/execution_api/versions/v2026_04_06\.py$|
^src/airflow/api_fastapi/logging/decorators\.py$|
^src/airflow/assets/evaluation\.py$|
^src/airflow/assets/manager\.py$|
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py
index 85ec1f2a608..59e671f0a24 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py
@@ -118,6 +118,34 @@ class ModifyDeferredTaskKwargsToJsonValue(VersionChange):
schema(TIDeferredStatePayload).field("next_kwargs").had(type=dict[str,
Any]),
)
+ @convert_response_to_previous_version_for(TIRunContext) # type:
ignore[arg-type]
+ def convert_next_kwargs_to_base_serialization(response: ResponseInfo) ->
None: # type: ignore[misc]
+ """
+ Convert next_kwargs from SDK serde format to BaseSerialization format
for old workers.
+
+ Old workers (task-sdk < 1.2) only know
BaseSerialization.deserialize(), which requires
+ dicts wrapped as {"__type": "dict", "__var": {...}}. SDK serde
produces plain dicts that
+ BaseSerialization cannot parse, causing KeyError on __var.
+
+ We must deserialize SDK serde first to recover native Python objects
(datetime,
+ timedelta, etc.), then re-serialize with BaseSerialization so old
workers get
+ proper typed values instead of raw {"__classname__": ...} dicts.
+ """
+ next_kwargs = response.body.get("next_kwargs")
+ if next_kwargs is None:
+ return
+
+ from airflow.sdk.serde import deserialize
+ from airflow.serialization.serialized_objects import BaseSerialization
+
+ try:
+ plain = deserialize(next_kwargs)
+ except (ImportError, KeyError, AttributeError, TypeError):
+ # Already in BaseSerialization format (rolling upgrade, old data
in DB)
+ return
+
+ response.body["next_kwargs"] = BaseSerialization.serialize(plain)
+
class RemoveUpstreamMapIndexesField(VersionChange):
"""Remove upstream_map_indexes field from TIRunContext - now computed by
Task SDK."""
diff --git a/airflow-core/src/airflow/models/trigger.py
b/airflow-core/src/airflow/models/trigger.py
index da78eede343..d2c0fde3c89 100644
--- a/airflow-core/src/airflow/models/trigger.py
+++ b/airflow-core/src/airflow/models/trigger.py
@@ -477,13 +477,15 @@ def handle_event_submit(event: TriggerEvent, *,
task_instance: TaskInstance, ses
next_kwargs = BaseSerialization.deserialize(next_kwargs_raw)
- # Add event to the plain dict, then serialize everything together. This
ensures that the event is properly
- # nested inside __var__ in the final serde serialized structure.
+ # Add event to the plain dict, then serialize everything together so nested
+ # non-primitive values get proper serde encoding.
if TYPE_CHECKING:
assert isinstance(next_kwargs, dict)
next_kwargs["event"] = event.payload
- # re-serialize the entire dict using serde to ensure consistent structure
+ # Re-serialize using serde. The Execution API version converter
+ # (ModifyDeferredTaskKwargsToJsonValue) handles converting this to
+ # BaseSerialization format when serving old workers.
task_instance.next_kwargs = serialize(next_kwargs)
# Remove ourselves as its trigger
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_04_06/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_04_06/test_task_instances.py
index 8117ac6b69c..a914ac6c6e5 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_04_06/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_04_06/test_task_instances.py
@@ -20,6 +20,7 @@ from __future__ import annotations
import pytest
from airflow._shared.timezones import timezone
+from airflow.serialization.serialized_objects import BaseSerialization
from airflow.utils.state import DagRunState, State
from tests_common.test_utils.db import clear_db_runs
@@ -125,3 +126,129 @@ class TestDagRunStartDateNullableBackwardCompat:
assert response.status_code == 200
assert dag_run["start_date"] is not None, "start_date should not be
None when DagRun has started"
assert dag_run["start_date"] ==
TIMESTAMP.isoformat().replace("+00:00", "Z")
+
+
+class TestNextKwargsBackwardCompat:
+ """Old workers only know BaseSerialization.deserialize -- SDK serde plain
dicts cause KeyError."""
+
+ @pytest.fixture(autouse=True)
+ def _freeze_time(self, time_machine):
+ time_machine.move_to(TIMESTAMP_STR, tick=False)
+
+ def setup_method(self):
+ clear_db_runs()
+
+ def teardown_method(self):
+ clear_db_runs()
+
+ def test_old_version_gets_base_serialization_format(self, old_ver_client,
session, create_task_instance):
+ """Old API version receives next_kwargs wrapped in __type/__var so
BaseSerialization can parse it."""
+ ti = create_task_instance(
+ task_id="test_next_kwargs_compat",
+ state=State.QUEUED,
+ session=session,
+ start_date=TIMESTAMP,
+ )
+ # Store SDK serde format (plain dict) in DB -- this is what trigger.py
handle_event_submit produces
+ ti.next_method = "execute_complete"
+ ti.next_kwargs = {"cheesecake": True, "event": "payload"}
+ session.commit()
+
+ response =
old_ver_client.patch(f"/execution/task-instances/{ti.id}/run",
json=RUN_PATCH_BODY)
+
+ assert response.status_code == 200
+ next_kwargs = response.json()["next_kwargs"]
+ # Old workers call BaseSerialization.deserialize on this -- verify it
works
+ result = BaseSerialization.deserialize(next_kwargs)
+ assert result == {"cheesecake": True, "event": "payload"}
+
+ def test_old_version_deserializes_complex_types(self, old_ver_client,
session, create_task_instance):
+ """Non-primitive values (datetime) must round-trip through serde ->
BaseSerialization correctly."""
+ from airflow.sdk.serde import serialize as serde_serialize
+
+ original = {"event": TIMESTAMP, "simple": True}
+ # Store SDK serde format with a datetime -- this is what
handle_event_submit produces
+ # when the trigger payload contains a datetime (e.g.
DateTimeSensorAsync)
+ serde_encoded = serde_serialize(original)
+
+ ti = create_task_instance(
+ task_id="test_next_kwargs_datetime",
+ state=State.QUEUED,
+ session=session,
+ start_date=TIMESTAMP,
+ )
+ ti.next_method = "execute_complete"
+ ti.next_kwargs = serde_encoded
+ session.commit()
+
+ response =
old_ver_client.patch(f"/execution/task-instances/{ti.id}/run",
json=RUN_PATCH_BODY)
+
+ assert response.status_code == 200
+ next_kwargs = response.json()["next_kwargs"]
+ result = BaseSerialization.deserialize(next_kwargs)
+ assert result["simple"] is True
+ # datetime must come back as a datetime, not a {"__classname__": ...}
dict
+ assert result["event"] == TIMESTAMP
+
+ def test_old_version_handles_already_base_serialization_in_db(
+ self, old_ver_client, session, create_task_instance
+ ):
+ """Rolling upgrade: DB still has BaseSerialization format from old
handle_event_submit."""
+ ti = create_task_instance(
+ task_id="test_next_kwargs_already_base",
+ state=State.QUEUED,
+ session=session,
+ start_date=TIMESTAMP,
+ )
+ ti.next_method = "execute_complete"
+ # Pre-upgrade data: BaseSerialization format already in DB
+ ti.next_kwargs = BaseSerialization.serialize({"cheesecake": True,
"event": "payload"})
+ session.commit()
+
+ response =
old_ver_client.patch(f"/execution/task-instances/{ti.id}/run",
json=RUN_PATCH_BODY)
+
+ assert response.status_code == 200
+ next_kwargs = response.json()["next_kwargs"]
+ # Should still be parseable by old workers
+ result = BaseSerialization.deserialize(next_kwargs)
+ assert result == {"cheesecake": True, "event": "payload"}
+
+ def test_old_version_handles_submit_failure_plain_dict(
+ self, old_ver_client, session, create_task_instance
+ ):
+ """submit_failure and scheduler timeout write raw plain dicts --
converter must handle those too."""
+ ti = create_task_instance(
+ task_id="test_next_kwargs_failure",
+ state=State.QUEUED,
+ session=session,
+ start_date=TIMESTAMP,
+ )
+ ti.next_method = "__fail__"
+ # This is what submit_failure / scheduler timeout writes -- plain
dict, no wrapping
+ ti.next_kwargs = {"error": "Trigger timeout"}
+ session.commit()
+
+ response =
old_ver_client.patch(f"/execution/task-instances/{ti.id}/run",
json=RUN_PATCH_BODY)
+
+ assert response.status_code == 200
+ next_kwargs = response.json()["next_kwargs"]
+ result = BaseSerialization.deserialize(next_kwargs)
+ assert result == {"error": "Trigger timeout"}
+
+ def test_head_version_returns_raw_serde_format(self, client, session,
create_task_instance):
+ """Head API version returns next_kwargs as-is (SDK serde format)."""
+ ti = create_task_instance(
+ task_id="test_next_kwargs_head",
+ state=State.QUEUED,
+ session=session,
+ start_date=TIMESTAMP,
+ )
+ ti.next_method = "execute_complete"
+ ti.next_kwargs = {"cheesecake": True, "event": "payload"}
+ session.commit()
+
+ response = client.patch(f"/execution/task-instances/{ti.id}/run",
json=RUN_PATCH_BODY)
+
+ assert response.status_code == 200
+ # Head version gets the plain dict directly -- no BaseSerialization
wrapping
+ assert response.json()["next_kwargs"] == {"cheesecake": True, "event":
"payload"}