This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c8336ec63b6 Add ordering to AssetEvent query in SchedulerJobRunner 
(#52231)
c8336ec63b6 is described below

commit c8336ec63b664879e23d4eec6bbda10b43b9fd5b
Author: Stanley Law <[email protected]>
AuthorDate: Wed Aug 13 21:06:33 2025 +0800

    Add ordering to AssetEvent query in SchedulerJobRunner (#52231)
    
    closes: #52230
    
    This change is adding ordering for selecting asset events to maintain the 
ordering.
---
 airflow-core/src/airflow/jobs/scheduler_job_runner.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index d57d191c68c..85d743a1230 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1584,7 +1584,8 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             )
 
             asset_events = session.scalars(
-                select(AssetEvent).where(
+                select(AssetEvent)
+                .where(
                     or_(
                         AssetEvent.asset_id.in_(
                             select(DagScheduleAssetReference.asset_id).where(
@@ -1600,6 +1601,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                     AssetEvent.timestamp <= triggered_date,
                     AssetEvent.timestamp > 
func.coalesce(cte.c.previous_dag_run_run_after, date.min),
                 )
+                .order_by(AssetEvent.timestamp.asc(), AssetEvent.id.asc())
             ).all()
 
             dag_run = dag.create_dagrun(

Reply via email to