[ https://issues.apache.org/jira/browse/SPARK-26265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16718297#comment-16718297 ]
ASF GitHub Bot commented on SPARK-26265: ---------------------------------------- viirya closed pull request #23289: [SPARK-26265][Core][BRANCH-2.4] Fix deadlock in BytesToBytesMap.MapIterator when locking both BytesToBytesMap.MapIterator and TaskMemoryManager URL: https://github.com/apache/spark/pull/23289 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 9b6cbab38cbcc..64650336c9371 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -267,11 +267,18 @@ private MapIterator(int numRecords, Location loc, boolean destructive) { } private void advanceToNextPage() { + // SPARK-26265: We will first lock this `MapIterator` and then `TaskMemoryManager` when going + // to free a memory page by calling `freePage`. At the same time, it is possibly that another + // memory consumer first locks `TaskMemoryManager` and then this `MapIterator` when it + // acquires memory and causes spilling on this `MapIterator`. To avoid deadlock here, we keep + // reference to the page to free and free it after releasing the lock of `MapIterator`. + MemoryBlock pageToFree = null; + synchronized (this) { int nextIdx = dataPages.indexOf(currentPage) + 1; if (destructive && currentPage != null) { dataPages.remove(currentPage); - freePage(currentPage); + pageToFree = currentPage; nextIdx --; } if (dataPages.size() > nextIdx) { @@ -295,6 +302,9 @@ private void advanceToNextPage() { } } } + if (pageToFree != null) { + freePage(pageToFree); + } } @Override diff --git a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java index 0bbaea6b834b8..6aa577d1bf797 100644 --- a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java +++ b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java @@ -38,12 +38,12 @@ public long spill(long size, MemoryConsumer trigger) throws IOException { return used; } - void use(long size) { + public void use(long size) { long got = taskMemoryManager.acquireExecutionMemory(size, this); used += got; } - void free(long size) { + public void free(long size) { used -= size; taskMemoryManager.releaseExecutionMemory(size, this); } diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java index 53a233f698c7a..278d28f7bf479 100644 --- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java +++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java @@ -33,6 +33,8 @@ import org.apache.spark.SparkConf; import org.apache.spark.executor.ShuffleWriteMetrics; +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.memory.TestMemoryConsumer; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.memory.TestMemoryManager; import org.apache.spark.network.util.JavaUtils; @@ -667,4 +669,49 @@ public void testPeakMemoryUsed() { } } + @Test + public void avoidDeadlock() throws InterruptedException { + memoryManager.limit(PAGE_SIZE_BYTES); + MemoryMode mode = useOffHeapMemoryAllocator() ? MemoryMode.OFF_HEAP: MemoryMode.ON_HEAP; + TestMemoryConsumer c1 = new TestMemoryConsumer(taskMemoryManager, mode); + BytesToBytesMap map = + new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.5, 1024, false); + + Thread thread = new Thread(() -> { + int i = 0; + long used = 0; + while (i < 10) { + c1.use(10000000); + used += 10000000; + i++; + } + c1.free(used); + }); + + try { + int i; + for (i = 0; i < 1024; i++) { + final long[] arr = new long[]{i}; + final BytesToBytesMap.Location loc = map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8); + loc.append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8); + } + + // Starts to require memory at another memory consumer. + thread.start(); + + BytesToBytesMap.MapIterator iter = map.destructiveIterator(); + for (i = 0; i < 1024; i++) { + iter.next(); + } + assertFalse(iter.hasNext()); + } finally { + map.free(); + thread.join(); + for (File spillFile : spillFilesCreated) { + assertFalse("Spill file " + spillFile.getPath() + " was not cleaned up", + spillFile.exists()); + } + } + } + } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > deadlock between TaskMemoryManager and BytesToBytesMap$MapIterator > ------------------------------------------------------------------ > > Key: SPARK-26265 > URL: https://issues.apache.org/jira/browse/SPARK-26265 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.3.2 > Reporter: qian han > Assignee: Liang-Chi Hsieh > Priority: Major > Fix For: 2.4.1, 3.0.0 > > > The application is running on a cluster with 72000 cores and 182000G mem. > Enviroment: > |spark.dynamicAllocation.minExecutors|5| > |spark.dynamicAllocation.initialExecutors|30| > |spark.dynamicAllocation.maxExecutors|400| > |spark.executor.cores|4| > |spark.executor.memory|20g| > > > Stage description: > org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:364) > org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:422) > org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:357) > org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:193) > > org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > java.lang.reflect.Method.invoke(Method.java:498) > org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894) > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198) > org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228) > org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137) > org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > > jstack information as follow: > Found one Java-level deadlock: ============================= > "Thread-ScriptTransformation-Feed": waiting to lock monitor > 0x0000000000e0cb18 (object 0x00000002f1641538, a > org.apache.spark.memory.TaskMemoryManager), which is held by "Executor task > launch worker for task 18899" "Executor task launch worker for task 18899": > waiting to lock monitor 0x0000000000e09788 (object 0x0000000302faa3b0, a > org.apache.spark.unsafe.map.BytesToBytesMap$MapIterator), which is held by > "Thread-ScriptTransformation-Feed" Java stack information for the threads > listed above: =================================================== > "Thread-ScriptTransformation-Feed": at > org.apache.spark.memory.TaskMemoryManager.freePage(TaskMemoryManager.java:332) > - waiting to lock <0x00000002f1641538> (a > org.apache.spark.memory.TaskMemoryManager) at > org.apache.spark.memory.MemoryConsumer.freePage(MemoryConsumer.java:130) at > org.apache.spark.unsafe.map.BytesToBytesMap.access$300(BytesToBytesMap.java:66) > at > org.apache.spark.unsafe.map.BytesToBytesMap$MapIterator.advanceToNextPage(BytesToBytesMap.java:274) > - locked <0x0000000302faa3b0> (a > org.apache.spark.unsafe.map.BytesToBytesMap$MapIterator) at > org.apache.spark.unsafe.map.BytesToBytesMap$MapIterator.next(BytesToBytesMap.java:313) > at > org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap$1.next(UnsafeFixedWidthAggregationMap.java:173) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown > Source) at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at > scala.collection.Iterator$class.foreach(Iterator.scala:893) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at > org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread$$anonfun$run$1.apply$mcV$sp(ScriptTransformationExec.scala:281) > at > org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread$$anonfun$run$1.apply(ScriptTransformationExec.scala:270) > at > org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread$$anonfun$run$1.apply(ScriptTransformationExec.scala:270) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1995) at > org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread.run(ScriptTransformationExec.scala:270) > "Executor task launch worker for task 18899": at > org.apache.spark.unsafe.map.BytesToBytesMap$MapIterator.spill(BytesToBytesMap.java:345) > - waiting to lock <0x0000000302faa3b0> (a > org.apache.spark.unsafe.map.BytesToBytesMap$MapIterator) at > org.apache.spark.unsafe.map.BytesToBytesMap.spill(BytesToBytesMap.java:772) > at > org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:180) > - locked <0x00000002f1641538> (a org.apache.spark.memory.TaskMemoryManager) > at > org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:283) > at > org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:117) > at > org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:371) > at > org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:394) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:267) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:188) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at > org.apache.spark.scheduler.Task.run(Task.scala:109) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) Found 1 deadlock. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org