This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e0e937421c Setup ui rest api (#41798)
e0e937421c is described below

commit e0e937421c639dff02d31f96aca19614140b37e4
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Thu Aug 29 19:08:56 2024 +0200

    Setup ui rest api (#41798)
    
    * init code and basic view
    
    * add more modularization
    
    * factorize dag bag instantiation code
---
 .../init_dagbag.py => api_ui/__init__.py}          | 18 -----
 .../extensions/init_dagbag.py => api_ui/app.py}    | 28 +++++---
 .../extensions/init_dagbag.py => api_ui/main.py}   | 22 +++---
 .../init_dagbag.py => api_ui/views/__init__.py}    | 18 -----
 airflow/api_ui/views/datasets.py                   | 82 ++++++++++++++++++++++
 airflow/www/extensions/init_dagbag.py              | 12 ++--
 6 files changed, 118 insertions(+), 62 deletions(-)

diff --git a/airflow/www/extensions/init_dagbag.py b/airflow/api_ui/__init__.py
similarity index 63%
copy from airflow/www/extensions/init_dagbag.py
copy to airflow/api_ui/__init__.py
index 1cab2bff6b..13a83393a9 100644
--- a/airflow/www/extensions/init_dagbag.py
+++ b/airflow/api_ui/__init__.py
@@ -14,21 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from __future__ import annotations
-
-import os
-
-from airflow.models import DagBag
-from airflow.settings import DAGS_FOLDER
-
-
-def init_dagbag(app):
-    """
-    Create global DagBag for webserver and API.
-
-    To access it use ``flask.current_app.dag_bag``.
-    """
-    if os.environ.get("SKIP_DAGS_PARSING") == "True":
-        app.dag_bag = DagBag(os.devnull, include_examples=False)
-    else:
-        app.dag_bag = DagBag(DAGS_FOLDER, read_dags_from_db=True)
diff --git a/airflow/www/extensions/init_dagbag.py b/airflow/api_ui/app.py
similarity index 58%
copy from airflow/www/extensions/init_dagbag.py
copy to airflow/api_ui/app.py
index 1cab2bff6b..2307e0eea3 100644
--- a/airflow/www/extensions/init_dagbag.py
+++ b/airflow/api_ui/app.py
@@ -16,19 +16,27 @@
 # under the License.
 from __future__ import annotations
 
-import os
+from fastapi import FastAPI
 
-from airflow.models import DagBag
-from airflow.settings import DAGS_FOLDER
+from airflow.www.extensions.init_dagbag import get_dag_bag
 
 
-def init_dagbag(app):
+def init_dag_bag(app: FastAPI) -> None:
     """
-    Create global DagBag for webserver and API.
+    Create global DagBag for the FastAPI application.
 
-    To access it use ``flask.current_app.dag_bag``.
+    To access it use ``request.app.state.dag_bag``.
     """
-    if os.environ.get("SKIP_DAGS_PARSING") == "True":
-        app.dag_bag = DagBag(os.devnull, include_examples=False)
-    else:
-        app.dag_bag = DagBag(DAGS_FOLDER, read_dags_from_db=True)
+    app.state.dag_bag = get_dag_bag()
+
+
+def create_app() -> FastAPI:
+    app = FastAPI(
+        description="Internal Rest API for the UI frontend. It is subject to 
breaking change "
+        "depending on the need of the frontend. Users should not rely on this 
API but use the "
+        "public API instead."
+    )
+
+    init_dag_bag(app)
+
+    return app
diff --git a/airflow/www/extensions/init_dagbag.py b/airflow/api_ui/main.py
similarity index 66%
copy from airflow/www/extensions/init_dagbag.py
copy to airflow/api_ui/main.py
index 1cab2bff6b..05798974a8 100644
--- a/airflow/www/extensions/init_dagbag.py
+++ b/airflow/api_ui/main.py
@@ -14,21 +14,19 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
 from __future__ import annotations
 
-import os
+from fastapi import APIRouter
+
+from airflow.api_ui.app import create_app
+from airflow.api_ui.views.datasets import dataset_router
+
+app = create_app()
 
-from airflow.models import DagBag
-from airflow.settings import DAGS_FOLDER
+root_router = APIRouter(prefix="/ui")
 
+root_router.include_router(dataset_router)
 
