This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c8336ec63b6 Add ordering to AssetEvent query in SchedulerJobRunner
(#52231)
c8336ec63b6 is described below
commit c8336ec63b664879e23d4eec6bbda10b43b9fd5b
Author: Stanley Law <[email protected]>
AuthorDate: Wed Aug 13 21:06:33 2025 +0800
Add ordering to AssetEvent query in SchedulerJobRunner (#52231)
closes: #52230
This change is adding ordering for selecting asset events to maintain the
ordering.
---
airflow-core/src/airflow/jobs/scheduler_job_runner.py | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index d57d191c68c..85d743a1230 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1584,7 +1584,8 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
)
asset_events = session.scalars(
- select(AssetEvent).where(
+ select(AssetEvent)
+ .where(
or_(
AssetEvent.asset_id.in_(
select(DagScheduleAssetReference.asset_id).where(
@@ -1600,6 +1601,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
AssetEvent.timestamp <= triggered_date,
AssetEvent.timestamp >
func.coalesce(cte.c.previous_dag_run_run_after, date.min),
)
+ .order_by(AssetEvent.timestamp.asc(), AssetEvent.id.asc())
).all()
dag_run = dag.create_dagrun(