nailo2c commented on code in PR #58543:
URL: https://github.com/apache/airflow/pull/58543#discussion_r3066924929


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2245,6 +2247,62 @@ def _update_state(dag: SerializedDAG, dag_run: DagRun):
                         dag_run.run_id,
                     )
                     continue
+            # For AssetAndTimeSchedule, defer starting until all required 
assets are queued.
+            if isinstance(dag.timetable, AssetAndTimeSchedule):
+                # Reuse dagrun_timeout to fail runs that wait in QUEUED for 
assets for too long.
+                if (
+                    dag.dagrun_timeout
+                    and dag_run.queued_at
+                    and dag_run.queued_at < timezone.utcnow() - 
dag.dagrun_timeout
+                ):

Review Comment:
   Great catch, this logic/scenario is worth to handling.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to