ReadytoRocc opened a new issue #19120:
URL: https://github.com/apache/airflow/issues/19120


   ### Apache Airflow version
   
   2.2.0 (latest released)
   
   ### Operating System
   
   Container-Optimized OS with Containerd (cos_containerd) - GKE
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   I went to clear a successful task instance that is using a Deferred 
operator, and noticed it immediately went into a state of successful. The 
Operator is designed to run a trigger that will wait 1 minute from 
approximately task start. Upon investigation, I did not see the following in 
the second run's task log: `{taskinstance.py:1332} INFO - Pausing task as 
DEFERRED.`.
   
   ### What you expected to happen
   
   I would expect the task to once again go into a state of deferred for 
approx. 1 minute and then succeed.
   
   ### How to reproduce
   
   Run the following DAG. Once a task instance is successful, clear it, and see 
the task fail. This confirms `ti.next_method` was not cleared, as `execute` did 
not rerun and reset `execute_try_number`. `execute` did not rerun, as 
`ti.next_method` was not cleared.
   
   ```
   from datetime import datetime
   
   from airflow import DAG
   from airflow.exceptions import AirflowException
   from airflow.models import BaseOperator
   from airflow.triggers.testing import SuccessTrigger
   
   
   class RetryOperator(BaseOperator):
       def execute(self, context):
           ti = context["ti"]
           next_method = ti.next_method
           try_number = ti.try_number
   
           self.log.info(
               f"In `execute`: try_number: {try_number}, next_method 
{next_method}."
           )
   
           self.defer(
               trigger=SuccessTrigger(),
               method_name="next",
               kwargs={"execute_try_number": try_number},
           )
   
       def next(self, context, execute_try_number, event=None):
           ti = context["ti"]
           next_method = ti.next_method
           try_number = ti.try_number
   
           self.log.info(
               f"In `next`: try_number: {try_number}, next_method 
{next_method}, execute_try_number: {execute_try_number}."
           )
   
           if execute_try_number != try_number:
               raise AirflowException("`execute` wasn't run during clear!")
   
           return None  # Success!
   
   
   with DAG(
       "triggerer_clear", schedule_interval=None, start_date=datetime(2021, 10, 
20)
   ) as dag:
       RetryOperator(task_id="clear")
   ```
   
   ### Anything else
   
   I believe this is due to `ti.next_method` (and `ti.next_method_kwargs`) not 
being cleared after a task has completed. A similar issue was raised in 
https://github.com/apache/airflow/issues/18146.
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to