sam-dumont opened a new pull request, #63266:
URL: https://github.com/apache/airflow/pull/63266

    <!-- SPDX-License-Identifier: Apache-2.0
         https://www.apache.org/licenses/LICENSE-2.0 -->
   
   `ti_skip_downstream()` issues an UPDATE filtered by `(dag_id, run_id, 
task_id, map_index)` without a state guard. When a BranchOperator on one 
scheduler decides to skip downstream tasks, the UPDATE can overwrite a task 
already RUNNING on a worker. The worker's next heartbeat returns 409 with 
`current_state: skipped`, killing the task mid-execution.
   
   This is a companion fix to #60330, which guards `schedule_tis()` against the 
same class of race condition. Different code path (Execution API routes vs 
`dagrun.py`), same root cause : unguarded bulk UPDATEs on TI state.
   
   **Production data (12 days, 5 schedulers, ~500 concurrent workers)**
   
   We deployed both fixes as monkey patches on our prod cluster and monitored 
409 heartbeat errors via CloudWatch :
   
   ```
   Before any fix       14-169 errors/day
   After schedule_tis   3-4/day (all current_state: skipped)
   After both fixes     0 errors for 18+ hours
   ```
   
   | Metric | Before fixes | After schedule_tis only | After both fixes |
   |--------|-------------|------------------------|-----------------|
   | Total 409s/day | 14-169 | 3-4 | **0** |
   | `current_state: scheduled` | present | **0** | 0 |
   | `current_state: failed` | 47/day | **0** | 0 |
   | `current_state: skipped` | 8/day | **2-5/day** | **0** |
   
   **Fix**
   
   Add `skippable_state_clause` to the UPDATE's WHERE clause :
   
   ```python
   skippable_state_clause = or_(
       TI.state.is_(None),
       TI.state.not_in([RUNNING, SUCCESS, FAILED]),
   )
   ```
   
   The `or_(IS NULL, NOT IN)` pattern handles SQL NULL semantics : `NULL NOT IN 
(...)` evaluates to NULL (falsy), so tasks with `state=None` need an explicit 
`IS NULL` check to remain skippable.
   
   QUEUED is intentionally NOT guarded : a QUEUED task hasn't started executing 
yet, so the BranchOperator's decision should take priority. The worker pod will 
get a benign 409 on `PATCH /run` and exit cleanly. Blocking QUEUED would cause 
a semantic error where the wrong branch executes.
   
   **Tests**
   
   5 regression tests in `TestTISkipDownstreamRaceCondition` :
   - RUNNING / SUCCESS / FAILED tasks protected from overwrite (parametrized)
   - QUEUED task correctly skipped (BranchOperator decision wins over queue)
   - None-state task still correctly skipped (happy path)
   
   related: #59378
   
   related: #60330
   
   related: #57618
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   - [X] Yes — Claude Code (claude-opus-4-6)
   
   Generated-by: Claude Code (claude-opus-4-6) following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to