This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e0e937421c Setup ui rest api (#41798)
e0e937421c is described below
commit e0e937421c639dff02d31f96aca19614140b37e4
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Thu Aug 29 19:08:56 2024 +0200
Setup ui rest api (#41798)
* init code and basic view
* add more modularization
* factorize dag bag instantiation code
---
.../init_dagbag.py => api_ui/__init__.py} | 18 -----
.../extensions/init_dagbag.py => api_ui/app.py} | 28 +++++---
.../extensions/init_dagbag.py => api_ui/main.py} | 22 +++---
.../init_dagbag.py => api_ui/views/__init__.py} | 18 -----
airflow/api_ui/views/datasets.py | 82 ++++++++++++++++++++++
airflow/www/extensions/init_dagbag.py | 12 ++--
6 files changed, 118 insertions(+), 62 deletions(-)
diff --git a/airflow/www/extensions/init_dagbag.py b/airflow/api_ui/__init__.py
similarity index 63%
copy from airflow/www/extensions/init_dagbag.py
copy to airflow/api_ui/__init__.py
index 1cab2bff6b..13a83393a9 100644
--- a/airflow/www/extensions/init_dagbag.py
+++ b/airflow/api_ui/__init__.py
@@ -14,21 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from __future__ import annotations
-
-import os
-
-from airflow.models import DagBag
-from airflow.settings import DAGS_FOLDER
-
-
-def init_dagbag(app):
- """
- Create global DagBag for webserver and API.
-
- To access it use ``flask.current_app.dag_bag``.
- """
- if os.environ.get("SKIP_DAGS_PARSING") == "True":
- app.dag_bag = DagBag(os.devnull, include_examples=False)
- else:
- app.dag_bag = DagBag(DAGS_FOLDER, read_dags_from_db=True)
diff --git a/airflow/www/extensions/init_dagbag.py b/airflow/api_ui/app.py
similarity index 58%
copy from airflow/www/extensions/init_dagbag.py
copy to airflow/api_ui/app.py
index 1cab2bff6b..2307e0eea3 100644
--- a/airflow/www/extensions/init_dagbag.py
+++ b/airflow/api_ui/app.py
@@ -16,19 +16,27 @@
# under the License.
from __future__ import annotations
-import os
+from fastapi import FastAPI
-from airflow.models import DagBag
-from airflow.settings import DAGS_FOLDER
+from airflow.www.extensions.init_dagbag import get_dag_bag
-def init_dagbag(app):
+def init_dag_bag(app: FastAPI) -> None:
"""
- Create global DagBag for webserver and API.
+ Create global DagBag for the FastAPI application.
- To access it use ``flask.current_app.dag_bag``.
+ To access it use ``request.app.state.dag_bag``.
"""
- if os.environ.get("SKIP_DAGS_PARSING") == "True":
- app.dag_bag = DagBag(os.devnull, include_examples=False)
- else:
- app.dag_bag = DagBag(DAGS_FOLDER, read_dags_from_db=True)
+ app.state.dag_bag = get_dag_bag()
+
+
+def create_app() -> FastAPI:
+ app = FastAPI(
+ description="Internal Rest API for the UI frontend. It is subject to
breaking change "
+ "depending on the need of the frontend. Users should not rely on this
API but use the "
+ "public API instead."
+ )
+
+ init_dag_bag(app)
+
+ return app
diff --git a/airflow/www/extensions/init_dagbag.py b/airflow/api_ui/main.py
similarity index 66%
copy from airflow/www/extensions/init_dagbag.py
copy to airflow/api_ui/main.py
index 1cab2bff6b..05798974a8 100644
--- a/airflow/www/extensions/init_dagbag.py
+++ b/airflow/api_ui/main.py
@@ -14,21 +14,19 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
from __future__ import annotations
-import os
+from fastapi import APIRouter
+
+from airflow.api_ui.app import create_app
+from airflow.api_ui.views.datasets import dataset_router
+
+app = create_app()
-from airflow.models import DagBag
-from airflow.settings import DAGS_FOLDER
+root_router = APIRouter(prefix="/ui")
+root_router.include_router(dataset_router)
-def init_dagbag(app):
- """
- Create global DagBag for webserver and API.
- To access it use ``flask.current_app.dag_bag``.
- """
- if os.environ.get("SKIP_DAGS_PARSING") == "True":
- app.dag_bag = DagBag(os.devnull, include_examples=False)
- else:
- app.dag_bag = DagBag(DAGS_FOLDER, read_dags_from_db=True)
+app.include_router(root_router)
diff --git a/airflow/www/extensions/init_dagbag.py
b/airflow/api_ui/views/__init__.py
similarity index 63%
copy from airflow/www/extensions/init_dagbag.py
copy to airflow/api_ui/views/__init__.py
index 1cab2bff6b..13a83393a9 100644
--- a/airflow/www/extensions/init_dagbag.py
+++ b/airflow/api_ui/views/__init__.py
@@ -14,21 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from __future__ import annotations
-
-import os
-
-from airflow.models import DagBag
-from airflow.settings import DAGS_FOLDER
-
-
-def init_dagbag(app):
- """
- Create global DagBag for webserver and API.
-
- To access it use ``flask.current_app.dag_bag``.
- """
- if os.environ.get("SKIP_DAGS_PARSING") == "True":
- app.dag_bag = DagBag(os.devnull, include_examples=False)
- else:
- app.dag_bag = DagBag(DAGS_FOLDER, read_dags_from_db=True)
diff --git a/airflow/api_ui/views/datasets.py b/airflow/api_ui/views/datasets.py
new file mode 100644
index 0000000000..fe7f2e901f
--- /dev/null
+++ b/airflow/api_ui/views/datasets.py
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from fastapi import APIRouter, HTTPException, Request
+from sqlalchemy import and_, func, select
+
+from airflow.models import DagModel
+from airflow.models.dataset import DagScheduleDatasetReference,
DatasetDagRunQueue, DatasetEvent, DatasetModel
+from airflow.utils.session import create_session
+
+dataset_router = APIRouter(tags=["Dataset"])
+
+
+# Ultimately we want async routes, with async sqlalchemy session / context
manager.
+# Additional effort to make airflow utility code async, not handled for now
and most likely part of the AIP-70
+@dataset_router.get("/next_run_datasets/{dag_id}")
+def next_run_datasets(dag_id: str, request: Request) -> dict:
+ dag = request.app.state.dag_bag.get_dag(dag_id)
+
+ if not dag:
+ raise HTTPException(404, f"can't find dag {dag_id}")
+
+ with create_session() as session:
+ dag_model = DagModel.get_dagmodel(dag_id, session=session)
+
+ if dag_model is None:
+ raise HTTPException(404, f"can't find associated dag_model
{dag_id}")
+
+ latest_run = dag_model.get_last_dagrun(session=session)
+
+ events = [
+ dict(info._mapping)
+ for info in session.execute(
+ select(
+ DatasetModel.id,
+ DatasetModel.uri,
+ func.max(DatasetEvent.timestamp).label("lastUpdate"),
+ )
+ .join(DagScheduleDatasetReference,
DagScheduleDatasetReference.dataset_id == DatasetModel.id)
+ .join(
+ DatasetDagRunQueue,
+ and_(
+ DatasetDagRunQueue.dataset_id == DatasetModel.id,
+ DatasetDagRunQueue.target_dag_id ==
DagScheduleDatasetReference.dag_id,
+ ),
+ isouter=True,
+ )
+ .join(
+ DatasetEvent,
+ and_(
+ DatasetEvent.dataset_id == DatasetModel.id,
+ (
+ DatasetEvent.timestamp >= latest_run.execution_date
+ if latest_run and latest_run.execution_date
+ else True
+ ),
+ ),
+ isouter=True,
+ )
+ .where(DagScheduleDatasetReference.dag_id == dag_id,
~DatasetModel.is_orphaned)
+ .group_by(DatasetModel.id, DatasetModel.uri)
+ .order_by(DatasetModel.uri)
+ )
+ ]
+ data = {"dataset_expression": dag_model.dataset_expression, "events":
events}
+ return data
diff --git a/airflow/www/extensions/init_dagbag.py
b/airflow/www/extensions/init_dagbag.py
index 1cab2bff6b..fc23a2f7e0 100644
--- a/airflow/www/extensions/init_dagbag.py
+++ b/airflow/www/extensions/init_dagbag.py
@@ -22,13 +22,17 @@ from airflow.models import DagBag
from airflow.settings import DAGS_FOLDER
+def get_dag_bag() -> DagBag:
+ """Instantiate the appropriate DagBag based on the ``SKIP_DAGS_PARSING``
environment variable."""
+ if os.environ.get("SKIP_DAGS_PARSING") == "True":
+ return DagBag(os.devnull, include_examples=False)
+ return DagBag(DAGS_FOLDER, read_dags_from_db=True)
+
+
def init_dagbag(app):
"""
Create global DagBag for webserver and API.
To access it use ``flask.current_app.dag_bag``.
"""
- if os.environ.get("SKIP_DAGS_PARSING") == "True":
- app.dag_bag = DagBag(os.devnull, include_examples=False)
- else:
- app.dag_bag = DagBag(DAGS_FOLDER, read_dags_from_db=True)
+ app.dag_bag = get_dag_bag()