ashb commented on code in PR #25935:
URL: https://github.com/apache/airflow/pull/25935#discussion_r963745382
##########
airflow/jobs/scheduler_job.py:
##########
@@ -1509,3 +1524,28 @@ def _generate_zombie_message_details(ti: TaskInstance):
zombie_message_details["External Executor Id"] =
ti.external_executor_id
return zombie_message_details
+
+ @provide_session
+ def _cleanup_stale_dags(self, session: Session = NEW_SESSION) -> None:
+ """
+ Find all dags that were not updated by Dag Processor recently and mark
them as inactive.
+
+ In case one of DagProcessors is stopped (in case there are multiple of
them
+ for different dag folders), it's dags are never marked as inactive.
+ Also remove dags from SerializedDag table.
+ Executed on schedule only if [scheduler]standalone_dag_processor is
True.
+ """
+ self.log.debug("Checking dags not parsed within last %s seconds.",
self._dag_stale_not_seen_duration)
+ limit_lpt = timezone.utcnow() -
timedelta(seconds=self._dag_stale_not_seen_duration)
+ stale_dags = (
+ session.query(DagModel).filter(DagModel.is_active,
DagModel.last_parsed_time < limit_lpt).all()
+ )
+ if not stale_dags:
+ self.log.debug("Not stale dags found.")
+ return
+
+ self.log.warning("Found (%d) stales dags not parsed after %s.",
len(stale_dags), limit_lpt)
Review Comment:
```suggestion
self.log.info("Found (%d) stales dags not parsed after %s.",
len(stale_dags), limit_lpt)
```
I don't think this is a warning -- it's nothing "wrong" that a user needs to
take action over.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]