matthieucx opened a new issue, #55146: URL: https://github.com/apache/airflow/issues/55146
### Apache Airflow version Other Airflow 2 version (please specify below) ### If "Other Airflow 2 version" selected, which one? 2.10.3 ### What happened? (Due to my work setup I tested the DAG on the latest Airflow version through a local deployment and the CLI and it still wasn't working, but I cannot guarantee I did not make a mistake in the process.) I am part of a data engineering team. We use airflow to orchestrate our data ingestion pipelines. At a high-level, they are made of: - An ingestion task group, that moves data from a source system to our S3 storage - Validation tests - Processing jobs that clean up and prepares the data for use by downstream consumers - Validations tests on the processed data The processing jobs run in Spark. To avoid overhead, I have been looking at using Airflow's skipping mechanisms to skip the rest of the pipeline if there is no data to be ingested. However, we need the processing jobs to run sequentially. For this purpose, we use `depends_on_past`. I then stumbled upon [the PR](https://github.com/apache/airflow/pull/27710) introducing `wait_for_past_depends_before_skipping`, which seems like it would allow me to skip tasks while still preserving the dependencies on past runs. However, the flag does not seem to have an effect. ### What you think should happen instead? The flag should behave as advertised in the base PR and in the docs. ### How to reproduce ```python from airflow import DAG from airflow.models import Param from airflow.operators.empty import EmptyOperator from airflow.operators.python import ShortCircuitOperator, get_current_context def check_continue(): ctx = get_current_context() skip_downstream = ctx["params"]["skip"] return not skip_downstream with DAG( dag_id="test_airflow_skip", schedule=None, params={ "skip": Param(default=False, type="boolean"), }, ) as dag: depends_on_past = EmptyOperator( task_id="depends_on_past", depends_on_past=True, wait_for_past_depends_before_skipping=True, ) skip = ShortCircuitOperator( task_id="skip", python_callable=check_continue, ) skip >> depends_on_past ``` 1. Launch a run with the skip parameter enabled 2. Then manually set the state of the skipped task to failed 3. Then launch a run without skipping ### Operating System Ubuntu 22.04.5 LTS ### Versions of Apache Airflow Providers _No response_ ### Deployment Docker-Compose ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
