Repository: spark
Updated Branches:
  refs/heads/master 366376425 -> 079a2609d


[SPARK-17788][SPARK-21033][SQL] fix the potential OOM in UnsafeExternalSorter 
and ShuffleExternalSorter

## What changes were proposed in this pull request?

In `UnsafeInMemorySorter`, one record may take 32 bytes: 1 `long` for pointer, 
1 `long` for key-prefix, and another 2 `long`s as the temporary buffer for 
radix sort.

In `UnsafeExternalSorter`, we set the 
`DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD` to be `1024 * 1024 * 1024 / 2`, and 
hoping the max size of point array to be 8 GB. However this is wrong, `1024 * 
1024 * 1024 / 2 * 32` is actually 16 GB, and if we grow the point array before 
reach this limitation, we may hit the max-page-size error.

Users may see exception like this on large dataset:
```
Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with more 
than 17179869176 bytes
at 
org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:241)
at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:121)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:94)
...
```

Setting `DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD` to a smaller number is not 
enough, users can still set the config to a big number and trigger the too 
large page size issue. This PR fixes it by explicitly handling the too large 
page size exception in the sorter and spill.

This PR also change the type of 
`spark.shuffle.spill.numElementsForceSpillThreshold` to int, because it's only 
compared with `numRecords`, which is an int. This is an internal conf so we 
don't have a serious compatibility issue.

## How was this patch tested?

TODO

Author: Wenchen Fan <[email protected]>

Closes #18251 from cloud-fan/sort.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/079a2609
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/079a2609
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/079a2609

Branch: refs/heads/master
Commit: 079a2609d7ad0a7dd2ec3eaa594e6ed8801a8008
Parents: 3663764
Author: Wenchen Fan <[email protected]>
Authored: Mon Oct 30 17:53:06 2017 +0100
Committer: Wenchen Fan <[email protected]>
Committed: Mon Oct 30 17:53:06 2017 +0100

----------------------------------------------------------------------
 .../org/apache/spark/memory/MemoryConsumer.java |  8 ++++++-
 .../apache/spark/memory/TaskMemoryManager.java  |  5 ++--
 .../spark/memory/TooLargePageException.java     | 24 ++++++++++++++++++++
 .../shuffle/sort/ShuffleExternalSorter.java     | 16 ++++++++-----
 .../unsafe/sort/UnsafeExternalSorter.java       | 21 +++++++++--------
 .../apache/spark/internal/config/package.scala  | 10 ++++++++
 .../unsafe/sort/UnsafeExternalSorterSuite.java  | 11 +++++----
 .../sql/execution/UnsafeExternalRowSorter.java  |  5 ++--
 .../org/apache/spark/sql/internal/SQLConf.scala |  6 ++---
 .../UnsafeFixedWidthAggregationMap.java         |  6 ++---
 .../sql/execution/UnsafeKVExternalSorter.java   |  4 ++--
 .../aggregate/ObjectAggregationIterator.scala   |  6 ++---
 .../aggregate/ObjectAggregationMap.scala        |  5 ++--
 ...ernalAppendOnlyUnsafeRowArrayBenchmark.scala |  3 ++-
 .../execution/UnsafeKVExternalSorterSuite.scala |  4 ++--
 15 files changed, 92 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/079a2609/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java 
