This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 64cc9e5 [SPARK-26477][CORE] Use ConfigEntry for hardcoded configs for unsafe category 64cc9e5 is described below commit 64cc9e572e0213d5dea241b2b48ecdd68a5c6c99 Author: Kazuaki Ishizaki <ishiz...@jp.ibm.com> AuthorDate: Fri Jan 18 23:57:04 2019 -0800 [SPARK-26477][CORE] Use ConfigEntry for hardcoded configs for unsafe category ## What changes were proposed in this pull request? The PR makes hardcoded `spark.unsafe` configs to use ConfigEntry and put them in the `config` package. ## How was this patch tested? Existing UTs Closes #23412 from kiszk/SPARK-26477. Authored-by: Kazuaki Ishizaki <ishiz...@jp.ibm.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../unsafe/sort/UnsafeSorterSpillReader.java | 35 +++++++++------------- .../scala/org/apache/spark/executor/Executor.scala | 2 +- .../org/apache/spark/internal/config/package.scala | 21 +++++++++++++ .../test/scala/org/apache/spark/FailureSuite.scala | 3 +- .../scala/org/apache/spark/ml/util/MLTest.scala | 3 +- .../apache/spark/sql/test/SharedSparkSession.scala | 3 +- .../org/apache/spark/sql/hive/test/TestHive.scala | 3 +- 7 files changed, 44 insertions(+), 26 deletions(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index fb179d0..99303e6 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java @@ -21,13 +21,13 @@ import com.google.common.io.ByteStreams; import com.google.common.io.Closeables; import org.apache.spark.SparkEnv; import org.apache.spark.TaskContext; +import org.apache.spark.internal.config.package$; +import org.apache.spark.internal.config.ConfigEntry; import org.apache.spark.io.NioBufferedFileInputStream; import org.apache.spark.io.ReadAheadInputStream; import org.apache.spark.serializer.SerializerManager; import org.apache.spark.storage.BlockId; import org.apache.spark.unsafe.Platform; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.*; @@ -36,9 +36,7 @@ import java.io.*; * of the file format). */ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implements Closeable { - private static final Logger logger = LoggerFactory.getLogger(UnsafeSorterSpillReader.class); - private static final int DEFAULT_BUFFER_SIZE_BYTES = 1024 * 1024; // 1 MB - private static final int MAX_BUFFER_SIZE_BYTES = 16777216; // 16 mb + public static final int MAX_BUFFER_SIZE_BYTES = 16777216; // 16 mb private InputStream in; private DataInputStream din; @@ -59,28 +57,23 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen File file, BlockId blockId) throws IOException { assert (file.length() > 0); - long bufferSizeBytes = - SparkEnv.get() == null ? - DEFAULT_BUFFER_SIZE_BYTES: - SparkEnv.get().conf().getSizeAsBytes("spark.unsafe.sorter.spill.reader.buffer.size", - DEFAULT_BUFFER_SIZE_BYTES); - if (bufferSizeBytes > MAX_BUFFER_SIZE_BYTES || bufferSizeBytes < DEFAULT_BUFFER_SIZE_BYTES) { - // fall back to a sane default value - logger.warn("Value of config \"spark.unsafe.sorter.spill.reader.buffer.size\" = {} not in " + - "allowed range [{}, {}). Falling back to default value : {} bytes", bufferSizeBytes, - DEFAULT_BUFFER_SIZE_BYTES, MAX_BUFFER_SIZE_BYTES, DEFAULT_BUFFER_SIZE_BYTES); - bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES; - } + final ConfigEntry<Object> bufferSizeConfigEntry = + package$.MODULE$.UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE(); + // This value must be less than or equal to MAX_BUFFER_SIZE_BYTES. Cast to int is always safe. + final int DEFAULT_BUFFER_SIZE_BYTES = + ((Long) bufferSizeConfigEntry.defaultValue().get()).intValue(); + int bufferSizeBytes = SparkEnv.get() == null ? DEFAULT_BUFFER_SIZE_BYTES : + ((Long) SparkEnv.get().conf().get(bufferSizeConfigEntry)).intValue(); - final boolean readAheadEnabled = SparkEnv.get() != null && - SparkEnv.get().conf().getBoolean("spark.unsafe.sorter.spill.read.ahead.enabled", true); + final boolean readAheadEnabled = SparkEnv.get() != null && (boolean)SparkEnv.get().conf().get( + package$.MODULE$.UNSAFE_SORTER_SPILL_READ_AHEAD_ENABLED()); final InputStream bs = - new NioBufferedFileInputStream(file, (int) bufferSizeBytes); + new NioBufferedFileInputStream(file, bufferSizeBytes); try { if (readAheadEnabled) { this.in = new ReadAheadInputStream(serializerManager.wrapStream(blockId, bs), - (int) bufferSizeBytes); + bufferSizeBytes); } else { this.in = serializerManager.wrapStream(blockId, bs); } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index a6759c4..a3644e7 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -431,7 +431,7 @@ private[spark] class Executor( if (freedMemory > 0 && !threwException) { val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId" - if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) { + if (conf.get(UNSAFE_EXCEPTION_ON_MEMORY_LEAK)) { throw new SparkException(errMsg) } else { logWarning(errMsg) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 6488d53..1e72800 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -25,6 +25,7 @@ import org.apache.spark.scheduler.EventLoggingListener import org.apache.spark.storage.{DefaultTopologyMapper, RandomBlockReplicationPolicy} import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.util.Utils +import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.MAX_BUFFER_SIZE_BYTES package object config { @@ -899,6 +900,26 @@ package object config { .checkValue(v => v > 0, "The max failures should be a positive value.") .createWithDefault(40) + private[spark] val UNSAFE_EXCEPTION_ON_MEMORY_LEAK = + ConfigBuilder("spark.unsafe.exceptionOnMemoryLeak") + .internal() + .booleanConf + .createWithDefault(false) + + private[spark] val UNSAFE_SORTER_SPILL_READ_AHEAD_ENABLED = + ConfigBuilder("spark.unsafe.sorter.spill.read.ahead.enabled") + .internal() + .booleanConf + .createWithDefault(true) + + private[spark] val UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE = + ConfigBuilder("spark.unsafe.sorter.spill.reader.buffer.size") + .internal() + .bytesConf(ByteUnit.BYTE) + .checkValue(v => 1024 * 1024 <= v && v <= MAX_BUFFER_SIZE_BYTES, + s"The value must be in allowed range [1,048,576, ${MAX_BUFFER_SIZE_BYTES}].") + .createWithDefault(1024 * 1024) + private[spark] val EXECUTOR_PLUGINS = ConfigBuilder("spark.executor.plugins") .doc("Comma-separated list of class names for \"plugins\" implementing " + diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala index f2d97d4..5f79b52 100644 --- a/core/src/test/scala/org/apache/spark/FailureSuite.scala +++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark import java.io.{IOException, NotSerializableException, ObjectInputStream} +import org.apache.spark.internal.config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK import org.apache.spark.memory.TestMemoryConsumer import org.apache.spark.storage.StorageLevel import org.apache.spark.util.NonSerializable @@ -144,7 +145,7 @@ class FailureSuite extends SparkFunSuite with LocalSparkContext { } test("managed memory leak error should not mask other failures (SPARK-9266") { - val conf = new SparkConf().set("spark.unsafe.exceptionOnMemoryLeak", "true") + val conf = new SparkConf().set(UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true) sc = new SparkContext("local[1,1]", "test", conf) // If a task leaks memory but fails due to some other cause, then make sure that the original diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala b/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala index acac171..514fa7f 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala @@ -22,6 +22,7 @@ import java.io.File import org.scalatest.Suite import org.apache.spark.{DebugFilesystem, SparkConf, SparkContext} +import org.apache.spark.internal.config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK import org.apache.spark.ml.{PredictionModel, Transformer} import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row} @@ -40,7 +41,7 @@ trait MLTest extends StreamTest with TempDirectory { self: Suite => protected override def sparkConf = { new SparkConf() .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName) - .set("spark.unsafe.exceptionOnMemoryLeak", "true") + .set(UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true) .set(SQLConf.CODEGEN_FALLBACK.key, "false") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala index e7e0ce6..8734639 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala @@ -23,6 +23,7 @@ import org.scalatest.{BeforeAndAfterEach, Suite} import org.scalatest.concurrent.Eventually import org.apache.spark.{DebugFilesystem, SparkConf} +import org.apache.spark.internal.config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation import org.apache.spark.sql.internal.SQLConf @@ -38,7 +39,7 @@ trait SharedSparkSession protected def sparkConf = { new SparkConf() .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName) - .set("spark.unsafe.exceptionOnMemoryLeak", "true") + .set(UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true) .set(SQLConf.CODEGEN_FALLBACK.key, "false") // Disable ConvertToLocalRelation for better test coverage. Test cases built on // LocalRelation will exercise the optimization rules better by disabling it as diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 23dd350..1db57b7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config import org.apache.spark.internal.config.UI._ import org.apache.spark.sql.{DataFrame, Dataset, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -61,7 +62,7 @@ object TestHive .set("spark.sql.warehouse.dir", TestHiveContext.makeWarehouseDir().toURI.getPath) // SPARK-8910 .set(UI_ENABLED, false) - .set("spark.unsafe.exceptionOnMemoryLeak", "true") + .set(config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true) // Disable ConvertToLocalRelation for better test coverage. Test cases built on // LocalRelation will exercise the optimization rules better by disabling it as // this rule may potentially block testing of other optimization rules such as --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org