msumit commented on a change in pull request #11344:
URL: https://github.com/apache/airflow/pull/11344#discussion_r501850751
##########
File path: airflow/models/dagbag.py
##########
@@ -352,9 +352,9 @@ def _process_modules(self, filepath, mods,
file_last_changed_on_disk):
dag.fileloc = filepath
try:
dag.is_subdag = False
- self.bag_dag(dag=dag, root_dag=dag)
if isinstance(dag.normalized_schedule_interval, str):
croniter(dag.normalized_schedule_interval)
+ self.bag_dag(dag=dag, root_dag=dag)
Review comment:
@kaxil found that such a testcase already exists, so added a check to
assert the dagbag size on top of it.
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1741,6 +1730,16 @@ def _run_scheduler_loop(self) -> None:
if not is_unit_test:
time.sleep(self._processor_poll_interval)
+ # Verify that all files were processed, and if so, deactivate DAGs
that
+ # haven't been touched by the scheduler as they likely have been
+ # deleted.
+ if self.processor_agent.all_files_processed:
Review comment:
yeah sure @ashb.. will remove this code block from the PR for now, and
will try to fix it next week.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]