Repository: spark
Updated Branches:
  refs/heads/master 71c2b81aa -> bebd2e1ce


[SPARK-22222][CORE] Fix the ARRAY_MAX in BufferHolder and add a test

## What changes were proposed in this pull request?

We should not break the assumption that the length of the allocated byte array 
is word rounded:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java#L170
So we want to use `Integer.MAX_VALUE - 15` instead of `Integer.MAX_VALUE - 8` 
as the upper bound of an allocated byte array.

cc: srowen gatorsmile
## How was this patch tested?

Since the Spark unit test JVM has less than 1GB heap, here we run the test code 
as a submit job, so it can run on a JVM has 4GB memory.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Feng Liu <[email protected]>

Closes #19460 from liufengdb/fix_array_max.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bebd2e1c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bebd2e1c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bebd2e1c

Branch: refs/heads/master
Commit: bebd2e1ce10a460555f75cda75df33f39a783469
Parents: 71c2b81
Author: Feng Liu <[email protected]>
Authored: Mon Oct 9 21:34:37 2017 -0700
Committer: gatorsmile <[email protected]>
Committed: Mon Oct 9 21:34:37 2017 -0700

----------------------------------------------------------------------
 .../spark/unsafe/array/ByteArrayMethods.java    |  7 ++
 .../spark/unsafe/map/HashMapGrowthStrategy.java |  6 +-
 .../util/collection/PartitionedPairBuffer.scala |  6 +-
 .../apache/spark/deploy/SparkSubmitSuite.scala  | 52 +++++++------
 .../expressions/codegen/BufferHolder.java       |  7 +-
 .../codegen/BufferHolderSparkSubmitSutie.scala  | 78 ++++++++++++++++++++
 .../vectorized/WritableColumnVector.java        |  3 +-
 7 files changed, 124 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bebd2e1c/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
----------------------------------------------------------------------
diff --git 
a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
index 9c551ab..f121b1c 100644
--- 
a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
+++ 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
@@ -40,6 +40,13 @@ public class ByteArrayMethods {
     }
   }
 
+  // Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max 
is somewhat smaller.
+  // Be conservative and lower the cap a little.
+  // Refer to 
"http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/tip/src/share/classes/java/util/ArrayList.java#l229";
+  // This value is word rounded. Use this value if the allocated byte arrays 
are used to store other
+  // types rather than bytes.
+  public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15;
+
   private static final boolean unaligned = Platform.unaligned();
   /**
    * Optimized byte array equality check for byte arrays.

http://git-wip-us.apache.org/repos/asf/spark/blob/bebd2e1c/core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java 
b/core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java
index b8c2294..ee6d9f7 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java
@@ -17,6 +17,8 @@
 
 package org.apache.spark.unsafe.map;
 
+import org.apache.spark.unsafe.array.ByteArrayMethods;
+
 /**
  * Interface that defines how we can grow the size of a hash map when it is 
over a threshold.
  */
