DataCerealz opened a new issue, #59925: URL: https://github.com/apache/airflow/issues/59925
### Apache Airflow version Other Airflow 3 version (please specify below) ### If "Other Airflow 3 version" selected, which one? 3.1.1 ### What happened? We have two tasks like so `[live_values_loaded, backfill_values_loaded] >> EmptyOperator(trigger_rule="all_done_min_one_success")`. However, the EmptyOperator execution gets skipped when one of the two tasks gets skipped and the other one is successful. ### What you think should happen instead? In the [airflow 3.1 patch notes](https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#new-trigger-rule-all-done-min-one-success) it says: `ALL_DONE_MIN_ONE_SUCCESS: This rule triggers when all upstream tasks are done (success, failed, or skipped) and at least one has succeeded, filling a gap between existing trigger rules for complex workflow patterns.` This is why we expect if any of the two tasks `[live_values_loaded, backfill_values_loaded]` gets skipped but the other one succeeds, our `EmptyOperator` should run. ### How to reproduce We have a tasklow task `load_values` used like this: ```python # ... live_values_loaded = load_values.override(task_id='live_values')() backfill_values_loaded = load_values.override(task_id='backfill_values')() get_live_values >> live_values_loaded get_backfill_values >> backfill_values_loaded ``` If there are no backfill values to be loaded, `backfill_values` gets skipped. If there are no new live values yet, `live_values` gets skipped. So if their respective upstream is skipped, the downstream load is skipped. We need to emit an asset event just if there are any new values loaded - doesn't matter if they are live values or backfill values. So we added this: ```python [live_values_loaded, backfill_values_loaded] >> EmptyOperator( task_id="emit_asset_updates", trigger_rule="all_done_min_one_success", outlets=[ Asset("tigerdata://db/table/new_data"), ], ) ``` So this should be enough to reproduce: ```python @task def load_values(skip: bool) if skip: raise AirflowSkipException() else: return @dag def sample_dag(): live_values_loaded = load_values.override(task_id='live_values')(skip=True) backfill_values_loaded = load_values.override(task_id='backfill_values')(skip=False) [live_values_loaded, backfill_values_loaded] >> EmptyOperator( task_id="emit_asset_updates", trigger_rule="all_done_min_one_success", outlets=[ Asset("tigerdata://db/table/new_data"), ], ) ``` ### Operating System Astronomer Runtime 3.1-3 ### Versions of Apache Airflow Providers _No response_ ### Deployment Astronomer ### Deployment details Astronomer Runtime 3.1-3 ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
