This is an automated email from the ASF dual-hosted git repository.

joshrosen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 606830e9cae [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager 
and UnsafeExternalSorter.SpillableIterator
606830e9cae is described below

commit 606830e9caefeec90bd0556f367395b1acbc827c
Author: sandeepvinayak <sandeep....@outlook.com>
AuthorDate: Tue May 31 15:28:07 2022 -0700

    [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager and 
UnsafeExternalSorter.SpillableIterator
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a deadlock between TaskMemoryManager and 
UnsafeExternalSorter.SpillableIterator.
    
    ### Why are the changes needed?
    
    We are facing the deadlock issue b/w TaskMemoryManager and 
UnsafeExternalSorter.SpillableIterator during the join. It turns out that in 
UnsafeExternalSorter.SpillableIterator#spill() function, it tries to get lock 
on UnsafeExternalSorter`SpillableIterator` and UnsafeExternalSorter and call 
`freePage` to free all allocated pages except the last one which takes the lock 
on TaskMemoryManager.
    At the same time, there can be another `MemoryConsumer` using 
`UnsafeExternalSorter` as part of sorting can try to allocatePage needs to get 
lock on `TaskMemoryManager` which can cause spill to happen which requires lock 
on `UnsafeExternalSorter` again causing deadlock.
    
    There is a similar fix here as well:
    https://issues.apache.org/jira/browse/SPARK-27338
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing unit tests.
    
    Closes #36680 from sandeepvinayak/SPARK-39283.
    
    Authored-by: sandeepvinayak <sandeep....@outlook.com>
    Signed-off-by: Josh Rosen <joshro...@databricks.com>
    (cherry picked from commit 8d0c035f102b005c2e85f03253f1c0c24f0a539f)
    Signed-off-by: Josh Rosen <joshro...@databricks.com>
---
 .../unsafe/sort/UnsafeExternalSorter.java          | 160 +++++++++++++--------
 1 file changed, 104 insertions(+), 56 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index c38327cae8c..ac8170c9d97 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -21,6 +21,7 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Queue;
 import java.util.function.Supplier;
 
@@ -298,16 +299,30 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
    * @return the number of bytes freed.
    */
   private long freeMemory() {
-    updatePeakMemoryUsed();
+    List<MemoryBlock> pagesToFree = clearAndGetAllocatedPagesToFree();
     long memoryFreed = 0;
-    for (MemoryBlock block : allocatedPages) {
+    for (MemoryBlock block : pagesToFree) {
       memoryFreed += block.size();
       freePage(block);
     }
+    return memoryFreed;
+  }
+
+  /**
+   * Clear the allocated pages and return the list of allocated pages to let
+   * the caller free the page. This is to prevent the deadlock by nested locks
+   * if the caller locks the UnsafeExternalSorter and call freePage which 
locks the
+   * TaskMemoryManager and cause nested locks.
+   *
+   * @return list of allocated pages to free
+   */
+  private List<MemoryBlock> clearAndGetAllocatedPagesToFree() {
+    updatePeakMemoryUsed();
+    List<MemoryBlock> pagesToFree = new LinkedList<>(allocatedPages);
     allocatedPages.clear();
     currentPage = null;
     pageCursor = 0;
-    return memoryFreed;
+    return pagesToFree;
   }
 
   /**
@@ -328,12 +343,27 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
    * Frees this sorter's in-memory data structures and cleans up its spill 
files.
    */
   public void cleanupResources() {
-    synchronized (this) {
-      deleteSpillFiles();
-      freeMemory();
-      if (inMemSorter != null) {
-        inMemSorter.freeMemory();
-        inMemSorter = null;
+    // To avoid deadlocks, we can't call methods that lock the 
TaskMemoryManager
+    // (such as various free() methods) while synchronizing on the 
UnsafeExternalSorter.
+    // Instead, we will manipulate UnsafeExternalSorter state inside the 
synchronized
+    // lock and perform the actual free() calls outside it.
+    UnsafeInMemorySorter inMemSorterToFree = null;
+    List<MemoryBlock> pagesToFree = null;
+    try {
+      synchronized (this) {
+        deleteSpillFiles();
+        pagesToFree = clearAndGetAllocatedPagesToFree();
+        if (inMemSorter != null) {
+          inMemSorterToFree = inMemSorter;
+          inMemSorter = null;
+        }
+      }
+    } finally {
+      for (MemoryBlock pageToFree : pagesToFree) {
+        freePage(pageToFree);
+      }
+      if (inMemSorterToFree != null) {
+        inMemSorterToFree.freeMemory();
       }
     }
   }
@@ -576,58 +606,76 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
     }
 
     public long spill() throws IOException {
-      synchronized (this) {
-        if (inMemSorter == null) {
-          return 0L;
-        }
-
-        long currentPageNumber = upstream.getCurrentPageNumber();
+      UnsafeInMemorySorter inMemSorterToFree = null;
+      List<MemoryBlock> pagesToFree = new LinkedList<>();
+      try {
+        synchronized (this) {
+          if (inMemSorter == null) {
+            return 0L;
+          }
 
-        ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
-        if (numRecords > 0) {
-          // Iterate over the records that have not been returned and spill 
them.
-          final UnsafeSorterSpillWriter spillWriter = new 
UnsafeSorterSpillWriter(
-                  blockManager, fileBufferSizeBytes, writeMetrics, numRecords);
-          spillIterator(upstream, spillWriter);
-          spillWriters.add(spillWriter);
-          upstream = spillWriter.getReader(serializerManager);
-        } else {
-          // Nothing to spill as all records have been read already, but do 
not return yet, as the
-          // memory still has to be freed.
-          upstream = null;
-        }
+          long currentPageNumber = upstream.getCurrentPageNumber();
+
+          ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
+          if (numRecords > 0) {
+            // Iterate over the records that have not been returned and spill 
them.
+            final UnsafeSorterSpillWriter spillWriter = new 
UnsafeSorterSpillWriter(
+                    blockManager, fileBufferSizeBytes, writeMetrics, 
numRecords);
+            spillIterator(upstream, spillWriter);
+            spillWriters.add(spillWriter);
+            upstream = spillWriter.getReader(serializerManager);
+          } else {
+            // Nothing to spill as all records have been read already, but do 
not return yet, as the
+            // memory still has to be freed.
+            upstream = null;
+          }
 
-        long released = 0L;
-        synchronized (UnsafeExternalSorter.this) {
-          // release the pages except the one that is used. There can still be 
a caller that
-          // is accessing the current record. We free this page in that 
caller's next loadNext()
-          // call.
-          for (MemoryBlock page : allocatedPages) {
-            if (!loaded || page.pageNumber != currentPageNumber) {
-              released += page.size();
-              freePage(page);
-            } else {
-              lastPage = page;
+          long released = 0L;
+          synchronized (UnsafeExternalSorter.this) {
+            // release the pages except the one that is used. There can still 
be a caller that
+            // is accessing the current record. We free this page in that 
caller's next loadNext()
+            // call.
+            for (MemoryBlock page : allocatedPages) {
+              if (!loaded || page.pageNumber != currentPageNumber) {
+                released += page.size();
+                // Do not free the page, while we are locking 
`SpillableIterator`. The `freePage`
+                // method locks the `TaskMemoryManager`, and it's not a good 
idea to lock 2 objects
+                // in sequence. We may hit dead lock if another thread locks 
`TaskMemoryManager`
+                // and `SpillableIterator` in sequence, which may happen in
+                // `TaskMemoryManager.acquireExecutionMemory`.
+                pagesToFree.add(page);
+              } else {
+                lastPage = page;
+              }
+            }
+            allocatedPages.clear();
+            if (lastPage != null) {
+              // Add the last page back to the list of allocated pages to make 
sure it gets freed in
+              // case loadNext() never gets called again.
+              allocatedPages.add(lastPage);
             }
           }
-          allocatedPages.clear();
-          if (lastPage != null) {
-            // Add the last page back to the list of allocated pages to make 
sure it gets freed in
-            // case loadNext() never gets called again.
-            allocatedPages.add(lastPage);
-          }
-        }
 
-        // in-memory sorter will not be used after spilling
-        assert(inMemSorter != null);
-        released += inMemSorter.getMemoryUsage();
-        totalSortTimeNanos += inMemSorter.getSortTimeNanos();
-        inMemSorter.freeMemory();
-        inMemSorter = null;
-        taskContext.taskMetrics().incMemoryBytesSpilled(released);
-        
taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.bytesWritten());
-        totalSpillBytes += released;
-        return released;
+          // in-memory sorter will not be used after spilling
+          assert (inMemSorter != null);
+          released += inMemSorter.getMemoryUsage();
+          totalSortTimeNanos += inMemSorter.getSortTimeNanos();
+          // Do not free the sorter while we are locking `SpillableIterator`,
+          // as this can cause a deadlock.
+          inMemSorterToFree = inMemSorter;
+          inMemSorter = null;
+          taskContext.taskMetrics().incMemoryBytesSpilled(released);
+          
taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.bytesWritten());
+          totalSpillBytes += released;
+          return released;
+        }
+      } finally {
+        for (MemoryBlock pageToFree : pagesToFree) {
+          freePage(pageToFree);
+        }
+        if (inMemSorterToFree != null) {
+          inMemSorterToFree.freeMemory();
+        }
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to