matthieucx opened a new issue, #55146:
URL: https://github.com/apache/airflow/issues/55146

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.10.3
   
   ### What happened?
   
   (Due to my work setup I tested the DAG on the latest Airflow version through 
a local deployment and the CLI and it still wasn't working, but I cannot 
guarantee I did not make a mistake in the process.)
   
   I am part of a data engineering team. We use airflow to orchestrate our data 
ingestion pipelines. At a high-level, they are made of:
   
   - An ingestion task group, that moves data from a source system to our S3 
storage
   - Validation tests
   - Processing jobs that clean up and prepares the data for use by downstream 
consumers
   - Validations tests on the processed data
   
   The processing jobs run in Spark. To avoid overhead, I have been looking at 
using Airflow's skipping mechanisms to skip the rest of the pipeline if there 
is no data to be ingested.
   
   However, we need the processing jobs to run sequentially. For this purpose, 
we use `depends_on_past`. I then stumbled upon [the 
PR](https://github.com/apache/airflow/pull/27710) introducing 
`wait_for_past_depends_before_skipping`, which seems like it would allow me to 
skip tasks while still preserving the dependencies on past runs. However, the 
flag does not seem to have an effect.
   
   ### What you think should happen instead?
   
   The flag should behave as advertised in the base PR and in the docs.
   
   ### How to reproduce
   
   ```python
   from airflow import DAG
   from airflow.models import Param
   from airflow.operators.empty import EmptyOperator
   from airflow.operators.python import ShortCircuitOperator, 
get_current_context
   
   
   def check_continue():
       ctx = get_current_context()
       skip_downstream = ctx["params"]["skip"]
       return not skip_downstream
   
   
   with DAG(
       dag_id="test_airflow_skip",
       schedule=None,
       params={
           "skip": Param(default=False, type="boolean"),
       },
   ) as dag:
       depends_on_past = EmptyOperator(
           task_id="depends_on_past",
           depends_on_past=True,
           wait_for_past_depends_before_skipping=True,
       )
       skip = ShortCircuitOperator(
           task_id="skip",
           python_callable=check_continue,
       )
   
       skip >> depends_on_past
   ```
   
   1. Launch a run with the skip parameter enabled
   2. Then manually set the state of the skipped task to failed
   3. Then launch a run without skipping
   
   ### Operating System
   
   Ubuntu 22.04.5 LTS
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to