This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1789f595f2 Add ui endpoint to get "next run datasets" info (#25454)
1789f595f2 is described below

commit 1789f595f2de8ce441bb73c01527e1f25e9809dc
Author: Jed Cunningham <[email protected]>
AuthorDate: Thu Aug 4 15:15:08 2022 -0700

    Add ui endpoint to get "next run datasets" info (#25454)
---
 airflow/www/views.py               | 36 +++++++++++++++++++++++++++++++++++-
 tests/test_utils/db.py             |  5 ++++-
 tests/www/views/test_views_grid.py | 34 +++++++++++++++++++++++++++++++++-
 3 files changed, 72 insertions(+), 3 deletions(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index 830af7027a..ea401ac914 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -110,7 +110,7 @@ from airflow.models.abstractoperator import AbstractOperator
 from airflow.models.dag import DAG, get_dataset_triggered_next_run_info
 from airflow.models.dagcode import DagCode
 from airflow.models.dagrun import DagRun, DagRunType
-from airflow.models.dataset import Dataset
+from airflow.models.dataset import Dataset, DatasetDagRef, DatasetDagRunQueue
 from airflow.models.operator import Operator
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import TaskInstance
@@ -3618,6 +3618,40 @@ class Airflow(AirflowBaseView):
             {'Content-Type': 'application/json; charset=utf-8'},
         )
 
+    @expose('/object/next_run_datasets/<string:dag_id>')
+    @auth.has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)])
+    def next_run_datasets(self, dag_id):
+        """Returns datasets necessary, and their status, for the next dag 
run"""
+        dag = get_airflow_app().dag_bag.get_dag(dag_id)
+        if not dag:
+            return {'error': f"can't find dag {dag_id}"}, 404
+
+        with create_session() as session:
+            data = [
+                dict(info)
+                for info in session.query(
+                    Dataset.id,
+                    Dataset.uri,
+                    DatasetDagRunQueue.created_at,
+                )
+                .join(DatasetDagRef, Dataset.id == DatasetDagRef.dataset_id)
+                .join(
+                    DatasetDagRunQueue,
+                    and_(
+                        DatasetDagRunQueue.dataset_id == 
DatasetDagRef.dataset_id,
+                        DatasetDagRunQueue.target_dag_id == 
DatasetDagRef.dag_id,
+                    ),
+                    isouter=True,
+                )
+                .filter(DatasetDagRef.dag_id == dag_id)
+                .order_by(Dataset.id)
+                .all()
+            ]
+        return (
+            htmlsafe_json_dumps(data, separators=(',', ':'), 
cls=utils_json.AirflowJsonEncoder),
+            {'Content-Type': 'application/json; charset=utf-8'},
+        )
+
     @expose('/robots.txt')
     @action_logging
     def robots(self):
diff --git a/tests/test_utils/db.py b/tests/test_utils/db.py
index 92629020fa..6c78c87c3a 100644
--- a/tests/test_utils/db.py
+++ b/tests/test_utils/db.py
@@ -37,7 +37,7 @@ from airflow.models import (
     errors,
 )
 from airflow.models.dagcode import DagCode
-from airflow.models.dataset import Dataset, DatasetEvent
+from airflow.models.dataset import Dataset, DatasetDagRef, DatasetDagRunQueue, 
DatasetEvent, DatasetTaskRef
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.security.permissions import RESOURCE_DAG_PREFIX
 from airflow.utils.db import add_default_pool_if_not_exists, 
create_default_connections, reflect_tables
@@ -57,6 +57,9 @@ def clear_db_datasets():
     with create_session() as session:
         session.query(DatasetEvent).delete()
         session.query(Dataset).delete()
+        session.query(DatasetDagRunQueue).delete()
+        session.query(DatasetDagRef).delete()
+        session.query(DatasetTaskRef).delete()
 
 
 def clear_db_dags():
diff --git a/tests/www/views/test_views_grid.py 
b/tests/www/views/test_views_grid.py
index 57d8964c5b..e6f5a698a8 100644
--- a/tests/www/views/test_views_grid.py
+++ b/tests/www/views/test_views_grid.py
@@ -25,7 +25,7 @@ from dateutil.tz import UTC
 from airflow.lineage.entities import File
 from airflow.models import DagBag
 from airflow.models.dagrun import DagRun
-from airflow.models.dataset import Dataset
+from airflow.models.dataset import Dataset, DatasetDagRunQueue
 from airflow.operators.empty import EmptyOperator
 from airflow.utils.state import DagRunState, TaskInstanceState
 from airflow.utils.task_group import TaskGroup
@@ -323,3 +323,35 @@ def test_has_outlet_dataset_flag(admin_client, dag_maker, 
session, app, monkeypa
             'label': None,
         },
     }
+
+
+def test_next_run_datasets(admin_client, dag_maker, session, app, monkeypatch):
+    with monkeypatch.context() as m:
+        datasets = [Dataset(id=i, uri=f's3://bucket/key/{i}') for i in [1, 2]]
+        session.add_all(datasets)
+        session.commit()
+
+        with dag_maker(dag_id=DAG_ID, schedule_on=datasets, serialized=True, 
session=session):
+            EmptyOperator(task_id='task1')
+
+        m.setattr(app, 'dag_bag', dag_maker.dagbag)
+
+        ddrq = DatasetDagRunQueue(
+            target_dag_id=DAG_ID, dataset_id=1, 
created_at=pendulum.DateTime(2022, 8, 1, tzinfo=UTC)
+        )
+        session.add(ddrq)
+        session.commit()
+
+        resp = admin_client.get(f'/object/next_run_datasets/{DAG_ID}', 
follow_redirects=True)
+
+    assert resp.status_code == 200, resp.json
+    assert resp.json == [
+        {'id': 1, 'uri': 's3://bucket/key/1', 'created_at': 
"2022-08-01T00:00:00+00:00"},
+        {'id': 2, 'uri': 's3://bucket/key/2', 'created_at': None},
+    ]
+
+
+def test_next_run_datasets_404(admin_client):
+    resp = admin_client.get('/object/next_run_datasets/missingdag', 
follow_redirects=True)
+    assert resp.status_code == 404, resp.json
+    assert resp.json == {'error': "can't find dag missingdag"}

Reply via email to