ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692758448



##########
File path: airflow/models/baseoperator.py
##########
@@ -1257,24 +1257,46 @@ def get_flat_relatives(self, upstream: bool = False):
         dag: DAG = self._dag
         return list(map(lambda task_id: dag.task_dict[task_id], 
self.get_flat_relative_ids(upstream)))
 
+    @provide_session
     def run(
         self,
         start_date: Optional[datetime] = None,
         end_date: Optional[datetime] = None,
         ignore_first_depends_on_past: bool = True,
         ignore_ti_state: bool = False,
         mark_success: bool = False,
+        session: Session = None,
     ) -> None:
         """Run a set of task instances for a date range."""
         start_date = start_date or self.start_date
         end_date = end_date or self.end_date or timezone.utcnow()
 
         for info in self.dag.iter_dagrun_infos_between(start_date, end_date, 
align=False):
             ignore_depends_on_past = info.logical_date == start_date and 
ignore_first_depends_on_past
-            TaskInstance(self, info.logical_date).run(
+            try:
+                ti = TaskInstance(self, info.logical_date)
+            except DagRunNotFound:

Review comment:
       (`# Create a "fake" run…` -- it can't be a fake one, it needs to be a 
real one in the DB as the ti gets committed by `ti.run()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to