uranusjr commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r643243739
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -901,6 +901,33 @@ def __get_concurrency_maps(
task_map[(dag_id, task_id)] = count
return dag_map, task_map
+ def _remove_tis_with_missing_dag(self, task_instances, session=None):
+ """
+ Fail task instances and the corresponding DagRun if the dag can't be
found in
+ the dags folder but exists in SerializedDag table.
+ Return task instances that exists in SerializedDag table as well as
dags folder.
+ If the dag can't be found in DagBag, just return the task instance.
This is common in
+ unittest where subdir is os.devnull
+ """
+ tis = []
Review comment:
This is the list of TaskInstances that *will* be queued, right? Could
use a better variable name (and a `:returns:` line in the docstring).
I may also turn this into an iterator or even filter function instead to get
rid of this local list, so at the call site we can do
```python
task_instances_to_examine = list(
self._filter_task_instances_to_examine(
task_instances_to_examine, session=session,
) # This is a generator that yields each ti.
)
```
or
```python
task_instances_to_examine = [
ti for ti in task_instances_to_examine
if self._mark_task_instance_should_be_examined(ti, session=session)
]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]