mykola-shyshov opened a new pull request, #58741:
URL: https://github.com/apache/airflow/pull/58741
# fix: Prevent duplicate task execution on scheduler crash (Celery executor)
Fixes #58570
## Problem
Tasks could execute twice when the scheduler crashed after a task was queued
to Celery but before the `external_executor_id` (Celery task ID) was persisted
to the database. This race condition occurred because:
1. Worker calls ti_run endpoint: state → RUNNING (`external_executor_id`
still NULL)
2. Scheduler processes events in next cycle: sets `external_executor_id`
3. **CRASH WINDOW**: If scheduler crashes between steps 1-2, the task is
left in RUNNING state but with NULL `external_executor_id`
4. On restart, `adopt_or_reset_orphaned_tasks()` cannot adopt the task (no
executor ID to query Celery), so it resets the task
5. Task gets requeued while original execution is still running
## Solution
Pass the Celery task ID from the worker through the entire call chain to the
ti_run API endpoint, so `external_executor_id` is set **atomically** with the
state transition to RUNNING in a single database transaction.
**Call chain:**
Celery Worker (has celery_task_id)
→ supervise(external_executor_id=celery_task_id)
→ ActivitySubprocess.start()
→ _on_child_started()
→ client.task_instances.start()
→ ti_run API endpoint
→ DB UPDATE: state=RUNNING, external_executor_id=celery_task_id (ATOMIC)
## Changes
- **task-sdk**: Add `external_executor_id` parameter to
`TIEnterRunningPayload`, API client, and supervisor chain
- **airflow-core**: Update ti_run endpoint to save `external_executor_id`
atomically with state change
- **celery provider**: Pass `celery_task_id` from worker to `supervise()`
- **scheduler**: Fix event buffer processing to not overwrite
worker-provided `external_executor_id`
- **scheduler**: Clear `external_executor_id` when resetting orphaned tasks
## Impact
✅ **Eliminates race condition** causing duplicate task execution on Celery
executor
✅ **Enables proper task adoption** after scheduler crashes
✅ **Backward compatible**: field is optional (None default), event buffer
acts as fallback for old workers
## Notes on Other Executors
### Celery Executor
✅ **Fixed by this PR** - Worker receives task_id from Celery broker and can
pass it to ti_run
### Other Executors (ECS, Lambda, Batch, etc.)
⚠️ **Not addressed by this PR** - These executors face a different challenge:
- Scheduler knows the external ID (ECS task ARN, Lambda invocation ID, etc.)
- Worker (running inside container) does not know its own external ID
**Recommendation for future work**:
⚠️ Implement pre-allocation approach where scheduler generates/stores the
external ID before starting the task, or provide it to the worker via
environment variable.
## Potential Remaining Race Conditions
While this PR eliminates the primary race condition, there are edge cases
that may require additional hardening:
### Event Buffer Race (Timing-Dependent)
**Scenario**: Scheduler's event buffer processing happens before worker's
ti_run commits:
- T1: Scheduler loads TI (external_executor_id=NULL)
- T2: Worker calls ti_run (in progress)
- T3: Scheduler checks external_executor_id → NULL, sets from event buffer
- T4: Worker commits (external_executor_id set)
**Impact**: Both set the same value (no duplicate execution), but
timing-dependent.
**Current behavior**: Event buffer acts as fallback - this is expected for
QUEUED state.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]