@@ -31,9 +33,7 @@ public interface HashMapGrowthStrategy {
 
   class Doubling implements HashMapGrowthStrategy {
 
-    // Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max 
is somewhat
-    // smaller. Be conservative and lower the cap a little.
-    private static final int ARRAY_MAX = Integer.MAX_VALUE - 8;
+    private static final int ARRAY_MAX = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
 
     @Override
     public int nextCapacity(int currentCapacity) {

http://git-wip-us.apache.org/repos/asf/spark/blob/bebd2e1c/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala
index b755e5d..e17a9de 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/PartitionedPairBuffer.scala
@@ -19,6 +19,8 @@ package org.apache.spark.util.collection
 
 import java.util.Comparator
 
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.util.collection.WritablePartitionedPairCollection._
 
 /**
@@ -96,7 +98,5 @@ private[spark] class PartitionedPairBuffer[K, 
V](initialCapacity: Int = 64)
 }
 
 private object PartitionedPairBuffer {
-  // Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max 
is somewhat
-  // smaller. Be conservative and lower the cap a little.
-  val MAXIMUM_CAPACITY: Int = (Int.MaxValue - 8) / 2
+  val MAXIMUM_CAPACITY: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 2
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bebd2e1c/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index b06f2e2..b52da4c 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -100,6 +100,8 @@ class SparkSubmitSuite
   with TimeLimits
   with TestPrematureExit {
 
+  import SparkSubmitSuite._
+
   override def beforeEach() {
     super.beforeEach()
     System.setProperty("spark.testing", "true")
@@ -974,30 +976,6 @@ class SparkSubmitSuite
     }
   }
 
-  // NOTE: This is an expensive operation in terms of time (10 seconds+). Use 
sparingly.
-  private def runSparkSubmit(args: Seq[String]): Unit = {
-    val sparkHome = sys.props.getOrElse("spark.test.home", 
fail("spark.test.home is not set!"))
-    val sparkSubmitFile = if (Utils.isWindows) {
-      new File("..\\bin\\spark-submit.cmd")
-    } else {
-      new File("../bin/spark-submit")
-    }
-    val process = Utils.executeCommand(
-      Seq(sparkSubmitFile.getCanonicalPath) ++ args,
-      new File(sparkHome),
-      Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
-
-    try {
-      val exitCode = failAfter(60 seconds) { process.waitFor() }
-      if (exitCode != 0) {
-        fail(s"Process returned with exit code $exitCode. See the log4j logs 
for more detail.")
-      }
-    } finally {
-      // Ensure we still kill the process in case it timed out
-      process.destroy()
-    }
-  }
-
   private def forConfDir(defaults: Map[String, String]) (f: String => Unit) = {
     val tmpDir = Utils.createTempDir()
 
@@ -1020,6 +998,32 @@ class SparkSubmitSuite
   }
 }
 
+object SparkSubmitSuite extends SparkFunSuite with TimeLimits {
+  // NOTE: This is an expensive operation in terms of time (10 seconds+). Use 
sparingly.
+  def runSparkSubmit(args: Seq[String], root: String = ".."): Unit = {
+    val sparkHome = sys.props.getOrElse("spark.test.home", 
fail("spark.test.home is not set!"))
+    val sparkSubmitFile = if (Utils.isWindows) {
+      new File(s"$root\\bin\\spark-submit.cmd")
+    } else {
+      new File(s"$root/bin/spark-submit")
+    }
+    val process = Utils.executeCommand(
+      Seq(sparkSubmitFile.getCanonicalPath) ++ args,
+      new File(sparkHome),
+      Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
+
+    try {
+      val exitCode = failAfter(60 seconds) { process.waitFor() }
+      if (exitCode != 0) {
+        fail(s"Process returned with exit code $exitCode. See the log4j logs 
for more detail.")
+      }
+    } finally {
+      // Ensure we still kill the process in case it timed out
+      process.destroy()
+    }
+  }
+}
+
 object JarCreationTest extends Logging {
   def main(args: Array[String]) {
     Utils.configTestLog4j("INFO")

http://git-wip-us.apache.org/repos/asf/spark/blob/bebd2e1c/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
index 971d199..2599761 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen;
 
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.array.ByteArrayMethods;
 
 /**
  * A helper class to manage the data buffer for an unsafe row.  The data 
buffer can grow and
@@ -36,9 +37,7 @@ import org.apache.spark.unsafe.Platform;
  */
 public class BufferHolder {
 
-  // Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max 
is somewhat
-  // smaller. Be conservative and lower the cap a little.
-  private static final int ARRAY_MAX = Integer.MAX_VALUE - 8;
+  private static final int ARRAY_MAX = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
 
   public byte[] buffer;
   public int cursor = Platform.BYTE_ARRAY_OFFSET;
@@ -51,7 +50,7 @@ public class BufferHolder {
 
   public BufferHolder(UnsafeRow row, int initialSize) {
     int bitsetWidthInBytes = 
UnsafeRow.calculateBitSetWidthInBytes(row.numFields());
-    if (row.numFields() > (Integer.MAX_VALUE - initialSize - 
bitsetWidthInBytes) / 8) {
+    if (row.numFields() > (ARRAY_MAX - initialSize - bitsetWidthInBytes) / 8) {
       throw new UnsupportedOperationException(
         "Cannot create BufferHolder for input UnsafeRow because there are " +
           "too many fields (number of fields: " + row.numFields() + ")");

http://git-wip-us.apache.org/repos/asf/spark/blob/bebd2e1c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSutie.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSutie.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSutie.scala
new file mode 100644
index 0000000..1167d2f
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSutie.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import org.scalatest.{BeforeAndAfterEach, Matchers}
+import org.scalatest.concurrent.Timeouts
+
+import org.apache.spark.{SparkFunSuite, TestUtils}
+import org.apache.spark.deploy.SparkSubmitSuite
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.unsafe.array.ByteArrayMethods
+import org.apache.spark.util.ResetSystemProperties
+
+// A test for growing the buffer holder to nearly 2GB. Due to the heap size 
limitation of the Spark
+// unit tests JVM, the actually test code is running as a submit job.
+class BufferHolderSparkSubmitSuite
+  extends SparkFunSuite
+    with Matchers
+    with BeforeAndAfterEach
+    with ResetSystemProperties
+    with Timeouts {
+
+  test("SPARK-22222: Buffer holder should be able to allocate memory larger 
than 1GB") {
+    val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+
+    val argsForSparkSubmit = Seq(
+      "--class", 
BufferHolderSparkSubmitSuite.getClass.getName.stripSuffix("$"),
+      "--name", "SPARK-22222",
+      "--master", "local-cluster[2,1,1024]",
+      "--driver-memory", "4g",
+      "--conf", "spark.ui.enabled=false",
+      "--conf", "spark.master.rest.enabled=false",
+      "--conf", "spark.driver.extraJavaOptions=-ea",
+      unusedJar.toString)
+    SparkSubmitSuite.runSparkSubmit(argsForSparkSubmit, "../..")
+  }
+}
+
+object BufferHolderSparkSubmitSuite {
+
+  def main(args: Array[String]): Unit = {
+
+    val ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
+
+    val holder = new BufferHolder(new UnsafeRow(1000))
+
+    holder.reset()
+    holder.grow(roundToWord(ARRAY_MAX / 2))
+
+    holder.reset()
+    holder.grow(roundToWord(ARRAY_MAX / 2 + 8))
+
+    holder.reset()
+    holder.grow(roundToWord(Integer.MAX_VALUE / 2))
+
+    holder.reset()
+    holder.grow(roundToWord(Integer.MAX_VALUE))
+  }
+
+  private def roundToWord(len: Int): Int = {
+    ByteArrayMethods.roundNumberOfBytesToNearestWord(len)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/bebd2e1c/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index da72954..d3a14b9 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.array.ByteArrayMethods;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
@@ -595,7 +596,7 @@ public abstract class WritableColumnVector extends 
ColumnVector {
    * Upper limit for the maximum capacity for this column.
    */
   @VisibleForTesting
-  protected int MAX_CAPACITY = Integer.MAX_VALUE - 8;
+  protected int MAX_CAPACITY = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
 
   /**
    * Number of nulls in this column. This is an optimization for the reader, 
to skip NULL checks.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to