mykola-shyshov opened a new pull request, #58741:
URL: https://github.com/apache/airflow/pull/58741

   # fix: Prevent duplicate task execution on scheduler crash (Celery executor)
   
   Fixes #58570
   
   ## Problem
   
   Tasks could execute twice when the scheduler crashed after a task was queued 
to Celery but before the `external_executor_id` (Celery task ID) was persisted 
to the database. This race condition occurred because:
   
   1. Worker calls ti_run endpoint: state → RUNNING (`external_executor_id` 
still NULL)
   2. Scheduler processes events in next cycle: sets `external_executor_id`
   3. **CRASH WINDOW**: If scheduler crashes between steps 1-2, the task is 
left in RUNNING state but with NULL `external_executor_id`
   4. On restart, `adopt_or_reset_orphaned_tasks()` cannot adopt the task (no 
executor ID to query Celery), so it resets the task
   5. Task gets requeued while original execution is still running
   
   ## Solution
   
   Pass the Celery task ID from the worker through the entire call chain to the 
ti_run API endpoint, so `external_executor_id` is set **atomically** with the 
state transition to RUNNING in a single database transaction.
   
   **Call chain:**
   Celery Worker (has celery_task_id)
     → supervise(external_executor_id=celery_task_id)
     → ActivitySubprocess.start()
     → _on_child_started()
     → client.task_instances.start()
     → ti_run API endpoint
     → DB UPDATE: state=RUNNING, external_executor_id=celery_task_id (ATOMIC)
   
   ## Changes
   
   - **task-sdk**: Add `external_executor_id` parameter to 
`TIEnterRunningPayload`, API client, and supervisor chain
   - **airflow-core**: Update ti_run endpoint to save `external_executor_id` 
atomically with state change
   - **celery provider**: Pass `celery_task_id` from worker to `supervise()`
   - **scheduler**: Fix event buffer processing to not overwrite 
worker-provided `external_executor_id`
   - **scheduler**: Clear `external_executor_id` when resetting orphaned tasks
   
   ## Impact
   
   ✅ **Eliminates race condition** causing duplicate task execution on Celery 
executor
   ✅ **Enables proper task adoption** after scheduler crashes
   ✅ **Backward compatible**: field is optional (None default), event buffer 
acts as fallback for old workers
   
   ## Notes on Other Executors
   
   ### Celery Executor
   ✅ **Fixed by this PR** - Worker receives task_id from Celery broker and can 
pass it to ti_run
   
   ### Other Executors (ECS, Lambda, Batch, etc.)
   ⚠️ **Not addressed by this PR** - These executors face a different challenge:
   - Scheduler knows the external ID (ECS task ARN, Lambda invocation ID, etc.)
   - Worker (running inside container) does not know its own external ID
   
   **Recommendation for future work**:
   ⚠️ Implement pre-allocation approach where scheduler generates/stores the 
external ID before starting the task, or provide it to the worker via 
environment variable.
   
   ## Potential Remaining Race Conditions
   
   While this PR eliminates the primary race condition, there are edge cases 
that may require additional hardening:
   
   ### Event Buffer Race (Timing-Dependent)
   **Scenario**: Scheduler's event buffer processing happens before worker's 
ti_run commits:
   - T1: Scheduler loads TI (external_executor_id=NULL)
   - T2: Worker calls ti_run (in progress)
   - T3: Scheduler checks external_executor_id → NULL, sets from event buffer
   - T4: Worker commits (external_executor_id set)
   
   **Impact**: Both set the same value (no duplicate execution), but 
timing-dependent.
   
   **Current behavior**: Event buffer acts as fallback - this is expected for 
QUEUED state.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to