atul-astronomer opened a new issue, #48933: URL: https://github.com/apache/airflow/issues/48933
### Apache Airflow version main (development) ### If "Other Airflow 2 version" selected, which one? _No response_ ### What happened? Clearing previous version dagrun is failing when includes skip based dependencies. The same dag is passing on v1 but when created v2 and user tries to clear the v1 dagrun and skip based dependency is not met, then dag is marked as failure. For clear steps, please refer to the reproducible steps.  ### What you think should happen instead? _No response_ ### How to reproduce 1. Create a dagrun for below code (v1). The below code skips some tasks when outcome is Heads. Run multiple times to get the both skipped and non-skipped scenarios. ```python from airflow.providers.standard.operators.python import ShortCircuitOperator from airflow.providers.standard.operators.empty import EmptyOperator from airflow.decorators import task, dag from datetime import datetime from textwrap import dedent from random import randint @task def flip_coin(): if randint(0, 1): return "Tails" else: return "Heads" def decide(coin_result): print(coin_result) if "Heads" in coin_result: print("skip") return True else: print("dont_skip") return False def do_thing(msg): "Creates a task with {msg} for a task_id that prints {msg}" @task(task_id=msg) def f(msg): print(msg) return f(msg) @dag( start_date=datetime(year=1970, month=1, day=1), schedule=None, tags=["short_circuit", "core"] ) def short_circuit_sometimes_skip(): decision = flip_coin() done = EmptyOperator(task_id="join") ( decision >> ShortCircuitOperator( task_id="decide", op_args=[decision], python_callable=decide ) >> do_thing("skip_on_heads") >> done ) decision >> do_thing("run_always") >> done >> do_thing("also_run_always") the_dag = short_circuit_sometimes_skip() ``` 2. Reverse the skipping logic and create v2. Run multiple times to get the both skipped and non-skipped scenarios. ```python from airflow.providers.standard.operators.python import ShortCircuitOperator from airflow.providers.standard.operators.empty import EmptyOperator from airflow.decorators import task, dag from datetime import datetime from textwrap import dedent from random import randint @task def flip_coin(): if randint(0, 1): return "Heads" else: return "Tails" def decide(coin_result): print(coin_result) if "Tails" in coin_result: print("skip") return True else: print("dont_skip") return False def do_thing(msg): "Creates a task with {msg} for a task_id that prints {msg}" @task(task_id=msg) def f(msg): print(msg) return f(msg) @dag( start_date=datetime(year=1970, month=1, day=1), schedule=None, tags=["short_circuit", "core"] ) def short_circuit_sometimes_skip(): decision = flip_coin() done = EmptyOperator(task_id="join") ( decision >> ShortCircuitOperator( task_id="decide", op_args=[decision], python_callable=decide ) >> do_thing("skip_on_tails") >> done ) decision >> do_thing("run_always") >> done >> do_thing("also_run_always") the_dag = short_circuit_sometimes_skip() ``` 3. Now go to first dagrun which is on v1 and clear the dagrun, notice when skip decision is true, dag is marked as passed but when skip decision is false then dag is failing. ### Operating System Linux ### Versions of Apache Airflow Providers _No response_ ### Deployment Other ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
