Repository: spark Updated Branches: refs/heads/master 28e0e500a -> 5028a001d
[SPARK-12317][SQL] Support units (m,k,g) in SQLConf This PR is continue from previous closed PR 10314. In this PR, SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE will be taken memory string conventions as input. For example, the user can now specify 10g for SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE in SQLConf file. marmbrus srowen : Can you help review this code changes ? Thanks. Author: Kevin Yu <[email protected]> Closes #10629 from kevinyu98/spark-12317. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5028a001 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5028a001 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5028a001 Branch: refs/heads/master Commit: 5028a001d51a9e9a13e3c39f6a080618f3425d87 Parents: 28e0e50 Author: Kevin Yu <[email protected]> Authored: Thu Jan 7 21:13:17 2016 -0800 Committer: Reynold Xin <[email protected]> Committed: Thu Jan 7 21:13:17 2016 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/SQLConf.scala | 22 ++++++++++- .../org/apache/spark/sql/SQLConfSuite.scala | 39 ++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5028a001/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 26c00dc..7976795 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -26,6 +26,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.catalyst.parser.ParserConf +import org.apache.spark.util.Utils //////////////////////////////////////////////////////////////////////////////////////////////////// // This file defines the configuration options for Spark SQL. @@ -115,6 +116,25 @@ private[spark] object SQLConf { } }, _.toString, doc, isPublic) + def longMemConf( + key: String, + defaultValue: Option[Long] = None, + doc: String = "", + isPublic: Boolean = true): SQLConfEntry[Long] = + SQLConfEntry(key, defaultValue, { v => + try { + v.toLong + } catch { + case _: NumberFormatException => + try { + Utils.byteStringAsBytes(v) + } catch { + case _: NumberFormatException => + throw new IllegalArgumentException(s"$key should be long, but was $v") + } + } + }, _.toString, doc, isPublic) + def doubleConf( key: String, defaultValue: Option[Double] = None, @@ -235,7 +255,7 @@ private[spark] object SQLConf { doc = "The default number of partitions to use when shuffling data for joins or aggregations.") val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE = - longConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", + longMemConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", defaultValue = Some(64 * 1024 * 1024), doc = "The target post-shuffle input size in bytes of a task.") http://git-wip-us.apache.org/repos/asf/spark/blob/5028a001/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala index 43300cd..a2eddc8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala @@ -92,4 +92,43 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { } assert(e.getMessage === s"${SQLConf.CASE_SENSITIVE.key} should be boolean, but was 10") } + + test("Test SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE's method") { + sqlContext.conf.clear() + + sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "100") + assert(sqlContext.conf.targetPostShuffleInputSize === 100) + + sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1k") + assert(sqlContext.conf.targetPostShuffleInputSize === 1024) + + sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1M") + assert(sqlContext.conf.targetPostShuffleInputSize === 1048576) + + sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1g") + assert(sqlContext.conf.targetPostShuffleInputSize === 1073741824) + + sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-1") + assert(sqlContext.conf.targetPostShuffleInputSize === -1) + + // Test overflow exception + intercept[IllegalArgumentException] { + // This value exceeds Long.MaxValue + // Utils.byteStringAsBytes("90000000000g") + sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "90000000000g") + } + + intercept[IllegalArgumentException] { + // This value less than Int.MinValue + // Utils.byteStringAsBytes("-90000000000g") + sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-90000000000g") + } + // Test invalid input + intercept[IllegalArgumentException] { + // This value exceeds Long.MaxValue + // Utils.byteStringAsBytes("-1g") + sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-1g") + } + sqlContext.conf.clear() + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
