This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 35b616333c Add TaskFail entries to Gantt chart (#37918)
35b616333c is described below
commit 35b616333ce7358ae4ba62a03a032debdea2a19a
Author: Brent Bovenzi <[email protected]>
AuthorDate: Thu Mar 7 17:09:28 2024 -0500
Add TaskFail entries to Gantt chart (#37918)
* Add TaskFail entries to Gantt chart
* Fix some autorefresh
---
airflow/www/static/js/api/index.ts | 2 +
airflow/www/static/js/api/useTaskFails.ts | 67 ++++++++++++++++
airflow/www/static/js/dag/details/gantt/Row.tsx | 41 ++++++++--
.../www/static/js/dag/details/gantt/TaskFail.tsx | 91 ++++++++++++++++++++++
.../static/js/dag/details/task/TaskDuration.tsx | 6 +-
airflow/www/templates/airflow/dag.html | 1 +
airflow/www/views.py | 25 ++++++
7 files changed, 226 insertions(+), 7 deletions(-)
diff --git a/airflow/www/static/js/api/index.ts
b/airflow/www/static/js/api/index.ts
index b04da5259a..4f883b4825 100644
--- a/airflow/www/static/js/api/index.ts
+++ b/airflow/www/static/js/api/index.ts
@@ -52,6 +52,7 @@ import useHistoricalMetricsData from
"./useHistoricalMetricsData";
import { useTaskXcomEntry, useTaskXcomCollection } from "./useTaskXcom";
import useEventLogs from "./useEventLogs";
import useCalendarData from "./useCalendarData";
+import useTaskFails from "./useTaskFails";
axios.interceptors.request.use((config) => {
config.paramsSerializer = {
@@ -100,4 +101,5 @@ export {
useTaskXcomCollection,
useEventLogs,
useCalendarData,
+ useTaskFails,
};
diff --git a/airflow/www/static/js/api/useTaskFails.ts
b/airflow/www/static/js/api/useTaskFails.ts
new file mode 100644
index 0000000000..f256db8f1f
--- /dev/null
+++ b/airflow/www/static/js/api/useTaskFails.ts
@@ -0,0 +1,67 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { useQuery } from "react-query";
+import axios, { AxiosResponse } from "axios";
+
+import { getMetaValue } from "src/utils";
+import { useAutoRefresh } from "src/context/autorefresh";
+
+const DAG_ID_PARAM = "dag_id";
+const RUN_ID_PARAM = "run_id";
+const TASK_ID_PARAM = "task_id";
+
+const dagId = getMetaValue(DAG_ID_PARAM);
+const taskFailsUrl = getMetaValue("task_fails_url");
+
+export interface TaskFail {
+ runId: string;
+ taskId: string;
+ mapIndex?: number;
+ startDate?: string;
+ endDate?: string;
+}
+
+interface Props {
+ runId?: string;
+ taskId?: string;
+ enabled?: boolean;
+}
+
+const useTaskFails = ({ runId, taskId, enabled = true }: Props) => {
+ const { isRefreshOn } = useAutoRefresh();
+
+ return useQuery(
+ ["taskFails", runId, taskId],
+ async () => {
+ const params = {
+ [DAG_ID_PARAM]: dagId,
+ [RUN_ID_PARAM]: runId,
+ [TASK_ID_PARAM]: taskId,
+ };
+ return axios.get<AxiosResponse, TaskFail[]>(taskFailsUrl, { params });
+ },
+ {
+ enabled,
+ refetchInterval: isRefreshOn && (autoRefreshInterval || 1) * 1000,
+ }
+ );
+};
+
+export default useTaskFails;
diff --git a/airflow/www/static/js/dag/details/gantt/Row.tsx
b/airflow/www/static/js/dag/details/gantt/Row.tsx
index 1526e1713f..fbee17e139 100644
--- a/airflow/www/static/js/dag/details/gantt/Row.tsx
+++ b/airflow/www/static/js/dag/details/gantt/Row.tsx
@@ -19,13 +19,17 @@
import React from "react";
import { Box, Tooltip, Flex } from "@chakra-ui/react";
+
import useSelection from "src/dag/useSelection";
import { getDuration } from "src/datetime_utils";
-import { SimpleStatus } from "src/dag/StatusBox";
+import { SimpleStatus, boxSize } from "src/dag/StatusBox";
import { useContainerRef } from "src/context/containerRef";
import { hoverDelay } from "src/utils";
import type { Task } from "src/types";
+import { useTaskFails } from "src/api";
+
import GanttTooltip from "./GanttTooltip";
+import TaskFail from "./TaskFail";
interface Props {
ganttWidth?: number;
@@ -59,6 +63,12 @@ const Row = ({
: true);
const isOpen = openGroupIds.includes(task.id || "");
+ const { data: taskFails } = useTaskFails({
+ taskId: task.id || undefined,
+ runId: runId || undefined,
+ enabled: !!(instance?.tryNumber && instance?.tryNumber > 1) && !!task.id,
// Only try to look up task fails if it even has a try number > 1
+ });
+
// Calculate durations in ms
const taskDuration = getDuration(instance?.startDate, instance?.endDate);
const queuedDuration = hasValidQueuedDttm
@@ -84,12 +94,14 @@ const Row = ({
return (
<div>
<Box
- py="4px"
borderBottomWidth={1}
borderBottomColor={!!task.children && isOpen ? "gray.400" : "gray.200"}
bg={isSelected ? "blue.100" : "inherit"}
+ position="relative"
+ width={ganttWidth}
+ height={`${boxSize + 9}px`}
>
- {instance ? (
+ {instance && (
<Tooltip
label={<GanttTooltip task={task} instance={instance} />}
hasArrow
@@ -99,9 +111,11 @@ const Row = ({
>
<Flex
width={`${width + queuedWidth}px`}
+ position="absolute"
cursor="pointer"
pointerEvents="auto"
- marginLeft={`${offsetMargin}px`}
+ top="4px"
+ left={`${offsetMargin}px`}
onClick={() => {
onSelect({
runId: instance.runId,
@@ -129,9 +143,24 @@ const Row = ({
/>
</Flex>
</Tooltip>
- ) : (
- <Box height="10px" />
)}
+ {/* Only show fails before the most recent task instance */}
+ {(taskFails || [])
+ .filter(
+ (tf) =>
+ tf.startDate !== instance?.startDate &&
+ // @ts-ignore
+ moment(tf.startDate).isAfter(ganttStartDate)
+ )
+ .map((taskFail) => (
+ <TaskFail
+ key={`${taskFail.taskId}-${taskFail.startDate}`}
+ taskFail={taskFail}
+ ganttStartDate={ganttStartDate}
+ ganttWidth={ganttWidth}
+ runDuration={runDuration}
+ />
+ ))}
</Box>
{isOpen &&
!!task.children &&
diff --git a/airflow/www/static/js/dag/details/gantt/TaskFail.tsx
b/airflow/www/static/js/dag/details/gantt/TaskFail.tsx
new file mode 100644
index 0000000000..1d217d3bfb
--- /dev/null
+++ b/airflow/www/static/js/dag/details/gantt/TaskFail.tsx
@@ -0,0 +1,91 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import React from "react";
+import { Box, Tooltip, Text } from "@chakra-ui/react";
+
+import { getDuration } from "src/datetime_utils";
+import { SimpleStatus } from "src/dag/StatusBox";
+import { useContainerRef } from "src/context/containerRef";
+import { hoverDelay } from "src/utils";
+import Time from "src/components/Time";
+
+import type { TaskFail as TaskFailType } from "src/api/useTaskFails";
+
+interface Props {
+ taskFail: TaskFailType;
+ runDuration: number;
+ ganttWidth: number;
+ ganttStartDate?: string | null;
+}
+
+const TaskFail = ({
+ taskFail,
+ runDuration,
+ ganttWidth,
+ ganttStartDate,
+}: Props) => {
+ const containerRef = useContainerRef();
+
+ const duration = getDuration(taskFail?.startDate, taskFail?.endDate);
+ const percent = duration / runDuration;
+ const failWidth = ganttWidth * percent;
+
+ const startOffset = getDuration(ganttStartDate, taskFail?.startDate);
+ const offsetLeft = (startOffset / runDuration) * ganttWidth;
+
+ return (
+ <Tooltip
+ label={
+ <Box>
+ <Text mb={2}>Task Fail</Text>
+ {taskFail?.startDate && (
+ <Text>
+ Start: <Time dateTime={taskFail?.startDate} />
+ </Text>
+ )}
+ {taskFail?.endDate && (
+ <Text>
+ End: <Time dateTime={taskFail?.endDate} />
+ </Text>
+ )}
+ <Text mt={2} fontSize="sm">
+ Can only show previous Task Fails, other tries are not yet saved.
+ </Text>
+ </Box>
+ }
+ hasArrow
+ portalProps={{ containerRef }}
+ placement="top"
+ openDelay={hoverDelay}
+ top="4px"
+ >
+ <Box
+ position="absolute"
+ left={`${offsetLeft}px`}
+ cursor="pointer"
+ top="4px"
+ >
+ <SimpleStatus state="failed" width={`${failWidth}px`} />
+ </Box>
+ </Tooltip>
+ );
+};
+
+export default TaskFail;
diff --git a/airflow/www/static/js/dag/details/task/TaskDuration.tsx
b/airflow/www/static/js/dag/details/task/TaskDuration.tsx
index 4b7eed9f3d..27cd8b33ec 100644
--- a/airflow/www/static/js/dag/details/task/TaskDuration.tsx
+++ b/airflow/www/static/js/dag/details/task/TaskDuration.tsx
@@ -22,7 +22,7 @@
import React from "react";
import useSelection from "src/dag/useSelection";
-import { useGridData } from "src/api";
+import { useGridData, useTaskFails } from "src/api";
import { startCase } from "lodash";
import { getDuration, formatDateTime, defaultFormat } from
"src/datetime_utils";
import ReactECharts, { ReactEChartsProps } from "src/components/ReactECharts";
@@ -45,6 +45,10 @@ const TaskDuration = () => {
onSelect,
} = useSelection();
+ const { data: taskFails } = useTaskFails({ taskId: taskId || undefined });
+
+ console.log(taskFails);
+
const {
data: { dagRuns, groups, ordering },
} = useGridData();
diff --git a/airflow/www/templates/airflow/dag.html
b/airflow/www/templates/airflow/dag.html
index c69251e71c..6f5a187f61 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -57,6 +57,7 @@
<meta name="grid_data_url" content="{{ url_for('Airflow.grid_data') }}">
<meta name="graph_data_url" content="{{ url_for('Airflow.graph_data') }}">
<meta name="calendar_data_url" content="{{ url_for('Airflow.calendar_data')
}}">
+ <meta name="task_fails_url" content="{{ url_for('Airflow.task_fails') }}">
<meta name="next_run_datasets_url" content="{{
url_for('Airflow.next_run_datasets', dag_id=dag.dag_id) }}">
<meta name="grid_url" content="{{ url_for('Airflow.grid', dag_id=dag.dag_id)
}}">
<meta name="datasets_url" content="{{ url_for('Airflow.datasets') }}">
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 5cffd28aaa..743947cdd3 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3505,6 +3505,31 @@ class Airflow(AirflowBaseView):
{"Content-Type": "application/json; charset=utf-8"},
)
+ @expose("/object/task_fails")
+ @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
+ @provide_session
+ def task_fails(self, session):
+ """Return task fails."""
+ dag_id = request.args.get("dag_id")
+ task_id = request.args.get("task_id")
+ run_id = request.args.get("run_id")
+
+ query = select(
+ TaskFail.task_id, TaskFail.run_id, TaskFail.map_index,
TaskFail.start_date, TaskFail.end_date
+ ).where(TaskFail.dag_id == dag_id)
+
+ if run_id:
+ query = query.where(TaskFail.run_id == run_id)
+ if task_id:
+ query = query.where(TaskFail.task_id == task_id)
+
+ task_fails = [dict(tf) for tf in session.execute(query).all()]
+
+ return (
+ htmlsafe_json_dumps(task_fails, separators=(",", ":"),
dumps=flask.json.dumps),
+ {"Content-Type": "application/json; charset=utf-8"},
+ )
+
@expose("/object/task_instances")
@auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
def task_instances(self):