Copilot commented on code in PR #58543:
URL: https://github.com/apache/airflow/pull/58543#discussion_r3066497714


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2245,6 +2247,62 @@ def _update_state(dag: SerializedDAG, dag_run: DagRun):
                         dag_run.run_id,
                     )
                     continue
+            # For AssetAndTimeSchedule, defer starting until all required 
assets are queued.
+            if isinstance(dag.timetable, AssetAndTimeSchedule):
+                # Reuse dagrun_timeout to fail runs that wait in QUEUED for 
assets for too long.
+                if (
+                    dag.dagrun_timeout
+                    and dag_run.queued_at
+                    and dag_run.queued_at < timezone.utcnow() - 
dag.dagrun_timeout
+                ):

Review Comment:
   The AssetAndTimeSchedule gating logic runs for *all* queued DagRuns of that 
DAG, including manual/backfill runs. This can cause manually triggered runs to 
remain QUEUED indefinitely (or be failed by the timeout) when no asset events 
are queued. Consider applying this deferral/timeout logic only to scheduled 
runs (e.g. gate on dag_run.run_type == DagRunType.SCHEDULED) so manual/backfill 
runs start normally.



##########
airflow-core/docs/authoring-and-scheduling/timetable.rst:
##########
@@ -290,6 +292,23 @@ Here's an example of a Dag using ``AssetOrTimeSchedule``:
         # Dag tasks go here
         pass
 
+Here's an example of a Dag using ``AssetAndTimeSchedule`` to require both the 
time-based schedule and fresh assets before a run starts:
+
+.. code-block:: python
+
+    from airflow.sdk import AssetAndTimeSchedule, CronTriggerTimetable
+
+
+    @dag(
+        schedule=AssetAndTimeSchedule(
+            timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), 
assets=(dag1_asset & dag2_asset)
+        )
+        # Additional arguments here, replace this comment with actual arguments
+    )

Review Comment:
   The new `@dag` examples are not valid Python as written: the schedule=... 
call is not followed by a comma before the next argument/comment line, so the 
decorator arguments won't parse. Please update the snippets to be syntactically 
correct (and avoid placeholder comments inside the argument list).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to