atul-astronomer opened a new issue, #48933:
URL: https://github.com/apache/airflow/issues/48933

   ### Apache Airflow version
   
   main (development)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   Clearing previous version dagrun is failing when includes skip based 
dependencies.
   The same dag is passing on v1 but when created v2 and user tries to clear 
the v1 dagrun and skip based dependency is not met, then dag is marked as 
failure.
   For clear steps, please refer to the reproducible steps.
   
   
![Image](https://github.com/user-attachments/assets/60310856-7d2c-4cb0-8e2f-9d7691d7628b)
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   1. Create a dagrun for below code (v1). The below code skips some tasks when 
outcome is Heads. Run multiple times to get the both skipped and non-skipped 
scenarios.
    
   ```python
   from airflow.providers.standard.operators.python import ShortCircuitOperator
   from airflow.providers.standard.operators.empty import EmptyOperator
   from airflow.decorators import task, dag
   from datetime import datetime
   from textwrap import dedent
   from random import randint
   
   
   @task
   def flip_coin():
       if randint(0, 1):
           return "Tails"
       else:
           return "Heads"
   
   
   def decide(coin_result):
       print(coin_result)
       if "Heads" in coin_result:
           print("skip")
           return True
       else:
           print("dont_skip")
           return False
   
   
   def do_thing(msg):
       "Creates a task with {msg} for a task_id that prints {msg}"
   
       @task(task_id=msg)
       def f(msg):
           print(msg)
   
       return f(msg)
   
   
   @dag(
       start_date=datetime(year=1970, month=1, day=1),
       schedule=None,
       tags=["short_circuit", "core"]
   )
   def short_circuit_sometimes_skip():
       decision = flip_coin()
       done = EmptyOperator(task_id="join")
       (
           decision
           >> ShortCircuitOperator(
               task_id="decide", op_args=[decision], python_callable=decide
           )
           >> do_thing("skip_on_heads")
           >> done
       )
       decision >> do_thing("run_always") >> done >> do_thing("also_run_always")
   
   
   the_dag = short_circuit_sometimes_skip()
   
   ``` 
   
   2. Reverse the skipping logic and create v2. Run multiple times to get the 
both skipped and non-skipped scenarios.
   
   ```python
   from airflow.providers.standard.operators.python import ShortCircuitOperator
   from airflow.providers.standard.operators.empty import EmptyOperator
   from airflow.decorators import task, dag
   from datetime import datetime
   from textwrap import dedent
   from random import randint
   
   
   @task
   def flip_coin():
       if randint(0, 1):
           return "Heads"
       else:
           return "Tails"
   
   
   def decide(coin_result):
       print(coin_result)
       if "Tails" in coin_result:
           print("skip")
           return True
       else:
           print("dont_skip")
           return False
   
   
   def do_thing(msg):
       "Creates a task with {msg} for a task_id that prints {msg}"
   
       @task(task_id=msg)
       def f(msg):
           print(msg)
   
       return f(msg)
   
   
   @dag(
       start_date=datetime(year=1970, month=1, day=1),
       schedule=None,
       tags=["short_circuit", "core"]
   )
   def short_circuit_sometimes_skip():
       decision = flip_coin()
       done = EmptyOperator(task_id="join")
       (
           decision
           >> ShortCircuitOperator(
               task_id="decide", op_args=[decision], python_callable=decide
           )
           >> do_thing("skip_on_tails")
           >> done
       )
       decision >> do_thing("run_always") >> done >> do_thing("also_run_always")
   
   
   the_dag = short_circuit_sometimes_skip()
   
   ``` 
   
   3. Now go to first dagrun which is on v1 and clear the dagrun, notice when 
skip decision is true, dag is marked as passed but when skip decision is false 
then dag is failing.
   
   ### Operating System
   
   Linux
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to