b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
index 0efae16..2dff241 100644
--- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
+++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
@@ -83,7 +83,13 @@ public abstract class MemoryConsumer {
   public abstract long spill(long size, MemoryConsumer trigger) throws 
IOException;
 
   /**
-   * Allocates a LongArray of `size`.
+   * Allocates a LongArray of `size`. Note that this method may throw 
`OutOfMemoryError` if Spark
+   * doesn't have enough memory for this allocation, or throw 
`TooLargePageException` if this
+   * `LongArray` is too large to fit in a single page. The caller side should 
take care of these
+   * two exceptions, or make sure the `size` is small enough that won't 
trigger exceptions.
+   *
+   * @throws OutOfMemoryError
+   * @throws TooLargePageException
    */
   public LongArray allocateArray(long size) {
     long required = size * 8L;

http://git-wip-us.apache.org/repos/asf/spark/blob/079a2609/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index 44b60c1..f6b5ea3 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -270,13 +270,14 @@ public class TaskMemoryManager {
    *
    * Returns `null` if there was not enough memory to allocate the page. May 
return a page that
    * contains fewer bytes than requested, so callers should verify the size of 
returned pages.
+   *
+   * @throws TooLargePageException
    */
   public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
     assert(consumer != null);
     assert(consumer.getMode() == tungstenMemoryMode);
     if (size > MAXIMUM_PAGE_SIZE_BYTES) {
-      throw new IllegalArgumentException(
-        "Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE_BYTES + " 
bytes");
+      throw new TooLargePageException(size);
     }
 
     long acquired = acquireExecutionMemory(size, consumer);

http://git-wip-us.apache.org/repos/asf/spark/blob/079a2609/core/src/main/java/org/apache/spark/memory/TooLargePageException.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/memory/TooLargePageException.java 
b/core/src/main/java/org/apache/spark/memory/TooLargePageException.java
new file mode 100644
index 0000000..4abee77
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/memory/TooLargePageException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory;
+
+public class TooLargePageException extends RuntimeException {
+  TooLargePageException(long size) {
+    super("Cannot allocate a page of " + size + " bytes.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/079a2609/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index b4f4630..e80f973 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -31,8 +31,10 @@ import org.slf4j.LoggerFactory;
 import org.apache.spark.SparkConf;
 import org.apache.spark.TaskContext;
 import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.internal.config.package$;
 import org.apache.spark.memory.MemoryConsumer;
 import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.memory.TooLargePageException;
 import org.apache.spark.serializer.DummySerializerInstance;
 import org.apache.spark.serializer.SerializerInstance;
 import org.apache.spark.storage.BlockManager;
@@ -43,7 +45,6 @@ import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.LongArray;
 import org.apache.spark.unsafe.memory.MemoryBlock;
 import org.apache.spark.util.Utils;
-import org.apache.spark.internal.config.package$;
 
 /**
  * An external sorter that is specialized for sort-based shuffle.
@@ -75,10 +76,9 @@ final class ShuffleExternalSorter extends MemoryConsumer {
   private final ShuffleWriteMetrics writeMetrics;
 
   /**
-   * Force this sorter to spill when there are this many elements in memory. 
The default value is
-   * 1024 * 1024 * 1024, which allows the maximum size of the pointer array to 
be 8G.
+   * Force this sorter to spill when there are this many elements in memory.
    */
-  private final long numElementsForSpillThreshold;
+  private final int numElementsForSpillThreshold;
 
   /** The buffer size to use when writing spills using DiskBlockObjectWriter */
   private final int fileBufferSizeBytes;
@@ -123,7 +123,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
     this.fileBufferSizeBytes =
         (int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 
1024;
     this.numElementsForSpillThreshold =
-      conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1024 
* 1024 * 1024);
+        (int) 
conf.get(package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD());
     this.writeMetrics = writeMetrics;
     this.inMemSorter = new ShuffleInMemorySorter(
       this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", 
true));
@@ -325,7 +325,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
    * array and grows the array if additional space is required. If the 
required space cannot be
    * obtained, then the in-memory data will be spilled to disk.
    */
-  private void growPointerArrayIfNecessary() {
+  private void growPointerArrayIfNecessary() throws IOException {
     assert(inMemSorter != null);
     if (!inMemSorter.hasSpaceForAnotherRecord()) {
       long used = inMemSorter.getMemoryUsage();
@@ -333,6 +333,10 @@ final class ShuffleExternalSorter extends MemoryConsumer {
       try {
         // could trigger spilling
         array = allocateArray(used / 8 * 2);
+      } catch (TooLargePageException e) {
+        // The pointer array is too big to fix in a single page, spill.
+        spill();
+        return;
       } catch (OutOfMemoryError e) {
         // should have trigger spilling
         if (!inMemSorter.hasSpaceForAnotherRecord()) {

http://git-wip-us.apache.org/repos/asf/spark/blob/079a2609/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index e749f7b..8b8e15e 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -32,6 +32,7 @@ import org.apache.spark.TaskContext;
 import org.apache.spark.executor.ShuffleWriteMetrics;
 import org.apache.spark.memory.MemoryConsumer;
 import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.memory.TooLargePageException;
 import org.apache.spark.serializer.SerializerManager;
 import org.apache.spark.storage.BlockManager;
 import org.apache.spark.unsafe.Platform;
@@ -68,12 +69,10 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
   private final int fileBufferSizeBytes;
 
   /**
-   * Force this sorter to spill when there are this many elements in memory. 
The default value is
-   * 1024 * 1024 * 1024 / 2 which allows the maximum size of the pointer array 
to be 8G.
+   * Force this sorter to spill when there are this many elements in memory.
    */
-  public static final long DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD = 1024 * 
1024 * 1024 / 2;
+  private final int numElementsForSpillThreshold;
 
-  private final long numElementsForSpillThreshold;
   /**
    * Memory pages that hold the records being sorted. The pages in this list 
are freed when
    * spilling, although in principle we could recycle these pages across 
spills (on the other hand,
@@ -103,11 +102,11 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
       PrefixComparator prefixComparator,
       int initialSize,
       long pageSizeBytes,
-      long numElementsForSpillThreshold,
+      int numElementsForSpillThreshold,
       UnsafeInMemorySorter inMemorySorter) throws IOException {
     UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, 
blockManager,
       serializerManager, taskContext, recordComparatorSupplier, 
prefixComparator, initialSize,
-        numElementsForSpillThreshold, pageSizeBytes, inMemorySorter, false /* 
ignored */);
+        pageSizeBytes, numElementsForSpillThreshold, inMemorySorter, false /* 
ignored */);
     sorter.spill(Long.MAX_VALUE, sorter);
     // The external sorter will be used to insert records, in-memory sorter is 
not needed.
     sorter.inMemSorter = null;
@@ -123,7 +122,7 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
       PrefixComparator prefixComparator,
       int initialSize,
       long pageSizeBytes,
-      long numElementsForSpillThreshold,
+      int numElementsForSpillThreshold,
       boolean canUseRadixSort) {
     return new UnsafeExternalSorter(taskMemoryManager, blockManager, 
serializerManager,
       taskContext, recordComparatorSupplier, prefixComparator, initialSize, 
pageSizeBytes,
@@ -139,7 +138,7 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
       PrefixComparator prefixComparator,
       int initialSize,
       long pageSizeBytes,
-      long numElementsForSpillThreshold,
+      int numElementsForSpillThreshold,
       @Nullable UnsafeInMemorySorter existingInMemorySorter,
       boolean canUseRadixSort) {
     super(taskMemoryManager, pageSizeBytes, 
taskMemoryManager.getTungstenMemoryMode());
@@ -338,7 +337,7 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
    * array and grows the array if additional space is required. If the 
required space cannot be
    * obtained, then the in-memory data will be spilled to disk.
    */
-  private void growPointerArrayIfNecessary() {
+  private void growPointerArrayIfNecessary() throws IOException {
     assert(inMemSorter != null);
     if (!inMemSorter.hasSpaceForAnotherRecord()) {
       long used = inMemSorter.getMemoryUsage();
@@ -346,6 +345,10 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
       try {
         // could trigger spilling
         array = allocateArray(used / 8 * 2);
+      } catch (TooLargePageException e) {
+        // The pointer array is too big to fix in a single page, spill.
+        spill();
+        return;
       } catch (OutOfMemoryError e) {
         // should have trigger spilling
         if (!inMemSorter.hasSpaceForAnotherRecord()) {

http://git-wip-us.apache.org/repos/asf/spark/blob/079a2609/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 6f0247b..57e2da8 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -475,4 +475,14 @@ package object config {
     .stringConf
     .toSequence
     .createOptional
+
+  private[spark] val SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD =
+    ConfigBuilder("spark.shuffle.spill.numElementsForceSpillThreshold")
+      .internal()
+      .doc("The maximum number of elements in memory before forcing the 
shuffle sorter to spill. " +
+        "By default it's Integer.MAX_VALUE, which means we never force the 
sorter to spill, " +
+        "until we reach some limitations, like the max page size limitation 
for the pointer " +
+        "array in the sorter.")
+      .intConf
+      .createWithDefault(Integer.MAX_VALUE)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/079a2609/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 
b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index d0d0334..af4975c 100644
--- 
a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -36,6 +36,7 @@ import org.apache.spark.SparkConf;
 import org.apache.spark.TaskContext;
 import org.apache.spark.executor.ShuffleWriteMetrics;
 import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.internal.config.package$;
 import org.apache.spark.memory.TestMemoryManager;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.serializer.JavaSerializer;
@@ -86,6 +87,9 @@ public class UnsafeExternalSorterSuite {
 
   private final long pageSizeBytes = 
conf.getSizeAsBytes("spark.buffer.pageSize", "4m");
 
+  private final int spillThreshold =
+    (int) 
conf.get(package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD());
+
   @Before
   public void setUp() {
     MockitoAnnotations.initMocks(this);
@@ -159,7 +163,7 @@ public class UnsafeExternalSorterSuite {
       prefixComparator,
       /* initialSize */ 1024,
       pageSizeBytes,
-      UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD,
+      spillThreshold,
       shouldUseRadixSort());
   }
 
@@ -383,7 +387,7 @@ public class UnsafeExternalSorterSuite {
       null,
       /* initialSize */ 1024,
       pageSizeBytes,
-      UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD,
+      spillThreshold,
       shouldUseRadixSort());
     long[] record = new long[100];
     int recordSize = record.length * 8;
@@ -445,7 +449,7 @@ public class UnsafeExternalSorterSuite {
       prefixComparator,
       1024,
       pageSizeBytes,
-      UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD,
+      spillThreshold,
       shouldUseRadixSort());
 
     // Peak memory should be monotonically increasing. More specifically, 
every time
@@ -548,4 +552,3 @@ public class UnsafeExternalSorterSuite {
     }
   }
 }
-

http://git-wip-us.apache.org/repos/asf/spark/blob/079a2609/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index 12a123e..6b002f0 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.spark.SparkEnv;
 import org.apache.spark.TaskContext;
+import org.apache.spark.internal.config.package$;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 import org.apache.spark.sql.types.StructType;
@@ -89,8 +90,8 @@ public final class UnsafeExternalRowSorter {
       sparkEnv.conf().getInt("spark.shuffle.sort.initialBufferSize",
                              DEFAULT_INITIAL_SORT_BUFFER_SIZE),
       pageSizeBytes,
-      
SparkEnv.get().conf().getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
-        UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
+      (int) SparkEnv.get().conf().get(
+        package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD()),
       canUseRadixSort
     );
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/079a2609/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 5203e88..ede116e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -884,7 +884,7 @@ object SQLConf {
       .internal()
       .doc("Threshold for number of rows to be spilled by window operator")
       .intConf
-      
.createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)
+      
.createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
 
   val SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
     buildConf("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold")
@@ -899,7 +899,7 @@ object SQLConf {
       .internal()
       .doc("Threshold for number of rows to be spilled by sort merge join 
operator")
       .intConf
-      
.createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)
+      
.createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
 
   val CARTESIAN_PRODUCT_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
     buildConf("spark.sql.cartesianProductExec.buffer.in.memory.threshold")
@@ -914,7 +914,7 @@ object SQLConf {
       .internal()
       .doc("Threshold for number of rows to be spilled by cartesian product 
operator")
       .intConf
-      
.createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)
+      
.createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
 
   val SUPPORT_QUOTED_REGEX_COLUMN_NAME = 
buildConf("spark.sql.parser.quotedRegexColumnNames")
     .doc("When true, quoted Identifiers (using backticks) in SELECT statement 
are interpreted" +

http://git-wip-us.apache.org/repos/asf/spark/blob/079a2609/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
index 8fea46a..c7c4c7b 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution;
 import java.io.IOException;
 
 import org.apache.spark.SparkEnv;
+import org.apache.spark.internal.config.package$;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
@@ -29,7 +30,6 @@ import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.KVIterator;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.map.BytesToBytesMap;
-import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter;
 
 /**
  * Unsafe-based HashMap for performing aggregations where the aggregated 
values are fixed-width.
@@ -238,8 +238,8 @@ public final class UnsafeFixedWidthAggregationMap {
       SparkEnv.get().blockManager(),
       SparkEnv.get().serializerManager(),
       map.getPageSizeBytes(),
-      
SparkEnv.get().conf().getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
-        UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
+      (int) SparkEnv.get().conf().get(
+        package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD()),
       map);
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/079a2609/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index 6aa52f1..eb2fe82 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -57,7 +57,7 @@ public final class UnsafeKVExternalSorter {
       BlockManager blockManager,
       SerializerManager serializerManager,
       long pageSizeBytes,
-      long numElementsForSpillThreshold) throws IOException {
+      int numElementsForSpillThreshold) throws IOException {
     this(keySchema, valueSchema, blockManager, serializerManager, 
pageSizeBytes,
       numElementsForSpillThreshold, null);
   }
@@ -68,7 +68,7 @@ public final class UnsafeKVExternalSorter {
       BlockManager blockManager,
       SerializerManager serializerManager,
       long pageSizeBytes,
-      long numElementsForSpillThreshold,
+      int numElementsForSpillThreshold,
       @Nullable BytesToBytesMap map) throws IOException {
     this.keySchema = keySchema;
     this.valueSchema = valueSchema;

http://git-wip-us.apache.org/repos/asf/spark/blob/079a2609/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
index c68dbc7..43514f5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.aggregate
 
 import org.apache.spark.{SparkEnv, TaskContext}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -315,9 +315,7 @@ class SortBasedAggregator(
       SparkEnv.get.blockManager,
       SparkEnv.get.serializerManager,
       TaskContext.get().taskMemoryManager().pageSizeBytes,
-      SparkEnv.get.conf.getLong(
-        "spark.shuffle.spill.numElementsForceSpillThreshold",
-        UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
+      
SparkEnv.get.conf.get(config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD),
       null
     )
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/079a2609/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationMap.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationMap.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationMap.scala
index f2d4f6c..b5372bc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationMap.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationMap.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.aggregate
 import java.{util => ju}
 
 import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.config
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection, 
UnsafeRow}
 import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, 
TypedImperativeAggregate}
@@ -73,9 +74,7 @@ class ObjectAggregationMap() {
       SparkEnv.get.blockManager,
       SparkEnv.get.serializerManager,
       TaskContext.get().taskMemoryManager().pageSizeBytes,
-      SparkEnv.get.conf.getLong(
-        "spark.shuffle.spill.numElementsForceSpillThreshold",
-        UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
+      
SparkEnv.get.conf.get(config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD),
       null
     )
 

http://git-wip-us.apache.org/repos/asf/spark/blob/079a2609/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
index efe28af..59397db 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.{SparkConf, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.internal.config
 import org.apache.spark.memory.MemoryTestingUtils
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.util.Benchmark
@@ -231,6 +232,6 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
     ExternalAppendOnlyUnsafeRowArray                 5 /    6         29.8     
     33.5       0.8X
     */
     testAgainstRawUnsafeExternalSorter(
-      UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt, 10 
* 1000, 1 << 4)
+      
config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get, 10 * 
1000, 1 << 4)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/079a2609/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
index 3d869c7..359525f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
@@ -22,13 +22,13 @@ import java.util.Properties
 import scala.util.Random
 
 import org.apache.spark._
+import org.apache.spark.internal.config
 import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
 import org.apache.spark.sql.{RandomDataGenerator, Row}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, 
UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
-import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
 
 /**
  * Test suite for [[UnsafeKVExternalSorter]], with randomly generated test 
data.
@@ -125,7 +125,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite 
with SharedSQLContext {
 
     val sorter = new UnsafeKVExternalSorter(
       keySchema, valueSchema, SparkEnv.get.blockManager, 
SparkEnv.get.serializerManager,
-      pageSize, UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD)
+      pageSize, 
config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
 
     // Insert the keys and values into the sorter
     inputData.foreach { case (k, v) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to