blag commented on code in PR #27828:
URL: https://github.com/apache/airflow/pull/27828#discussion_r1032045393


##########
airflow/jobs/scheduler_job.py:
##########
@@ -1574,3 +1585,33 @@ def _cleanup_stale_dags(self, session: Session = 
NEW_SESSION) -> None:
             dag.is_active = False
             SerializedDagModel.remove_dag(dag_id=dag.dag_id, session=session)
         session.flush()
+
+    @provide_session
+    def _orphan_unreferenced_datasets(self, session: Session = NEW_SESSION) -> 
None:
+        """
+        Detects datasets that are no longer referenced in any DAG schedule 
parameters or task outlets and
+        sets the dataset is_orphaned flags to True
+        """
+        orphaned_dataset_query = (
+            session.query(DatasetModel)
+            .join(
+                DagScheduleDatasetReference,
+                DagScheduleDatasetReference.dataset_id == DatasetModel.id,
+                isouter=True,
+            )
+            .join(
+                TaskOutletDatasetReference,
+                TaskOutletDatasetReference.dataset_id == DatasetModel.id,
+                isouter=True,
+            )
+            .group_by(DatasetModel.id)
+            .having(
+                and_(
+                    func.count(DagScheduleDatasetReference.dag_id) == 0,
+                    func.count(TaskOutletDatasetReference.dag_id) == 0,
+                )
+            )
+        )
+        for dataset in orphaned_dataset_query:
+            self.log.info("Orphaning unreferenced dataset '%s'", dataset.uri)
+            dataset.is_orphaned = True

Review Comment:
   The group by expression might interfere but I'll try it, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to