Repository: spark Updated Branches: refs/heads/branch-2.4 dc6047613 -> 8d1720079
[SPARK-24519][CORE] Compute SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS only once ## What changes were proposed in this pull request? Previously SPARK-24519 created a modifiable config SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS. However, the config is being parsed for every creation of MapStatus, which could be very expensive. Another problem with the previous approach is that it created the illusion that this can be changed dynamically at runtime, which was not true. This PR changes it so the config is computed only once. ## How was this patch tested? Removed a test case that's no longer valid. Closes #22521 from rxin/SPARK-24519. Authored-by: Reynold Xin <r...@databricks.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit e702fb1d5218d062fcb8e618b92dad7958eb4062) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d172007 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d172007 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d172007 Branch: refs/heads/branch-2.4 Commit: 8d172007968dc4f1d4a091ccb9e16cd785c0a363 Parents: dc60476 Author: Reynold Xin <r...@databricks.com> Authored: Wed Sep 26 10:15:16 2018 -0700 Committer: Dongjoon Hyun <dongj...@apache.org> Committed: Wed Sep 26 10:22:50 2018 -0700 ---------------------------------------------------------------------- .../org/apache/spark/scheduler/MapStatus.scala | 12 ++++++--- .../apache/spark/scheduler/MapStatusSuite.scala | 28 -------------------- 2 files changed, 9 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8d172007/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 659694d..0e221ed 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -49,10 +49,16 @@ private[spark] sealed trait MapStatus { private[spark] object MapStatus { + /** + * Min partition number to use [[HighlyCompressedMapStatus]]. A bit ugly here because in test + * code we can't assume SparkEnv.get exists. + */ + private lazy val minPartitionsToUseHighlyCompressMapStatus = Option(SparkEnv.get) + .map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS)) + .getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get) + def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = { - if (uncompressedSizes.length > Option(SparkEnv.get) - .map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS)) - .getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)) { + if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) { HighlyCompressedMapStatus(loc, uncompressedSizes) } else { new CompressedMapStatus(loc, uncompressedSizes) http://git-wip-us.apache.org/repos/asf/spark/blob/8d172007/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala index 354e638..2155a0f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala @@ -188,32 +188,4 @@ class MapStatusSuite extends SparkFunSuite { assert(count === 3000) } } - - test("SPARK-24519: HighlyCompressedMapStatus has configurable threshold") { - val conf = new SparkConf() - val env = mock(classOf[SparkEnv]) - doReturn(conf).when(env).conf - SparkEnv.set(env) - val sizes = Array.fill[Long](500)(150L) - // Test default value - val status = MapStatus(null, sizes) - assert(status.isInstanceOf[CompressedMapStatus]) - // Test Non-positive values - for (s <- -1 to 0) { - assertThrows[IllegalArgumentException] { - conf.set(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS, s) - val status = MapStatus(null, sizes) - } - } - // Test positive values - Seq(1, 100, 499, 500, 501).foreach { s => - conf.set(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS, s) - val status = MapStatus(null, sizes) - if(sizes.length > s) { - assert(status.isInstanceOf[HighlyCompressedMapStatus]) - } else { - assert(status.isInstanceOf[CompressedMapStatus]) - } - } - } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org