This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new 510b968533e Fix duplication of Task tries in the UI (#43891) (#43950)
510b968533e is described below
commit 510b968533e4201151de9eebd95ca770a1f2636f
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Wed Nov 13 09:48:24 2024 +0100
Fix duplication of Task tries in the UI (#43891) (#43950)
It was observed that there are moments where the TI tries endpoint returns
duplicate
TaskInstance. I have observed this to happen when the TI is in up_for_retry
state.
When the TI is in up_for_retry state, we have already recorded the previous
try
in TI history and the TI try_number has not incremented at this time, so we
must
exclude this recorded TI from the taskinstance tries endpoint. We know the
TI because
its state is in up_for_retry, so we filter TIs with up_for_retry state when
querying
for the task instance tries.
Closes: #41765
(cherry picked from commit 4bc1257df4bf1f7391ad8bca3b10d294b2d92e7a)
---
.../api_connexion/endpoints/task_instance_endpoint.py | 7 ++++++-
.../endpoints/test_task_instance_endpoint.py | 17 +++++++++++++++++
2 files changed, 23 insertions(+), 1 deletion(-)
diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py
b/airflow/api_connexion/endpoints/task_instance_endpoint.py
index a79af61f69b..2eb63260e34 100644
--- a/airflow/api_connexion/endpoints/task_instance_endpoint.py
+++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py
@@ -840,7 +840,12 @@ def get_task_instance_tries(
)
return query
- task_instances = session.scalars(_query(TIH)).all() +
session.scalars(_query(TI)).all()
+ # Exclude TaskInstance with state UP_FOR_RETRY since they have been
recorded in TaskInstanceHistory
+ tis = session.scalars(
+ _query(TI).where(or_(TI.state != TaskInstanceState.UP_FOR_RETRY,
TI.state.is_(None)))
+ ).all()
+
+ task_instances = session.scalars(_query(TIH)).all() + tis
return task_instance_history_collection_schema.dump(
TaskInstanceHistoryCollection(task_instances=task_instances,
total_entries=len(task_instances))
)
diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
index 330b69c3867..bf81584caf7 100644
--- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
@@ -3013,6 +3013,23 @@ class TestGetTaskInstanceTries(TestTaskInstanceEndpoint):
assert response.json["total_entries"] == 2 # The task instance and
its history
assert len(response.json["task_instances"]) == 2
+ def test_ti_in_retry_state_not_returned(self, session):
+ self.create_task_instances(
+ session=session, task_instances=[{"state": State.SUCCESS}],
with_ti_history=True
+ )
+ ti = session.query(TaskInstance).one()
+ ti.state = State.UP_FOR_RETRY
+ session.merge(ti)
+ session.commit()
+
+ response = self.client.get(
+
"/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries",
+ environ_overrides={"REMOTE_USER": "test"},
+ )
+ assert response.status_code == 200
+ assert response.json["total_entries"] == 1
+ assert len(response.json["task_instances"]) == 1
+
def test_mapped_task_should_respond_200(self, session):
tis = self.create_task_instances(session, task_instances=[{"state":
State.FAILED}])
old_ti = tis[0]