dabla opened a new issue, #49437:
URL: https://github.com/apache/airflow/issues/49437
### Apache Airflow version
2.10.5
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
When running multiple Airflow schedulers in combination with a distributed
Postgres-compatible database like YugabyteDB, the _set_state method of the
TaskInstance can fail to update the task state correctly. This leads to
synchronization issues in task state management, potentially causing scheduling
anomalies or duplicated task runs.
This issue does not occur when:
A single scheduler is used with YugabyteDB, or
Multiple schedulers are used with a traditional single-node PostgreSQL
instance.
The root cause is likely due to YugabyteDB's distributed nature and how it
handles concurrent updates with conditionals (like WHERE state != <state>),
which can behave subtly differently compared to standard PostgreSQL under race
conditions.
Tested scenario's matrix
| Scenario | Database | Multiple
Schedulers | Works? |
|----------------------------------------------|--------------|----------------------|--------|
| Standard PostgreSQL | ✅ Yes | ✅ Yes
| ✅ |
| YugabyteDB (distributed Postgres-compatible) | ✅ Yes | ❌ No
| ❌ |
| YugabyteDB (distributed Postgres-compatible) | ✅ Yes | ✅ No
| ✅ |
### What you think should happen instead?
The provided _set_state refactor bellow ensures consistent and reliable
updates to task states across all setups.
The improved method:
```
@staticmethod
@internal_api_call
def _set_state(
ti: TaskInstance | TaskInstancePydantic, state: TaskInstanceState,
session: Session
) -> bool:
if isinstance(ti, TaskInstancePydantic):
ti.state = TaskInstanceState[ti.state]
logging.info("Setting task state for %s to %s", ti, state)
current_time = timezone.utcnow()
start_date = ti.start_date or current_time
if ti.state in State.finished or ti.state ==
TaskInstanceState.UP_FOR_RETRY:
end_date = ti.end_date or current_time
duration = (
(end_date - start_date).total_seconds() if start_date and
end_date else None
)
else:
end_date = None
duration = None
result = session.execute(
update(TaskInstance)
.where(
TaskInstance.task_id == ti.task_id,
TaskInstance.dag_id == ti.dag_id,
TaskInstance.run_id == ti.run_id,
TaskInstance.map_index == ti.map_index,
TaskInstance.state != state,
)
.values(
state=state,
start_date=start_date,
end_date=end_date,
duration=duration,
)
)
return result.rowcount > 0
```
### How to reproduce
Use a distributed Postgres database using yugabyteDB in combination with
multiple schedulers as one scheduler won't trigger the deadlock due to the fact
that there isn't concurrency then.
### Operating System
RockyLinux 9.5
### Versions of Apache Airflow Providers
Latest stable versions
### Deployment
Other 3rd-party Helm chart
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]