This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new dbe14c31d5 Add running and failed status tab for DAGs on the UI 
(#30429)
dbe14c31d5 is described below

commit dbe14c31d52a345aa82e050cc0a91ee60d9ee567
Author: PApostol <[email protected]>
AuthorDate: Mon May 22 17:05:44 2023 +0100

    Add running and failed status tab for DAGs on the UI (#30429)
    
    * Add running status tab for DAGs on the UI
    
    * Attempt to better position Running tab
    
    * Add FAILED tab, update tests & docs
    
    * Switch header to flex
    
    ---------
    
    Co-authored-by: Brent Bovenzi <[email protected]>
---
 airflow/www/static/css/dags.css               |  3 ++
 airflow/www/templates/airflow/dags.html       | 58 +++++++++++++++------------
 airflow/www/views.py                          | 43 ++++++++++++++++++++
 docs/apache-airflow/core-concepts/dag-run.rst |  4 ++
 tests/www/views/test_views_home.py            | 18 ++++++++-
 5 files changed, 100 insertions(+), 26 deletions(-)

diff --git a/airflow/www/static/css/dags.css b/airflow/www/static/css/dags.css
index 19f1c5ef9f..36cfea78b8 100644
--- a/airflow/www/static/css/dags.css
+++ b/airflow/www/static/css/dags.css
@@ -24,6 +24,9 @@
 }
 
 .dags-table-header {
+  display: flex;
+  flex-wrap: wrap;
+  justify-content: space-between;
   margin: 0;
   padding-top: 16px;
 }
diff --git a/airflow/www/templates/airflow/dags.html 
b/airflow/www/templates/airflow/dags.html
index b7e7d64f72..99e5980881 100644
--- a/airflow/www/templates/airflow/dags.html
+++ b/airflow/www/templates/airflow/dags.html
@@ -130,27 +130,35 @@
 {% block content %}
   <h2>{{ page_title }}</h2>
   <div id="main_content">
-    <div class="row dags-table-header">
-      <div class="col-sm-4 col-md-4 no-x-padding">
-        <div class="form-group btn-group">
-          <a
-            href="{{ url_for('Airflow.index', status='all', 
search=request.args.get('search', None), tags=request.args.getlist('tags', 
None)) }}"
-            class="btn {{ 'btn-primary' if status_filter == 'all' else 
'btn-default' }}"
-            title="Show active and paused DAGs">All <span class="badge">{{ 
"{:,}".format(status_count_all) }}</span></a>
-          <a
-            href="{{ url_for('Airflow.index', status='active', 
search=request.args.get('search', None), tags=request.args.getlist('tags', 
None)) }}"
-            class="btn {{ 'btn-primary' if status_filter == 'active' else 
'btn-default' }}"
-            title="Show only active DAGs">Active <span class="badge">{{ 
"{:,}".format(status_count_active) }}</span></a>
-          <a
-            href="{{ url_for('Airflow.index', status='paused', 
search=request.args.get('search', None), tags=request.args.getlist('tags', 
None)) }}"
-            class="btn {{ 'btn-primary' if status_filter == 'paused' else 
'btn-default' }}"
-            title="Show only paused DAGs">Paused <span class="badge">{{ 
"{:,}".format(status_count_paused) }}</span></a>
-        </div>
+    <div class="dags-table-header">
+      <div class="form-group btn-group">
+        <a
+          href="{{ url_for('Airflow.index', status='all', 
search=request.args.get('search', None), tags=request.args.getlist('tags', 
None)) }}"
+          class="btn {{ 'btn-primary' if status_filter == 'all' else 
'btn-default' }}"
+          title="Show active and paused DAGs">All <span class="badge">{{ 
"{:,}".format(status_count_all) }}</span></a>
+        <a
+          href="{{ url_for('Airflow.index', status='active', 
search=request.args.get('search', None), tags=request.args.getlist('tags', 
None)) }}"
+          class="btn {{ 'btn-primary' if status_filter == 'active' else 
'btn-default' }}"
+          title="Show only active DAGs">Active <span class="badge">{{ 
"{:,}".format(status_count_active) }}</span></a>
+        <a
+          href="{{ url_for('Airflow.index', status='paused', 
search=request.args.get('search', None), tags=request.args.getlist('tags', 
None)) }}"
+          class="btn {{ 'btn-primary' if status_filter == 'paused' else 
'btn-default' }}"
+          title="Show only paused DAGs">Paused <span class="badge">{{ 
"{:,}".format(status_count_paused) }}</span></a>
       </div>
