sunank200 commented on code in PR #38369:
URL: https://github.com/apache/airflow/pull/38369#discussion_r1534154327


##########
airflow/www/views.py:
##########
@@ -1082,6 +1088,67 @@ def datasets(self):
             state_color_mapping=state_color_mapping,
         )
 
+    @expose("/dataset_dags/<int:dataset_id>")
+    @auth.has_access(
+        [
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET),
+        ]
+    )
+    @action_logging
+    def retrieve_related_dags_for_dataset(self, dataset_id: int):
+        """
+        Retrieve related DAGs for a specific dataset.
+
+        :param dataset_id: The dataset id
+        """
+        with create_session() as session:
+            dataset = 
session.query(DatasetModel).filter_by(id=dataset_id).one_or_none()
+            if not dataset:
+                return flask.json.jsonify({"error": "Dataset not found"}), 404
+
+            consuming_dags_refs = (
+                
session.query(DagScheduleDatasetReference).filter_by(dataset_id=dataset_id).all()
+            )
+            producing_tasks_refs = (
+                
session.query(TaskOutletDatasetReference).filter_by(dataset_id=dataset_id).all()
+            )
+
+            # Collect all unique DAG IDs from references
+            all_dag_ids = {ref.dag_id for ref in consuming_dags_refs + 
producing_tasks_refs}
+
+            # Fetch is_paused status for all DAGs in one query
+            dag_status = {
+                dag.dag_id: dag.is_paused
+                for dag in 
session.query(DagModel).filter(DagModel.dag_id.in_(all_dag_ids)).all()
+            }
+
+            # Validate if all dag_ids found
+            missing_dags = all_dag_ids - dag_status.keys()
+            if missing_dags:
+                raise ValueError(f"Missing DAGs in DagModel: {missing_dags}")
+
+            consuming_dags = [
+                {
+                    "dag_id": dag_ref.dag_id,
+                    "is_paused": dag_status.get(dag_ref.dag_id, False),
+                    # Default to False if dag_id not found

Review Comment:
   Done: 
https://github.com/apache/airflow/pull/38369/commits/50baa2449ce7182e4a1b882ca761f2f812ab7f16



##########
airflow/www/views.py:
##########
@@ -1082,6 +1088,67 @@ def datasets(self):
             state_color_mapping=state_color_mapping,
         )
 
+    @expose("/dataset_dags/<int:dataset_id>")
+    @auth.has_access(
+        [
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET),
+        ]
+    )
+    @action_logging
+    def retrieve_related_dags_for_dataset(self, dataset_id: int):
+        """
+        Retrieve related DAGs for a specific dataset.
+
+        :param dataset_id: The dataset id
+        """
+        with create_session() as session:
+            dataset = 
session.query(DatasetModel).filter_by(id=dataset_id).one_or_none()
+            if not dataset:
+                return flask.json.jsonify({"error": "Dataset not found"}), 404
+
+            consuming_dags_refs = (
+                
session.query(DagScheduleDatasetReference).filter_by(dataset_id=dataset_id).all()
+            )
+            producing_tasks_refs = (
+                
session.query(TaskOutletDatasetReference).filter_by(dataset_id=dataset_id).all()
+            )
+
+            # Collect all unique DAG IDs from references
+            all_dag_ids = {ref.dag_id for ref in consuming_dags_refs + 
producing_tasks_refs}
+
+            # Fetch is_paused status for all DAGs in one query
+            dag_status = {
+                dag.dag_id: dag.is_paused
+                for dag in 
session.query(DagModel).filter(DagModel.dag_id.in_(all_dag_ids)).all()
+            }
+
+            # Validate if all dag_ids found
+            missing_dags = all_dag_ids - dag_status.keys()
+            if missing_dags:
+                raise ValueError(f"Missing DAGs in DagModel: {missing_dags}")
+
+            consuming_dags = [
+                {
+                    "dag_id": dag_ref.dag_id,
+                    "is_paused": dag_status.get(dag_ref.dag_id, False),
+                    # Default to False if dag_id not found
+                }
+                for dag_ref in consuming_dags_refs
+            ]
+
+            producing_tasks = [
+                {
+                    "dag_id": task_ref.dag_id,
+                    "is_paused": dag_status.get(task_ref.dag_id, False),
+                    # Default to False if dag_id not found

Review Comment:
   Done: 
https://github.com/apache/airflow/pull/38369/commits/50baa2449ce7182e4a1b882ca761f2f812ab7f16



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to