ashb commented on a change in pull request #10956:
URL: https://github.com/apache/airflow/pull/10956#discussion_r494807294



##########
File path: airflow/models/dagrun.py
##########
@@ -75,6 +80,8 @@ class DagRun(Base, LoggingMixin):
         backref=backref('dag_run', uselist=False),
     )
 
+    DEFAULT_DAGRUNS_TO_EXAMINE = airflow_conf.getint('scheduler', 
'max_dagruns_per_query', fallback=20)

Review comment:
       numbers picked entirely arbitrarily and it seemed to permit will in 
tests.
   
   The other number is hard coded still because I haven't addressed the to-do 
comment I already left to pull it out

##########
File path: airflow/models/dagrun.py
##########
@@ -75,6 +80,8 @@ class DagRun(Base, LoggingMixin):
         backref=backref('dag_run', uselist=False),
     )
 
+    DEFAULT_DAGRUNS_TO_EXAMINE = airflow_conf.getint('scheduler', 
'max_dagruns_per_query', fallback=20)

Review comment:
       numbers picked entirely arbitrarily and it seemed to perform well in 
tests.
   
   The other number is hard coded still because I haven't addressed the to-do 
comment I already left to pull it out

##########
File path: airflow/models/dag.py
##########
@@ -1941,6 +2087,36 @@ def deactivate_deleted_dags(cls, alive_dag_filelocs: 
List[str], session=None):
             session.rollback()
             raise
 
+    @classmethod
+    def dags_needing_dagruns(cls, session: Session):
+        """
+        Return (and lock) a list of Dag objects that are due to create a new 
DagRun This will return a
+        resultset of rows  that is row-level-locked with a "SELECT ... FOR 
UPDATE" query, you should ensure
+        that any scheduling decisions are made in a single transaction -- as 
soon as the transaction is
+        committed it will be unlocked.
+        """
+
+        # TODO[HA]: Bake this query, it is run _A lot_
+        # TODO[HA]: Make this limit a tunable. We limit so that _one_ scheduler
+        # doesn't try to do all the creation of dag runs
+        return session.query(cls).filter(
+            cls.is_paused.is_(False),
+            cls.is_active.is_(True),
+            cls.next_dagrun_create_after <= func.now(),
+        ).order_by(
+            cls.next_dagrun_create_after
+        ).limit(10).with_for_update(of=cls, **skip_locked(session=session))

Review comment:
       ```python
                        # TODO[HA]: Make this limit a tunable. We limit so that 
_one_ scheduler
                        # doesn't try to do all the creation of dag runs
   ```

##########
File path: airflow/models/dagrun.py
##########
@@ -494,12 +542,13 @@ def verify_integrity(self, session: Session = None):
                 session.add(ti)
 
         try:
-            session.commit()
+            session.flush()

Review comment:
       Not sure yet -- https://github.com/apache/airflow/pull/10136 was where 
this was added only recently.

##########
File path: airflow/models/dag.py
##########
@@ -1941,6 +2087,36 @@ def deactivate_deleted_dags(cls, alive_dag_filelocs: 
List[str], session=None):
             session.rollback()
             raise
 
