nailo2c opened a new pull request, #58543:
URL: https://github.com/apache/airflow/pull/58543

   Relates: #58056
   
   # Why
   
   Use case: #58056
   
   # How
   
   ### 1. Implement `AssetAndTimeSchedule` class
   
   Basically almost the same as 
[AssetOrTimeSchedule](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/timetables/assets.py#L36),
 but this class inherits from `Timetable` and delegates `run_id` generation to 
the wrapped timetable.
   
   ### 2. Assign `AssetAndTimeSchedule` to the `non_asset_dags` set
   
   Modifying 
[‎DagModel.dags_needing_dagruns‎](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/dag.py#L587)
 to prevent `AssetAndTimeSchedule` from being assigned to 
`triggered_date_by_dag`.
   
   ### 3. Add starting logic for `AssetAndTimeSchedule` in the scheduler.
   
   Updating the DAG status from `QUEUED` to `RUNNING` when both the timetable 
and asset conditions are meet.
   
   # What
   
   Here are my test DAGs:
   
   `upstream_asset_producer`
   ```python
   import pendulum
   from airflow.models.dag import DAG
   from airflow.operators.bash import BashOperator
   from airflow.sdk.definitions.asset import Asset
   
   my_asset = Asset("/my/example/asset")
   
   with DAG(
       dag_id="upstream_asset_producer",
       start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
       schedule="0 * * * *",
       catchup=False,
       tags=["example", "upstream"],
   ) as upstream_dag:
       BashOperator(
           task_id="produce_asset",
           bash_command="echo 'Asset produced!'",
           outlets=[my_asset],
       )
   ```
   
   `downstream_asset_and_time_consumer`
   ```python
   import pendulum
   from airflow.models.dag import DAG
   from airflow.operators.bash import BashOperator
   from airflow.sdk.definitions.asset import Asset
   from airflow.timetables.trigger import CronTriggerTimetable
   from airflow.timetables.assets import AssetAndTimeSchedule
   
   my_asset = Asset("/my/example/asset")
   
   with DAG(
       dag_id="downstream_asset_and_time_consumer",
       start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
       schedule=AssetAndTimeSchedule(
           timetable=CronTriggerTimetable("4 * * * *", timezone="UTC"),
           assets=[my_asset],
       ),
       catchup=False,
       max_active_runs=1,
       tags=["example", "downstream", "asset-and-time"],
   ) as downstream_dag:
       BashOperator(
           task_id="consume_asset",
           bash_command="echo 'Asset condition and time condition were both 
met!'",
       )
   ```
   
   From the screenshots, the downstream DAG runs as expected: it runs only 
after both the timetable and the assets are ready.
   
   1. Upstream dag  
   
      
![AssetAndTimeSchedule_example_1_upstream](https://github.com/user-attachments/assets/ed205b0e-78eb-453d-a83b-26bd6e29f42e)
   
   2. Asset production time  
   
      
![AssetAndTimeSchedule_example_2_asset](https://github.com/user-attachments/assets/3224c304-ffd2-4f60-a24e-5431692d6142)
   
   3. Downstream, `AssetAndTimeSchedule`  
   
      
![AssetAndTimeSchedule_example_3_downstream](https://github.com/user-attachments/assets/dbd840ef-8059-47d3-a9af-dfa27b8227ef)
   
   # Discussion
   
   Since I added this logic in `dag.py` to guarantee that only 
`AssetTriggeredTimetable` can be added to `triggered_date_by_dag`.
   ```python
   if not isinstance(timetable, AssetTriggeredTimetable):
       del adrq_by_dag[dag_id]
       continue
   ```
   
   Should I remove the extra guard logic in 
`SchedulerJobRunner._create_dag_runs_asset_triggered`?
   
https://github.com/apache/airflow/blob/9e7b36e5052c2bf8bfb499d7652349926a1f0fc1/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L1800-L1805


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to