This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 64cc9e5  [SPARK-26477][CORE] Use ConfigEntry for hardcoded configs for 
unsafe category
64cc9e5 is described below

commit 64cc9e572e0213d5dea241b2b48ecdd68a5c6c99
Author: Kazuaki Ishizaki <ishiz...@jp.ibm.com>
AuthorDate: Fri Jan 18 23:57:04 2019 -0800

    [SPARK-26477][CORE] Use ConfigEntry for hardcoded configs for unsafe 
category
    
    ## What changes were proposed in this pull request?
    
    The PR makes hardcoded `spark.unsafe` configs to use ConfigEntry and put 
them in the `config` package.
    
    ## How was this patch tested?
    
    Existing UTs
    
    Closes #23412 from kiszk/SPARK-26477.
    
    Authored-by: Kazuaki Ishizaki <ishiz...@jp.ibm.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../unsafe/sort/UnsafeSorterSpillReader.java       | 35 +++++++++-------------
 .../scala/org/apache/spark/executor/Executor.scala |  2 +-
 .../org/apache/spark/internal/config/package.scala | 21 +++++++++++++
 .../test/scala/org/apache/spark/FailureSuite.scala |  3 +-
 .../scala/org/apache/spark/ml/util/MLTest.scala    |  3 +-
 .../apache/spark/sql/test/SharedSparkSession.scala |  3 +-
 .../org/apache/spark/sql/hive/test/TestHive.scala  |  3 +-
 7 files changed, 44 insertions(+), 26 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
index fb179d0..99303e6 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
@@ -21,13 +21,13 @@ import com.google.common.io.ByteStreams;
 import com.google.common.io.Closeables;
 import org.apache.spark.SparkEnv;
 import org.apache.spark.TaskContext;
+import org.apache.spark.internal.config.package$;
+import org.apache.spark.internal.config.ConfigEntry;
 import org.apache.spark.io.NioBufferedFileInputStream;
 import org.apache.spark.io.ReadAheadInputStream;
 import org.apache.spark.serializer.SerializerManager;
 import org.apache.spark.storage.BlockId;
 import org.apache.spark.unsafe.Platform;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.*;
 
@@ -36,9 +36,7 @@ import java.io.*;
  * of the file format).
  */
 public final class UnsafeSorterSpillReader extends UnsafeSorterIterator 
