ephraimbuddy commented on a change in pull request #20349:
URL: https://github.com/apache/airflow/pull/20349#discussion_r770880331
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -316,6 +316,15 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session =
pool_to_task_instances: DefaultDict[str, List[models.Pool]] =
defaultdict(list)
for task_instance in task_instances_to_examine:
+ # If the dag is no longer in the dagbag, don't bother
Review comment:
This can hardly happen but can happen.
The way to reproduce it is to remove a dag file of a running dag and mark
the task instances as failed then delete the dag from the UI. You'll observe
the crashlooping. The dag must have `max_active_tasks_per_dag` set
As for the change, it doesn't stop a running task from completing, it only
errors out that the dag is missing.
I could actually only error when checking concurrency because that's where
we accessed a dagbag and used it to check for task which can cause the
scheduler to crash with nonetype attribute error.
And sorry for the misleading comment at the concurrency point. The task
would get there and still print error log.
I'm considering to only have the log there. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]