This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new dbe14c31d5 Add running and failed status tab for DAGs on the UI
(#30429)
dbe14c31d5 is described below
commit dbe14c31d52a345aa82e050cc0a91ee60d9ee567
Author: PApostol <[email protected]>
AuthorDate: Mon May 22 17:05:44 2023 +0100
Add running and failed status tab for DAGs on the UI (#30429)
* Add running status tab for DAGs on the UI
* Attempt to better position Running tab
* Add FAILED tab, update tests & docs
* Switch header to flex
---------
Co-authored-by: Brent Bovenzi <[email protected]>
---
airflow/www/static/css/dags.css | 3 ++
airflow/www/templates/airflow/dags.html | 58 +++++++++++++++------------
airflow/www/views.py | 43 ++++++++++++++++++++
docs/apache-airflow/core-concepts/dag-run.rst | 4 ++
tests/www/views/test_views_home.py | 18 ++++++++-
5 files changed, 100 insertions(+), 26 deletions(-)
diff --git a/airflow/www/static/css/dags.css b/airflow/www/static/css/dags.css
index 19f1c5ef9f..36cfea78b8 100644
--- a/airflow/www/static/css/dags.css
+++ b/airflow/www/static/css/dags.css
@@ -24,6 +24,9 @@
}
.dags-table-header {
+ display: flex;
+ flex-wrap: wrap;
+ justify-content: space-between;
margin: 0;
padding-top: 16px;
}
diff --git a/airflow/www/templates/airflow/dags.html
b/airflow/www/templates/airflow/dags.html
index b7e7d64f72..99e5980881 100644
--- a/airflow/www/templates/airflow/dags.html
+++ b/airflow/www/templates/airflow/dags.html
@@ -130,27 +130,35 @@
{% block content %}
<h2>{{ page_title }}</h2>
<div id="main_content">
- <div class="row dags-table-header">
- <div class="col-sm-4 col-md-4 no-x-padding">
- <div class="form-group btn-group">
- <a
- href="{{ url_for('Airflow.index', status='all',
search=request.args.get('search', None), tags=request.args.getlist('tags',
None)) }}"
- class="btn {{ 'btn-primary' if status_filter == 'all' else
'btn-default' }}"
- title="Show active and paused DAGs">All <span class="badge">{{
"{:,}".format(status_count_all) }}</span></a>
- <a
- href="{{ url_for('Airflow.index', status='active',
search=request.args.get('search', None), tags=request.args.getlist('tags',
None)) }}"
- class="btn {{ 'btn-primary' if status_filter == 'active' else
'btn-default' }}"
- title="Show only active DAGs">Active <span class="badge">{{
"{:,}".format(status_count_active) }}</span></a>
- <a
- href="{{ url_for('Airflow.index', status='paused',
search=request.args.get('search', None), tags=request.args.getlist('tags',
None)) }}"
- class="btn {{ 'btn-primary' if status_filter == 'paused' else
'btn-default' }}"
- title="Show only paused DAGs">Paused <span class="badge">{{
"{:,}".format(status_count_paused) }}</span></a>
- </div>
+ <div class="dags-table-header">
+ <div class="form-group btn-group">
+ <a
+ href="{{ url_for('Airflow.index', status='all',
search=request.args.get('search', None), tags=request.args.getlist('tags',
None)) }}"
+ class="btn {{ 'btn-primary' if status_filter == 'all' else
'btn-default' }}"
+ title="Show active and paused DAGs">All <span class="badge">{{
"{:,}".format(status_count_all) }}</span></a>
+ <a
+ href="{{ url_for('Airflow.index', status='active',
search=request.args.get('search', None), tags=request.args.getlist('tags',
None)) }}"
+ class="btn {{ 'btn-primary' if status_filter == 'active' else
'btn-default' }}"
+ title="Show only active DAGs">Active <span class="badge">{{
"{:,}".format(status_count_active) }}</span></a>
+ <a
+ href="{{ url_for('Airflow.index', status='paused',
search=request.args.get('search', None), tags=request.args.getlist('tags',
None)) }}"
+ class="btn {{ 'btn-primary' if status_filter == 'paused' else
'btn-default' }}"
+ title="Show only paused DAGs">Paused <span class="badge">{{
"{:,}".format(status_count_paused) }}</span></a>
</div>
- <div class="col-sm-2 col-md-2">
- <form id="tags_form" style="width: 100%; text-align: left;">
- <div class="form-group search-input" style="width:100%;">
- <select multiple name="tags" id="tags_filter"
class="select2-drop-mask" style="width: 100%;">
+ <div class="form-group btn-group p-2">
+ <a
+ href="{{ url_for('Airflow.index', status='running',
search=request.args.get('search', None), tags=request.args.getlist('tags',
None)) }}"
+ class="btn {{ 'btn-primary' if status_filter == 'running' else
'btn-default' }}"
+ title="Show currently running DAG runs">Running <span
class="badge">{{ "{:,}".format(status_count_running) }}</span></a>
+ <a
+ href="{{ url_for('Airflow.index', status='failed',
search=request.args.get('search', None), tags=request.args.getlist('tags',
None)) }}"
+ class="btn {{ 'btn-primary' if status_filter == 'failed' else
'btn-default' }}"
+ title="Show DAGs with failed latest DAG run">Failed <span
class="badge">{{ "{:,}".format(status_count_failed) }}</span></a>
+ </div>
+ <div style="min-width: 200px; padding: 0 10px;">
+ <form id="tags_form">
+ <div class="form-group search-input">
+ <select multiple name="tags" id="tags_filter"
class="select2-drop-mask">
{% for tag in tags %}
<option value="{{ tag.name }}" {% if tag.selected %}selected{%
endif %}>{{ tag.name }}</option>
{% endfor %}
@@ -163,13 +171,13 @@
</div>
</form>
</div>
- <div class="col-sm-2 col-md-2 col-md-offset-1 no-x-padding">
+ <div style="min-width: 300px; padding: 0 10px;">
<form id="search_form">
- <div class="form-group search-input" style="width: 100%;">
+ <div class="form-group search-input">
<label for="dag_query" class="sr-only">Search DAGs</label>
<input type="search" id="dag_query" class="typeahead form-control
search-input__input"
- data-provide="typeahead" style="width:100%;" value="{{
search_query }}" autocomplete="off"
- placeholder="Search DAGs">
+ data-provide="typeahead" value="{{ search_query }}"
autocomplete="off"
+ placeholder="Search DAGs">
{% if search_query %}
<button type="reset" aria-label="Clear DAG Search Term"
class="btn btn-default btn-sm material-icons
search-input__clear-btn">cancel
@@ -178,7 +186,7 @@
</div>
</form>
</div>
- <div class="refresh-actions col-sm-2 col-md-2 no-x-padding">
+ <div class="refresh-actions">
{{ loading_dots(id='loading-dots', classes='refresh-loading') }}
<label class="switch-label">
<input class="switch-input" id="auto_refresh" type="checkbox">
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 85e1a223d3..83a63ca9e8 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -756,13 +756,48 @@ class Airflow(AirflowBaseView):
active_dags = dags_query.filter(~DagModel.is_paused)
paused_dags = dags_query.filter(DagModel.is_paused)
+ # find DAGs which have a RUNNING DagRun
+ running_dags = dags_query.join(DagRun, DagModel.dag_id ==
DagRun.dag_id).filter(
+ DagRun.state == State.RUNNING
+ )
+
+ # find DAGs for which the latest DagRun is FAILED
+ subq_all = (
+ session.query(DagRun.dag_id,
func.max(DagRun.start_date).label("start_date"))
+ .group_by(DagRun.dag_id)
+ .subquery()
+ )
+ subq_failed = (
+ session.query(DagRun.dag_id,
func.max(DagRun.start_date).label("start_date"))
+ .filter(DagRun.state == State.FAILED)
+ .group_by(DagRun.dag_id)
+ .subquery()
+ )
+ subq_join = (
+ session.query(subq_all.c.dag_id, subq_all.c.start_date)
+ .join(
+ subq_failed,
+ and_(
+ subq_all.c.dag_id == subq_failed.c.dag_id,
+ subq_all.c.start_date == subq_failed.c.start_date,
+ ),
+ )
+ .subquery()
+ )
+ failed_dags = dags_query.join(subq_join, DagModel.dag_id ==
subq_join.c.dag_id)
+
is_paused_count = dict(
all_dags.with_entities(DagModel.is_paused,
func.count(DagModel.dag_id))
.group_by(DagModel.is_paused)
.all()
)
+
status_count_active = is_paused_count.get(False, 0)
status_count_paused = is_paused_count.get(True, 0)
+
+ status_count_running = running_dags.count()
+ status_count_failed = failed_dags.count()
+
all_dags_count = status_count_active + status_count_paused
if arg_status_filter == "active":
current_dags = active_dags
@@ -770,6 +805,12 @@ class Airflow(AirflowBaseView):
elif arg_status_filter == "paused":
current_dags = paused_dags
num_of_all_dags = status_count_paused
+ elif arg_status_filter == "running":
+ current_dags = running_dags
+ num_of_all_dags = status_count_running
+ elif arg_status_filter == "failed":
+ current_dags = failed_dags
+ num_of_all_dags = status_count_failed
else:
current_dags = all_dags
num_of_all_dags = all_dags_count
@@ -931,6 +972,8 @@ class Airflow(AirflowBaseView):
status_count_all=all_dags_count,
status_count_active=status_count_active,
status_count_paused=status_count_paused,
+ status_count_running=status_count_running,
+ status_count_failed=status_count_failed,
tags_filter=arg_tags_filter,
sorting_key=arg_sorting_key,
sorting_direction=arg_sorting_direction,
diff --git a/docs/apache-airflow/core-concepts/dag-run.rst
b/docs/apache-airflow/core-concepts/dag-run.rst
index a5b6e8634c..391fe958ff 100644
--- a/docs/apache-airflow/core-concepts/dag-run.rst
+++ b/docs/apache-airflow/core-concepts/dag-run.rst
@@ -40,6 +40,10 @@ There are two possible terminal states for the DAG Run:
Be careful if some of your tasks have defined some specific :ref:`trigger
rule <concepts:trigger-rules>`.
These can lead to some unexpected behavior, e.g. if you have a leaf task
with trigger rule `"all_done"`, it will be executed regardless of the states of
the rest of the tasks and if it will succeed, then the whole DAG Run will also
be marked as ``success``, even if something failed in the middle.
+*Added in Airflow 2.7*
+
+DAGs that have a currently running DAG run can be shown on the UI dashboard in
the "Running" tab. Similarly, DAGs whose latest DAG run is marked as failed can
be found on the "Failed" tab.
+
Cron Presets
''''''''''''
diff --git a/tests/www/views/test_views_home.py
b/tests/www/views/test_views_home.py
index 045e34b002..8525149899 100644
--- a/tests/www/views/test_views_home.py
+++ b/tests/www/views/test_views_home.py
@@ -81,6 +81,12 @@ def test_home_status_filter_cookie(admin_client):
admin_client.get("home?status=paused", follow_redirects=True)
assert "paused" == flask.session[FILTER_STATUS_COOKIE]
+ admin_client.get("home?status=running", follow_redirects=True)
+ assert "running" == flask.session[FILTER_STATUS_COOKIE]
+
+ admin_client.get("home?status=failed", follow_redirects=True)
+ assert "failed" == flask.session[FILTER_STATUS_COOKIE]
+
admin_client.get("home?status=all", follow_redirects=True)
assert "all" == flask.session[FILTER_STATUS_COOKIE]
@@ -230,7 +236,17 @@ def test_home_importerrors(broken_dags, user_client):
check_content_in_response(f"/{dag_id}.py", resp)
[email protected]("page", ["home", "home?status=active",
"home?status=paused", "home?status=all"])
[email protected](
+ "page",
+ [
+ "home",
+ "home?status=all",
+ "home?status=active",
+ "home?status=paused",
+ "home?status=running",
+ "home?status=failed",
+ ],
+)
def test_home_importerrors_filtered_singledag_user(broken_dags_with_read_perm,
client_single_dag, page):
# Users that can only see certain DAGs get a filtered list of import errors
resp = client_single_dag.get(page, follow_redirects=True)