seanmuth opened a new issue, #64971:
URL: https://github.com/apache/airflow/issues/64971
### Description
Generated with claude, wanted to open an issue to have some discussion first
on this proposed fix before opening a PR.
When a scheduler dies in the window between sending a task to Celery and
processing the resulting `QUEUED` executor event, `external_executor_id` is
never written to the `task_instance` table. When a new scheduler takes over and
calls `adopt_or_reset_orphaned_tasks`,
`CeleryExecutor.try_adopt_task_instances` cannot adopt the task (it requires a
non-None
`external_executor_id` to construct the AsyncResult for Celery state
lookup), so the task is reset and re-queued instead.
### Use case/motivation
### Root cause
external_executor_id is only set as a side effect of
handle_executor_events when it processes a QUEUED or RUNNING state event:
# scheduler_job_runner.py
```
if state in (TaskInstanceState.QUEUED, TaskInstanceState.RUNNING):
ti.external_executor_id = info
```
The Celery task_id (result.task_id) is available immediately after
apply_async() returns in _send_workloads(), but it is only stored in the
in-memory event_buffer:
# celery_executor.py
```
self.event_buffer[key] = (TaskInstanceState.QUEUED, result.task_id)
```
If the scheduler process dies before the next handle_executor_events cycle
flushes that value to the DB, the task_id is lost. The new scheduler's executor
starts with an empty workloads dict and event buffer, so there is no way to
recover it.
try_adopt_task_instances explicitly skips tasks without a task_id (with a
comment acknowledging this limitation):
# celery_executor.py
```
for ti in tis:
if ti.external_executor_id is not None:
celery_tasks[ti.external_executor_id] =
(AsyncResult(ti.external_executor_id), ti)
else:
not_adopted_tis.append(ti) # cannot adopt — no Celery task_id to
look up
```
### Observed behavior
A task instance in running state owned by a dead scheduler job is reset
instead of adopted, triggering a duplicate execution. The scheduler logs show:
```
Reset the following 1 orphaned TaskInstances:
<TaskInstance: dag.task scheduled__... [running]>
```
with no prior "Setting external_executor_id" log for the task's first try.
### Expected behavior
The Celery task_id should survive a scheduler restart. The new scheduler
should be able to adopt the in-flight task and poll its Celery state rather
than resetting it.
### Proposed fix
Persist external_executor_id directly to the database in _send_workloads()
immediately after a successful apply_async(), rather than waiting for it to be
written as a side effect of event processing:
# celery_executor.py — _send_workloads()
```
elif result is not None:
result.backend = cached_celery_backend
self.running.add(key)
self.workloads[key] = result
self.event_buffer[key] = (TaskInstanceState.QUEUED, result.task_id)
# Persist the Celery task_id immediately so it survives a scheduler
restart.
# Currently this is only written as a side effect of
handle_executor_events,
# which creates a window where a crash leaves external_executor_id as
None
# and prevents adoption by a new scheduler.
self._persist_external_executor_id(key, result.task_id, session)
```
This eliminates the race window between task submission and event
processing. The event_buffer path can remain as-is for the normal (no-crash)
flow; the DB write is an additional safety net.
### Notes
- The existing code comment in try_adopt_task_instances already
acknowledges this race condition as a known limitation. This fix resolves it.
- A session would need to be threaded into _send_workloads or the write
issued via a short-lived session, depending on how the call chain is structured.
- The handle_executor_events path that sets external_executor_id from
QUEUED/RUNNING events should be kept as a reconciliation path for any case
where the DB write is missed.
### Related issues
https://github.com/apache/airflow/pull/37784
### Are you willing to submit a PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]