This is an automated email from the ASF dual-hosted git repository. utkarsharma pushed a commit to branch sync_2-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 7b678c3b38db2dad79794666f78c137ea6d2855c Author: Ephraim Anierobi <[email protected]> AuthorDate: Wed Nov 13 09:48:24 2024 +0100 Fix duplication of Task tries in the UI (#43891) (#43950) It was observed that there are moments where the TI tries endpoint returns duplicate TaskInstance. I have observed this to happen when the TI is in up_for_retry state. When the TI is in up_for_retry state, we have already recorded the previous try in TI history and the TI try_number has not incremented at this time, so we must exclude this recorded TI from the taskinstance tries endpoint. We know the TI because its state is in up_for_retry, so we filter TIs with up_for_retry state when querying for the task instance tries. Closes: #41765 (cherry picked from commit 4bc1257df4bf1f7391ad8bca3b10d294b2d92e7a) --- .../api_connexion/endpoints/task_instance_endpoint.py | 7 ++++++- .../endpoints/test_task_instance_endpoint.py | 17 +++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index a79af61f69b..2eb63260e34 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -840,7 +840,12 @@ def get_task_instance_tries( ) return query - task_instances = session.scalars(_query(TIH)).all() + session.scalars(_query(TI)).all() + # Exclude TaskInstance with state UP_FOR_RETRY since they have been recorded in TaskInstanceHistory + tis = session.scalars( + _query(TI).where(or_(TI.state != TaskInstanceState.UP_FOR_RETRY, TI.state.is_(None))) + ).all() + + task_instances = session.scalars(_query(TIH)).all() + tis return task_instance_history_collection_schema.dump( TaskInstanceHistoryCollection(task_instances=task_instances, total_entries=len(task_instances)) ) diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index 330b69c3867..bf81584caf7 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -3013,6 +3013,23 @@ class TestGetTaskInstanceTries(TestTaskInstanceEndpoint): assert response.json["total_entries"] == 2 # The task instance and its history assert len(response.json["task_instances"]) == 2 + def test_ti_in_retry_state_not_returned(self, session): + self.create_task_instances( + session=session, task_instances=[{"state": State.SUCCESS}], with_ti_history=True + ) + ti = session.query(TaskInstance).one() + ti.state = State.UP_FOR_RETRY + session.merge(ti) + session.commit() + + response = self.client.get( + "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200 + assert response.json["total_entries"] == 1 + assert len(response.json["task_instances"]) == 1 + def test_mapped_task_should_respond_200(self, session): tis = self.create_task_instances(session, task_instances=[{"state": State.FAILED}]) old_ti = tis[0]
