ephraimbuddy commented on a change in pull request #20349:
URL: https://github.com/apache/airflow/pull/20349#discussion_r770880331



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -316,6 +316,15 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session =
 
         pool_to_task_instances: DefaultDict[str, List[models.Pool]] = 
defaultdict(list)
         for task_instance in task_instances_to_examine:
+            # If the dag is no longer in the dagbag, don't bother

Review comment:
       This can hardly happen but can happen.
   The way to reproduce it is to remove a dag file of a running dag and mark 
the task instances as failed then delete the dag from the UI. You'll observe 
the crashlooping. The dag must have `max_active_tasks_per_dag` set
   
   As for the change, it doesn't stop a running task from completing, it only 
errors out that the dag is missing. 
   I could actually only error when checking concurrency because that's where 
we accessed a dagbag and used it to check for task which can cause the 
scheduler to crash with nonetype attribute error.
   
   And sorry for the misleading comment at the concurrency point. The task 
would get there and still print error log.
   
   I'm considering to only have the log there. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to