implements Closeable {
-  private static final Logger logger = 
LoggerFactory.getLogger(UnsafeSorterSpillReader.class);
-  private static final int DEFAULT_BUFFER_SIZE_BYTES = 1024 * 1024; // 1 MB
-  private static final int MAX_BUFFER_SIZE_BYTES = 16777216; // 16 mb
+  public static final int MAX_BUFFER_SIZE_BYTES = 16777216; // 16 mb
 
   private InputStream in;
   private DataInputStream din;
@@ -59,28 +57,23 @@ public final class UnsafeSorterSpillReader extends 
UnsafeSorterIterator implemen
       File file,
       BlockId blockId) throws IOException {
     assert (file.length() > 0);
-    long bufferSizeBytes =
-        SparkEnv.get() == null ?
-            DEFAULT_BUFFER_SIZE_BYTES:
-            
SparkEnv.get().conf().getSizeAsBytes("spark.unsafe.sorter.spill.reader.buffer.size",
-                                                 DEFAULT_BUFFER_SIZE_BYTES);
-    if (bufferSizeBytes > MAX_BUFFER_SIZE_BYTES || bufferSizeBytes < 
DEFAULT_BUFFER_SIZE_BYTES) {
-      // fall back to a sane default value
-      logger.warn("Value of config 
\"spark.unsafe.sorter.spill.reader.buffer.size\" = {} not in " +
-        "allowed range [{}, {}). Falling back to default value : {} bytes", 
bufferSizeBytes,
-        DEFAULT_BUFFER_SIZE_BYTES, MAX_BUFFER_SIZE_BYTES, 
DEFAULT_BUFFER_SIZE_BYTES);
-      bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES;
-    }
+    final ConfigEntry<Object> bufferSizeConfigEntry =
+        package$.MODULE$.UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE();
+    // This value must be less than or equal to MAX_BUFFER_SIZE_BYTES. Cast to 
int is always safe.
+    final int DEFAULT_BUFFER_SIZE_BYTES =
+        ((Long) bufferSizeConfigEntry.defaultValue().get()).intValue();
+    int bufferSizeBytes = SparkEnv.get() == null ? DEFAULT_BUFFER_SIZE_BYTES :
+        ((Long) SparkEnv.get().conf().get(bufferSizeConfigEntry)).intValue();
 
-    final boolean readAheadEnabled = SparkEnv.get() != null &&
-        
SparkEnv.get().conf().getBoolean("spark.unsafe.sorter.spill.read.ahead.enabled",
 true);
+    final boolean readAheadEnabled = SparkEnv.get() != null && 
(boolean)SparkEnv.get().conf().get(
+        package$.MODULE$.UNSAFE_SORTER_SPILL_READ_AHEAD_ENABLED());
 
     final InputStream bs =
-        new NioBufferedFileInputStream(file, (int) bufferSizeBytes);
+        new NioBufferedFileInputStream(file, bufferSizeBytes);
     try {
       if (readAheadEnabled) {
         this.in = new 
ReadAheadInputStream(serializerManager.wrapStream(blockId, bs),
-                (int) bufferSizeBytes);
+                bufferSizeBytes);
       } else {
         this.in = serializerManager.wrapStream(blockId, bs);
       }
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index a6759c4..a3644e7 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -431,7 +431,7 @@ private[spark] class Executor(
 
           if (freedMemory > 0 && !threwException) {
             val errMsg = s"Managed memory leak detected; size = $freedMemory 
bytes, TID = $taskId"
-            if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
+            if (conf.get(UNSAFE_EXCEPTION_ON_MEMORY_LEAK)) {
               throw new SparkException(errMsg)
             } else {
               logWarning(errMsg)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 6488d53..1e72800 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -25,6 +25,7 @@ import org.apache.spark.scheduler.EventLoggingListener
 import org.apache.spark.storage.{DefaultTopologyMapper, 
RandomBlockReplicationPolicy}
 import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.util.Utils
+import 
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.MAX_BUFFER_SIZE_BYTES
 
 package object config {
 
@@ -899,6 +900,26 @@ package object config {
       .checkValue(v => v > 0, "The max failures should be a positive value.")
       .createWithDefault(40)
 
+  private[spark] val UNSAFE_EXCEPTION_ON_MEMORY_LEAK =
+    ConfigBuilder("spark.unsafe.exceptionOnMemoryLeak")
+      .internal()
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val UNSAFE_SORTER_SPILL_READ_AHEAD_ENABLED =
+    ConfigBuilder("spark.unsafe.sorter.spill.read.ahead.enabled")
+      .internal()
+      .booleanConf
+      .createWithDefault(true)
+
+  private[spark] val UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE =
+    ConfigBuilder("spark.unsafe.sorter.spill.reader.buffer.size")
+      .internal()
+      .bytesConf(ByteUnit.BYTE)
+      .checkValue(v => 1024 * 1024 <= v && v <= MAX_BUFFER_SIZE_BYTES,
+        s"The value must be in allowed range [1,048,576, 
${MAX_BUFFER_SIZE_BYTES}].")
+      .createWithDefault(1024 * 1024)
+
   private[spark] val EXECUTOR_PLUGINS =
     ConfigBuilder("spark.executor.plugins")
       .doc("Comma-separated list of class names for \"plugins\" implementing " 
+
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala 
b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index f2d97d4..5f79b52 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark
 
 import java.io.{IOException, NotSerializableException, ObjectInputStream}
 
+import org.apache.spark.internal.config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK
 import org.apache.spark.memory.TestMemoryConsumer
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.NonSerializable
@@ -144,7 +145,7 @@ class FailureSuite extends SparkFunSuite with 
LocalSparkContext {
   }
 
   test("managed memory leak error should not mask other failures (SPARK-9266") 
{
-    val conf = new SparkConf().set("spark.unsafe.exceptionOnMemoryLeak", 
"true")
+    val conf = new SparkConf().set(UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true)
     sc = new SparkContext("local[1,1]", "test", conf)
 
     // If a task leaks memory but fails due to some other cause, then make 
sure that the original
diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala 
b/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala
index acac171..514fa7f 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala
@@ -22,6 +22,7 @@ import java.io.File
 import org.scalatest.Suite
 
 import org.apache.spark.{DebugFilesystem, SparkConf, SparkContext}
+import org.apache.spark.internal.config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK
 import org.apache.spark.ml.{PredictionModel, Transformer}
 import org.apache.spark.ml.linalg.Vector
 import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row}
@@ -40,7 +41,7 @@ trait MLTest extends StreamTest with TempDirectory { self: 
Suite =>
   protected override def sparkConf = {
     new SparkConf()
       .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
-      .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+      .set(UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true)
       .set(SQLConf.CODEGEN_FALLBACK.key, "false")
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index e7e0ce6..8734639 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -23,6 +23,7 @@ import org.scalatest.{BeforeAndAfterEach, Suite}
 import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.{DebugFilesystem, SparkConf}
+import org.apache.spark.internal.config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK
 import org.apache.spark.sql.{SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
 import org.apache.spark.sql.internal.SQLConf
@@ -38,7 +39,7 @@ trait SharedSparkSession
   protected def sparkConf = {
     new SparkConf()
       .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
-      .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+      .set(UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true)
       .set(SQLConf.CODEGEN_FALLBACK.key, "false")
       // Disable ConvertToLocalRelation for better test coverage. Test cases 
built on
       // LocalRelation will exercise the optimization rules better by 
disabling it as
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 23dd350..1db57b7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
 
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
@@ -61,7 +62,7 @@ object TestHive
         .set("spark.sql.warehouse.dir", 
TestHiveContext.makeWarehouseDir().toURI.getPath)
         // SPARK-8910
         .set(UI_ENABLED, false)
-        .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+        .set(config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true)
         // Disable ConvertToLocalRelation for better test coverage. Test cases 
built on
         // LocalRelation will exercise the optimization rules better by 
disabling it as
         // this rule may potentially block testing of other optimization rules 
such as


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to