This is an automated email from the ASF dual-hosted git repository. joshrosen pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 1ad1c18fc28 [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator 1ad1c18fc28 is described below commit 1ad1c18fc283acea6d18bc4c8753d3b6e50408ed Author: sandeepvinayak <sandeep....@outlook.com> AuthorDate: Tue May 31 15:28:07 2022 -0700 [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator ### What changes were proposed in this pull request? This PR fixes a deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator. ### Why are the changes needed? We are facing the deadlock issue b/w TaskMemoryManager and UnsafeExternalSorter.SpillableIterator during the join. It turns out that in UnsafeExternalSorter.SpillableIterator#spill() function, it tries to get lock on UnsafeExternalSorter`SpillableIterator` and UnsafeExternalSorter and call `freePage` to free all allocated pages except the last one which takes the lock on TaskMemoryManager. At the same time, there can be another `MemoryConsumer` using `UnsafeExternalSorter` as part of sorting can try to allocatePage needs to get lock on `TaskMemoryManager` which can cause spill to happen which requires lock on `UnsafeExternalSorter` again causing deadlock. There is a similar fix here as well: https://issues.apache.org/jira/browse/SPARK-27338 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing unit tests. Closes #36680 from sandeepvinayak/SPARK-39283. Authored-by: sandeepvinayak <sandeep....@outlook.com> Signed-off-by: Josh Rosen <joshro...@databricks.com> (cherry picked from commit 8d0c035f102b005c2e85f03253f1c0c24f0a539f) Signed-off-by: Josh Rosen <joshro...@databricks.com> --- .../unsafe/sort/UnsafeExternalSorter.java | 160 +++++++++++++-------- 1 file changed, 104 insertions(+), 56 deletions(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index c38327cae8c..ac8170c9d97 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -21,6 +21,7 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.LinkedList; +import java.util.List; import java.util.Queue; import java.util.function.Supplier; @@ -298,16 +299,30 @@ public final class UnsafeExternalSorter extends MemoryConsumer { * @return the number of bytes freed. */ private long freeMemory() { - updatePeakMemoryUsed(); + List<MemoryBlock> pagesToFree = clearAndGetAllocatedPagesToFree(); long memoryFreed = 0; - for (MemoryBlock block : allocatedPages) { + for (MemoryBlock block : pagesToFree) { memoryFreed += block.size(); freePage(block); } + return memoryFreed; + } + + /** + * Clear the allocated pages and return the list of allocated pages to let + * the caller free the page. This is to prevent the deadlock by nested locks + * if the caller locks the UnsafeExternalSorter and call freePage which locks the + * TaskMemoryManager and cause nested locks. + * + * @return list of allocated pages to free + */ + private List<MemoryBlock> clearAndGetAllocatedPagesToFree() { + updatePeakMemoryUsed(); + List<MemoryBlock> pagesToFree = new LinkedList<>(allocatedPages); allocatedPages.clear(); currentPage = null; pageCursor = 0; - return memoryFreed; + return pagesToFree; } /** @@ -328,12 +343,27 @@ public final class UnsafeExternalSorter extends MemoryConsumer { * Frees this sorter's in-memory data structures and cleans up its spill files. */ public void cleanupResources() { - synchronized (this) { - deleteSpillFiles(); - freeMemory(); - if (inMemSorter != null) { - inMemSorter.freeMemory(); - inMemSorter = null; + // To avoid deadlocks, we can't call methods that lock the TaskMemoryManager + // (such as various free() methods) while synchronizing on the UnsafeExternalSorter. + // Instead, we will manipulate UnsafeExternalSorter state inside the synchronized + // lock and perform the actual free() calls outside it. + UnsafeInMemorySorter inMemSorterToFree = null; + List<MemoryBlock> pagesToFree = null; + try { + synchronized (this) { + deleteSpillFiles(); + pagesToFree = clearAndGetAllocatedPagesToFree(); + if (inMemSorter != null) { + inMemSorterToFree = inMemSorter; + inMemSorter = null; + } + } + } finally { + for (MemoryBlock pageToFree : pagesToFree) { + freePage(pageToFree); + } + if (inMemSorterToFree != null) { + inMemSorterToFree.freeMemory(); } } } @@ -576,58 +606,76 @@ public final class UnsafeExternalSorter extends MemoryConsumer { } public long spill() throws IOException { - synchronized (this) { - if (inMemSorter == null) { - return 0L; - } - - long currentPageNumber = upstream.getCurrentPageNumber(); + UnsafeInMemorySorter inMemSorterToFree = null; + List<MemoryBlock> pagesToFree = new LinkedList<>(); + try { + synchronized (this) { + if (inMemSorter == null) { + return 0L; + } - ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics(); - if (numRecords > 0) { - // Iterate over the records that have not been returned and spill them. - final UnsafeSorterSpillWriter spillWriter = new UnsafeSorterSpillWriter( - blockManager, fileBufferSizeBytes, writeMetrics, numRecords); - spillIterator(upstream, spillWriter); - spillWriters.add(spillWriter); - upstream = spillWriter.getReader(serializerManager); - } else { - // Nothing to spill as all records have been read already, but do not return yet, as the - // memory still has to be freed. - upstream = null; - } + long currentPageNumber = upstream.getCurrentPageNumber(); + + ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics(); + if (numRecords > 0) { + // Iterate over the records that have not been returned and spill them. + final UnsafeSorterSpillWriter spillWriter = new UnsafeSorterSpillWriter( + blockManager, fileBufferSizeBytes, writeMetrics, numRecords); + spillIterator(upstream, spillWriter); + spillWriters.add(spillWriter); + upstream = spillWriter.getReader(serializerManager); + } else { + // Nothing to spill as all records have been read already, but do not return yet, as the + // memory still has to be freed. + upstream = null; + } - long released = 0L; - synchronized (UnsafeExternalSorter.this) { - // release the pages except the one that is used. There can still be a caller that - // is accessing the current record. We free this page in that caller's next loadNext() - // call. - for (MemoryBlock page : allocatedPages) { - if (!loaded || page.pageNumber != currentPageNumber) { - released += page.size(); - freePage(page); - } else { - lastPage = page; + long released = 0L; + synchronized (UnsafeExternalSorter.this) { + // release the pages except the one that is used. There can still be a caller that + // is accessing the current record. We free this page in that caller's next loadNext() + // call. + for (MemoryBlock page : allocatedPages) { + if (!loaded || page.pageNumber != currentPageNumber) { + released += page.size(); + // Do not free the page, while we are locking `SpillableIterator`. The `freePage` + // method locks the `TaskMemoryManager`, and it's not a good idea to lock 2 objects + // in sequence. We may hit dead lock if another thread locks `TaskMemoryManager` + // and `SpillableIterator` in sequence, which may happen in + // `TaskMemoryManager.acquireExecutionMemory`. + pagesToFree.add(page); + } else { + lastPage = page; + } + } + allocatedPages.clear(); + if (lastPage != null) { + // Add the last page back to the list of allocated pages to make sure it gets freed in + // case loadNext() never gets called again. + allocatedPages.add(lastPage); } } - allocatedPages.clear(); - if (lastPage != null) { - // Add the last page back to the list of allocated pages to make sure it gets freed in - // case loadNext() never gets called again. - allocatedPages.add(lastPage); - } - } - // in-memory sorter will not be used after spilling - assert(inMemSorter != null); - released += inMemSorter.getMemoryUsage(); - totalSortTimeNanos += inMemSorter.getSortTimeNanos(); - inMemSorter.freeMemory(); - inMemSorter = null; - taskContext.taskMetrics().incMemoryBytesSpilled(released); - taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.bytesWritten()); - totalSpillBytes += released; - return released; + // in-memory sorter will not be used after spilling + assert (inMemSorter != null); + released += inMemSorter.getMemoryUsage(); + totalSortTimeNanos += inMemSorter.getSortTimeNanos(); + // Do not free the sorter while we are locking `SpillableIterator`, + // as this can cause a deadlock. + inMemSorterToFree = inMemSorter; + inMemSorter = null; + taskContext.taskMetrics().incMemoryBytesSpilled(released); + taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.bytesWritten()); + totalSpillBytes += released; + return released; + } + } finally { + for (MemoryBlock pageToFree : pagesToFree) { + freePage(pageToFree); + } + if (inMemSorterToFree != null) { + inMemSorterToFree.freeMemory(); + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org