+    @classmethod
+    def dags_needing_dagruns(cls, session: Session):
+        """
+        Return (and lock) a list of Dag objects that are due to create a new 
DagRun This will return a
+        resultset of rows  that is row-level-locked with a "SELECT ... FOR 
UPDATE" query, you should ensure
+        that any scheduling decisions are made in a single transaction -- as 
soon as the transaction is
+        committed it will be unlocked.
+        """
+
+        # TODO[HA]: Bake this query, it is run _A lot_
+        # TODO[HA]: Make this limit a tunable. We limit so that _one_ scheduler
+        # doesn't try to do all the creation of dag runs
+        return session.query(cls).filter(
+            cls.is_paused.is_(False),
+            cls.is_active.is_(True),
+            cls.next_dagrun_create_after <= func.now(),

Review comment:
       Yes, None in `next_dagrun_create_after` is designed to be skipped - for 
instance for `@once` 
https://github.com/apache/airflow/pull/10956/files#diff-e5cbc8f771ec50ccb79ad8505f6f5697R482-R485:
   
   ```python
   
           if (self.schedule_interval == "@once" and 
date_last_automated_dagrun) or \
                   self.schedule_interval is None:
               # Manual trigger, or already created the run for @once, can 
short circuit
               return (None, None) 
   ```
   
   or max_active_dags:
   
   ```

##########
File path: airflow/models/dag.py
##########
@@ -1941,6 +2087,36 @@ def deactivate_deleted_dags(cls, alive_dag_filelocs: 
List[str], session=None):
             session.rollback()
             raise
 
+    @classmethod
+    def dags_needing_dagruns(cls, session: Session):
+        """
+        Return (and lock) a list of Dag objects that are due to create a new 
DagRun This will return a
+        resultset of rows  that is row-level-locked with a "SELECT ... FOR 
UPDATE" query, you should ensure
+        that any scheduling decisions are made in a single transaction -- as 
soon as the transaction is
+        committed it will be unlocked.
+        """
+
+        # TODO[HA]: Bake this query, it is run _A lot_
+        # TODO[HA]: Make this limit a tunable. We limit so that _one_ scheduler
+        # doesn't try to do all the creation of dag runs
+        return session.query(cls).filter(
+            cls.is_paused.is_(False),
+            cls.is_active.is_(True),
+            cls.next_dagrun_create_after <= func.now(),

Review comment:
       Yes, None in `next_dagrun_create_after` is designed to be skipped - for 
instance for `@once` 
https://github.com/apache/airflow/pull/10956/files#diff-e5cbc8f771ec50ccb79ad8505f6f5697R482-R485:
   
   ```python
   
           if (self.schedule_interval == "@once" and 
date_last_automated_dagrun) or \
                   self.schedule_interval is None:
               # Manual trigger, or already created the run for @once, can 
short circuit
               return (None, None) 
   ```
   
   or max_active_dags 
https://github.com/apache/airflow/pull/10956/files#diff-e5cbc8f771ec50ccb79ad8505f6f5697R1758-R1765:
   
   ```python
   
               active_runs_of_dag = num_active_runs.get(dag.dag_id, 0)
               if dag.max_active_runs and active_runs_of_dag >= 
dag.max_active_runs:
                   # Since this happens every time the dag is parsed it would 
be quite spammy at info
                   log.debug(
                       "DAG %s is at (or above) max_active_runs (%d of %d), not 
creating any more runs",
                       dag.dag_id, active_runs_of_dag, dag.max_active_runs
                   )
                   orm_dag.next_dagrun_create_after = None
   ```

##########
File path: airflow/jobs/backfill_job.py
##########
@@ -629,6 +629,7 @@ def _per_task_process(task, key, ti, session=None):  # 
pylint: disable=too-many-
             _dag_runs = ti_status.active_runs[:]
             for run in _dag_runs:
                 run.update_state(session=session)
+                session.merge(run)

Review comment:
       Done now. all changes removed from this fie.

##########
File path: airflow/models/dag.py
##########
@@ -1941,6 +2087,36 @@ def deactivate_deleted_dags(cls, alive_dag_filelocs: 
List[str], session=None):
             session.rollback()
             raise
 
+    @classmethod
+    def dags_needing_dagruns(cls, session: Session):
+        """
+        Return (and lock) a list of Dag objects that are due to create a new 
DagRun This will return a
+        resultset of rows  that is row-level-locked with a "SELECT ... FOR 
UPDATE" query, you should ensure
+        that any scheduling decisions are made in a single transaction -- as 
soon as the transaction is
+        committed it will be unlocked.
+        """
+
+        # TODO[HA]: Bake this query, it is run _A lot_
+        # TODO[HA]: Make this limit a tunable. We limit so that _one_ scheduler
+        # doesn't try to do all the creation of dag runs
+        return session.query(cls).filter(
+            cls.is_paused.is_(False),
+            cls.is_active.is_(True),
+            cls.next_dagrun_create_after <= func.now(),
+        ).order_by(
+            cls.next_dagrun_create_after
+        ).limit(10).with_for_update(of=cls, **skip_locked(session=session))

Review comment:
       The main reason for limiting this is two fold:
   
   1. so that when running multiple schedulers, one scheduler doesn't try to do 
_all_ the work of creating dag runs.
   2. when running with a single scheduler, make sure that the scheduler 
doesn't get "stuck" just creating dag runs instead of scheduling some tasks.

##########
File path: airflow/models/dagrun.py
##########
@@ -313,14 +360,16 @@ def update_state(self, session: Session = None) -> 
List[TI]:
         :rtype ready_tis: list[airflow.models.TaskInstance]
         """
 
+        start_dttm = timezone.utcnow()

Review comment:
       Already had that thought :) 
   
https://github.com/apache/airflow/blob/b28d67332bb052441bc2ffd00030d620aac381c8/airflow/jobs/scheduler_job.py#L1565-L1567

##########
File path: airflow/models/dagrun.py
##########
@@ -494,12 +542,13 @@ def verify_integrity(self, session: Session = None):
                 session.add(ti)
 
         try:
-            session.commit()
+            session.flush()
         except IntegrityError as err:
             self.log.info(str(err))
             self.log.info('Hit IntegrityError while creating the TIs for '
                           f'{dag.dag_id} - {self.execution_date}.')
             self.log.info('Doing session rollback.')
+            # TODO[HA]: We probaly need to savepoint this so we can keep the 
transaction alive.

Review comment:
       Already had that thought too :) 
   
https://github.com/apache/airflow/blob/b28d67332bb052441bc2ffd00030d620aac381c8/airflow/jobs/scheduler_job.py#L1565-L1566




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to