Samuli Holopainen created AIRFLOW-2438:
------------------------------------------
Summary: Checking for concurrency limit causes session to be
committed early and potentially results in duplicate task runs
Key: AIRFLOW-2438
URL: https://issues.apache.org/jira/browse/AIRFLOW-2438
Project: Apache Airflow
Issue Type: Bug
Components: core
Affects Versions: 1.9.0
Reporter: Samuli Holopainen
When theĀ {{DagTISlotsAvailableDep}} dependency is checked, the
{{concurrency_reached}} property of the DAG instance is accessed
[here|https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/ti_deps/deps/dag_ti_slots_available_dep.py#L24].
The session from the outer scope is not passed to the property getter, causing
the session to be committed in the {{provide_session}} decorator.
As a result of this is the transaction where the task instance is locked for
update in the method {{_check_and_change_state_before_execution}} is committed
in the second call to {{are_dependencies_met}}
[here|https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/models.py#L1376],
i.e. before the state of the task instance has been set to RUNNING. Two
simultaneous runs of the {{_check_and_change_state_before_execution}} method
can therefore both return {{True}}, causing the same task to be run more than
once concurrently.
I have verified this behavior by stepping through the code with a debugger
while trying to run the same task twice at the same time.
Passing the session from the outer scope to the concurrency reached check
should probably fix this.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)