[ 
https://issues.apache.org/jira/browse/AIRFLOW-2438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Samuli Holopainen updated AIRFLOW-2438:
---------------------------------------
    Description: 
When the {{DagTISlotsAvailableDep}} dependency is checked, the 
{{concurrency_reached}} property of the DAG instance is accessed 
[here|https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/ti_deps/deps/dag_ti_slots_available_dep.py#L24].
 The session from the outer scope is not passed to the property getter, causing 
the session to be committed in the {{provide_session}} decorator.

As a result of this is the transaction where the task instance is locked for 
update in the method {{_check_and_change_state_before_execution}} is committed 
in the second call to {{are_dependencies_met}} 
[here|https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/models.py#L1376],
 i.e. before the state of the task instance has been set to RUNNING. Two 
simultaneous runs of the {{_check_and_change_state_before_execution}} method 
can therefore both return {{True}}, allowing the same task to be run more than 
once concurrently.

I have verified this behavior by stepping through the code with a debugger 
while trying to run the same task twice at the same time.

Passing the session from the outer scope to the concurrency reached check 
should probably fix this.

  was:
When the {{DagTISlotsAvailableDep}} dependency is checked, the 
{{concurrency_reached}} property of the DAG instance is accessed 
[here|https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/ti_deps/deps/dag_ti_slots_available_dep.py#L24].
 The session from the outer scope is not passed to the property getter, causing 
the session to be committed in the {{provide_session}} decorator.

As a result of this is the transaction where the task instance is locked for 
update in the method {{_check_and_change_state_before_execution}} is committed 
in the second call to {{are_dependencies_met}} 
[here|https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/models.py#L1376],
 i.e. before the state of the task instance has been set to RUNNING. Two 
simultaneous runs of the {{_check_and_change_state_before_execution}} method 
can therefore both return {{True}}, causing the same task to be run more than 
once concurrently.

I have verified this behavior by stepping through the code with a debugger 
while trying to run the same task twice at the same time.

Passing the session from the outer scope to the concurrency reached check 
should probably fix this.


> Checking for concurrency limit causes session to be committed early and 
> potentially results in duplicate task runs
> ------------------------------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-2438
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2438
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.9.0
>            Reporter: Samuli Holopainen
>            Priority: Major
>
> When the {{DagTISlotsAvailableDep}} dependency is checked, the 
> {{concurrency_reached}} property of the DAG instance is accessed 
> [here|https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/ti_deps/deps/dag_ti_slots_available_dep.py#L24].
>  The session from the outer scope is not passed to the property getter, 
> causing the session to be committed in the {{provide_session}} decorator.
> As a result of this is the transaction where the task instance is locked for 
> update in the method {{_check_and_change_state_before_execution}} is 
> committed in the second call to {{are_dependencies_met}} 
> [here|https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/models.py#L1376],
>  i.e. before the state of the task instance has been set to RUNNING. Two 
> simultaneous runs of the {{_check_and_change_state_before_execution}} method 
> can therefore both return {{True}}, allowing the same task to be run more 
> than once concurrently.
> I have verified this behavior by stepping through the code with a debugger 
> while trying to run the same task twice at the same time.
> Passing the session from the outer scope to the concurrency reached check 
> should probably fix this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to