dabla opened a new issue, #49437:
URL: https://github.com/apache/airflow/issues/49437

   ### Apache Airflow version
   
   2.10.5
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   When running multiple Airflow schedulers in combination with a distributed 
Postgres-compatible database like YugabyteDB, the _set_state method of the 
TaskInstance can fail to update the task state correctly. This leads to 
synchronization issues in task state management, potentially causing scheduling 
anomalies or duplicated task runs.
   
   This issue does not occur when:
   
   A single scheduler is used with YugabyteDB, or
   
   Multiple schedulers are used with a traditional single-node PostgreSQL 
instance.
   
   The root cause is likely due to YugabyteDB's distributed nature and how it 
handles concurrent updates with conditionals (like WHERE state != <state>), 
which can behave subtly differently compared to standard PostgreSQL under race 
conditions.
   
   Tested scenario's matrix
   
   | Scenario                                      | Database     | Multiple 
Schedulers | Works? |
   
|----------------------------------------------|--------------|----------------------|--------|
   | Standard PostgreSQL                          | ✅ Yes        | ✅ Yes        
       | ✅     |
   | YugabyteDB (distributed Postgres-compatible) | ✅ Yes        | ❌ No         
       | ❌     |
   | YugabyteDB (distributed Postgres-compatible) | ✅ Yes        | ✅ No         
       | ✅     |
   
   
   ### What you think should happen instead?
   
   The provided _set_state refactor bellow ensures consistent and reliable 
updates to task states across all setups.
   
   The improved method:
   
   ```
   @staticmethod
   @internal_api_call
   def _set_state(
       ti: TaskInstance | TaskInstancePydantic, state: TaskInstanceState, 
session: Session
   ) -> bool:
       if isinstance(ti, TaskInstancePydantic):
           ti.state = TaskInstanceState[ti.state]
   
       logging.info("Setting task state for %s to %s", ti, state)
       current_time = timezone.utcnow()
       start_date = ti.start_date or current_time
   
       if ti.state in State.finished or ti.state == 
TaskInstanceState.UP_FOR_RETRY:
           end_date = ti.end_date or current_time
           duration = (
               (end_date - start_date).total_seconds() if start_date and 
end_date else None
           )
       else:
           end_date = None
           duration = None
   
       result = session.execute(
           update(TaskInstance)
           .where(
               TaskInstance.task_id == ti.task_id,
               TaskInstance.dag_id == ti.dag_id,
               TaskInstance.run_id == ti.run_id,
               TaskInstance.map_index == ti.map_index,
               TaskInstance.state != state,
           )
           .values(
               state=state,
               start_date=start_date,
               end_date=end_date,
               duration=duration,
           )
       )
   
       return result.rowcount > 0
   ```
   
   ### How to reproduce
   
   Use a distributed Postgres database using yugabyteDB in combination with 
multiple schedulers as one scheduler won't trigger the deadlock due to the fact 
that there isn't concurrency then.
   
   ### Operating System
   
   RockyLinux 9.5
   
   ### Versions of Apache Airflow Providers
   
   Latest stable versions
   
   ### Deployment
   
   Other 3rd-party Helm chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to