SamWheating commented on a change in pull request #21399:
URL: https://github.com/apache/airflow/pull/21399#discussion_r801881889
##########
File path: airflow/dag_processing/manager.py
##########
@@ -503,6 +507,40 @@ def start(self):
return self._run_parsing_loop()
+ @provide_session
+ def _deactivate_stale_dags(self, session=None):
+ now = timezone.utcnow()
+ elapsed_time_since_refresh = (now -
self.last_deactivate_stale_dags_time).total_seconds()
+ if elapsed_time_since_refresh > self.deactivate_stale_dags_interval:
+ last_parsed = {
+ fp: self.get_last_finish_time(fp) for fp in self.file_paths if
self.get_last_finish_time(fp)
+ }
+ to_deactivate = set()
+ dags_parsed = (
+ session.query(DagModel.dag_id, DagModel.fileloc,
DagModel.last_parsed_time)
+ .filter(DagModel.is_active)
+ .all()
+ )
+ for dag in dags_parsed:
+ if (
+ dag.fileloc in last_parsed
+ and (dag.last_parsed_time +
timedelta(seconds=self._processor_timeout))
Review comment:
So there's actually a reason for this.
We're comparing the parse time as reported by the processor manager to the
last_parsed_time as seen in the DAG table, however these values are taken
independently:
`DagModel.last_parsed_time` is decided here, when the DAG is written to the
DB:
https://github.com/apache/airflow/blob/960f573615b5357677c10bd9f7ec11811a0355c6/airflow/models/dag.py#L2427
whereas the `DagParsingStat.last_finish_time` is decided when the file
processor finishes:
https://github.com/apache/airflow/blob/dbe723da95143f6d33e5d2594bc2017c4164e687/airflow/dag_processing/manager.py#L915
So because of this, `DagParsingStat.last_finish_time` is always going to be
_slightly_ later than `DagModel.last_parsed_time` (typically on the order of
milliseconds). Thus in order to be certain that the file was processed more
recently than the DAG was last observed we can't directly compare the two
timestamps and instead have to do something like:
```
DagParsingStat.last_finish_time > (SOME_BUFFER + DagModel.last_parsed_time)
```
I chose to use the `processor_timeout` here because it represents the
absolute upper bound on the difference between
`DagParsingStat.last_finish_time` and ` DagModel.last_parsed_time`, and thus we
favour false negatives (not deactivating a DAG which is actually gone) over
false positives (incorrectly deactivating a DAG because the file processor was
blocking for a few seconds after updating the DB)
Let me know what you think - from my testing in `breeze` this approach
appears to work reliably, but it also adds a lot of complexity.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]