This is an automated email from the ASF dual-hosted git repository.
phanikumv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 354e633ddd Add calendar view to react (#37909)
354e633ddd is described below
commit 354e633ddd7da5fb8310ef6bd1c5b9e5e8763848
Author: Brent Bovenzi <[email protected]>
AuthorDate: Wed Mar 6 21:09:06 2024 -0500
Add calendar view to react (#37909)
---
airflow/www/static/js/api/index.ts | 2 +
airflow/www/static/js/api/useCalendarData.ts | 48 +++++
airflow/www/static/js/dag/details/dag/Calendar.tsx | 195 +++++++++++++++++++++
airflow/www/static/js/dag/details/index.tsx | 17 ++
airflow/www/templates/airflow/dag.html | 1 +
airflow/www/views.py | 98 +++++++++++
6 files changed, 361 insertions(+)
diff --git a/airflow/www/static/js/api/index.ts
b/airflow/www/static/js/api/index.ts
index 4ab90292a4..b04da5259a 100644
--- a/airflow/www/static/js/api/index.ts
+++ b/airflow/www/static/js/api/index.ts
@@ -51,6 +51,7 @@ import useDagRuns from "./useDagRuns";
import useHistoricalMetricsData from "./useHistoricalMetricsData";
import { useTaskXcomEntry, useTaskXcomCollection } from "./useTaskXcom";
import useEventLogs from "./useEventLogs";
+import useCalendarData from "./useCalendarData";
axios.interceptors.request.use((config) => {
config.paramsSerializer = {
@@ -98,4 +99,5 @@ export {
useTaskXcomEntry,
useTaskXcomCollection,
useEventLogs,
+ useCalendarData,
};
diff --git a/airflow/www/static/js/api/useCalendarData.ts
b/airflow/www/static/js/api/useCalendarData.ts
new file mode 100644
index 0000000000..e2a7171e8a
--- /dev/null
+++ b/airflow/www/static/js/api/useCalendarData.ts
@@ -0,0 +1,48 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { useQuery } from "react-query";
+import axios, { AxiosResponse } from "axios";
+
+import { getMetaValue } from "src/utils";
+
+const DAG_ID_PARAM = "dag_id";
+
+const dagId = getMetaValue(DAG_ID_PARAM);
+const calendarDataUrl = getMetaValue("calendar_data_url");
+
+interface DagState {
+ count: number;
+ date: string;
+ state: string;
+}
+
+interface CalendarData {
+ dagStates: DagState[];
+}
+
+const useCalendarData = () =>
+ useQuery(["calendarData"], async () => {
+ const params = {
+ [DAG_ID_PARAM]: dagId,
+ };
+ return axios.get<AxiosResponse, CalendarData>(calendarDataUrl, { params });
+ });
+
+export default useCalendarData;
diff --git a/airflow/www/static/js/dag/details/dag/Calendar.tsx
b/airflow/www/static/js/dag/details/dag/Calendar.tsx
new file mode 100644
index 0000000000..105c202995
--- /dev/null
+++ b/airflow/www/static/js/dag/details/dag/Calendar.tsx
@@ -0,0 +1,195 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* global moment */
+
+import React from "react";
+import type { EChartsOption } from "echarts";
+import { Spinner } from "@chakra-ui/react";
+
+import ReactECharts from "src/components/ReactECharts";
+import { useCalendarData } from "src/api";
+import useFilters from "src/dag/useFilters";
+
+const Calendar = () => {
+ const { onBaseDateChange } = useFilters();
+ const { data: calendarData, isLoading } = useCalendarData();
+
+ if (isLoading) return <Spinner />;
+ if (!calendarData) return null;
+
+ const { dagStates } = calendarData;
+
+ const startDate = dagStates[0].date;
+ const endDate = dagStates[dagStates.length - 1].date;
+ // @ts-ignore
+ const startYear = moment(startDate).year();
+ // @ts-ignore
+ const endYear = moment(endDate).year();
+
+ const calendarOption: EChartsOption["calendar"] = [];
+ const seriesOption: EChartsOption["series"] = [];
+
+ const flatDates: Record<string, any> = {};
+ const plannedDates: Record<string, any> = {};
+ dagStates.forEach((ds) => {
+ if (ds.state !== "planned") {
+ flatDates[ds.date] = {
+ ...flatDates[ds.date],
+ [ds.state]: ds.count,
+ };
+ } else {
+ plannedDates[ds.date] = {
+ [ds.state]: ds.count,
+ };
+ }
+ });
+
+ const proportions = Object.keys(flatDates).map((key) => {
+ const date = key;
+ const states = flatDates[key];
+ const total =
+ (states.failed || 0) + (states.success || 0) + (states.running || 0);
+ const percent = ((states.success || 0) + (states.running || 0)) / total;
+ return [date, Math.round(percent * 100)];
+ });
+
+ // We need to split the data into multiple years of calendars
+ if (startYear !== endYear) {
+ for (let y = startYear; y <= endYear; y += 1) {
+ const index = y - startYear;
+ const yearStartDate = y === startYear ? startDate : `${y}-01-01`;
+ const yearEndDate = `${y}-12-31`;
+ calendarOption.push({
+ left: 100,
+ top: index * 150 + 20,
+ range: [yearStartDate, yearEndDate],
+ cellSize: 15,
+ });
+ seriesOption.push({
+ calendarIndex: index,
+ type: "heatmap",
+ coordinateSystem: "calendar",
+ data: proportions.filter(
+ (p) => typeof p[0] === "string" && p[0].startsWith(y.toString())
+ ),
+ });
+ seriesOption.push({
+ calendarIndex: index,
+ type: "scatter",
+ coordinateSystem: "calendar",
+ symbolSize: 4,
+ data: dagStates
+ .filter(
+ (ds) => ds.date.startsWith(y.toString()) && ds.state === "planned"
+ )
+ .map((ds) => [ds.date, ds.count]),
+ });
+ }
+ } else {
+ calendarOption.push({
+ top: 20,
+ left: 100,
+ range: [startDate, `${endYear}-12-31`],
+ cellSize: 15,
+ });
+ seriesOption.push({
+ type: "heatmap",
+ coordinateSystem: "calendar",
+ data: proportions,
+ });
+ seriesOption.push({
+ type: "scatter",
+ coordinateSystem: "calendar",
+ symbolSize: () => 4,
+ data: dagStates
+ .filter((ds) => ds.state === "planned")
+ .map((ds) => [ds.date, ds.count]),
+ });
+ }
+
+ const scatterIndexes: number[] = [];
+ const heatmapIndexes: number[] = [];
+
+ seriesOption.forEach((s, i) => {
+ if (s.type === "heatmap") heatmapIndexes.push(i);
+ else if (s.type === "scatter") scatterIndexes.push(i);
+ });
+
+ const option: EChartsOption = {
+ tooltip: {
+ formatter: (p: any) => {
+ const date = p.data[0];
+ const states = flatDates[date];
+ const plannedCount =
+ p.componentSubType === "scatter"
+ ? p.data[1]
+ : plannedDates[date]?.planned || 0;
+ // @ts-ignore
+ const formattedDate = moment(date).format("ddd YYYY-MM-DD");
+
+ return `
+ <strong>${formattedDate}</strong> <br>
+ ${plannedCount ? `Planned ${plannedCount} <br>` : ""}
+ ${states?.failed ? `Failed ${states.failed} <br>` : ""}
+ ${states?.running ? `Running ${states.running} <br>` : ""}
+ ${states?.success ? `Success ${states.success} <br>` : ""}
+ `;
+ },
+ },
+ visualMap: [
+ {
+ min: 0,
+ max: 100,
+ text: ["% Success", "Failed"],
+ calculable: true,
+ orient: "vertical",
+ left: "0",
+ top: "0",
+ seriesIndex: heatmapIndexes,
+ inRange: {
+ color: [
+ stateColors.failed,
+ stateColors.up_for_retry,
+ stateColors.success,
+ ],
+ },
+ },
+ {
+ seriesIndex: scatterIndexes,
+ inRange: {
+ color: "gray",
+ opacity: 0.6,
+ },
+ },
+ ],
+ calendar: calendarOption,
+ series: seriesOption,
+ };
+
+ const events = {
+ click(p: any) {
+ onBaseDateChange(p.data[0]);
+ },
+ };
+
+ return <ReactECharts option={option} events={events} />;
+};
+
+export default Calendar;
diff --git a/airflow/www/static/js/dag/details/index.tsx
b/airflow/www/static/js/dag/details/index.tsx
index 0c3315e4dc..6ef60e0120 100644
--- a/airflow/www/static/js/dag/details/index.tsx
+++ b/airflow/www/static/js/dag/details/index.tsx
@@ -43,6 +43,7 @@ import {
MdSyncAlt,
MdHourglassBottom,
MdPlagiarism,
+ MdEvent,
} from "react-icons/md";
import { BiBracket } from "react-icons/bi";
import URLSearchParamsWrapper from "src/utils/URLSearchParamWrapper";
@@ -66,6 +67,7 @@ import XcomCollection from "./taskInstance/Xcom";
import TaskDetails from "./task";
import AuditLog from "./AuditLog";
import RunDuration from "./dag/RunDuration";
+import Calendar from "./dag/Calendar";
const dagId = getMetaValue("dag_id")!;
@@ -92,6 +94,7 @@ const tabToIndex = (tab?: string) => {
case "run_duration":
return 5;
case "xcom":
+ case "calendar":
return 6;
case "details":
default:
@@ -129,6 +132,7 @@ const indexToTab = (
if (!runId && !taskId) return "run_duration";
return undefined;
case 6:
+ if (!runId && !taskId) return "calendar";
if (isTaskInstance) return "xcom";
return undefined;
default:
@@ -323,6 +327,14 @@ const Details = ({
</Text>
</Tab>
)}
+ {isDag && (
+ <Tab>
+ <MdEvent size={16} />
+ <Text as="strong" ml={1}>
+ Calendar
+ </Text>
+ </Tab>
+ )}
{isTaskInstance && (
<Tab>
<MdReorder size={16} />
@@ -418,6 +430,11 @@ const Details = ({
<RunDuration />
</TabPanel>
)}
+ {isDag && (
+ <TabPanel height="100%" width="100%">
+ <Calendar />
+ </TabPanel>
+ )}
{isTaskInstance && run && (
<TabPanel
pt={mapIndex !== undefined ? "0px" : undefined}
diff --git a/airflow/www/templates/airflow/dag.html
b/airflow/www/templates/airflow/dag.html
index 939809ce81..c69251e71c 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -56,6 +56,7 @@
<meta name="confirm_url" content="{{ url_for('Airflow.confirm') }}">
<meta name="grid_data_url" content="{{ url_for('Airflow.grid_data') }}">
<meta name="graph_data_url" content="{{ url_for('Airflow.graph_data') }}">
+ <meta name="calendar_data_url" content="{{ url_for('Airflow.calendar_data')
}}">
<meta name="next_run_datasets_url" content="{{
url_for('Airflow.next_run_datasets', dag_id=dag.dag_id) }}">
<meta name="grid_url" content="{{ url_for('Airflow.grid', dag_id=dag.dag_id)
}}">
<meta name="datasets_url" content="{{ url_for('Airflow.datasets') }}">
diff --git a/airflow/www/views.py b/airflow/www/views.py
index f94e9624d3..5cffd28aaa 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -116,6 +116,7 @@ from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
from airflow.timetables._cron import CronMixin
from airflow.timetables.base import DataInterval, TimeRestriction
+from airflow.timetables.simple import ContinuousTimetable
from airflow.utils import json as utils_json, timezone, yaml
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.dag_edges import dag_edges
@@ -2940,6 +2941,103 @@ class Airflow(AirflowBaseView):
dag_model=dag_model,
)
+ @expose("/object/calendar_data")
+ @auth.has_access_dag("GET", DagAccessEntity.RUN)
+ @gzipped
+ @provide_session
+ def calendar_data(self, session: Session = NEW_SESSION):
+ """Get DAG runs as calendar."""
+ dag_id = request.args.get("dag_id")
+ dag = get_airflow_app().dag_bag.get_dag(dag_id, session=session)
+ if not dag:
+ return {"error": f"can't find dag {dag_id}"}, 404
+
+ dag_states = session.execute(
+ select(
+ func.date(DagRun.execution_date).label("date"),
+ DagRun.state,
+
func.max(DagRun.data_interval_start).label("data_interval_start"),
+ func.max(DagRun.data_interval_end).label("data_interval_end"),
+ func.count("*").label("count"),
+ )
+ .where(DagRun.dag_id == dag.dag_id)
+ .group_by(func.date(DagRun.execution_date), DagRun.state)
+ .order_by(func.date(DagRun.execution_date).asc())
+ ).all()
+
+ data_dag_states = [
+ {
+ # DATE() in SQLite and MySQL behave differently:
+ # SQLite returns a string, MySQL returns a date.
+ "date": dr.date if isinstance(dr.date, str) else
dr.date.isoformat(),
+ "state": dr.state,
+ "count": dr.count,
+ }
+ for dr in dag_states
+ ]
+
+ # Interpret the schedule and show planned dag runs in calendar
+ if (
+ dag_states
+ and dag_states[-1].data_interval_start
+ and dag_states[-1].data_interval_end
+ and not isinstance(dag.timetable, ContinuousTimetable)
+ ):
+ last_automated_data_interval = DataInterval(
+ timezone.coerce_datetime(dag_states[-1].data_interval_start),
+ timezone.coerce_datetime(dag_states[-1].data_interval_end),
+ )
+
+ year = last_automated_data_interval.end.year
+ restriction = TimeRestriction(dag.start_date, dag.end_date, False)
+ dates: dict[datetime.date, int] = collections.Counter()
+
+ if isinstance(dag.timetable, CronMixin):
+ # Optimized calendar generation for timetables based on a cron
expression.
+ dates_iter: Iterator[datetime.datetime | None] = croniter(
+ dag.timetable._expression,
+ start_time=last_automated_data_interval.end,
+ ret_type=datetime.datetime,
+ )
+ for dt in dates_iter:
+ if dt is None:
+ break
+ if dt.year != year:
+ break
+ if dag.end_date and dt > dag.end_date:
+ break
+ dates[dt.date()] += 1
+ else:
+ prev_logical_date = DateTime.min
+ while True:
+ curr_info = dag.timetable.next_dagrun_info(
+
last_automated_data_interval=last_automated_data_interval,
+ restriction=restriction,
+ )
+ if curr_info is None:
+ break # Reached the end.
+ if curr_info.logical_date <= prev_logical_date:
+ break # We're not progressing. Maybe a malformed
timetable? Give up.
+ if curr_info.logical_date.year != year:
+ break # Crossed the year boundary.
+ last_automated_data_interval = curr_info.data_interval
+ dates[curr_info.logical_date.date()] += 1
+ prev_logical_date = curr_info.logical_date
+
+ data_dag_states.extend(
+ {"date": date.isoformat(), "state": "planned", "count": count}
+ for (date, count) in dates.items()
+ )
+
+ data = {
+ "dag_states": data_dag_states,
+ }
+
+ return (
+ htmlsafe_json_dumps(data, separators=(",", ":"),
dumps=flask.json.dumps),
+ {"Content-Type": "application/json; charset=utf-8"},
+ )
+
@expose("/graph")
def legacy_graph(self):
"""Redirect from url param."""