DataCerealz opened a new issue, #59925:
URL: https://github.com/apache/airflow/issues/59925

   ### Apache Airflow version
   
   Other Airflow 3 version (please specify below)
   
   ### If "Other Airflow 3 version" selected, which one?
   
   3.1.1
   
   ### What happened?
   
   We have two tasks like so  `[live_values_loaded, backfill_values_loaded] >> 
EmptyOperator(trigger_rule="all_done_min_one_success")`. However, the 
EmptyOperator execution gets skipped when one of the two tasks gets skipped and 
the other one is successful.
   
   ### What you think should happen instead?
   
   In the [airflow 3.1 patch 
notes](https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#new-trigger-rule-all-done-min-one-success)
 it says: `ALL_DONE_MIN_ONE_SUCCESS: This rule triggers when all upstream tasks 
are done (success, failed, or skipped) and at least one has succeeded, filling 
a gap between existing trigger rules for complex workflow patterns.`
   
   This is why we expect if any of the two tasks `[live_values_loaded, 
backfill_values_loaded]` gets skipped but the other one succeeds, our 
`EmptyOperator` should run.
   
   ### How to reproduce
   
   We have a tasklow task `load_values` used like this:
   
   ```python
   # ...
   live_values_loaded = load_values.override(task_id='live_values')()
   backfill_values_loaded = load_values.override(task_id='backfill_values')()
   
   get_live_values >> live_values_loaded
   get_backfill_values >> backfill_values_loaded
   ```
   
   If there are no backfill values to be loaded, `backfill_values` gets skipped.
   If there are no new live values yet, `live_values` gets skipped.
   So if their respective upstream is skipped, the downstream load is skipped.
   
   We need to emit an asset event just if there are any new values loaded - 
doesn't matter if they are live values or backfill values.
   
   So we added this:
   
   ```python
   [live_values_loaded, backfill_values_loaded] >> EmptyOperator(
        task_id="emit_asset_updates",
        trigger_rule="all_done_min_one_success",
        outlets=[
                Asset("tigerdata://db/table/new_data"),
        ],
   )
   ```
   
   So this should be enough to reproduce:
   ```python
   @task
   def load_values(skip: bool)
     if skip:
       raise AirflowSkipException()
     else:
       return
   
   @dag
   def sample_dag():
   
     live_values_loaded = load_values.override(task_id='live_values')(skip=True)
     backfill_values_loaded = 
load_values.override(task_id='backfill_values')(skip=False)
     
     [live_values_loaded, backfill_values_loaded] >> EmptyOperator(
        task_id="emit_asset_updates",
        trigger_rule="all_done_min_one_success",
        outlets=[
                Asset("tigerdata://db/table/new_data"),
        ],
     )
   ```
   
   ### Operating System
   
   Astronomer Runtime 3.1-3
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   Astronomer Runtime 3.1-3
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to