ashb commented on code in PR #25935:
URL: https://github.com/apache/airflow/pull/25935#discussion_r960647211


##########
airflow/jobs/scheduler_job.py:
##########
@@ -1483,9 +1497,35 @@ def _find_zombies(self, session: Session) -> None:
         for ti, file_loc in zombies:
             request = TaskCallbackRequest(
                 full_filepath=file_loc,
+                dag_directory=ti.dag_model.dag_directory,
                 simple_task_instance=SimpleTaskInstance.from_ti(ti),
                 msg=f"Detected {ti} as zombie",
             )
             self.log.error("Detected zombie job: %s", request)
             self.executor.send_callback(request)
             Stats.incr('zombies_killed')
+
+    @provide_session
+    def _cleanup_stale_dags(self, session: Session) -> None:
+        """
+        Find all dags that were not updated by Dag Processor recently and mark 
them as inactive.
+
+        In case one of DagProcessors is stopped (in case there are multiple of 
them
+        for different dag folders), it's dags are never marked as inactive.
+        Also remove dags from SerializedDag table.
+        Executed on schedule only if [scheduler]standalone_dag_processor is 
True.
+        """
+        self.log.debug("Checking dags not parsed within last %s seconds.", 
self._stalled_dags_update_timeout)
+        limit_lpt = timezone.utcnow() - 
timedelta(seconds=self._stalled_dags_update_timeout)
+        stale_dags = (
+            session.query(DagModel).filter(DagModel.is_active, 
DagModel.last_parsed_time < limit_lpt).all()
+        )
+        if not stale_dags:
+            self.log.debug("Not stale dags found.")
+            return
+
+        self.log.warning("Found (%d) stales dags not parsed after %s.", 
len(stale_dags), limit_lpt)
+        for dag in stale_dags:
+            dag.is_active = False
+            session.merge(dag)
+            SerializedDagModel.remove_dag(dag_id=dag.dag_id, session=session)

Review Comment:
   ```suggestion
               SerializedDagModel.remove_dag(dag_id=dag.dag_id, session=session)
           session.flush()
   ```



##########
airflow/jobs/scheduler_job.py:
##########
@@ -1483,9 +1497,35 @@ def _find_zombies(self, session: Session) -> None:
         for ti, file_loc in zombies:
             request = TaskCallbackRequest(
                 full_filepath=file_loc,
+                dag_directory=ti.dag_model.dag_directory,
                 simple_task_instance=SimpleTaskInstance.from_ti(ti),
                 msg=f"Detected {ti} as zombie",
             )
             self.log.error("Detected zombie job: %s", request)
             self.executor.send_callback(request)
             Stats.incr('zombies_killed')
+
+    @provide_session
+    def _cleanup_stale_dags(self, session: Session) -> None:
+        """
+        Find all dags that were not updated by Dag Processor recently and mark 
them as inactive.
+
+        In case one of DagProcessors is stopped (in case there are multiple of 
them
+        for different dag folders), it's dags are never marked as inactive.
+        Also remove dags from SerializedDag table.
+        Executed on schedule only if [scheduler]standalone_dag_processor is 
True.
+        """
+        self.log.debug("Checking dags not parsed within last %s seconds.", 
self._stalled_dags_update_timeout)
+        limit_lpt = timezone.utcnow() - 
timedelta(seconds=self._stalled_dags_update_timeout)
+        stale_dags = (
+            session.query(DagModel).filter(DagModel.is_active, 
DagModel.last_parsed_time < limit_lpt).all()
+        )
+        if not stale_dags:
+            self.log.debug("Not stale dags found.")
+            return
+
+        self.log.warning("Found (%d) stales dags not parsed after %s.", 
len(stale_dags), limit_lpt)
+        for dag in stale_dags:
+            dag.is_active = False
+            session.merge(dag)

Review Comment:
   ```suggestion
   ```
   
   Not needed, as this object is already attached to the session so SQLA will 
already track changes to it.



##########
tests/models/test_dagbag.py:
##########
@@ -815,9 +815,9 @@ def test_sync_to_db_is_retried(self, mock_bulk_write_to_db, 
mock_s10n_write_dag,
         # Test that 3 attempts were made to run 'DAG.bulk_write_to_db' 
successfully
         mock_bulk_write_to_db.assert_has_calls(
             [
-                mock.call(mock.ANY, session=mock.ANY),
-                mock.call(mock.ANY, session=mock.ANY),
-                mock.call(mock.ANY, session=mock.ANY),
+                mock.call(mock.ANY, dag_directory=mock.ANY, session=mock.ANY),
+                mock.call(mock.ANY, dag_directory=mock.ANY, session=mock.ANY),
+                mock.call(mock.ANY, dag_directory=mock.ANY, session=mock.ANY),

Review Comment:
   In this case I think this should be this?
   ```suggestion
                   mock.call(mock.ANY, dag_directory=None, session=mock.ANY),
                   mock.call(mock.ANY, dag_directory=None, session=mock.ANY),
                   mock.call(mock.ANY, dag_directory=None, session=mock.ANY),
   ```



##########
airflow/jobs/scheduler_job.py:
##########
@@ -1483,9 +1497,35 @@ def _find_zombies(self, session: Session) -> None:
         for ti, file_loc in zombies:
             request = TaskCallbackRequest(
                 full_filepath=file_loc,
+                dag_directory=ti.dag_model.dag_directory,
                 simple_task_instance=SimpleTaskInstance.from_ti(ti),
                 msg=f"Detected {ti} as zombie",
             )
             self.log.error("Detected zombie job: %s", request)
             self.executor.send_callback(request)
             Stats.incr('zombies_killed')
+
+    @provide_session
+    def _cleanup_stale_dags(self, session: Session) -> None:

Review Comment:
   ```suggestion
       def _cleanup_stale_dags(self, session: Session = NEW_SESSION) -> None:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to