This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-test by this push:
     new 510b968533e Fix duplication of Task tries in the UI (#43891) (#43950)
510b968533e is described below

commit 510b968533e4201151de9eebd95ca770a1f2636f
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Wed Nov 13 09:48:24 2024 +0100

    Fix duplication of Task tries in the UI (#43891) (#43950)
    
    It was observed that there are moments where the TI tries endpoint returns 
duplicate
    TaskInstance. I have observed this to happen when the TI is in up_for_retry 
state.
    
    When the TI is in up_for_retry state, we have already recorded the previous 
try
    in TI history and the TI try_number has not incremented at this time, so we 
must
    exclude this recorded TI from the taskinstance tries endpoint. We know the 
TI because
    its state is in up_for_retry, so we filter TIs with up_for_retry state when 
querying
    for the task instance tries.
    
    Closes: #41765
    (cherry picked from commit 4bc1257df4bf1f7391ad8bca3b10d294b2d92e7a)
---
 .../api_connexion/endpoints/task_instance_endpoint.py   |  7 ++++++-
 .../endpoints/test_task_instance_endpoint.py            | 17 +++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py 
b/airflow/api_connexion/endpoints/task_instance_endpoint.py
index a79af61f69b..2eb63260e34 100644
--- a/airflow/api_connexion/endpoints/task_instance_endpoint.py
+++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py
@@ -840,7 +840,12 @@ def get_task_instance_tries(
         )
         return query
 
-    task_instances = session.scalars(_query(TIH)).all() + 
session.scalars(_query(TI)).all()
+    # Exclude TaskInstance with state UP_FOR_RETRY since they have been 
recorded in TaskInstanceHistory
+    tis = session.scalars(
+        _query(TI).where(or_(TI.state != TaskInstanceState.UP_FOR_RETRY, 
TI.state.is_(None)))
+    ).all()
+
+    task_instances = session.scalars(_query(TIH)).all() + tis
     return task_instance_history_collection_schema.dump(
         TaskInstanceHistoryCollection(task_instances=task_instances, 
total_entries=len(task_instances))
     )
diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py 
b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
index 330b69c3867..bf81584caf7 100644
--- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
@@ -3013,6 +3013,23 @@ class TestGetTaskInstanceTries(TestTaskInstanceEndpoint):
         assert response.json["total_entries"] == 2  # The task instance and 
its history
         assert len(response.json["task_instances"]) == 2
 
+    def test_ti_in_retry_state_not_returned(self, session):
+        self.create_task_instances(
+            session=session, task_instances=[{"state": State.SUCCESS}], 
with_ti_history=True
+        )
+        ti = session.query(TaskInstance).one()
+        ti.state = State.UP_FOR_RETRY
+        session.merge(ti)
+        session.commit()
+
+        response = self.client.get(
+            
"/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries",
+            environ_overrides={"REMOTE_USER": "test"},
+        )
+        assert response.status_code == 200
+        assert response.json["total_entries"] == 1
+        assert len(response.json["task_instances"]) == 1
+
     def test_mapped_task_should_respond_200(self, session):
         tis = self.create_task_instances(session, task_instances=[{"state": 
State.FAILED}])
         old_ti = tis[0]

Reply via email to