pankajkoti commented on code in PR #38369:
URL: https://github.com/apache/airflow/pull/38369#discussion_r1533885409
##########
airflow/www/views.py:
##########
@@ -1082,6 +1088,54 @@ def datasets(self):
state_color_mapping=state_color_mapping,
)
+ @expose("/dataset_dags/<int:dataset_id>")
+ @auth.has_access(
+ [
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET),
+ ]
+ )
+ @action_logging
+ def retrieve_related_dags_for_dataset(self, dataset_id: int):
+ """
+ Retrieve related DAGs for a specific dataset.
+
+ :param dataset_id: The dataset id
+ """
+ with create_session() as session:
+ dataset =
session.query(DatasetModel).filter_by(id=dataset_id).one_or_none()
+ if not dataset:
+ return flask.json.jsonify({"error": "Dataset not found"}), 404
+
+ # Fetch all consuming DAGs (downstream)
+ consuming_dags_refs = (
+
session.query(DagScheduleDatasetReference).filter_by(dataset_id=dataset_id).all()
+ )
+ consuming_dags = [
+ {
+ "dag_id": dag_ref.dag_id,
+ "is_paused":
session.query(DagModel).filter_by(dag_id=dag_ref.dag_id).one().is_paused,
Review Comment:
Wondering if we can optimise the number of DB calls here.
Instead of making a query every single time when iterating over
consuming_dag_refs, can we get pre-build the list of all the dag_ids from
consuming_dag_regs and make a single query to DagModel with all those DAG IDs
for getting there is_paused values?
##########
airflow/www/views.py:
##########
@@ -1082,6 +1088,54 @@ def datasets(self):
state_color_mapping=state_color_mapping,
)
+ @expose("/dataset_dags/<int:dataset_id>")
+ @auth.has_access(
+ [
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET),
+ ]
+ )
+ @action_logging
+ def retrieve_related_dags_for_dataset(self, dataset_id: int):
+ """
+ Retrieve related DAGs for a specific dataset.
+
+ :param dataset_id: The dataset id
+ """
+ with create_session() as session:
+ dataset =
session.query(DatasetModel).filter_by(id=dataset_id).one_or_none()
+ if not dataset:
+ return flask.json.jsonify({"error": "Dataset not found"}), 404
+
+ # Fetch all consuming DAGs (downstream)
+ consuming_dags_refs = (
+
session.query(DagScheduleDatasetReference).filter_by(dataset_id=dataset_id).all()
+ )
+ consuming_dags = [
+ {
+ "dag_id": dag_ref.dag_id,
+ "is_paused":
session.query(DagModel).filter_by(dag_id=dag_ref.dag_id).one().is_paused,
+ }
+ for dag_ref in consuming_dags_refs
+ ]
+
+ # Fetch all producing tasks (upstream)
+ producing_tasks_refs = (
+
session.query(TaskOutletDatasetReference).filter_by(dataset_id=dataset_id).all()
+ )
+ producing_tasks = []
+ for task_ref in producing_tasks_refs:
+ dag =
session.query(DagModel).filter_by(dag_id=task_ref.dag_id).one()
Review Comment:
Also, wondering if we can avoid multiple DB calls here. Looks like
TaskOutletDatasetReference does contain dag_id, so it would be possible to
pre-build the list of DAG IDs and then we can make a single DB call to DAGModel
with those dag_ids to get their is_paused values.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]