Repository: spark
Updated Branches:
  refs/heads/master bd2ae857d -> e702fb1d5


[SPARK-24519][CORE] Compute SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS only once

## What changes were proposed in this pull request?
Previously SPARK-24519 created a modifiable config 
SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS. However, the config is being parsed 
for every creation of MapStatus, which could be very expensive. Another problem 
with the previous approach is that it created the illusion that this can be 
changed dynamically at runtime, which was not true. This PR changes it so the 
config is computed only once.

## How was this patch tested?
Removed a test case that's no longer valid.

Closes #22521 from rxin/SPARK-24519.

Authored-by: Reynold Xin <r...@databricks.com>
Signed-off-by: Dongjoon Hyun <dongj...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e702fb1d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e702fb1d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e702fb1d

Branch: refs/heads/master
Commit: e702fb1d5218d062fcb8e618b92dad7958eb4062
Parents: bd2ae85
Author: Reynold Xin <r...@databricks.com>
Authored: Wed Sep 26 10:15:16 2018 -0700
Committer: Dongjoon Hyun <dongj...@apache.org>
Committed: Wed Sep 26 10:15:16 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/MapStatus.scala  | 12 ++++++---
 .../apache/spark/scheduler/MapStatusSuite.scala | 28 --------------------
 2 files changed, 9 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e702fb1d/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 659694d..0e221ed 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -49,10 +49,16 @@ private[spark] sealed trait MapStatus {
 
 private[spark] object MapStatus {
 
+  /**
+   * Min partition number to use [[HighlyCompressedMapStatus]]. A bit ugly 
here because in test
+   * code we can't assume SparkEnv.get exists.
+   */
+  private lazy val minPartitionsToUseHighlyCompressMapStatus = 
Option(SparkEnv.get)
+    .map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
+    
.getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)
+
   def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
-    if (uncompressedSizes.length >  Option(SparkEnv.get)
-      .map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
-      
.getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)) {
+    if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) {
       HighlyCompressedMapStatus(loc, uncompressedSizes)
     } else {
       new CompressedMapStatus(loc, uncompressedSizes)

http://git-wip-us.apache.org/repos/asf/spark/blob/e702fb1d/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
index 354e638..2155a0f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
@@ -188,32 +188,4 @@ class MapStatusSuite extends SparkFunSuite {
       assert(count === 3000)
     }
   }
-
-  test("SPARK-24519: HighlyCompressedMapStatus has configurable threshold") {
-    val conf = new SparkConf()
-    val env = mock(classOf[SparkEnv])
-    doReturn(conf).when(env).conf
-    SparkEnv.set(env)
-    val sizes = Array.fill[Long](500)(150L)
-    // Test default value
-    val status = MapStatus(null, sizes)
-    assert(status.isInstanceOf[CompressedMapStatus])
-    // Test Non-positive values
-    for (s <- -1 to 0) {
-      assertThrows[IllegalArgumentException] {
-        conf.set(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS, s)
-        val status = MapStatus(null, sizes)
-      }
-    }
-    // Test positive values
-    Seq(1, 100, 499, 500, 501).foreach { s =>
-      conf.set(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS, s)
-      val status = MapStatus(null, sizes)
-      if(sizes.length > s) {
-        assert(status.isInstanceOf[HighlyCompressedMapStatus])
-      } else {
-        assert(status.isInstanceOf[CompressedMapStatus])
-      }
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to