DaveT1991 commented on issue #64971: URL: https://github.com/apache/airflow/issues/64971#issuecomment-4225499032
> ### Description > Generated with claude, wanted to open an issue to have some discussion first on this proposed fix before opening a PR. > > When a scheduler dies in the window between sending a task to Celery and processing the resulting `QUEUED` executor event, `external_executor_id` is never written to the `task_instance` table. When a new scheduler takes over and calls `adopt_or_reset_orphaned_tasks`, `CeleryExecutor.try_adopt_task_instances` cannot adopt the task (it requires a non-None `external_executor_id` to construct the AsyncResult for Celery state lookup), so the task is reset and re-queued instead. > > ### Use case/motivation > ### Root cause > external_executor_id is only set as a side effect of handle_executor_events when it processes a QUEUED or RUNNING state event: > > # scheduler_job_runner.py > ``` > if state in (TaskInstanceState.QUEUED, TaskInstanceState.RUNNING): > ti.external_executor_id = info > ``` > > The Celery task_id (result.task_id) is available immediately after apply_async() returns in _send_workloads(), but it is only stored in the in-memory event_buffer: > > # celery_executor.py > ``` > self.event_buffer[key] = (TaskInstanceState.QUEUED, result.task_id) > ``` > > If the scheduler process dies before the next handle_executor_events cycle flushes that value to the DB, the task_id is lost. The new scheduler's executor starts with an empty workloads dict and event buffer, so there is no way to recover it. > > try_adopt_task_instances explicitly skips tasks without a task_id (with a comment acknowledging this limitation): > > # celery_executor.py > ``` > for ti in tis: > if ti.external_executor_id is not None: > celery_tasks[ti.external_executor_id] = (AsyncResult(ti.external_executor_id), ti) > else: > not_adopted_tis.append(ti) # cannot adopt — no Celery task_id to look up > ``` > > ### Observed behavior > A task instance in running state owned by a dead scheduler job is reset instead of adopted, triggering a duplicate execution. The scheduler logs show: > > ``` > Reset the following 1 orphaned TaskInstances: > <TaskInstance: dag.task scheduled__... [running]> > ``` > > with no prior "Setting external_executor_id" log for the task's first try. > > ### Expected behavior > The Celery task_id should survive a scheduler restart. The new scheduler should be able to adopt the in-flight task and poll its Celery state rather than resetting it. > > ### Proposed fix > Persist external_executor_id directly to the database in _send_workloads() immediately after a successful apply_async(), rather than waiting for it to be written as a side effect of event processing: > > # celery_executor.py — _send_workloads() > ``` > elif result is not None: > result.backend = cached_celery_backend > self.running.add(key) > self.workloads[key] = result > self.event_buffer[key] = (TaskInstanceState.QUEUED, result.task_id) > # Persist the Celery task_id immediately so it survives a scheduler restart. > # Currently this is only written as a side effect of handle_executor_events, > # which creates a window where a crash leaves external_executor_id as None > # and prevents adoption by a new scheduler. > self._persist_external_executor_id(key, result.task_id, session) > ``` > > This eliminates the race window between task submission and event processing. The event_buffer path can remain as-is for the normal (no-crash) flow; the DB write is an additional safety net. > > ### Notes > * The existing code comment in try_adopt_task_instances already acknowledges this race condition as a known limitation. This fix resolves it. > * A session would need to be threaded into _send_workloads or the write issued via a short-lived session, depending on how the call chain is structured. > * The handle_executor_events path that sets external_executor_id from QUEUED/RUNNING events should be kept as a reconciliation path for any case where the DB write is missed. > > ### Related issues > [#37784](https://github.com/apache/airflow/pull/37784) > > ### Are you willing to submit a PR? > * [x] Yes I am willing to submit a PR! > > ### Code of Conduct > * [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) Any updates about my submission? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
