Repository: spark
Updated Branches:
  refs/heads/branch-2.4 dc6047613 -> 8d1720079


[SPARK-24519][CORE] Compute SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS only once

## What changes were proposed in this pull request?
Previously SPARK-24519 created a modifiable config 
SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS. However, the config is being parsed 
for every creation of MapStatus, which could be very expensive. Another problem 
with the previous approach is that it created the illusion that this can be 
changed dynamically at runtime, which was not true. This PR changes it so the 
config is computed only once.

## How was this patch tested?
Removed a test case that's no longer valid.

Closes #22521 from rxin/SPARK-24519.

Authored-by: Reynold Xin <r...@databricks.com>
Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
(cherry picked from commit e702fb1d5218d062fcb8e618b92dad7958eb4062)
Signed-off-by: Dongjoon Hyun <dongj...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d172007
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d172007
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d172007

Branch: refs/heads/branch-2.4
Commit: 8d172007968dc4f1d4a091ccb9e16cd785c0a363
Parents: dc60476
Author: Reynold Xin <r...@databricks.com>
Authored: Wed Sep 26 10:15:16 2018 -0700
Committer: Dongjoon Hyun <dongj...@apache.org>
Committed: Wed Sep 26 10:22:50 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/MapStatus.scala  | 12 ++++++---
 .../apache/spark/scheduler/MapStatusSuite.scala | 28 --------------------
 2 files changed, 9 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8d172007/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 659694d..0e221ed 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -49,10 +49,16 @@ private[spark] sealed trait MapStatus {
 
 private[spark] object MapStatus {
 
+  /**
+   * Min partition number to use [[HighlyCompressedMapStatus]]. A bit ugly 
here because in test
+   * code we can't assume SparkEnv.get exists.
+   */
+  private lazy val minPartitionsToUseHighlyCompressMapStatus = 
Option(SparkEnv.get)
+    .map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
+    
.getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)
+
   def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
-    if (uncompressedSizes.length >  Option(SparkEnv.get)
-      .map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
-      
.getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)) {
+    if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) {
       HighlyCompressedMapStatus(loc, uncompressedSizes)
     } else {
       new CompressedMapStatus(loc, uncompressedSizes)

http://git-wip-us.apache.org/repos/asf/spark/blob/8d172007/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
index 354e638..2155a0f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
@@ -188,32 +188,4 @@ class MapStatusSuite extends SparkFunSuite {
       assert(count === 3000)
     }
   }
-
-  test("SPARK-24519: HighlyCompressedMapStatus has configurable threshold") {
-    val conf = new SparkConf()
-    val env = mock(classOf[SparkEnv])
-    doReturn(conf).when(env).conf
-    SparkEnv.set(env)
-    val sizes = Array.fill[Long](500)(150L)
-    // Test default value
-    val status = MapStatus(null, sizes)
-    assert(status.isInstanceOf[CompressedMapStatus])
-    // Test Non-positive values
-    for (s <- -1 to 0) {
-      assertThrows[IllegalArgumentException] {
-        conf.set(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS, s)
-        val status = MapStatus(null, sizes)
-      }
-    }
-    // Test positive values
-    Seq(1, 100, 499, 500, 501).foreach { s =>
-      conf.set(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS, s)
-      val status = MapStatus(null, sizes)
-      if(sizes.length > s) {
-        assert(status.isInstanceOf[HighlyCompressedMapStatus])
-      } else {
-        assert(status.isInstanceOf[CompressedMapStatus])
-      }
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to