potiuk commented on a change in pull request #10956:
URL: https://github.com/apache/airflow/pull/10956#discussion_r493478012
##########
File path: airflow/models/dag.py
##########
@@ -1941,6 +2087,36 @@ def deactivate_deleted_dags(cls, alive_dag_filelocs:
List[str], session=None):
session.rollback()
raise
+ @classmethod
+ def dags_needing_dagruns(cls, session: Session):
+ """
+ Return (and lock) a list of Dag objects that are due to create a new
DagRun This will return a
+ resultset of rows that is row-level-locked with a "SELECT ... FOR
UPDATE" query, you should ensure
+ that any scheduling decisions are made in a single transaction -- as
soon as the transaction is
+ committed it will be unlocked.
+ """
+
+ # TODO[HA]: Bake this query, it is run _A lot_
+ # TODO[HA]: Make this limit a tunable. We limit so that _one_ scheduler
+ # doesn't try to do all the creation of dag runs
+ return session.query(cls).filter(
+ cls.is_paused.is_(False),
+ cls.is_active.is_(True),
+ cls.next_dagrun_create_after <= func.now(),
+ ).order_by(
+ cls.next_dagrun_create_after
+ ).limit(10).with_for_update(of=cls, **skip_locked(session=session))
Review comment:
This limit is a bit out of a blue and I think it's not good that we have
it hard-coded here. While I understand why we want to have it (we want to
repeat scheduler loops more frequently and only process them in "batches" - It
would be great to dwo two things:
1) to extract it as a meaningful constant with an explanation why 10 is a
good value
2) or - better - to calculate it based on some criteria if this value should
depend on something
Also we need to check (and understand) what happens if we are increasing
this value. Do we risk higher memory consumptions? Or deadlocks ? or any other
risks? What are we trying to prevent here?
I understand this is an empirical value, but I think we should justify this
value.
##########
File path: airflow/models/dagrun.py
##########
@@ -75,6 +80,8 @@ class DagRun(Base, LoggingMixin):
backref=backref('dag_run', uselist=False),
)
+ DEFAULT_DAGRUNS_TO_EXAMINE = airflow_conf.getint('scheduler',
'max_dagruns_per_query', fallback=20)
Review comment:
Same here. The 20 value is a bit magical and we need to know where it
comes from and what are the consequences of changing the values. Ideally some
test results would be great to show what can happen if we decrease it. It is
also a bit strange that his 20 here is not hard-coded, where the 10 in the
"next_dagrun" is. What's the reasoning behind one being hard-coded and the
other being configurable.
##########
File path: airflow/models/dagrun.py
##########
@@ -494,12 +542,13 @@ def verify_integrity(self, session: Session = None):
session.add(ti)
try:
- session.commit()
+ session.flush()
except IntegrityError as err:
self.log.info(str(err))
self.log.info('Hit IntegrityError while creating the TIs for '
f'{dag.dag_id} - {self.execution_date}.')
self.log.info('Doing session rollback.')
+ # TODO[HA]: We probaly need to savepoint this so we can keep the
transaction alive.
Review comment:
On a side not. Should we rename verify_integrity to some different name
? Seems that this method not only verify the integrity of the dagrun and
connected task instances but it also creates task instances were they are
missing. Maybe "synchronize_task_instances" or something like that.
##########
File path: airflow/models/dagrun.py
##########
@@ -344,6 +393,9 @@ def update_state(self, session: Session = None) -> List[TI]:
leaf_task_ids = {t.task_id for t in dag.leaves}
leaf_tis = [ti for ti in tis if ti.task_id in leaf_task_ids]
+ # TODO[ha]: These callbacks shouldn't run in the scheduler loop -
check if Kamil changed this to run
Review comment:
We should indeed move it outside of the scheduler loop to a DAG
processor. And possibly send it to DagFileProcessor like Kamil did.
This shoud actually help a lot if this is all within the uncommitted
transaction. Those callbacks are potentialy problematic
##########
File path: airflow/models/dagrun.py
##########
@@ -313,14 +360,16 @@ def update_state(self, session: Session = None) ->
List[TI]:
:rtype ready_tis: list[airflow.models.TaskInstance]
"""
+ start_dttm = timezone.utcnow()
Review comment:
Should we change this method to also update state of the tasks? I think
it is only used in two places (schedule_job and backfill_job) and the
"update_state" method returning tasks sounds a bit weird. I'd rather think
about update the task instances as well when update_state is run so that it is
all encapsulated within one "update_state" method.
##########
File path: airflow/models/dag.py
##########
@@ -1941,6 +2087,36 @@ def deactivate_deleted_dags(cls, alive_dag_filelocs:
List[str], session=None):
session.rollback()
raise
+ @classmethod
+ def dags_needing_dagruns(cls, session: Session):
+ """
+ Return (and lock) a list of Dag objects that are due to create a new
DagRun This will return a
+ resultset of rows that is row-level-locked with a "SELECT ... FOR
UPDATE" query, you should ensure
+ that any scheduling decisions are made in a single transaction -- as
soon as the transaction is
+ committed it will be unlocked.
+ """
+
+ # TODO[HA]: Bake this query, it is run _A lot_
+ # TODO[HA]: Make this limit a tunable. We limit so that _one_ scheduler
+ # doesn't try to do all the creation of dag runs
+ return session.query(cls).filter(
+ cls.is_paused.is_(False),
+ cls.is_active.is_(True),
+ cls.next_dagrun_create_after <= func.now(),
Review comment:
What happens if next _dagrun_create_after is None (because it is set to
None in update_dag_next_dagrun(). Will they be skipped? Is there any other
place where those are selected ?
##########
File path: airflow/models/dagrun.py
##########
@@ -494,12 +542,13 @@ def verify_integrity(self, session: Session = None):
session.add(ti)
try:
- session.commit()
+ session.flush()
Review comment:
What's the reason for the integration error here? Is the Integrity error
here still valid if we do not commit to the database? I think flush is not
likely to generate Integrity Error at all.
Shoudl we start caring about isolation level of the transactions we have ?
Will it work when we have different isolation levels set ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]