-def init_dagbag(app):
-    """
-    Create global DagBag for webserver and API.
 
-    To access it use ``flask.current_app.dag_bag``.
-    """
-    if os.environ.get("SKIP_DAGS_PARSING") == "True":
-        app.dag_bag = DagBag(os.devnull, include_examples=False)
-    else:
-        app.dag_bag = DagBag(DAGS_FOLDER, read_dags_from_db=True)
+app.include_router(root_router)
diff --git a/airflow/www/extensions/init_dagbag.py 
b/airflow/api_ui/views/__init__.py
similarity index 63%
copy from airflow/www/extensions/init_dagbag.py
copy to airflow/api_ui/views/__init__.py
index 1cab2bff6b..13a83393a9 100644
--- a/airflow/www/extensions/init_dagbag.py
+++ b/airflow/api_ui/views/__init__.py
@@ -14,21 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from __future__ import annotations
-
-import os
-
-from airflow.models import DagBag
-from airflow.settings import DAGS_FOLDER
-
-
-def init_dagbag(app):
-    """
-    Create global DagBag for webserver and API.
-
-    To access it use ``flask.current_app.dag_bag``.
-    """
-    if os.environ.get("SKIP_DAGS_PARSING") == "True":
-        app.dag_bag = DagBag(os.devnull, include_examples=False)
-    else:
-        app.dag_bag = DagBag(DAGS_FOLDER, read_dags_from_db=True)
diff --git a/airflow/api_ui/views/datasets.py b/airflow/api_ui/views/datasets.py
new file mode 100644
index 0000000000..fe7f2e901f
--- /dev/null
+++ b/airflow/api_ui/views/datasets.py
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from fastapi import APIRouter, HTTPException, Request
+from sqlalchemy import and_, func, select
+
+from airflow.models import DagModel
+from airflow.models.dataset import DagScheduleDatasetReference, 
DatasetDagRunQueue, DatasetEvent, DatasetModel
+from airflow.utils.session import create_session
+
+dataset_router = APIRouter(tags=["Dataset"])
+
+
+# Ultimately we want async routes, with async sqlalchemy session / context 
manager.
+# Additional effort to make airflow utility code async, not handled for now 
and most likely part of the AIP-70
+@dataset_router.get("/next_run_datasets/{dag_id}")
+def next_run_datasets(dag_id: str, request: Request) -> dict:
+    dag = request.app.state.dag_bag.get_dag(dag_id)
+
+    if not dag:
+        raise HTTPException(404, f"can't find dag {dag_id}")
+
+    with create_session() as session:
+        dag_model = DagModel.get_dagmodel(dag_id, session=session)
+
+        if dag_model is None:
+            raise HTTPException(404, f"can't find associated dag_model 
{dag_id}")
+
+        latest_run = dag_model.get_last_dagrun(session=session)
+
+        events = [
+            dict(info._mapping)
+            for info in session.execute(
+                select(
+                    DatasetModel.id,
+                    DatasetModel.uri,
+                    func.max(DatasetEvent.timestamp).label("lastUpdate"),
+                )
+                .join(DagScheduleDatasetReference, 
DagScheduleDatasetReference.dataset_id == DatasetModel.id)
+                .join(
+                    DatasetDagRunQueue,
+                    and_(
+                        DatasetDagRunQueue.dataset_id == DatasetModel.id,
+                        DatasetDagRunQueue.target_dag_id == 
DagScheduleDatasetReference.dag_id,
+                    ),
+                    isouter=True,
+                )
+                .join(
+                    DatasetEvent,
+                    and_(
+                        DatasetEvent.dataset_id == DatasetModel.id,
+                        (
+                            DatasetEvent.timestamp >= latest_run.execution_date
+                            if latest_run and latest_run.execution_date
+                            else True
+                        ),
+                    ),
+                    isouter=True,
+                )
+                .where(DagScheduleDatasetReference.dag_id == dag_id, 
~DatasetModel.is_orphaned)
+                .group_by(DatasetModel.id, DatasetModel.uri)
+                .order_by(DatasetModel.uri)
+            )
+        ]
+        data = {"dataset_expression": dag_model.dataset_expression, "events": 
events}
+        return data
diff --git a/airflow/www/extensions/init_dagbag.py 
b/airflow/www/extensions/init_dagbag.py
index 1cab2bff6b..fc23a2f7e0 100644
--- a/airflow/www/extensions/init_dagbag.py
+++ b/airflow/www/extensions/init_dagbag.py
@@ -22,13 +22,17 @@ from airflow.models import DagBag
 from airflow.settings import DAGS_FOLDER
 
 
+def get_dag_bag() -> DagBag:
+    """Instantiate the appropriate DagBag based on the ``SKIP_DAGS_PARSING`` 
environment variable."""
+    if os.environ.get("SKIP_DAGS_PARSING") == "True":
+        return DagBag(os.devnull, include_examples=False)
+    return DagBag(DAGS_FOLDER, read_dags_from_db=True)
+
+
 def init_dagbag(app):
     """
     Create global DagBag for webserver and API.
 
     To access it use ``flask.current_app.dag_bag``.
     """
-    if os.environ.get("SKIP_DAGS_PARSING") == "True":
-        app.dag_bag = DagBag(os.devnull, include_examples=False)
-    else:
-        app.dag_bag = DagBag(DAGS_FOLDER, read_dags_from_db=True)
+    app.dag_bag = get_dag_bag()

Reply via email to