-      <div class="col-sm-2 col-md-2">
-        <form id="tags_form" style="width: 100%; text-align: left;">
-          <div class="form-group search-input" style="width:100%;">
-            <select multiple name="tags" id="tags_filter" 
class="select2-drop-mask" style="width: 100%;">
+      <div class="form-group btn-group p-2">
+        <a
+          href="{{ url_for('Airflow.index', status='running', 
search=request.args.get('search', None), tags=request.args.getlist('tags', 
None)) }}"
+          class="btn {{ 'btn-primary' if status_filter == 'running' else 
'btn-default' }}"
+          title="Show currently running DAG runs">Running <span 
class="badge">{{ "{:,}".format(status_count_running) }}</span></a>
+        <a
+          href="{{ url_for('Airflow.index', status='failed', 
search=request.args.get('search', None), tags=request.args.getlist('tags', 
None)) }}"
+          class="btn {{ 'btn-primary' if status_filter == 'failed' else 
'btn-default' }}"
+          title="Show DAGs with failed latest DAG run">Failed <span 
class="badge">{{ "{:,}".format(status_count_failed) }}</span></a>
+      </div>
+      <div style="min-width: 200px; padding: 0 10px;">
+        <form id="tags_form">
+          <div class="form-group search-input">
+            <select multiple name="tags" id="tags_filter" 
class="select2-drop-mask">
               {% for tag in tags %}
                 <option value="{{ tag.name }}" {% if tag.selected %}selected{% 
endif %}>{{ tag.name }}</option>
               {% endfor %}
@@ -163,13 +171,13 @@
           </div>
         </form>
       </div>
-      <div class="col-sm-2 col-md-2 col-md-offset-1 no-x-padding">
+      <div style="min-width: 300px; padding: 0 10px;">
         <form id="search_form">
-          <div class="form-group search-input" style="width: 100%;">
+          <div class="form-group search-input">
             <label for="dag_query" class="sr-only">Search DAGs</label>
             <input type="search" id="dag_query" class="typeahead form-control 
search-input__input"
-                   data-provide="typeahead" style="width:100%;" value="{{ 
search_query }}" autocomplete="off"
-                   placeholder="Search DAGs">
+                    data-provide="typeahead" value="{{ search_query }}" 
autocomplete="off"
+                    placeholder="Search DAGs">
             {% if search_query %}
               <button type="reset" aria-label="Clear DAG Search Term"
                       class="btn btn-default btn-sm material-icons 
search-input__clear-btn">cancel
@@ -178,7 +186,7 @@
           </div>
         </form>
       </div>
-      <div class="refresh-actions col-sm-2 col-md-2 no-x-padding">
+      <div class="refresh-actions">
         {{ loading_dots(id='loading-dots', classes='refresh-loading') }}
         <label class="switch-label">
           <input class="switch-input" id="auto_refresh" type="checkbox">
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 85e1a223d3..83a63ca9e8 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -756,13 +756,48 @@ class Airflow(AirflowBaseView):
             active_dags = dags_query.filter(~DagModel.is_paused)
             paused_dags = dags_query.filter(DagModel.is_paused)
 
+            # find DAGs which have a RUNNING DagRun
+            running_dags = dags_query.join(DagRun, DagModel.dag_id == 
DagRun.dag_id).filter(
+                DagRun.state == State.RUNNING
+            )
+
+            # find DAGs for which the latest DagRun is FAILED
+            subq_all = (
+                session.query(DagRun.dag_id, 
func.max(DagRun.start_date).label("start_date"))
+                .group_by(DagRun.dag_id)
+                .subquery()
+            )
+            subq_failed = (
+                session.query(DagRun.dag_id, 
func.max(DagRun.start_date).label("start_date"))
+                .filter(DagRun.state == State.FAILED)
+                .group_by(DagRun.dag_id)
+                .subquery()
+            )
+            subq_join = (
+                session.query(subq_all.c.dag_id, subq_all.c.start_date)
+                .join(
+                    subq_failed,
+                    and_(
+                        subq_all.c.dag_id == subq_failed.c.dag_id,
+                        subq_all.c.start_date == subq_failed.c.start_date,
+                    ),
+                )
+                .subquery()
+            )
+            failed_dags = dags_query.join(subq_join, DagModel.dag_id == 
subq_join.c.dag_id)
+
             is_paused_count = dict(
                 all_dags.with_entities(DagModel.is_paused, 
func.count(DagModel.dag_id))
                 .group_by(DagModel.is_paused)
                 .all()
             )
