uranusjr commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r643243739



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -901,6 +901,33 @@ def __get_concurrency_maps(
             task_map[(dag_id, task_id)] = count
         return dag_map, task_map
 
+    def _remove_tis_with_missing_dag(self, task_instances, session=None):
+        """
+        Fail task instances and the corresponding DagRun if the dag can't be 
found in
+        the dags folder but exists in SerializedDag table.
+        Return task instances that exists in SerializedDag table as well as 
dags folder.
+        If the dag can't be found in DagBag, just return the task instance. 
This is common in
+        unittest where subdir is os.devnull
+        """
+        tis = []

Review comment:
       This is the list of TaskInstances that *will* be queued, right? Could 
use a better variable name (and a `:returns:` line in the docstring).
   
   I may also turn this into an iterator or even filter function instead to get 
rid of this local list, so at the call site we can do
   
   ```python
   task_instances_to_examine = list(
       self._filter_task_instances_to_examine(
           task_instances_to_examine, session=session,
       )  # This is a generator that yields each ti.
   )
   ```
   
   or
   
   ```python
   task_instances_to_examine = [
       ti for ti in task_instances_to_examine
       if self._mark_task_instance_should_be_examined(ti, session=session)
   ]
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to