[ 
https://issues.apache.org/jira/browse/SPARK-26265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16718297#comment-16718297
 ] 

ASF GitHub Bot commented on SPARK-26265:
----------------------------------------

viirya closed pull request #23289: [SPARK-26265][Core][BRANCH-2.4] Fix deadlock 
in BytesToBytesMap.MapIterator when locking both BytesToBytesMap.MapIterator 
and TaskMemoryManager
URL: https://github.com/apache/spark/pull/23289
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 9b6cbab38cbcc..64650336c9371 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -267,11 +267,18 @@ private MapIterator(int numRecords, Location loc, boolean 
destructive) {
     }
 
     private void advanceToNextPage() {
+      // SPARK-26265: We will first lock this `MapIterator` and then 
`TaskMemoryManager` when going
+      // to free a memory page by calling `freePage`. At the same time, it is 
possibly that another
+      // memory consumer first locks `TaskMemoryManager` and then this 
`MapIterator` when it
+      // acquires memory and causes spilling on this `MapIterator`. To avoid 
deadlock here, we keep
+      // reference to the page to free and free it after releasing the lock of 
`MapIterator`.
+      MemoryBlock pageToFree = null;
+
       synchronized (this) {
         int nextIdx = dataPages.indexOf(currentPage) + 1;
         if (destructive && currentPage != null) {
           dataPages.remove(currentPage);
-          freePage(currentPage);
+          pageToFree = currentPage;
           nextIdx --;
         }
         if (dataPages.size() > nextIdx) {
@@ -295,6 +302,9 @@ private void advanceToNextPage() {
           }
         }
       }
+      if (pageToFree != null) {
+        freePage(pageToFree);
+      }
     }
 
     @Override
diff --git a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java 
b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
index 0bbaea6b834b8..6aa577d1bf797 100644
--- a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
+++ b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
@@ -38,12 +38,12 @@ public long spill(long size, MemoryConsumer trigger) throws 
IOException {
     return used;
   }
 
