This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 53c05a0e80c [v3-1-test] Fix task retries executing wrong method after 
deferred state (#56731) (#56737)
53c05a0e80c is described below

commit 53c05a0e80c581c24ecbfb7f9c3a4dc35d70fd2d
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Oct 16 20:00:51 2025 +0100

    [v3-1-test] Fix task retries executing wrong method after deferred state 
(#56731) (#56737)
---
 .../execution_api/routes/task_instances.py         |  2 +-
 .../versions/head/test_task_instances.py           | 44 ++++++++++++++++++++++
 2 files changed, 45 insertions(+), 1 deletion(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index f2f8fc8e115..67af09eeb3c 100644
--- 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -437,7 +437,7 @@ def _create_ti_state_update_query_and_update_state(
         ti = session.get(TI, ti_id_str)
         updated_state = ti_patch_payload.state
         query = TI.duration_expression_update(ti_patch_payload.end_date, 
query, session.bind)
-        query = query.values(state=updated_state)
+        query = query.values(state=updated_state, next_method=None, 
next_kwargs=None)
 
         if updated_state == TerminalTIState.FAILED:
             # This is the only case needs extra handling for 
TITerminalStatePayload
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index 66ce18e2d35..899bf3ba82d 100644
--- 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -1095,6 +1095,50 @@ class TestTIUpdateState:
         assert tih.task_instance_id
         assert tih.task_instance_id != ti.id
 
+    @pytest.mark.parametrize(
+        "target_state",
+        [
+            State.UP_FOR_RETRY,
+            TerminalTIState.FAILED,
+            TerminalTIState.SUCCESS,
+        ],
+    )
+    def test_ti_update_state_clears_deferred_fields(
+        self, client, session, create_task_instance, target_state
+    ):
+        """Test that next_method and next_kwargs are cleared when 
transitioning to terminal/retry states."""
+        ti = create_task_instance(
+            task_id="test_ti_update_state_clears_deferred_fields",
+            state=State.RUNNING,
+        )
+        # Simulate a task that resumed from deferred state with 
next_method/next_kwargs set
+        ti.next_method = "execute_complete"
+        ti.next_kwargs = {"event": "test_event", "data": "test_data"}
+        session.commit()
+
+        response = client.patch(
+            f"/execution/task-instances/{ti.id}/state",
+            json={
+                "state": target_state,
+                "end_date": DEFAULT_END_DATE.isoformat(),
+            },
+        )
+
+        assert response.status_code == 204
+
+        if target_state == State.UP_FOR_RETRY:
+            # Retry creates a new TI ID, so we need to fetch by unique key
+            ti = session.scalar(
+                select(TaskInstance).filter_by(task_id=ti.task_id, 
run_id=ti.run_id, dag_id=ti.dag_id)
+            )
+        else:
+            session.expire_all()
+            ti = session.get(TaskInstance, ti.id)
+
+        assert ti.state == target_state
+        assert ti.next_method is None
+        assert ti.next_kwargs is None
+
     def test_ti_update_state_to_failed_table_check(self, client, session, 
create_task_instance):
         # we just want to fail in this test, no need to retry
         ti = create_task_instance(

Reply via email to