Repository: spark
Updated Branches:
  refs/heads/branch-1.5 b3f1e6533 -> 49355d0e0


[SPARK-10474] [SQL] Aggregation fails to allocate memory for pointer array

When `TungstenAggregation` hits memory pressure, it switches from hash-based to 
sort-based aggregation in-place. However, in the process we try to allocate the 
pointer array for writing to the new `UnsafeExternalSorter` *before* actually 
freeing the memory from the hash map. This lead to the following exception:
```
 java.io.IOException: Could not acquire 65536 bytes of memory
        at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.initializeForWriting(UnsafeExternalSorter.java:169)
        at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:220)
        at 
org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:126)
        at 
org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257)
        at 
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435)
```

Author: Andrew Or <and...@databricks.com>

Closes #8827 from andrewor14/allocate-pointer-array.

(cherry picked from commit 7ff8d68cc19299e16dedfd819b9e96480fa6cf44)
Signed-off-by: Andrew Or <and...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49355d0e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49355d0e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49355d0e

Branch: refs/heads/branch-1.5
Commit: 49355d0e032cfe82b907e6cb45c0b894387ba46b
Parents: b3f1e65
Author: Andrew Or <and...@databricks.com>
Authored: Fri Sep 18 23:58:25 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Fri Sep 18 23:58:36 2015 -0700

----------------------------------------------------------------------
 .../unsafe/sort/UnsafeExternalSorter.java       | 14 +++++-
 .../sql/execution/UnsafeKVExternalSorter.java   |  8 +++-
 .../UnsafeFixedWidthAggregationMapSuite.scala   | 49 +++++++++++++++++++-
 3 files changed, 66 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/49355d0e/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index fc364e0..14b6aaf 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -159,7 +159,7 @@ public final class UnsafeExternalSorter {
   /**
    * Allocates new sort data structures. Called when creating the sorter and 
after each spill.
    */
-  private void initializeForWriting() throws IOException {
+  public void initializeForWriting() throws IOException {
     this.writeMetrics = new ShuffleWriteMetrics();
     final long pointerArrayMemory =
       UnsafeInMemorySorter.getMemoryRequirementsForPointerArray(initialSize);
@@ -187,6 +187,14 @@ public final class UnsafeExternalSorter {
    * Sort and spill the current records in response to memory pressure.
    */
   public void spill() throws IOException {
+    spill(true);
+  }
+
+  /**
+   * Sort and spill the current records in response to memory pressure.
+   * @param shouldInitializeForWriting whether to allocate memory for writing 
after the spill
+   */
+  public void spill(boolean shouldInitializeForWriting) throws IOException {
     assert(inMemSorter != null);
     logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
       Thread.currentThread().getId(),
@@ -217,7 +225,9 @@ public final class UnsafeExternalSorter {
     // written to disk. This also counts the space needed to store the 
sorter's pointer array.
     taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
 
-    initializeForWriting();
+    if (shouldInitializeForWriting) {
+      initializeForWriting();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/49355d0e/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index 7db6b7f..b81f67a 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -85,6 +85,7 @@ public final class UnsafeKVExternalSorter {
       // We will use the number of elements in the map as the initialSize of 
the
       // UnsafeInMemorySorter. Because UnsafeInMemorySorter does not accept 0 
as the initialSize,
       // we will use 1 as its initial size if the map is empty.
+      // TODO: track pointer array memory used by this in-memory sorter!
       final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter(
         taskMemoryManager, recordComparator, prefixComparator, Math.max(1, 
map.numElements()));
 
@@ -123,8 +124,13 @@ public final class UnsafeKVExternalSorter {
         pageSizeBytes,
         inMemSorter);
 
-      sorter.spill();
+      // Note: This spill doesn't actually release any memory, so if we try to 
allocate a new
+      // pointer array immediately after the spill then we may fail to acquire 
sufficient space
+      // for it (SPARK-10474). For this reason, we must initialize for writing 
explicitly *after*
+      // we have actually freed memory from our map.
+      sorter.spill(false /* initialize for writing */);
       map.free();
+      sorter.initializeForWriting();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/49355d0e/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
index d1f0b2b..ada4d42 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
@@ -23,9 +23,10 @@ import scala.util.{Try, Random}
 
 import org.scalatest.Matchers
 
-import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, UnsafeProjection}
 import org.apache.spark.{TaskContextImpl, TaskContext, SparkFunSuite}
+import org.apache.spark.shuffle.ShuffleMemoryManager
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, UnsafeProjection}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator, 
TaskMemoryManager}
@@ -325,7 +326,7 @@ class UnsafeFixedWidthAggregationMapSuite
       // At here, we also test if copy is correct.
       iter.getKey.copy()
       iter.getValue.copy()
-      count += 1;
+      count += 1
     }
 
     // 1 record was from the map and 4096 records were explicitly inserted.
@@ -333,4 +334,48 @@ class UnsafeFixedWidthAggregationMapSuite
 
     map.free()
   }
+
+  testWithMemoryLeakDetection("convert to external sorter under memory 
pressure (SPARK-10474)") {
+    val smm = ShuffleMemoryManager.createForTesting(65536)
+    val pageSize = 4096
+    val map = new UnsafeFixedWidthAggregationMap(
+      emptyAggregationBuffer,
+      aggBufferSchema,
+      groupKeySchema,
+      taskMemoryManager,
+      smm,
+      128, // initial capacity
+      pageSize,
+      false // disable perf metrics
+    )
+
+    // Insert into the map until we've run out of space
+    val rand = new Random(42)
+    var hasSpace = true
+    while (hasSpace) {
+      val str = rand.nextString(1024)
+      val buf = 
map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
+      if (buf == null) {
+        hasSpace = false
+      } else {
+        buf.setInt(0, str.length)
+      }
+    }
+
+    // Ensure we're actually maxed out by asserting that we can't acquire even 
just 1 byte
+    assert(smm.tryToAcquire(1) === 0)
+
+    // Convert the map into a sorter. This used to fail before the fix for 
SPARK-10474
+    // because we would try to acquire space for the in-memory sorter pointer 
array before
+    // actually releasing the pages despite having spilled all of them.
+    var sorter: UnsafeKVExternalSorter = null
+    try {
+      sorter = map.destructAndCreateExternalSorter()
+    } finally {
+      if (sorter != null) {
+        sorter.cleanupResources()
+      }
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to