-  void use(long size) {
+  public void use(long size) {
     long got = taskMemoryManager.acquireExecutionMemory(size, this);
     used += got;
   }
 
-  void free(long size) {
+  public void free(long size) {
     used -= size;
     taskMemoryManager.releaseExecutionMemory(size, this);
   }
diff --git 
a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
 
b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 53a233f698c7a..278d28f7bf479 100644
--- 
a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ 
b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -33,6 +33,8 @@
 
 import org.apache.spark.SparkConf;
 import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.memory.TestMemoryConsumer;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.memory.TestMemoryManager;
 import org.apache.spark.network.util.JavaUtils;
@@ -667,4 +669,49 @@ public void testPeakMemoryUsed() {
     }
   }
 
+  @Test
+  public void avoidDeadlock() throws InterruptedException {
+    memoryManager.limit(PAGE_SIZE_BYTES);
+    MemoryMode mode = useOffHeapMemoryAllocator() ? MemoryMode.OFF_HEAP: 
MemoryMode.ON_HEAP;
+    TestMemoryConsumer c1 = new TestMemoryConsumer(taskMemoryManager, mode);
+    BytesToBytesMap map =
+      new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 
1, 0.5, 1024, false);
+
+    Thread thread = new Thread(() -> {
+      int i = 0;
+      long used = 0;
+      while (i < 10) {
+        c1.use(10000000);
+        used += 10000000;
+        i++;
+      }
+      c1.free(used);
+    });
+
+    try {
+      int i;
+      for (i = 0; i < 1024; i++) {
+        final long[] arr = new long[]{i};
+        final BytesToBytesMap.Location loc = map.lookup(arr, 
Platform.LONG_ARRAY_OFFSET, 8);
+        loc.append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, 
Platform.LONG_ARRAY_OFFSET, 8);
+      }
+
+      // Starts to require memory at another memory consumer.
+      thread.start();
+
+      BytesToBytesMap.MapIterator iter = map.destructiveIterator();
+      for (i = 0; i < 1024; i++) {
+        iter.next();
+      }
+      assertFalse(iter.hasNext());
+    } finally {
+      map.free();
+      thread.join();
+      for (File spillFile : spillFilesCreated) {
+        assertFalse("Spill file " + spillFile.getPath() + " was not cleaned 
up",
+          spillFile.exists());
+      }
+    }
+  }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> deadlock between TaskMemoryManager and BytesToBytesMap$MapIterator
> ------------------------------------------------------------------
>
>                 Key: SPARK-26265
>                 URL: https://issues.apache.org/jira/browse/SPARK-26265
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.2
>            Reporter: qian han
>            Assignee: Liang-Chi Hsieh
>            Priority: Major
>             Fix For: 2.4.1, 3.0.0
>
>
> The application is running on a cluster with 72000 cores and 182000G mem.
> Enviroment:
> |spark.dynamicAllocation.minExecutors|5|
> |spark.dynamicAllocation.initialExecutors|30|
> |spark.dynamicAllocation.maxExecutors|400|
> |spark.executor.cores|4|
> |spark.executor.memory|20g|
>  
>   
> Stage description:
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:364)
>  org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:422) 
> org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:357) 
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:193)
>  
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
>  sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  java.lang.reflect.Method.invoke(Method.java:498) 
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) 
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
>  org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198) 
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228) 
> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137) 
> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>  
> jstack information as follow:
> Found one Java-level deadlock: ============================= 
> "Thread-ScriptTransformation-Feed": waiting to lock monitor 
> 0x0000000000e0cb18 (object 0x00000002f1641538, a 
> org.apache.spark.memory.TaskMemoryManager), which is held by "Executor task 
> launch worker for task 18899" "Executor task launch worker for task 18899": 
> waiting to lock monitor 0x0000000000e09788 (object 0x0000000302faa3b0, a 
> org.apache.spark.unsafe.map.BytesToBytesMap$MapIterator), which is held by 
> "Thread-ScriptTransformation-Feed" Java stack information for the threads 
> listed above: =================================================== 
> "Thread-ScriptTransformation-Feed": at 
> org.apache.spark.memory.TaskMemoryManager.freePage(TaskMemoryManager.java:332)
>  - waiting to lock <0x00000002f1641538> (a 
> org.apache.spark.memory.TaskMemoryManager) at 
> org.apache.spark.memory.MemoryConsumer.freePage(MemoryConsumer.java:130) at 
> org.apache.spark.unsafe.map.BytesToBytesMap.access$300(BytesToBytesMap.java:66)
>  at 
> org.apache.spark.unsafe.map.BytesToBytesMap$MapIterator.advanceToNextPage(BytesToBytesMap.java:274)
>  - locked <0x0000000302faa3b0> (a 
> org.apache.spark.unsafe.map.BytesToBytesMap$MapIterator) at 
> org.apache.spark.unsafe.map.BytesToBytesMap$MapIterator.next(BytesToBytesMap.java:313)
>  at 
> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap$1.next(UnsafeFixedWidthAggregationMap.java:173)
>  at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
>  Source) at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>  at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
> scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
> org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread$$anonfun$run$1.apply$mcV$sp(ScriptTransformationExec.scala:281)
>  at 
> org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread$$anonfun$run$1.apply(ScriptTransformationExec.scala:270)
>  at 
> org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread$$anonfun$run$1.apply(ScriptTransformationExec.scala:270)
>  at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1995) at 
> org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread.run(ScriptTransformationExec.scala:270)
>  "Executor task launch worker for task 18899": at 
> org.apache.spark.unsafe.map.BytesToBytesMap$MapIterator.spill(BytesToBytesMap.java:345)
>  - waiting to lock <0x0000000302faa3b0> (a 
> org.apache.spark.unsafe.map.BytesToBytesMap$MapIterator) at 
> org.apache.spark.unsafe.map.BytesToBytesMap.spill(BytesToBytesMap.java:772) 
> at 
> org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:180)
>  - locked <0x00000002f1641538> (a org.apache.spark.memory.TaskMemoryManager) 
> at 
> org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:283)
>  at 
> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:117) 
> at 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:371)
>  at 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:394)
>  at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:267)
>  at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:188)
>  at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at 
> org.apache.spark.scheduler.Task.run(Task.scala:109) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:748) Found 1 deadlock.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to