Yuvraj-Dhepe opened a new issue, #48694: URL: https://github.com/apache/airflow/issues/48694
### What do you see as an issue? Hello Team, Thank you for an amazing tool. I am finding the trigger rules an issue in my case, wasn't sure to ask it in discussion, because of my time constraints. Hence thought to ask in the Docs report. Docs Link: `https://airflow.apache.org/docs/apache-airflow/1.10.9/concepts.html#trigger-rules` I am trying to implement an external trigger functionality, where my approval task waits for parent task to complete, and then runs itself. The problem is trigger rules are not behaving as expected, so thought to ask the question about the same. ```python from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.empty import EmptyOperator from airflow.exceptions import AirflowSkipException # NOTE: Use pendulum for timezones. from datetime import datetime from pendulum import datetime as pud from datetime import timedelta from airflow.sensors.base import BaseSensorOperator from airflow.utils.session import provide_session from airflow.models import TaskInstance from sqlalchemy.orm.session import Session class ApprovalSensor(BaseSensorOperator): """ Sensor that waits for a parent task to reach a specific state before proceeding. """ def __init__(self, parent_task_id: str, expected_states: list, *args, **kwargs): super().__init__(*args, **kwargs) self.parent_task_id = parent_task_id self.expected_states = ( expected_states # List of states that allow this task to continue ) @provide_session def poke(self, context, session: Session = None): dag_run = context["dag_run"] ti = ( session.query(TaskInstance) .filter( TaskInstance.dag_id == dag_run.dag_id, TaskInstance.task_id == self.parent_task_id, TaskInstance.execution_date == dag_run.execution_date, ) .first() ) if ti: self.log.info(f"Parent Task {self.parent_task_id} is in state: {ti.state}") if ti.state in self.expected_states: return True # Proceed if the task has reached the expected state else: return False # Keep waiting else: self.log.warning(f"Parent Task {self.parent_task_id} not found.") return False # Function to be executed as an Airflow task def my_python_task(**kwargs): print(f"Executing Python Task At {datetime.now()}") return "Task completed successfully!" def my_task(): raise AirflowSkipException("Skipping task because the condition was met.") # Define default arguments for the DAG default_args = { "owner": "airflow", "depends_on_past": False, # NOTE: Update to the current date, else a dag is triggered, because of the schedule param in DAG "start_date": pud(2024, 3, 20, tz="UTC"), "email_on_failure": False, "email_on_retry": False, "retries": 0, "retry_delay": timedelta(minutes=5), } # Define the DAG with DAG( dag_id="skip_sample_dag", default_args=default_args, description="A simple Airflow DAG", schedule="@daily", # Runs daily catchup=False, tags=["D1"], ) as dag: # Start task (Empty Operator) start = EmptyOperator(task_id="start") # Python Operator Task python_task = PythonOperator( task_id="python_task", python_callable=my_task, ) # Sensor waits for parent task to complete (success or failed) approval_task = ApprovalSensor( task_id="wait_for_parent_task", parent_task_id="python_task", expected_states=[ "success", "failed", ], # The sensor waits until the parent task finishes poke_interval=10, # Check every 10 seconds timeout=60, # Maximum wait time (10 minutes) trigger_rule="all_done", ) # End task (Empty Operator) end = EmptyOperator(task_id="end") # Define task dependencies start >> python_task >> approval_task >> end ``` Following scenarios occur with different trigger rules: 1) all_success/all_failed: The skipped state goes to all downstream tasks after python_task. NOTE: This is fine & inline with documentation. 2) one_failed/one_success: The skipped state goes to all downstream tasks after python_task. NOTE: I am doubting if the python_task has no success or failed state, why should down_stream tasks gain a skipped state? 3) none_failed/all_done/dummy: NOTE: These are the only trigger rules that wait for python_task to finish, and run the approval_task in poke mode. 4) none_skipped: The skipped state goes to all downstream tasks after python_task. NOTE: This I was expecting to wait for python_task to finish, but again, the skipped state cascades through downstream task, and am wondering the reason? Could you please help me with the same team? ### Solving the problem I am not sure of the solution. ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
