potiuk commented on a change in pull request #10956:
URL: https://github.com/apache/airflow/pull/10956#discussion_r493515364



##########
File path: airflow/models/dagrun.py
##########
@@ -494,12 +542,13 @@ def verify_integrity(self, session: Session = None):
                 session.add(ti)
 
         try:
-            session.commit()
+            session.flush()
         except IntegrityError as err:
             self.log.info(str(err))
             self.log.info('Hit IntegrityError while creating the TIs for '
                           f'{dag.dag_id} - {self.execution_date}.')
             self.log.info('Doing session rollback.')
+            # TODO[HA]: We probaly need to savepoint this so we can keep the 
transaction alive.

Review comment:
       One side note. Should we rename verify_integrity to some different name 
? Seems that this method not only verify the integrity of the dagrun and 
connected task instances but it also creates task instances were they are 
missing. Maybe "synchronize_task_instances" or something like that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to