uranusjr commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r702845592



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -434,15 +433,15 @@ def _process_backfill_task_instances(
             # determined deadlocked while they are actually
             # waiting for their upstream to finish
             @provide_session
-            def _per_task_process(key, ti, session=None):
+            def _per_task_process(key, ti: TaskInstance, session=None):
                 ti.refresh_from_db(lock_for_update=True, session=session)
 
                 task = self.dag.get_task(ti.task_id, include_subdags=True)
                 ti.task = task
 
-                ignore_depends_on_past = self.ignore_first_depends_on_past and 
ti.execution_date == (
-                    start_date or ti.start_date
-                )
+                ignore_depends_on_past = self.ignore_first_depends_on_past and 
ti.get_dagrun(
+                    session
+                ).execution_date == (start_date or ti.start_date)

Review comment:
       ```suggestion
                   dagrun = self.ignore_first_depends_on_past and 
ti.get_dagrun(session=session)
                   ignore_depends_on_past = dagrun.execution_date == 
(start_date or ti.start_date)
   ```
   
   Gosh that Black formatting is awful.

##########
File path: airflow/dag_processing/processor.py
##########
@@ -387,14 +389,10 @@ def manage_slas(self, dag: DAG, session: Session = None) 
-> None:
             .subquery('sq')
         )
 
-        max_tis: List[TI] = (
-            session.query(TI)
-            .filter(
-                TI.dag_id == dag.dag_id,
-                TI.task_id == qry.c.task_id,
-                TI.execution_date == qry.c.max_ti,
-            )
-            .all()
+        max_tis: Iterator[TI] = session.query(TI).filter(
+            TI.dag_id == dag.dag_id,
+            TI.task_id == qry.c.task_id,
+            TI.execution_date == qry.c.max_ti,
         )

Review comment:
       ```suggestion
           max_tis: Iterator[TI] = (
               session.query(TI)
               .join(TI.dag_run)
               .filter(
                   TI.dag_id == dag.dag_id,
                   TI.task_id == qry.c.task_id,
                   DR.execution_date == qry.c.max_ti,
               )
          )
   ```
   
   I think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to