nailo2c opened a new pull request, #58543: URL: https://github.com/apache/airflow/pull/58543
Relates: #58056 # Why Use case: #58056 # How ### 1. Implement `AssetAndTimeSchedule` class Basically almost the same as [AssetOrTimeSchedule](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/timetables/assets.py#L36), but this class inherits from `Timetable` and delegates `run_id` generation to the wrapped timetable. ### 2. Assign `AssetAndTimeSchedule` to the `non_asset_dags` set Modifying [DagModel.dags_needing_dagruns](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/dag.py#L587) to prevent `AssetAndTimeSchedule` from being assigned to `triggered_date_by_dag`. ### 3. Add starting logic for `AssetAndTimeSchedule` in the scheduler. Updating the DAG status from `QUEUED` to `RUNNING` when both the timetable and asset conditions are meet. # What Here are my test DAGs: `upstream_asset_producer` ```python import pendulum from airflow.models.dag import DAG from airflow.operators.bash import BashOperator from airflow.sdk.definitions.asset import Asset my_asset = Asset("/my/example/asset") with DAG( dag_id="upstream_asset_producer", start_date=pendulum.datetime(2025, 1, 1, tz="UTC"), schedule="0 * * * *", catchup=False, tags=["example", "upstream"], ) as upstream_dag: BashOperator( task_id="produce_asset", bash_command="echo 'Asset produced!'", outlets=[my_asset], ) ``` `downstream_asset_and_time_consumer` ```python import pendulum from airflow.models.dag import DAG from airflow.operators.bash import BashOperator from airflow.sdk.definitions.asset import Asset from airflow.timetables.trigger import CronTriggerTimetable from airflow.timetables.assets import AssetAndTimeSchedule my_asset = Asset("/my/example/asset") with DAG( dag_id="downstream_asset_and_time_consumer", start_date=pendulum.datetime(2025, 1, 1, tz="UTC"), schedule=AssetAndTimeSchedule( timetable=CronTriggerTimetable("4 * * * *", timezone="UTC"), assets=[my_asset], ), catchup=False, max_active_runs=1, tags=["example", "downstream", "asset-and-time"], ) as downstream_dag: BashOperator( task_id="consume_asset", bash_command="echo 'Asset condition and time condition were both met!'", ) ``` From the screenshots, the downstream DAG runs as expected: it runs only after both the timetable and the assets are ready. 1. Upstream dag  2. Asset production time  3. Downstream, `AssetAndTimeSchedule`  # Discussion Since I added this logic in `dag.py` to guarantee that only `AssetTriggeredTimetable` can be added to `triggered_date_by_dag`. ```python if not isinstance(timetable, AssetTriggeredTimetable): del adrq_by_dag[dag_id] continue ``` Should I remove the extra guard logic in `SchedulerJobRunner._create_dag_runs_asset_triggered`? https://github.com/apache/airflow/blob/9e7b36e5052c2bf8bfb499d7652349926a1f0fc1/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L1800-L1805 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
