dungdm93 opened a new issue #20580:
URL: https://github.com/apache/airflow/issues/20580


   ### Apache Airflow version
   
   2.2.3 (latest released)
   
   ### What happened
   
   Hello, I quite excited to new [Deferrable ("Async") Operators in AIP-40 
](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177050929), 
and going to adapt this by develop new async version of ExternalTaskSensor. But 
while doing, I find out that Deferrable ("Async") Operators are not canceled 
when DAG is mark failded/success.
   ![Screenshot from 2021-12-30 
14-07-44](https://user-images.githubusercontent.com/6848311/147730232-90a6a5eb-3b6d-4270-9090-57c65240fe19.png)
   When I mark DAG as failed, `wait_task` is canceled (changed to `failed` 
state), but `wait_task_async` still in `deferred` state and triggerer is keep 
poking.
   ![Screenshot from 2021-12-30 
14-08-06](https://user-images.githubusercontent.com/6848311/147730385-9c3e8c13-6b21-4bee-b85a-3d064ec1cde5.png)
   
![image](https://user-images.githubusercontent.com/6848311/147730395-124f8705-adf7-4f1c-832a-15fd3826446c.png)
   
   
   
   ### What you expected to happen
   
   Deferrable ("Async") Operators should be canceled as sync version of 
operators
   
   ### How to reproduce
   
   Testing DAG.
   ```python
   from datetime import datetime, timedelta
   
   from airflow import DAG
   from airflow.operators.bash import BashOperator
   from airflow.sensors.external_task import ExternalTaskSensor
   from airflow.sensors.external_task_async import ExternalTaskSensorAsync
   
   with DAG(
       'tutorial_async_sensor',
       default_args={
           'depends_on_past': False,
           'email': ['[email protected]'],
           'email_on_failure': False,
           'email_on_retry': False,
           'retries': 1,
           'retry_delay': timedelta(minutes=5),
       },
       description='A simple tutorial DAG using external sensor async',
       schedule_interval=timedelta(days=1),
       start_date=datetime(2021, 1, 1),
       catchup=False,
       tags=['example'],
   ) as dag:
       t1 = ExternalTaskSensorAsync(
           task_id='wait_task_async',
           external_dag_id="tutorial",
           external_task_id="sleep",
           execution_delta=timedelta(hours=1),
           poke_interval=5.0
       )
   
       t2 = ExternalTaskSensor(
           task_id='wait_task',
           external_dag_id="tutorial",
           external_task_id="sleep",
           execution_delta=timedelta(hours=1),
           poke_interval=5.0
       )
   
       t3 = BashOperator(
           task_id='echo',
           depends_on_past=False,
           bash_command='echo Hello world',
           retries=3,
       )
       [t1, t2] >> t3
   ```
   
   ### Operating System
   
   Ubuntu 20.04
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   run `airflow standalone`
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to