GitHub user ronak-sirwani created a discussion: Deferred TI's next_method and 
next_kwargs not cleared on retries in airflow 3.x

I have been exploring the new Task SDK and Deferrable Operator lifecycle in 
Airflow 3.0. I’ve observed a behavior regarding task state during retries that 
I wanted to humbly raise for discussion. It appears that next_method and 
next_kwargs are persisting across attempts, which seems to differ from the 
established behavior in Airflow 2.x.

#### Observation:
When a Deferrable Operator fails during its resumption callback and enters an 
Up For Retry state, the subsequent attempt (Try 2) appears to incorrectly 
persist the next_method and next_kwargs from the previous failed attempt (Try 
1).
Instead of starting fresh with the standard execute() entry point, the Worker 
immediately invokes the resumption callback using the stale metadata. This 
causes the task to bypass the initial setup logic in execute(), leading to 
"zombie resumptions" where the second attempt tries to process data intended 
for the first.

When a Deferrable Operator fails during a trigger resumption and enters a Retry 
(following an Up For Retry state), the subsequent attempt incorrectly persists 
the `next_method` and `next_kwargs` from the previous failed attempt.
Instead of starting fresh with the standard execute() entry point, the Worker 
attempts to immediately resume the task using the stale 
next_method(**next_kwargs) data. This causes the task to either process 
"stale/zombie" events from the previous attempt or fail prematurely because the 
initial setup logic in execute() was bypassed.

- Airflow 2.x: This looks like a regression of a known issue addressed in 
#18146. The fix ensured that when a task was reset for a retry, the next_method 
and next_kwargs columns in taskisntance table were explicitly nullified.
- Airflow 3.x: With the transition to the new Internal API and Task SDK (e.g., 
Issue #47373), this reset logic appears to be missing and subsequent attempts 
skip execute() entirely.

#### How to reproduce:
```python
from datetime import datetime, timedelta

from airflow import DAG
from airflow.exceptions import AirflowException
from airflow.sdk import BaseOperator
from airflow.triggers.testing import SuccessTrigger


class RetryOperator(BaseOperator):
    def execute(self, context):
        ti = context["ti"]
        has_next_method = bool(ti._ti_context_from_server.next_method)
        try_number = ti.try_number
        self.log.info(
            f"In `execute`: has_next_method: {has_next_method}, 
try_number:{try_number}"
        )

        self.defer(
            trigger=SuccessTrigger(),
            method_name="next",
            kwargs={"execute_try_number": try_number},
        )

    def next(self, context, execute_try_number, event=None):
        self.log.info("In next!")
        ti = context["ti"]
        has_next_method = bool(ti._ti_context_from_server.next_method)
        try_number = ti.try_number
        self.log.info(
            f"In `next`: has_next_method: {has_next_method}, 
try_number:{try_number}, excute_try_number: {execute_try_number}"
        )

        if try_number == 1:
            # Force a retry
            raise AirflowException("Force a retry")
        # Did we run `execute`?
        if execute_try_number != try_number:
            raise AirflowException("`execute` wasn't run during retry!")
        return None  # Success!


with DAG(
    "triggerer_retry", schedule_interval=None, start_date=datetime(2021, 9, 13)
) as dag:
    RetryOperator(task_id="retry", retries=1, retry_delay=timedelta(seconds=15))
```

#### My Current Workaround:
To mitigate this in my custom operators, I’ve had to implement a "Single-Entry" 
pattern to force idempotency:
Always setting method_name="execute" in the self.defer() call.
Passing a unique attempt_id (from ti.try_number) into the Trigger.
Manually validating the attempt_id inside execute() to detect if the event is 
fresh or stale.
While this works, it adds significant boilerplate and complexity to every 
custom operator.

I understand the TaskInstance lifecycle has been significantly refactored for 
Airflow 3.0. I wanted to ask:
- Is this indeed an issue? Is it intended for next_method to persist across 
retries in the new architecture, or should the 2.x reset behavior be preserved?
- Does this need a fix in Core/SDK? I’ve looked at the main branch but couldn't 
find where the nullification of these fields is handled in the new Internal API.

GitHub link: https://github.com/apache/airflow/discussions/62342

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to