GitHub user ronak-sirwani created a discussion: Deferred TI's next_method and
next_kwargs not cleared on retries in airflow 3.x
I have been exploring the new Task SDK and Deferrable Operator lifecycle in
Airflow 3.0. I’ve observed a behavior regarding task state during retries that
I wanted to humbly raise for discussion. It appears that next_method and
next_kwargs are persisting across attempts, which seems to differ from the
established behavior in Airflow 2.x.
#### Observation:
When a Deferrable Operator fails during its resumption callback and enters an
Up For Retry state, the subsequent attempt (Try 2) appears to incorrectly
persist the next_method and next_kwargs from the previous failed attempt (Try
1).
Instead of starting fresh with the standard execute() entry point, the Worker
immediately invokes the resumption callback using the stale metadata. This
causes the task to bypass the initial setup logic in execute(), leading to
"zombie resumptions" where the second attempt tries to process data intended
for the first.
When a Deferrable Operator fails during a trigger resumption and enters a Retry
(following an Up For Retry state), the subsequent attempt incorrectly persists
the `next_method` and `next_kwargs` from the previous failed attempt.
Instead of starting fresh with the standard execute() entry point, the Worker
attempts to immediately resume the task using the stale
next_method(**next_kwargs) data. This causes the task to either process
"stale/zombie" events from the previous attempt or fail prematurely because the
initial setup logic in execute() was bypassed.
- Airflow 2.x: This looks like a regression of a known issue addressed in
#18146. The fix ensured that when a task was reset for a retry, the next_method
and next_kwargs columns in taskisntance table were explicitly nullified.
- Airflow 3.x: With the transition to the new Internal API and Task SDK (e.g.,
Issue #47373), this reset logic appears to be missing and subsequent attempts
skip execute() entirely.
#### How to reproduce:
```python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.exceptions import AirflowException
from airflow.sdk import BaseOperator
from airflow.triggers.testing import SuccessTrigger
class RetryOperator(BaseOperator):
def execute(self, context):
ti = context["ti"]
has_next_method = bool(ti._ti_context_from_server.next_method)
try_number = ti.try_number
self.log.info(
f"In `execute`: has_next_method: {has_next_method},
try_number:{try_number}"
)
self.defer(
trigger=SuccessTrigger(),
method_name="next",
kwargs={"execute_try_number": try_number},
)
def next(self, context, execute_try_number, event=None):
self.log.info("In next!")
ti = context["ti"]
has_next_method = bool(ti._ti_context_from_server.next_method)
try_number = ti.try_number
self.log.info(
f"In `next`: has_next_method: {has_next_method},
try_number:{try_number}, excute_try_number: {execute_try_number}"
)
if try_number == 1:
# Force a retry
raise AirflowException("Force a retry")
# Did we run `execute`?
if execute_try_number != try_number:
raise AirflowException("`execute` wasn't run during retry!")
return None # Success!
with DAG(
"triggerer_retry", schedule_interval=None, start_date=datetime(2021, 9, 13)
) as dag:
RetryOperator(task_id="retry", retries=1, retry_delay=timedelta(seconds=15))
```
#### My Current Workaround:
To mitigate this in my custom operators, I’ve had to implement a "Single-Entry"
pattern to force idempotency:
Always setting method_name="execute" in the self.defer() call.
Passing a unique attempt_id (from ti.try_number) into the Trigger.
Manually validating the attempt_id inside execute() to detect if the event is
fresh or stale.
While this works, it adds significant boilerplate and complexity to every
custom operator.
I understand the TaskInstance lifecycle has been significantly refactored for
Airflow 3.0. I wanted to ask:
- Is this indeed an issue? Is it intended for next_method to persist across
retries in the new architecture, or should the 2.x reset behavior be preserved?
- Does this need a fix in Core/SDK? I’ve looked at the main branch but couldn't
find where the nullification of these fields is handled in the new Internal API.
GitHub link: https://github.com/apache/airflow/discussions/62342
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]