This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2272ea246a AIP-64: Add UI endpoint for task instance history (#40221)
2272ea246a is described below
commit 2272ea246a89cea315f118389279da2e75454707
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Thu Jun 13 20:58:00 2024 +0100
AIP-64: Add UI endpoint for task instance history (#40221)
* AIP-64: Add UI endpoint for task instance history
This adds UI endpoint for task instance history
Co-authored-by: Jed Cunningham <[email protected]>
Co-Authored-By: dstandish <[email protected]>
Co-Authored-By: Brent Bovenzi <[email protected]>
* fixup! AIP-64: Add UI endpoint for task instance history
---------
Co-authored-by: Jed Cunningham <[email protected]>
Co-authored-by: dstandish <[email protected]>
Co-authored-by: Brent Bovenzi <[email protected]>
---
airflow/www/views.py | 35 +++++++++++++++++++++++++
tests/www/views/test_views_ti_history.py | 44 ++++++++++++++++++++++++++++++++
2 files changed, 79 insertions(+)
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 0a25a7b25d..8311050bff 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -110,6 +110,7 @@ from airflow.models.dataset import
DagScheduleDatasetReference, DatasetDagRunQue
from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance, TaskInstanceNote
+from airflow.models.taskinstancehistory import TaskInstanceHistory as TIHistory
from airflow.plugins_manager import PLUGINS_ATTRIBUTES_TO_DUMP
from airflow.providers_manager import ProvidersManager
from airflow.security import permissions
@@ -3610,6 +3611,40 @@ class Airflow(AirflowBaseView):
{"Content-Type": "application/json; charset=utf-8"},
)
+ @expose("/object/task_instance_history")
+ @provide_session
+ @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
+ def ti_history(self, session: Session = NEW_SESSION):
+ dag_id = request.args.get("dag_id")
+ task_id = request.args.get("task_id")
+ run_id = request.args.get("run_id")
+ map_index = request.args.get("map_index", -1, type=int)
+
+ ti_history = (
+ session.query(TIHistory)
+ .filter(
+ TIHistory.dag_id == dag_id,
+ TIHistory.task_id == task_id,
+ TIHistory.run_id == run_id,
+ TIHistory.map_index == map_index,
+ )
+ .order_by(TIHistory.try_number.asc())
+ .all()
+ )
+
+ attrs = TaskInstance.__table__.columns.keys()
+
+ data = [{attr: getattr(ti, attr) for attr in attrs} for ti in
ti_history]
+
+ for entity in data:
+ entity["dag_run_id"] = entity.pop("run_id")
+ entity["queued_when"] = entity.pop("queued_dttm")
+
+ return (
+ htmlsafe_json_dumps(data, separators=(",", ":"),
cls=utils_json.WebEncoder),
+ {"Content-Type": "application/json; charset=utf-8"},
+ )
+
@expose("/robots.txt")
@action_logging
def robots(self):
diff --git a/tests/www/views/test_views_ti_history.py
b/tests/www/views/test_views_ti_history.py
new file mode 100644
index 0000000000..cc7a933e94
--- /dev/null
+++ b/tests/www/views/test_views_ti_history.py
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.models.taskinstancehistory import TaskInstanceHistory
+from tests.test_utils.db import clear_db_runs
+
+pytestmark = pytest.mark.db_test
+
+
+class TestTIHistoryEndpoint:
+ @pytest.fixture(autouse=True)
+ def cleanup(self):
+ clear_db_runs()
+ yield
+ clear_db_runs()
+
+
+class TestGetTIHistory(TestTIHistoryEndpoint):
+ def test_should_respond_200(self, admin_client, create_task_instance,
session):
+ ti = create_task_instance(dag_id="test_dag", task_id="test_task",
run_id="test_run_id")
+ TaskInstanceHistory.record_ti(ti)
+ assert session.query(TaskInstanceHistory).count() == 1
+ response = admin_client.get(
+
"/object/task_instance_history?dag_id=test_dag&task_id=test_task&run_id=test_run_id"
+ )
+ assert response.status_code == 200
+ assert len(response.json) == 1