pdellarciprete commented on issue #57618:
URL: https://github.com/apache/airflow/issues/57618#issuecomment-3629305213

   Hello @ephraimbuddy,
   
   Not sure if you’ve had time to dig into this yet, but I spent some time 
analyzing the code and it looks like the issue may indeed stem from concurrency 
during the `None → SCHEDULED` transition inside `DagRun.schedule_tis()`. In 
particular, this seems to allow the scheduler to incorrectly increment the 
`try_number`.
   
   **Relevant code:**  
   
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/dagrun.py#L2080-L2097
   
   ```python
   for id_chunk in schedulable_ti_ids_chunks:
       result = session.execute(
           update(TI)
           .where(TI.id.in_(id_chunk))
           .values(
               state=TaskInstanceState.SCHEDULED,
               scheduled_dttm=timezone.utcnow(),
               try_number=case(
                   (
                       or_(TI.state.is_(None), TI.state != 
TaskInstanceState.UP_FOR_RESCHEDULE),
                       TI.try_number + 1,
                   ),
                   else_=TI.try_number,
               ),
           )
           .execution_options(synchronize_session=False)
       )
       count += getattr(result, "rowcount", 0)
   ```
   
   Reproducing the race condition scenario as I see it:
   
   1. **Scheduler A** wins the race and successfully executes the `UPDATE`, 
setting `state = SCHEDULED` and `try_number = 1`.
   2. **Scheduler B** immediately runs the same `UPDATE` on the same row. At 
this point, `TI.state` is already `SCHEDULED`, but the `or_` condition still 
evaluates to `True` (since `SCHEDULED != UP_FOR_RESCHEDULE`). As a result, 
Scheduler B updates `try_number` to `2`.
   3. Both schedulers then move on to 
`_critical_section_enqueue_task_instances()` and each believes it owns a 
runnable task instance (A with try 1, B with try 2).
   
   One possible mitigation could be to explicitly exclude tasks that are 
already in the `SCHEDULED` state in the `WHERE` clause, for example:
   
   ```python
   .where(TI.state.is_not_(TaskInstanceState.SCHEDULED))
   ```
   
   Or if you prefer:
   
   ```python
   .where(
       or_(
           TI.state.is_(None),
           TI.state == TaskInstanceState.UP_FOR_RESCHEDULE
       )
   )
   ```
   
   Happy to hear your thoughts — I might be overlooking something subtle here.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to