jscheffl commented on code in PR #66285:
URL: https://github.com/apache/airflow/pull/66285#discussion_r3367157630
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -981,21 +981,32 @@ def get_previous_dagrun(
dag_run: DagRun, state: DagRunState | None = None, session: Session =
NEW_SESSION
) -> DagRun | None:
"""
- Return the previous DagRun, if there is one.
+ Return the previous DagRun, if there is one (AIP-39).
:param dag_run: the dag run
:param session: SQLAlchemy ORM Session
:param state: the dag run state
"""
- if not dag_run or dag_run.logical_date is None:
+ if not dag_run:
return None
- filters = [
- DagRun.dag_id == dag_run.dag_id,
- DagRun.logical_date < dag_run.logical_date,
- ]
+
+ filters = [DagRun.dag_id == dag_run.dag_id]
if state is not None:
filters.append(DagRun.state == state)
- return
session.scalar(select(DagRun).where(*filters).order_by(DagRun.logical_date.desc()).limit(1))
+
+ # Use (run_after, id) to correctly order runs when run_after values
are equal
+ # (e.g. two manual runs triggered at the same time, or a scheduled run
whose
+ # run_after equals the next run's run_after). id is a
monotonically-increasing
+ # surrogate key so it gives a stable, deterministic tiebreak.
+ filters.append(
+ or_(
+ DagRun.run_after < dag_run.run_after,
+ and_(DagRun.run_after == dag_run.run_after, DagRun.id <
dag_run.id),
Review Comment:
Why not checking via `and_(DagRun.run_after <= dag_run.run_after, DagRun.id
< dag_run.id)`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]