potiuk commented on a change in pull request #10956:
URL: https://github.com/apache/airflow/pull/10956#discussion_r493478012



##########
File path: airflow/models/dag.py
##########
@@ -1941,6 +2087,36 @@ def deactivate_deleted_dags(cls, alive_dag_filelocs: 
List[str], session=None):
             session.rollback()
             raise
 
+    @classmethod
+    def dags_needing_dagruns(cls, session: Session):
+        """
+        Return (and lock) a list of Dag objects that are due to create a new 
DagRun This will return a
+        resultset of rows  that is row-level-locked with a "SELECT ... FOR 
UPDATE" query, you should ensure
+        that any scheduling decisions are made in a single transaction -- as 
soon as the transaction is
+        committed it will be unlocked.
+        """
+
+        # TODO[HA]: Bake this query, it is run _A lot_
+        # TODO[HA]: Make this limit a tunable. We limit so that _one_ scheduler
+        # doesn't try to do all the creation of dag runs
+        return session.query(cls).filter(
+            cls.is_paused.is_(False),
+            cls.is_active.is_(True),
+            cls.next_dagrun_create_after <= func.now(),
+        ).order_by(
+            cls.next_dagrun_create_after
+        ).limit(10).with_for_update(of=cls, **skip_locked(session=session))

Review comment:
       This limit is a bit out of a blue and I think it's not good that we have 
it hard-coded here. While I understand why we want to have it (we want to 
repeat scheduler loops more frequently and only process them in "batches" - It 
would be great to dwo two things:
   
   1) to extract it as a meaningful constant with an explanation why 10 is a 
good value
   2) or - better - to calculate it based on some criteria if this value should 
depend on something
   
   Also we need to check (and understand) what happens if we are increasing 
this value. Do we risk higher memory consumptions? Or deadlocks ? or any other 
risks? What are we trying to prevent here?  
   
   I understand this is an empirical value, but I think we should justify this 
value.

##########
File path: airflow/models/dagrun.py
##########
@@ -75,6 +80,8 @@ class DagRun(Base, LoggingMixin):
         backref=backref('dag_run', uselist=False),
     )
 
+    DEFAULT_DAGRUNS_TO_EXAMINE = airflow_conf.getint('scheduler', 
'max_dagruns_per_query', fallback=20)

Review comment:
       Same here. The 20 value is a bit magical and we need to know where it 
comes from and what are the consequences of changing the values. Ideally some 
test results would be great to show what can happen if we decrease it. It is 
also a bit strange that his 20 here is not hard-coded, where the 10 in the 
"next_dagrun" is. What's the reasoning behind one being hard-coded and the 
other being configurable. 

##########
File path: airflow/models/dagrun.py
##########
@@ -494,12 +542,13 @@ def verify_integrity(self, session: Session = None):
                 session.add(ti)
 
         try:
-            session.commit()
+            session.flush()
         except IntegrityError as err:
             self.log.info(str(err))
             self.log.info('Hit IntegrityError while creating the TIs for '
                           f'{dag.dag_id} - {self.execution_date}.')
             self.log.info('Doing session rollback.')
+            # TODO[HA]: We probaly need to savepoint this so we can keep the 
transaction alive.

Review comment:
       On a side not. Should we rename verify_integrity to some different name 
? Seems that this method not only verify the integrity of the dagrun and 
connected task instances but it also creates task instances were they are 
missing. Maybe "synchronize_task_instances" or something like that.

##########
File path: airflow/models/dagrun.py
##########
@@ -344,6 +393,9 @@ def update_state(self, session: Session = None) -> List[TI]:
         leaf_task_ids = {t.task_id for t in dag.leaves}
         leaf_tis = [ti for ti in tis if ti.task_id in leaf_task_ids]
 
+        # TODO[ha]: These callbacks shouldn't run in the scheduler loop - 
check if Kamil changed this to run

Review comment:
       We should indeed move it outside of the scheduler loop to a DAG 
processor. And possibly  send it to DagFileProcessor like Kamil did. 
   
   This shoud actually help a lot if this is all within the uncommitted 
transaction. Those callbacks are potentialy problematic

##########
File path: airflow/models/dagrun.py
##########
@@ -313,14 +360,16 @@ def update_state(self, session: Session = None) -> 
List[TI]:
         :rtype ready_tis: list[airflow.models.TaskInstance]
         """
 
+        start_dttm = timezone.utcnow()

Review comment:
       Should we change this method to also update state of the tasks? I think 
it is only used in two places (schedule_job and backfill_job) and the 
"update_state" method returning tasks sounds a bit weird. I'd rather think 
about update the task instances as well when update_state is run so that it is 
all encapsulated within one "update_state" method.

##########
File path: airflow/models/dag.py
##########
@@ -1941,6 +2087,36 @@ def deactivate_deleted_dags(cls, alive_dag_filelocs: 
List[str], session=None):
             session.rollback()
             raise
 
+    @classmethod
+    def dags_needing_dagruns(cls, session: Session):
+        """
+        Return (and lock) a list of Dag objects that are due to create a new 
DagRun This will return a
+        resultset of rows  that is row-level-locked with a "SELECT ... FOR 
UPDATE" query, you should ensure
+        that any scheduling decisions are made in a single transaction -- as 
soon as the transaction is
+        committed it will be unlocked.
+        """
+
+        # TODO[HA]: Bake this query, it is run _A lot_
+        # TODO[HA]: Make this limit a tunable. We limit so that _one_ scheduler
+        # doesn't try to do all the creation of dag runs
+        return session.query(cls).filter(
+            cls.is_paused.is_(False),
+            cls.is_active.is_(True),
+            cls.next_dagrun_create_after <= func.now(),

Review comment:
       What happens if next _dagrun_create_after is None (because it is set to 
None in update_dag_next_dagrun(). Will they be skipped? Is there any other 
place where those are selected ?

##########
File path: airflow/models/dagrun.py
##########
@@ -494,12 +542,13 @@ def verify_integrity(self, session: Session = None):
                 session.add(ti)
 
         try:
-            session.commit()
+            session.flush()

Review comment:
       What's the reason for the integration error here? Is the Integrity error 
here still valid if we do not commit to the database? I think flush is not 
likely to generate Integrity Error at all.
   
   Shoudl we start caring about isolation level of the transactions we have ? 
Will it work when we have different isolation levels set ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to