DaveT1991 commented on issue #64971:
URL: https://github.com/apache/airflow/issues/64971#issuecomment-4225499032

   > ### Description
   > Generated with claude, wanted to open an issue to have some discussion 
first on this proposed fix before opening a PR.
   > 
   > When a scheduler dies in the window between sending a task to Celery and 
processing the resulting `QUEUED` executor event, `external_executor_id` is 
never written to the `task_instance` table. When a new scheduler takes over and 
calls `adopt_or_reset_orphaned_tasks`, 
`CeleryExecutor.try_adopt_task_instances` cannot adopt the task (it requires a 
non-None `external_executor_id` to construct the AsyncResult for Celery state 
lookup), so the task is reset and re-queued instead.
   > 
   > ### Use case/motivation
   > ### Root cause
   > external_executor_id is only set as a side effect of 
handle_executor_events when it processes a QUEUED or RUNNING state event:
   > 
   > # scheduler_job_runner.py
   > ```
   >   if state in (TaskInstanceState.QUEUED, TaskInstanceState.RUNNING):
   >       ti.external_executor_id = info
   > ```
   > 
   > The Celery task_id (result.task_id) is available immediately after 
apply_async() returns in _send_workloads(), but it is only stored in the 
in-memory event_buffer:
   > 
   > # celery_executor.py
   > ```
   >   self.event_buffer[key] = (TaskInstanceState.QUEUED, result.task_id)
   > ```
   > 
   > If the scheduler process dies before the next handle_executor_events cycle 
flushes that value to the DB, the task_id is lost. The new scheduler's executor 
starts with an empty workloads dict and event buffer, so there is no way to 
recover it.
   > 
   > try_adopt_task_instances explicitly skips tasks without a task_id (with a 
comment acknowledging this limitation):
   > 
   > # celery_executor.py
   > ```
   >   for ti in tis:
   >       if ti.external_executor_id is not None:
   >           celery_tasks[ti.external_executor_id] = 
(AsyncResult(ti.external_executor_id), ti)
   >       else:
   >           not_adopted_tis.append(ti)  # cannot adopt — no Celery task_id 
to look up
   > ```
   > 
   > ### Observed behavior
   > A task instance in running state owned by a dead scheduler job is reset 
instead of adopted, triggering a duplicate execution. The scheduler logs show:
   > 
   > ```
   >   Reset the following 1 orphaned TaskInstances:
   >       <TaskInstance: dag.task scheduled__... [running]>
   > ```
   > 
   > with no prior "Setting external_executor_id" log for the task's first try.
   > 
   > ### Expected behavior
   > The Celery task_id should survive a scheduler restart. The new scheduler 
should be able to adopt the in-flight task and poll its Celery state rather 
than resetting it.
   > 
   > ### Proposed fix
   > Persist external_executor_id directly to the database in _send_workloads() 
immediately after a successful apply_async(), rather than waiting for it to be 
written as a side effect of event processing:
   > 
   > # celery_executor.py — _send_workloads()
   > ```
   > elif result is not None:
   >       result.backend = cached_celery_backend
   >       self.running.add(key)
   >       self.workloads[key] = result
   >       self.event_buffer[key] = (TaskInstanceState.QUEUED, result.task_id)
   >       # Persist the Celery task_id immediately so it survives a scheduler 
restart.
   >       # Currently this is only written as a side effect of 
handle_executor_events,
   >       # which creates a window where a crash leaves external_executor_id 
as None
   >       # and prevents adoption by a new scheduler.
   >       self._persist_external_executor_id(key, result.task_id, session)
   > ```
   > 
   > This eliminates the race window between task submission and event 
processing. The event_buffer path can remain as-is for the normal (no-crash) 
flow; the DB write is an additional safety net.
   > 
   > ### Notes
   > * The existing code comment in try_adopt_task_instances already 
acknowledges this race condition as a known limitation. This fix resolves it.
   > * A session would need to be threaded into _send_workloads or the write 
issued via a short-lived session, depending on how the call chain is structured.
   > * The handle_executor_events path that sets external_executor_id from 
QUEUED/RUNNING events should be kept as a reconciliation path for any case 
where the DB write is missed.
   > 
   > ### Related issues
   > [#37784](https://github.com/apache/airflow/pull/37784)
   > 
   > ### Are you willing to submit a PR?
   > * [x]  Yes I am willing to submit a PR!
   > 
   > ### Code of Conduct
   > * [x]  I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   
   Any updates about my submission?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to