[ 
https://issues.apache.org/jira/browse/SPARK-13122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Or resolved SPARK-13122.
-------------------------------
          Resolution: Fixed
       Fix Version/s: 2.0.0
                      1.6.1
    Target Version/s: 1.6.1, 2.0.0  (was: 1.6.1)

> Race condition in MemoryStore.unrollSafely() causes memory leak
> ---------------------------------------------------------------
>
>                 Key: SPARK-13122
>                 URL: https://issues.apache.org/jira/browse/SPARK-13122
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, Streaming
>    Affects Versions: 1.6.0
>            Reporter: Adam Budde
>            Assignee: Adam Budde
>             Fix For: 1.6.1, 2.0.0
>
>
> The 
> [unrollSafely()|https://github.com/apache/spark/blob/v1.6.0/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L249]
>  method in MemoryStore will progressively unroll the contents of a block 
> iterator into memory. It works by reserving an initial chunk of unroll memory 
> and periodically checking if more memory must be reserved as it unrolls the 
> iterator. The memory reserved for performing the unroll is considered 
> "pending" memory and is tracked on a per-task attempt ID bases in a map 
> object named pendingUnrollMemoryMap. When the unrolled block is committed to 
> storage memory in the 
> [tryToPut()|https://github.com/apache/spark/blob/v1.6.0/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L362]
>  method, a method named 
> [releasePendingUnrollMemoryForThisTask()|https://github.com/apache/spark/blob/v1.6.0/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L521]
>  is invoked and this pending memory is released. tryToPut() then proceeds to 
> allocate the storage memory required for the block.
> The unrollSafely() method computes the amount of pending memory used for the 
> unroll operation by saving the amount of unroll memory reserved for the 
> particular task attempt ID at the start of the method in a variable named 
> previousMemoryReserved and subtracting this value from the unroll memory 
> dedicated to the task at the end of the method. This value is stored as the 
> variable amountToTransferToPending. This amount is then subtracted from the 
> per-task unrollMemoryMap and added to pendingUnrollMemoryMap.
> The amount of unroll memory consumed for the task is obtained from 
> unrollMemoryMap via the currentUnrollMemoryForThisTask method. In order for 
> the semantics of unrollSafely() to work, the value of unrollMemoryMap for the 
> task returned by 
> [currentTaskAttemptId()|https://github.com/apache/spark/blob/v1.6.0/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L475]
>  must not be mutated between the computation of previousMemoryReserved and 
> amountToTransferToPending. However, since there is no synchronization in 
> place to ensure that computing both variables and updating the memory maps 
> happens atomically, a race condition can occur when multiple threads for 
> which currentTaskAttemptId() returns the same value are both trying to store 
> blocks. This can lead to a negative value being computed for 
> amountToTransferToPending, corrupting the unrollMemoryMap and 
> pendingUnrollMemoryMap memory maps which in turn can lead to the memory 
> manager leaking unroll memory.
> For example, lets consider how the state of the unrollMemoryMap and 
> pendingUnrollMemoryMap variables might be affected if two threads returning 
> the same value for currentTaskAttemptId() both execute unrollSafely() 
> concurrently:
> ||Thread 1||Thread 2||unrollMemoryMap||pendingUnrollMemoryMap||
> |Enter unrollSafely()|-|0|0|
> |perviousMemoryReserved = 0|-|0|0|
> |(perform unroll)|-|2097152 (2 MiB)|0|
> |-|Enter unrollSafely()|2097152 (2 MiB)|0| 
> |-|perviousMemoryReserved = 2097152|2097152 (2 MiB)|0|
> |-|(performUnroll)|3145728 (3 MiB)|0|
> |Enter finally { }|-|3145728 (3 MiB)|0| 
> |amtToTransfer =  3145728|-|3145728 (3 MiB)|0|
> |Update memory maps|-|0|3145728 (3 MiB)|
> |Return|Enter finally { }|0|3145728 (3 MiB)|
> |-|amtToTrasnfer = -2097152|0|3145728 (3 MiB)|
> |-|Update memory maps|-2097152 (2 MiB)|1048576 (1 MiB)|
> In this example, we end up leaking 2 MiB of unroll memory since both Thread 1 
> and Thread 2 think that the task has only 1 MiB of unroll memory allocated to 
> it when it actually has 3 MiB. The negative value stored in unrollMemoryMap 
> will also propagate to future invocations of unrollSafely().
> In our particular case, this behavior manifests since the 
> currentTaskAttemptId() method is returning -1 for each Spark receiver task. 
> This in and of itself could be a bug and is something I'm going to look into. 
> We noticed that blocks would start to spill over to disk when more than 
> enough storage memory was available, so we inserted log statements into 
> MemoryManager's acquireUnrollMemory() and releaseUnrollMemory() in order to 
> collect the number of unroll bytes acquired and released. When we plot the 
> output, it is apparent that unroll memory is being leaked:
> !https://raw.githubusercontent.com/budde/spark_debug/master/plots/UnifiedMemoryManager/leaked_unroll_bytes.png!
> Running difference between acquire/release bytes:
> !https://raw.githubusercontent.com/budde/spark_debug/master/plots/UnifiedMemoryManager/leaked_unroll_bytes_running_diff.png!
> However, if we change the implementation of unrollSafely() so that the entire 
> method is within a synchronized block, we no longer see this leak:
> !https://raw.githubusercontent.com/budde/spark_debug/master/plots/UnifiedMemoryManager/synchronized_unroll_bytes.png!
> Running difference between acquire/release bytes:
> !https://raw.githubusercontent.com/budde/spark_debug/master/plots/UnifiedMemoryManager/synchronized_unroll_bytes_running_diff.png!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to