ecodina commented on issue #34207:
URL: https://github.com/apache/airflow/issues/34207#issuecomment-3733395925

   I've just tried the deferrable mode in Airflow 3.1.5 and 
`apache-airflow-providers-standard` 1.10.0 and it seems it works.
   
   "Parent" dag: has a `BashOperator` that sleeps for 10 minutes.
   
   "Sensor" dag: has 2 `ExternalTaskSensor` tasks, 1 waits for the full Dag, 
another waits for 1 task.
   
   Without setting `execution_timeout` to the `ExternalTaskSensor`
   
   - It waits for the full Dag
   - It waits for the `BashOperator` task.
   
   Setting `execution_timeout=timedelta(minutes=1)`:
   
   - Waiting for the full Dag is cancelled after 1 minute
   - Waiting for the `BashOperator` task is cancelled after 1 minute
   
   I believe we can now close this issue.
   
   @suman-himanshu are you still experiencing issues? Are you using above 
versions?
   
   <details>
   
   <summary>Parent Dag</summary>
   
   ```python
   from airflow.providers.standard.operators.bash import BashOperator
   from airflow.sdk import DAG
   from datetime import datetime
   
   
   default_args = {
       'owner': 'test_user',
       'start_date': datetime(2024, 1, 1),
       'weight_rule': 'downstream',
   }
   
   with DAG(
       dag_id='sleeper_pipeline',
       default_args=default_args,
       description='Sleeper Pipeline',
       tags=['default'],
   ) as dag:
   
       task_a = BashOperator(
           task_id='task_a',
           bash_command='sleep 600',
       )
   ```
   
   </details>
   
   
   <details>
   
   <summary>Sensor Dag</summary>
   
   ```python
   from airflow.providers.standard.sensors.external_task import 
ExternalTaskSensor
   from airflow.sdk import DAG
   from airflow.timetables.interval import CronDataIntervalTimetable
   from datetime import datetime, timedelta
   
   
   
   default_args = {
       'owner': 'test_user',
       'start_date': datetime(2024, 1, 1),
       'weight_rule': 'downstream',
   }
   
   with DAG(
       dag_id='sensor_pipeline',
       default_args=default_args,
       description='Sensor Pipeline',
       catchup=False,
       tags=['default'],
   ) as dag:
   
   
   
       task_a = ExternalTaskSensor(
           task_id='task_a',
           external_dag_id='sleeper_pipeline',
           execution_date_fn=your_function,  # Add your function
           deferrable=True,
           execution_timeout=timedelta(minutes=1), # Remove if you don't want a 
timeout
       )
   
       task_b = ExternalTaskSensor(
           task_id='task_b',
           external_dag_id='sleeper_pipeline',
           external_task_ids=['task_a'],
           priority_weight=0,
           execution_date_fn=your_function,  # Add your function
           deferrable=True,
           execution_timeout=timedelta(minutes=1),  # Remove if you don't want 
a timeout
       )
   ```
   
   </details>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to