+
             status_count_active = is_paused_count.get(False, 0)
             status_count_paused = is_paused_count.get(True, 0)
+
+            status_count_running = running_dags.count()
+            status_count_failed = failed_dags.count()
+
             all_dags_count = status_count_active + status_count_paused
             if arg_status_filter == "active":
                 current_dags = active_dags
@@ -770,6 +805,12 @@ class Airflow(AirflowBaseView):
             elif arg_status_filter == "paused":
                 current_dags = paused_dags
                 num_of_all_dags = status_count_paused
+            elif arg_status_filter == "running":
+                current_dags = running_dags
+                num_of_all_dags = status_count_running
+            elif arg_status_filter == "failed":
+                current_dags = failed_dags
+                num_of_all_dags = status_count_failed
             else:
                 current_dags = all_dags
                 num_of_all_dags = all_dags_count
@@ -931,6 +972,8 @@ class Airflow(AirflowBaseView):
             status_count_all=all_dags_count,
             status_count_active=status_count_active,
             status_count_paused=status_count_paused,
+            status_count_running=status_count_running,
+            status_count_failed=status_count_failed,
             tags_filter=arg_tags_filter,
             sorting_key=arg_sorting_key,
             sorting_direction=arg_sorting_direction,
diff --git a/docs/apache-airflow/core-concepts/dag-run.rst 
b/docs/apache-airflow/core-concepts/dag-run.rst
index a5b6e8634c..391fe958ff 100644
--- a/docs/apache-airflow/core-concepts/dag-run.rst
+++ b/docs/apache-airflow/core-concepts/dag-run.rst
@@ -40,6 +40,10 @@ There are two possible terminal states for the DAG Run:
     Be careful if some of your tasks have defined some specific :ref:`trigger 
rule <concepts:trigger-rules>`.
     These can lead to some unexpected behavior, e.g. if you have a leaf task 
with trigger rule `"all_done"`, it will be executed regardless of the states of 
the rest of the tasks and if it will succeed, then the whole DAG Run will also 
be marked as ``success``, even if something failed in the middle.
 
+*Added in Airflow 2.7*
+
+DAGs that have a currently running DAG run can be shown on the UI dashboard in 
the "Running" tab. Similarly, DAGs whose latest DAG run is marked as failed can 
be found on the "Failed" tab.
+
 Cron Presets
 ''''''''''''
 
diff --git a/tests/www/views/test_views_home.py 
b/tests/www/views/test_views_home.py
index 045e34b002..8525149899 100644
--- a/tests/www/views/test_views_home.py
+++ b/tests/www/views/test_views_home.py
@@ -81,6 +81,12 @@ def test_home_status_filter_cookie(admin_client):
         admin_client.get("home?status=paused", follow_redirects=True)
         assert "paused" == flask.session[FILTER_STATUS_COOKIE]
 
+        admin_client.get("home?status=running", follow_redirects=True)
+        assert "running" == flask.session[FILTER_STATUS_COOKIE]
+
+        admin_client.get("home?status=failed", follow_redirects=True)
+        assert "failed" == flask.session[FILTER_STATUS_COOKIE]
+
         admin_client.get("home?status=all", follow_redirects=True)
         assert "all" == flask.session[FILTER_STATUS_COOKIE]
 
@@ -230,7 +236,17 @@ def test_home_importerrors(broken_dags, user_client):
         check_content_in_response(f"/{dag_id}.py", resp)
 
 
[email protected]("page", ["home", "home?status=active", 
"home?status=paused", "home?status=all"])
[email protected](
+    "page",
+    [
+        "home",
+        "home?status=all",
+        "home?status=active",
+        "home?status=paused",
+        "home?status=running",
+        "home?status=failed",
+    ],
+)
 def test_home_importerrors_filtered_singledag_user(broken_dags_with_read_perm, 
client_single_dag, page):
     # Users that can only see certain DAGs get a filtered list of import errors
     resp = client_single_dag.get(page, follow_redirects=True)

Reply via email to