Yuvraj-Dhepe opened a new issue, #48694:
URL: https://github.com/apache/airflow/issues/48694

   ### What do you see as an issue?
   
   Hello Team, 
   Thank you for an amazing tool.
   
   I am finding the trigger rules an issue in my case, wasn't sure to ask it in 
discussion, because of my time constraints. Hence thought to ask in the Docs 
report.
   
   
   Docs Link:
   
`https://airflow.apache.org/docs/apache-airflow/1.10.9/concepts.html#trigger-rules`
   
   I am trying to implement an external trigger functionality, where my 
approval task waits for parent task to complete, and then runs itself.
   The problem is trigger rules are not behaving as expected, so thought to ask 
the question about the same. 
   
   ```python
   from airflow import DAG
   from airflow.operators.python import PythonOperator
   from airflow.operators.empty import EmptyOperator
   from airflow.exceptions import AirflowSkipException
   
   # NOTE: Use pendulum for timezones.
   from datetime import datetime
   from pendulum import datetime as pud
   from datetime import timedelta
   
   from airflow.sensors.base import BaseSensorOperator
   from airflow.utils.session import provide_session
   from airflow.models import TaskInstance
   from sqlalchemy.orm.session import Session
   
   
   class ApprovalSensor(BaseSensorOperator):
       """
       Sensor that waits for a parent task to reach a specific state before 
proceeding.
       """
   
       def __init__(self, parent_task_id: str, expected_states: list, *args, 
**kwargs):
           super().__init__(*args, **kwargs)
           self.parent_task_id = parent_task_id
           self.expected_states = (
               expected_states  # List of states that allow this task to 
continue
           )
   
       @provide_session
       def poke(self, context, session: Session = None):
           dag_run = context["dag_run"]
           ti = (
               session.query(TaskInstance)
               .filter(
                   TaskInstance.dag_id == dag_run.dag_id,
                   TaskInstance.task_id == self.parent_task_id,
                   TaskInstance.execution_date == dag_run.execution_date,
               )
               .first()
           )
   
           if ti:
               self.log.info(f"Parent Task {self.parent_task_id} is in state: 
{ti.state}")
               if ti.state in self.expected_states:
                   return True  # Proceed if the task has reached the expected 
state
               else:
                   return False  # Keep waiting
           else:
               self.log.warning(f"Parent Task {self.parent_task_id} not found.")
               return False
   
   
   # Function to be executed as an Airflow task
   def my_python_task(**kwargs):
       print(f"Executing Python Task At {datetime.now()}")
       return "Task completed successfully!"
   
   
   def my_task():
       raise AirflowSkipException("Skipping task because the condition was 
met.")
   
   
   # Define default arguments for the DAG
   default_args = {
       "owner": "airflow",
       "depends_on_past": False,
       # NOTE: Update to the current date, else a dag is triggered, because of 
the schedule param in DAG
       "start_date": pud(2024, 3, 20, tz="UTC"),
       "email_on_failure": False,
       "email_on_retry": False,
       "retries": 0,
       "retry_delay": timedelta(minutes=5),
   }
   
   # Define the DAG
   with DAG(
       dag_id="skip_sample_dag",
       default_args=default_args,
       description="A simple Airflow DAG",
       schedule="@daily",  # Runs daily
       catchup=False,
       tags=["D1"],
   ) as dag:
       # Start task (Empty Operator)
       start = EmptyOperator(task_id="start")
   
       # Python Operator Task
       python_task = PythonOperator(
           task_id="python_task",
           python_callable=my_task,
       )
   
       # Sensor waits for parent task to complete (success or failed)
       approval_task = ApprovalSensor(
           task_id="wait_for_parent_task",
           parent_task_id="python_task",
           expected_states=[
               "success",
               "failed",
           ],  # The sensor waits until the parent task finishes
           poke_interval=10,  # Check every 10 seconds
           timeout=60,  # Maximum wait time (10 minutes)
           trigger_rule="all_done",
       )
   
       # End task (Empty Operator)
       end = EmptyOperator(task_id="end")
   
       # Define task dependencies
       start >> python_task >> approval_task >> end
   
   ```
   Following scenarios occur with different trigger rules:
   1) all_success/all_failed: The skipped state goes to all downstream tasks 
after python_task. 
   NOTE: This is fine & inline with documentation. 
   
   2) one_failed/one_success: The skipped state goes to all downstream tasks 
after python_task. 
   NOTE: I am doubting if the python_task has no success or failed state, why 
should down_stream tasks gain a skipped state?
   
   3) none_failed/all_done/dummy: 
   NOTE: These are the only trigger rules that wait for python_task to finish, 
and run the approval_task in poke mode. 
   
   4) none_skipped: The skipped state goes to all downstream tasks after 
python_task. 
   NOTE: This I was expecting to wait for python_task to finish, but again, the 
skipped state cascades through downstream task, and am wondering the reason?
   
   
   Could you please help me with the same team?
   
   
   
   ### Solving the problem
   
   I am not sure of the solution.
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to