Samuli Holopainen created AIRFLOW-2438:
------------------------------------------

             Summary: Checking for concurrency limit causes session to be 
committed early and potentially results in duplicate task runs
                 Key: AIRFLOW-2438
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2438
             Project: Apache Airflow
          Issue Type: Bug
          Components: core
    Affects Versions: 1.9.0
            Reporter: Samuli Holopainen


When theĀ {{DagTISlotsAvailableDep}} dependency is checked, the 
{{concurrency_reached}} property of the DAG instance is accessed 
[here|https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/ti_deps/deps/dag_ti_slots_available_dep.py#L24].
 The session from the outer scope is not passed to the property getter, causing 
the session to be committed in the {{provide_session}} decorator.

As a result of this is the transaction where the task instance is locked for 
update in the method {{_check_and_change_state_before_execution}} is committed 
in the second call to {{are_dependencies_met}} 
[here|https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/models.py#L1376],
 i.e. before the state of the task instance has been set to RUNNING. Two 
simultaneous runs of the {{_check_and_change_state_before_execution}} method 
can therefore both return {{True}}, causing the same task to be run more than 
once concurrently.

I have verified this behavior by stepping through the code with a debugger 
while trying to run the same task twice at the same time.

Passing the session from the outer scope to the concurrency reached check 
should probably fix this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to