nailo2c commented on code in PR #58543:
URL: https://github.com/apache/airflow/pull/58543#discussion_r3419753934


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2582,6 +2584,55 @@ def _update_state(dag: SerializedDAG, dag_run: DagRun):
                         dag_run.run_id,
                     )
                     continue
+            # For AssetAndTimeSchedule, defer starting until all required 
assets are queued.
+            # Only gate scheduled runs; manual and backfill runs should start 
immediately.
+            if isinstance(dag.timetable, AssetAndTimeSchedule) and 
dag_run.run_type == DagRunType.SCHEDULED:

Review Comment:
   + Old approach: when the schedule time was reached, the scheduler created a 
placeholder `QUEUED` DagRun first, then waited for asset events in a later 
scheduler loop.
   
   + New approach: the scheduler creates a scheduled DagRun only when both 
conditions are satisfied: the schedule time is due and the required asset 
condition is ready. If assets are missing, no DagRun is created and 
`next_dagrun_create_after` stays on the pending slot, so the same slot can be 
retried later without placeholder DagRuns.
   
   I feel the new approach is much cleaner and without hack to handle the 
timeout issue, and the gating point is moved from after DagRun creation to 
before DagRun creation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to