SamWheating commented on a change in pull request #21399:
URL: https://github.com/apache/airflow/pull/21399#discussion_r801881889



##########
File path: airflow/dag_processing/manager.py
##########
@@ -503,6 +507,40 @@ def start(self):
 
         return self._run_parsing_loop()
 
+    @provide_session
+    def _deactivate_stale_dags(self, session=None):
+        now = timezone.utcnow()
+        elapsed_time_since_refresh = (now - 
self.last_deactivate_stale_dags_time).total_seconds()
+        if elapsed_time_since_refresh > self.deactivate_stale_dags_interval:
+            last_parsed = {
+                fp: self.get_last_finish_time(fp) for fp in self.file_paths if 
self.get_last_finish_time(fp)
+            }
+            to_deactivate = set()
+            dags_parsed = (
+                session.query(DagModel.dag_id, DagModel.fileloc, 
DagModel.last_parsed_time)
+                .filter(DagModel.is_active)
+                .all()
+            )
+            for dag in dags_parsed:
+                if (
+                    dag.fileloc in last_parsed
+                    and (dag.last_parsed_time + 
timedelta(seconds=self._processor_timeout))

Review comment:
       So there's actually a reason for this. 
   
   We're comparing the parse time as reported by the processor manager to the 
last_parsed_time as seen in the DAG table, however these values are taken 
independently:
   
   `DagModel.last_parsed_time` is decided here, when the DAG is written to the 
DB:
   
https://github.com/apache/airflow/blob/960f573615b5357677c10bd9f7ec11811a0355c6/airflow/models/dag.py#L2427
   
   whereas the `DagParsingStat.last_finish_time` is decided when the file 
processor finishes:
   
https://github.com/apache/airflow/blob/dbe723da95143f6d33e5d2594bc2017c4164e687/airflow/dag_processing/manager.py#L915
   
   So because of this, `DagParsingStat.last_finish_time` is always going to be 
_slightly_ later than `DagModel.last_parsed_time` (typically on the order of 
milliseconds). Thus in order to be certain that the file was processed more 
recently than the DAG was last observed we can't directly compare the two 
timestamps and instead have to do something like:
   
   ```
   DagParsingStat.last_finish_time > (SOME_BUFFER + DagModel.last_parsed_time)
   ```
   
   I chose to use the `processor_timeout` here because it represents the 
absolute upper bound on the difference between 
`DagParsingStat.last_finish_time` and ` DagModel.last_parsed_time`, and thus we 
favour false negatives (not deactivating a DAG which is actually gone) over 
false positives (incorrectly deactivating a DAG because the file processor was 
blocking for a few seconds after updating the DB)
   
   Let me know what you think - from my testing in `breeze` this approach 
appears to work reliably, but it also adds a lot of complexity.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to