ecodina commented on issue #34207:
URL: https://github.com/apache/airflow/issues/34207#issuecomment-3733395925
I've just tried the deferrable mode in Airflow 3.1.5 and
`apache-airflow-providers-standard` 1.10.0 and it seems it works.
"Parent" dag: has a `BashOperator` that sleeps for 10 minutes.
"Sensor" dag: has 2 `ExternalTaskSensor` tasks, 1 waits for the full Dag,
another waits for 1 task.
Without setting `execution_timeout` to the `ExternalTaskSensor`
- It waits for the full Dag
- It waits for the `BashOperator` task.
Setting `execution_timeout=timedelta(minutes=1)`:
- Waiting for the full Dag is cancelled after 1 minute
- Waiting for the `BashOperator` task is cancelled after 1 minute
I believe we can now close this issue.
@suman-himanshu are you still experiencing issues? Are you using above
versions?
<details>
<summary>Parent Dag</summary>
```python
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG
from datetime import datetime
default_args = {
'owner': 'test_user',
'start_date': datetime(2024, 1, 1),
'weight_rule': 'downstream',
}
with DAG(
dag_id='sleeper_pipeline',
default_args=default_args,
description='Sleeper Pipeline',
tags=['default'],
) as dag:
task_a = BashOperator(
task_id='task_a',
bash_command='sleep 600',
)
```
</details>
<details>
<summary>Sensor Dag</summary>
```python
from airflow.providers.standard.sensors.external_task import
ExternalTaskSensor
from airflow.sdk import DAG
from airflow.timetables.interval import CronDataIntervalTimetable
from datetime import datetime, timedelta
default_args = {
'owner': 'test_user',
'start_date': datetime(2024, 1, 1),
'weight_rule': 'downstream',
}
with DAG(
dag_id='sensor_pipeline',
default_args=default_args,
description='Sensor Pipeline',
catchup=False,
tags=['default'],
) as dag:
task_a = ExternalTaskSensor(
task_id='task_a',
external_dag_id='sleeper_pipeline',
execution_date_fn=your_function, # Add your function
deferrable=True,
execution_timeout=timedelta(minutes=1), # Remove if you don't want a
timeout
)
task_b = ExternalTaskSensor(
task_id='task_b',
external_dag_id='sleeper_pipeline',
external_task_ids=['task_a'],
priority_weight=0,
execution_date_fn=your_function, # Add your function
deferrable=True,
execution_timeout=timedelta(minutes=1), # Remove if you don't want
a timeout
)
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]