This is an automated email from the ASF dual-hosted git repository.
ryanahamilton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 76105c1 Auto refresh on Tree View (#15474)
76105c1 is described below
commit 76105c12fdc1b7c1aae09d678c4cbe1816f98328
Author: Brent Bovenzi <[email protected]>
AuthorDate: Fri Apr 23 11:06:29 2021 -0500
Auto refresh on Tree View (#15474)
* initial working example
* keep transitions when navigating branches
* replace missing transitions
* add shared get_tree_data function
* Update airflow/www/views.py
Co-authored-by: Ash Berlin-Taylor <[email protected]>
* Update airflow/www/views.py
Co-authored-by: Ash Berlin-Taylor <[email protected]>
* move dagId to dag.html
* make, class method, typing and linting
* resolve Ryan's comments
* keep space for loading dots
* clean up locals
* check that dataInstance and row are valid
* Update airflow/www/views.py
Co-authored-by: Ash Berlin-Taylor <[email protected]>
Co-authored-by: Ash Berlin-Taylor <[email protected]>
---
airflow/www/static/css/graph.css | 17 ---
airflow/www/static/css/main.css | 19 +++
airflow/www/static/js/graph.js | 1 +
airflow/www/static/js/tree.js | 221 ++++++++++++++++++++----------
airflow/www/templates/airflow/dag.html | 5 +
airflow/www/templates/airflow/graph.html | 1 -
airflow/www/templates/airflow/ti_log.html | 1 -
airflow/www/templates/airflow/tree.html | 16 +++
airflow/www/views.py | 155 ++++++++++++++-------
9 files changed, 294 insertions(+), 142 deletions(-)
diff --git a/airflow/www/static/css/graph.css b/airflow/www/static/css/graph.css
index 8bd3510..6259d45 100644
--- a/airflow/www/static/css/graph.css
+++ b/airflow/www/static/css/graph.css
@@ -165,20 +165,3 @@ g.node.removed rect {
background-color: #f0f0f0;
cursor: move;
}
-
-.refresh-actions {
- float: right;
- display: inline-flex;
- align-items: center;
- top: 10px;
- right: 10px;
- position: relative;
-}
-
-.refresh-actions > .switch-label {
- margin: 0 10px 0 20px;
-}
-
-.loading-dots.refresh-loading {
- display: none;
-}
diff --git a/airflow/www/static/css/main.css b/airflow/www/static/css/main.css
index e946d2a..d202788 100644
--- a/airflow/www/static/css/main.css
+++ b/airflow/www/static/css/main.css
@@ -445,3 +445,22 @@ label[for="timezone-other"],
margin-top: 8px;
overflow-x: auto;
}
+
+.refresh-actions {
+ justify-content: flex-end;
+ min-width: 225px;
+ float: right;
+ display: inline-flex;
+ align-items: center;
+ right: 10px;
+ margin-bottom: 15px;
+ position: relative;
+}
+
+.refresh-actions > .switch-label {
+ margin: 0 10px 0 20px;
+}
+
+.loading-dots.refresh-loading {
+ display: none;
+}
diff --git a/airflow/www/static/js/graph.js b/airflow/www/static/js/graph.js
index eda6c19..88feb20 100644
--- a/airflow/www/static/js/graph.js
+++ b/airflow/www/static/js/graph.js
@@ -27,6 +27,7 @@ import getMetaValue from './meta_value';
import { escapeHtml } from './main';
import tiTooltip, { taskNoInstanceTooltip } from './task_instances';
+// dagId comes from dag.html
const dagId = getMetaValue('dag_id');
const executionDate = getMetaValue('execution_date');
const arrange = getMetaValue('arrange');
diff --git a/airflow/www/static/js/tree.js b/airflow/www/static/js/tree.js
index d2578fd..95c3401 100644
--- a/airflow/www/static/js/tree.js
+++ b/airflow/www/static/js/tree.js
@@ -19,11 +19,14 @@
* under the License.
*/
-/* global treeData, document, window, $, d3, moment, call_modal_dag,
call_modal, */
+/* global treeData, document, window, $, d3, moment, call_modal_dag,
call_modal, localStorage */
import { escapeHtml } from './main';
import tiTooltip from './task_instances';
import getMetaValue from './meta_value';
+// dagId comes from dag.html
+const dagId = getMetaValue('dag_id');
+
function toDateString(ts) {
const dt = new Date(ts * 1000);
return dt.toISOString();
@@ -48,10 +51,11 @@ document.addEventListener('DOMContentLoaded', () => {
$('span.status_square').tooltip({ html: true });
// JSON.parse is faster for large payloads than an object literal
- const data = JSON.parse(treeData);
+ let data = JSON.parse(treeData);
const tree = d3.layout.tree().nodeSize([0, 25]);
- const nodes = tree.nodes(data);
+ let nodes = tree.nodes(data);
const nodeobj = {};
+ const getActiveRuns = () => data.instances.filter((run) => run.state ===
'running').length > 0;
const now = Date.now() / 1000;
const devicePixelRatio = window.devicePixelRatio || 1;
@@ -74,7 +78,6 @@ document.addEventListener('DOMContentLoaded', () => {
const barWidth = width * 0.9;
let i = 0;
- const duration = 400;
let root;
function populateTaskInstanceProperties(node) {
@@ -84,60 +87,61 @@ document.addEventListener('DOMContentLoaded', () => {
const dataInstance = data.instances[j];
const row = node.instances[j];
- if (row === null) {
- node.instances[j] = {
- task_id: node.name,
- execution_date: dataInstance.execution_date,
- };
- continue;
- }
-
- const taskInstance = {
- state: row[0],
- try_number: row[1],
- start_ts: row[2],
- duration: row[3],
- };
- node.instances[j] = taskInstance;
-
- taskInstance.task_id = node.name;
- taskInstance.operator = node.operator;
- taskInstance.execution_date = dataInstance.execution_date;
- taskInstance.external_trigger = dataInstance.external_trigger;
-
- // compute start_date and end_date if applicable
- if (taskInstance.start_ts !== null) {
- taskInstance.start_date = toDateString(taskInstance.start_ts);
- if (taskInstance.state === 'running') {
- taskInstance.duration = now - taskInstance.start_ts;
- } else if (taskInstance.duration !== null) {
- taskInstance.end_date = toDateString(taskInstance.start_ts +
taskInstance.duration);
+ // check that the dataInstance and the row are valid
+ if (dataInstance && dataInstance.execution_date) {
+ if (row && row.length) {
+ const taskInstance = {
+ state: row[0],
+ try_number: row[1],
+ start_ts: row[2],
+ duration: row[3],
+ };
+ node.instances[j] = taskInstance;
+
+ taskInstance.task_id = node.name;
+ taskInstance.operator = node.operator;
+ taskInstance.execution_date = dataInstance.execution_date;
+ taskInstance.external_trigger = dataInstance.external_trigger;
+
+ // compute start_date and end_date if applicable
+ if (taskInstance.start_ts !== null) {
+ taskInstance.start_date = toDateString(taskInstance.start_ts);
+ if (taskInstance.state === 'running') {
+ taskInstance.duration = now - taskInstance.start_ts;
+ } else if (taskInstance.duration !== null) {
+ taskInstance.end_date = toDateString(taskInstance.start_ts +
taskInstance.duration);
+ }
+ }
+ } else {
+ node.instances[j] = {
+ task_id: node.name,
+ execution_date: dataInstance.execution_date,
+ };
}
}
}
}
- for (i = 0; i < nodes.length; i += 1) {
- const node = nodes[i];
+ const renderNode = (node) => {
nodeobj[node.name] = node;
- if (node.name === '[DAG]') {
+ if (node.name !== '[DAG]') {
// skip synthetic root node since it's doesn't contain actual task
instances
- continue;
- }
+ if (node.start_ts !== undefined) {
+ node.start_date = toDateString(node.start_ts);
+ }
+ if (node.end_ts !== undefined) {
+ node.end_date = toDateString(node.end_ts);
+ }
+ if (node.depends_on_past === undefined) {
+ node.depends_on_past = false;
+ }
- if (node.start_ts !== undefined) {
- node.start_date = toDateString(node.start_ts);
- }
- if (node.end_ts !== undefined) {
- node.end_date = toDateString(node.end_ts);
- }
- if (node.depends_on_past === undefined) {
- node.depends_on_past = false;
+ populateTaskInstanceProperties(node);
}
+ };
- populateTaskInstanceProperties(node);
- }
+ nodes.forEach((node) => renderNode(node));
const diagonal = d3.svg.diagonal()
.projection((d) => [d.y, d.x]);
@@ -147,7 +151,6 @@ document.addEventListener('DOMContentLoaded', () => {
.html((toolTipHtml) => toolTipHtml);
const svg = d3.select('#tree-svg')
- // .attr("width", width + margin.left + margin.right)
.append('g')
.attr('class', 'level')
.attr('transform', `translate(${margin.left},${margin.top})`);
@@ -189,31 +192,33 @@ document.addEventListener('DOMContentLoaded', () => {
.style('text-anchor', 'start')
.call(taskTip);
- function update(source) {
- // Compute the flattened node list. TODO use d3.layout.hierarchy.
- const nodes = tree.nodes(root);
+ function update(source, showTransition = true) {
+ // Compute the flattened node list. TODO use d3.layout.hierarchy.
+ const updateNodes = tree.nodes(root);
+ const duration = showTransition ? 400 : 0;
- const height = Math.max(500, nodes.length * barHeight + margin.top +
margin.bottom);
- const width = squareX
+ const height = Math.max(500, updateNodes.length * barHeight + margin.top +
margin.bottom);
+ const updateWidth = squareX
+ (numSquare * (squareSize + squareSpacing))
+ margin.left + margin.right + 50;
- d3.select('#tree-svg').transition()
+ d3.select('#tree-svg')
+ .transition()
.duration(duration)
.attr('height', height)
- .attr('width', width);
+ .attr('width', updateWidth);
d3.select(self.frameElement).transition()
.duration(duration)
.style('height', `${height}px`);
// Compute the "layout".
- nodes.forEach((n, i) => {
- n.x = i * barHeight;
+ updateNodes.forEach((n, j) => {
+ n.x = j * barHeight;
});
// Update the nodes…
const node = svg.selectAll('g.node')
- .data(nodes, (d) => d.id || (d.id = ++i));
+ .data(updateNodes, (d) => d.id || (d.id = ++i));
const nodeEnter = node.enter().append('g')
.attr('class', nodeClass)
@@ -240,12 +245,12 @@ document.addEventListener('DOMContentLoaded', () => {
}
taskTip.direction('e');
taskTip.show(tt, this);
- d3.select(this).transition()
+ d3.select(this).transition().duration(duration)
.style('stroke-width', 3);
})
.on('mouseout', function (d) {
taskTip.hide(d);
- d3.select(this).transition()
+ d3.select(this).transition().duration(duration)
.style('stroke-width', (dd) => (isDagRun(dd) ? '2' : '1'));
})
.attr('height', barHeight)
@@ -288,7 +293,6 @@ document.addEventListener('DOMContentLoaded', () => {
);
}
})
- .attr('class', (d) => `state ${d.state}`)
.attr('data-toggle', 'tooltip')
.attr('rx', (d) => (isDagRun(d) ? '5' : '1'))
.attr('ry', (d) => (isDagRun(d) ? '5' : '1'))
@@ -300,12 +304,12 @@ document.addEventListener('DOMContentLoaded', () => {
const tt = tiTooltip({ ...d, duration: d.duration ||
moment(d.end_date).diff(d.start_date, 'seconds') });
taskTip.direction('n');
taskTip.show(tt, this);
- d3.select(this).transition()
+ d3.select(this).transition().duration(duration)
.style('stroke-width', 3);
})
.on('mouseout', function (d) {
taskTip.hide(d);
- d3.select(this).transition()
+ d3.select(this).transition().duration(duration)
.style('stroke-width', (dd) => (isDagRun(dd) ? '2' : '1'));
})
.attr('x', (d, j) => (j * (squareSize + squareSpacing)))
@@ -313,20 +317,27 @@ document.addEventListener('DOMContentLoaded', () => {
.attr('width', 10)
.attr('height', 10);
+ node.selectAll('rect')
+ .data((d) => d.instances)
+ .attr('class', (d) => `state ${d.state}`);
+
// Transition nodes to their new position.
- nodeEnter.transition()
+ nodeEnter
+ .transition()
.duration(duration)
.attr('transform', (d) => `translate(${d.y},${d.x})`)
.style('opacity', 1);
- node.transition()
+ node
+ .transition()
.duration(duration)
.attr('class', nodeClass)
.attr('transform', (d) => `translate(${d.y},${d.x})`)
.style('opacity', 1);
// Transition exiting nodes to the parent's new position.
- node.exit().transition()
+ node.exit()
+ .transition()
.duration(duration)
.attr('transform', () => `translate(${source.y},${source.x})`)
.style('opacity', 1e-6)
@@ -334,7 +345,7 @@ document.addEventListener('DOMContentLoaded', () => {
// Update the links…
const link = svg.selectAll('path.link')
- .data(tree.links(nodes), (d) => d.target.id);
+ .data(tree.links(updateNodes), (d) => d.target.id);
// Enter any new links at the parent's previous position.
link.enter().insert('path', 'g')
@@ -348,12 +359,14 @@ document.addEventListener('DOMContentLoaded', () => {
.attr('d', diagonal);
// Transition links to their new position.
- link.transition()
+ link
+ .transition()
.duration(duration)
.attr('d', diagonal);
// Transition exiting nodes to the parent's new position.
- link.exit().transition()
+ link.exit()
+ .transition()
.duration(duration)
.attr('d', () => {
const o = { x: source.x, y: source.y };
@@ -362,7 +375,7 @@ document.addEventListener('DOMContentLoaded', () => {
.remove();
// Stash the old positions for transition.
- nodes.forEach((d) => {
+ updateNodes.forEach((d) => {
d.x0 = d.x;
d.y0 = d.y;
});
@@ -370,7 +383,7 @@ document.addEventListener('DOMContentLoaded', () => {
$('#loading').remove();
}
- update(root = data);
+ update(root = data, false);
function toggles(clicked) {
// Collapse nodes with the same task id
@@ -392,4 +405,70 @@ document.addEventListener('DOMContentLoaded', () => {
}
update(clicked);
}
+
+ function handleRefresh() {
+ $('#loading-dots').css('display', 'inline-block');
+ $.get(`/object/tree_data?dag_id=${dagId}`)
+ .done(
+ (runs) => {
+ const newData = {
+ ...data,
+ ...JSON.parse(runs),
+ };
+ // only rerender the graph if the instances have changed
+ if (JSON.stringify(data.instances) !==
JSON.stringify(newData.instances)) {
+ nodes = tree.nodes(newData);
+ nodes.forEach((node) => renderNode(node));
+ update(root = newData, false);
+ data = newData;
+ }
+ setTimeout(() => { $('#loading-dots').hide(); }, 500);
+ $('#error').hide();
+ },
+ ).fail((_, textStatus, err) => {
+ $('#error_msg').text(`${textStatus}: ${err}`);
+ $('#error').show();
+ setTimeout(() => { $('#loading-dots').hide(); }, 500);
+ });
+ }
+
+ let refreshInterval;
+
+ function startOrStopRefresh() {
+ if ($('#auto_refresh').is(':checked')) {
+ refreshInterval = setInterval(() => {
+ // only do a refresh if there are any active dag runs
+ if (getActiveRuns()) {
+ handleRefresh();
+ } else {
+ $('#auto_refresh').removeAttr('checked');
+ }
+ }, 3000); // run refresh every 3 seconds
+ } else {
+ clearInterval(refreshInterval);
+ }
+ }
+
+ $('#auto_refresh').change(() => {
+ if ($('#auto_refresh').is(':checked')) {
+ // Run an initial refesh before starting interval if manually turned on
+
+ handleRefresh();
+ localStorage.removeItem('disableAutoRefresh');
+ } else {
+ localStorage.setItem('disableAutoRefresh', 'true');
+ }
+ startOrStopRefresh();
+ });
+
+ function initRefresh() {
+ // default to auto-refresh if there are any active dag runs
+ if (getActiveRuns() && !localStorage.getItem('disableAutoRefresh')) {
+ $('#auto_refresh').attr('checked', true);
+ }
+ startOrStopRefresh();
+ d3.select('#refresh_button').on('click', () => handleRefresh());
+ }
+
+ initRefresh();
});
diff --git a/airflow/www/templates/airflow/dag.html
b/airflow/www/templates/airflow/dag.html
index 29f140f..f7b14a0 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -22,6 +22,11 @@
{% block page_title %}{{ dag.dag_id }} - {{ appbuilder.app_name }}{% endblock
%}
+{% block head_meta %}
+ {{ super() }}
+ <meta name="dag_id" content="{{ dag.dag_id }}">
+{% endblock %}
+
{% block head_css %}
{{ super() }}
<link rel="stylesheet" type="text/css" href="{{ url_for_asset('switch.css')
}}">
diff --git a/airflow/www/templates/airflow/graph.html
b/airflow/www/templates/airflow/graph.html
index 9ef79f3..1d3489c 100644
--- a/airflow/www/templates/airflow/graph.html
+++ b/airflow/www/templates/airflow/graph.html
@@ -24,7 +24,6 @@
{% block head_meta %}
{{ super() }}
- <meta name="dag_id" content="{{ dag.dag_id }}">
<meta name="execution_date" content="{{ execution_date }}">
<meta name="arrange" content="{{ arrange }}">
<meta name="task_instances_url" content="{{
url_for('Airflow.task_instances') }}">
diff --git a/airflow/www/templates/airflow/ti_log.html
b/airflow/www/templates/airflow/ti_log.html
index 7f75b7f..734c160 100644
--- a/airflow/www/templates/airflow/ti_log.html
+++ b/airflow/www/templates/airflow/ti_log.html
@@ -22,7 +22,6 @@
{% block head_meta %}
{{ super() }}
- <meta name="dag_id" content="{{ dag_id }}">
<meta name="task_id" content="{{ task_id }}">
<meta name="execution_date" content="{{ execution_date }}">
<meta name="logs_with_metadata_url" content="{{
url_for('Airflow.get_logs_with_metadata') }}">
diff --git a/airflow/www/templates/airflow/tree.html
b/airflow/www/templates/airflow/tree.html
index 5e8c9f4..acc68dd 100644
--- a/airflow/www/templates/airflow/tree.html
+++ b/airflow/www/templates/airflow/tree.html
@@ -19,6 +19,7 @@
{% extends "airflow/dag.html" %}
{% block page_title %}{{ dag.dag_id }} - Tree - {{ appbuilder.app_name }}{%
endblock %}
+{% from 'appbuilder/loading_dots.html' import loading_dots %}
{% block head_css %}
{{ super() }}
@@ -75,6 +76,21 @@
<hr>
<div id="svg_container">
<img id='loading' width="50" src="{{ url_for('static',
filename='loading.gif') }}">
+ <div id="error" style="display: none; margin-top: 10px;" class="alert
alert-danger" role="alert">
+ <span class="material-icons" aria-hidden="true">error</span>
+ <span id="error_msg">Oops.</span>
+ </div>
+ <div class="refresh-actions">
+ {{ loading_dots(id='loading-dots', classes='refresh-loading') }}
+ <label class="switch-label">
+ <input class="switch-input" id="auto_refresh" type="checkbox">
+ <span class="switch" aria-hidden="true"></span>
+ Auto-refresh
+ </label>
+ <button class="btn btn-default btn-sm" id="refresh_button">
+ <span class="material-icons" aria-hidden="true">refresh</span>
+ </button>
+ </div>
<svg id="tree-svg" class='tree' width="100%"></svg>
</div>
{% endblock %}
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 5441434..21d4a82 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -25,10 +25,10 @@ import socket
import sys
import traceback
from collections import defaultdict
-from datetime import datetime, timedelta
+from datetime import timedelta
from json import JSONDecodeError
from operator import itemgetter
-from typing import Dict, List, Optional, Tuple
+from typing import Iterable, List, Optional, Tuple
from urllib.parse import parse_qsl, unquote, urlencode, urlparse
import lazy_object_proxy
@@ -73,6 +73,7 @@ from flask_appbuilder.security.views import (
from flask_appbuilder.widgets import FormWidget
from flask_babel import lazy_gettext
from jinja2.utils import htmlsafe_json_dumps, pformat # type: ignore
+from pendulum.datetime import DateTime
from pygments import highlight, lexers
from pygments.formatters import HtmlFormatter # noqa pylint:
disable=no-name-in-module
from sqlalchemy import and_, desc, func, or_, union_all
@@ -92,7 +93,7 @@ from airflow.exceptions import AirflowException
from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.base_job import BaseJob
from airflow.jobs.scheduler_job import SchedulerJob
-from airflow.models import Connection, DagModel, DagTag, Log, SlaMiss,
TaskFail, XCom, errors
+from airflow.models import DAG, Connection, DagModel, DagTag, Log, SlaMiss,
TaskFail, XCom, errors
from airflow.models.baseoperator import BaseOperator
from airflow.models.dagcode import DagCode
from airflow.models.dagrun import DagRun, DagRunType
@@ -1911,56 +1912,15 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint:
disable=too-many-public-m
State.SUCCESS,
)
- @expose('/tree')
- @auth.has_access(
- [
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_LOG),
- ]
- )
- @gzipped # pylint: disable=too-many-locals
- @action_logging # pylint: disable=too-many-locals
- def tree(self):
- """Get Dag as tree."""
- dag_id = request.args.get('dag_id')
- dag = current_app.dag_bag.get_dag(dag_id)
- if not dag:
- flash(f'DAG "{dag_id}" seems to be missing from DagBag.', "error")
- return redirect(url_for('Airflow.index'))
-
- root = request.args.get('root')
- if root:
- dag = dag.sub_dag(task_ids_or_regex=root,
include_downstream=False, include_upstream=True)
-
- base_date = request.args.get('base_date')
- num_runs = request.args.get('num_runs', type=int)
- if num_runs is None:
- num_runs = conf.getint('webserver',
'default_dag_run_display_number')
-
- if base_date:
- base_date = timezone.parse(base_date)
- else:
- base_date = dag.get_latest_execution_date() or timezone.utcnow()
-
- with create_session() as session:
- dag_runs = (
- session.query(DagRun)
- .filter(DagRun.dag_id == dag.dag_id, DagRun.execution_date <=
base_date)
- .order_by(DagRun.execution_date.desc())
- .limit(num_runs)
- .all()
- )
- dag_runs = {dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs}
-
+ def _get_tree_data(self, dag_runs: Iterable[DagRun], dag: DAG, base_date:
DateTime):
+ """Returns formatted dag_runs for Tree View"""
dates = sorted(dag_runs.keys())
- max_date = max(dates) if dates else None
- min_date = min(dates) if dates else None
+ min_date = min(dag_runs, default=None)
- tis = dag.get_task_instances(start_date=min_date, end_date=base_date)
- task_instances: Dict[Tuple[str, datetime], models.TaskInstance] = {}
- for ti in tis:
- task_instances[(ti.task_id, ti.execution_date)] = ti
+ task_instances = {
+ (ti.task_id, ti.execution_date): ti
+ for ti in dag.get_task_instances(start_date=min_date,
end_date=base_date)
+ }
expanded = set()
# The default recursion traces every path so that tree view has full
@@ -2036,12 +1996,55 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint:
disable=too-many-public-m
node['extra_links'] = task.extra_links
return node
- data = {
+ return {
'name': '[DAG]',
'children': [recurse_nodes(t, set()) for t in dag.roots],
'instances': [dag_runs.get(d) or {'execution_date': d.isoformat()}
for d in dates],
}
+ @expose('/tree')
+ @auth.has_access(
+ [
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_LOG),
+ ]
+ )
+ @gzipped # pylint: disable=too-many-locals
+ @action_logging # pylint: disable=too-many-locals
+ def tree(self):
+ """Get Dag as tree."""
+ dag_id = request.args.get('dag_id')
+ dag = current_app.dag_bag.get_dag(dag_id)
+ if not dag:
+ flash(f'DAG "{dag_id}" seems to be missing from DagBag.', "error")
+ return redirect(url_for('Airflow.index'))
+
+ root = request.args.get('root')
+ if root:
+ dag = dag.sub_dag(task_ids_or_regex=root,
include_downstream=False, include_upstream=True)
+
+ num_runs = request.args.get('num_runs', type=int)
+ if num_runs is None:
+ num_runs = conf.getint('webserver',
'default_dag_run_display_number')
+
+ try:
+ base_date = timezone.parse(request.args["base_date"])
+ except (KeyError, ValueError):
+ base_date = dag.get_latest_execution_date() or timezone.utcnow()
+
+ with create_session() as session:
+ dag_runs = (
+ session.query(DagRun)
+ .filter(DagRun.dag_id == dag.dag_id, DagRun.execution_date <=
base_date)
+ .order_by(DagRun.execution_date.desc())
+ .limit(num_runs)
+ .all()
+ )
+ dag_runs = {dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs}
+
+ max_date = max(dag_runs.keys(), default=None)
+
form = DateTimeWithNumRunsForm(
data={
'base_date': max_date or timezone.utcnow(),
@@ -2057,6 +2060,8 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint:
disable=too-many-public-m
else:
external_log_name = None
+ data = self._get_tree_data(dag_runs, dag, base_date)
+
# avoid spaces to reduce payload size
data = htmlsafe_json_dumps(data, separators=(',', ':'))
@@ -2676,6 +2681,52 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint:
disable=too-many-public-m
return json.dumps(task_instances, cls=utils_json.AirflowJsonEncoder)
+ @expose('/object/tree_data')
+ @auth.has_access(
+ [
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
+ ]
+ )
+ @action_logging
+ def tree_data(self):
+ """Returns tree data"""
+ dag_id = request.args.get('dag_id')
+ dag = current_app.dag_bag.get_dag(dag_id)
+
+ if not dag:
+ response = jsonify({'error': f"can't find dag {dag_id}"})
+ response.status_code = 404
+ return response
+
+ root = request.args.get('root')
+ if root:
+ dag = dag.partial_subset(task_ids_or_regex=root,
include_downstream=False, include_upstream=True)
+
+ num_runs = request.args.get('num_runs', type=int)
+ if num_runs is None:
+ num_runs = conf.getint('webserver',
'default_dag_run_display_number')
+
+ try:
+ base_date = timezone.parse(request.args["base_date"])
+ except (KeyError, ValueError):
+ base_date = dag.get_latest_execution_date() or timezone.utcnow()
+
+ with create_session() as session:
+ dag_runs = (
+ session.query(DagRun)
+ .filter(DagRun.dag_id == dag.dag_id, DagRun.execution_date <=
base_date)
+ .order_by(DagRun.execution_date.desc())
+ .limit(num_runs)
+ .all()
+ )
+ dag_runs = {dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs}
+
+ tree_data = self._get_tree_data(dag_runs, dag, base_date)
+
+ # avoid spaces to reduce payload size
+ return htmlsafe_json_dumps(tree_data, separators=(',', ':'))
+
class ConfigurationView(AirflowBaseView):
"""View to show Airflow Configurations"""