This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 53c05a0e80c [v3-1-test] Fix task retries executing wrong method after
deferred state (#56731) (#56737)
53c05a0e80c is described below
commit 53c05a0e80c581c24ecbfb7f9c3a4dc35d70fd2d
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Oct 16 20:00:51 2025 +0100
[v3-1-test] Fix task retries executing wrong method after deferred state
(#56731) (#56737)
---
.../execution_api/routes/task_instances.py | 2 +-
.../versions/head/test_task_instances.py | 44 ++++++++++++++++++++++
2 files changed, 45 insertions(+), 1 deletion(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index f2f8fc8e115..67af09eeb3c 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -437,7 +437,7 @@ def _create_ti_state_update_query_and_update_state(
ti = session.get(TI, ti_id_str)
updated_state = ti_patch_payload.state
query = TI.duration_expression_update(ti_patch_payload.end_date,
query, session.bind)
- query = query.values(state=updated_state)
+ query = query.values(state=updated_state, next_method=None,
next_kwargs=None)
if updated_state == TerminalTIState.FAILED:
# This is the only case needs extra handling for
TITerminalStatePayload
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index 66ce18e2d35..899bf3ba82d 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -1095,6 +1095,50 @@ class TestTIUpdateState:
assert tih.task_instance_id
assert tih.task_instance_id != ti.id
+ @pytest.mark.parametrize(
+ "target_state",
+ [
+ State.UP_FOR_RETRY,
+ TerminalTIState.FAILED,
+ TerminalTIState.SUCCESS,
+ ],
+ )
+ def test_ti_update_state_clears_deferred_fields(
+ self, client, session, create_task_instance, target_state
+ ):
+ """Test that next_method and next_kwargs are cleared when
transitioning to terminal/retry states."""
+ ti = create_task_instance(
+ task_id="test_ti_update_state_clears_deferred_fields",
+ state=State.RUNNING,
+ )
+ # Simulate a task that resumed from deferred state with
next_method/next_kwargs set
+ ti.next_method = "execute_complete"
+ ti.next_kwargs = {"event": "test_event", "data": "test_data"}
+ session.commit()
+
+ response = client.patch(
+ f"/execution/task-instances/{ti.id}/state",
+ json={
+ "state": target_state,
+ "end_date": DEFAULT_END_DATE.isoformat(),
+ },
+ )
+
+ assert response.status_code == 204
+
+ if target_state == State.UP_FOR_RETRY:
+ # Retry creates a new TI ID, so we need to fetch by unique key
+ ti = session.scalar(
+ select(TaskInstance).filter_by(task_id=ti.task_id,
run_id=ti.run_id, dag_id=ti.dag_id)
+ )
+ else:
+ session.expire_all()
+ ti = session.get(TaskInstance, ti.id)
+
+ assert ti.state == target_state
+ assert ti.next_method is None
+ assert ti.next_kwargs is None
+
def test_ti_update_state_to_failed_table_check(self, client, session,
create_task_instance):
# we just want to fail in this test, no need to retry
ti = create_task_instance(