Repository: spark
Updated Branches:
  refs/heads/master f725b2ec1 -> 369127f03


[SPARK-12130] Replace shuffleManagerClass with shortShuffleMgrNames in 
ExternalShuffleBlockResolver

Replace shuffleManagerClassName with shortShuffleMgrName is  to reduce time of 
string's comparison. and put sort's comparison on the front. cc JoshRosen 
andrewor14

Author: Lianhui Wang <lianhuiwan...@gmail.com>

Closes #10131 from lianhuiwang/spark-12130.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/369127f0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/369127f0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/369127f0

Branch: refs/heads/master
Commit: 369127f03257e7081d2aa1fc445e773b26f0d5e3
Parents: f725b2e
Author: Lianhui Wang <lianhuiwan...@gmail.com>
Authored: Tue Dec 15 18:16:22 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Tue Dec 15 18:17:48 2015 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/shuffle/ShuffleManager.scala  | 4 ++++
 .../org/apache/spark/shuffle/hash/HashShuffleManager.scala    | 2 ++
 .../org/apache/spark/shuffle/sort/SortShuffleManager.scala    | 2 ++
 .../main/scala/org/apache/spark/storage/BlockManager.scala    | 2 +-
 .../spark/network/shuffle/ExternalShuffleBlockResolver.java   | 7 +++----
 .../org/apache/spark/network/sasl/SaslIntegrationSuite.java   | 3 +--
 .../network/shuffle/ExternalShuffleBlockResolverSuite.java    | 6 +++---
 .../network/shuffle/ExternalShuffleIntegrationSuite.java      | 4 ++--
 8 files changed, 18 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/369127f0/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
index 978366d..a3444bf 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
@@ -28,6 +28,10 @@ import org.apache.spark.{TaskContext, ShuffleDependency}
  * boolean isDriver as parameters.
  */
 private[spark] trait ShuffleManager {
+
+  /** Return short name for the ShuffleManager */
+  val shortName: String
+
   /**
    * Register a shuffle with the manager and obtain a handle for it to pass to 
tasks.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/369127f0/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
index d2e2fc4..4f30da0 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
@@ -34,6 +34,8 @@ private[spark] class HashShuffleManager(conf: SparkConf) 
extends ShuffleManager
 
   private val fileShuffleBlockResolver = new FileShuffleBlockResolver(conf)
 
+  override val shortName: String = "hash"
+
   /* Register a shuffle with the manager and obtain a handle for it to pass to 
tasks. */
   override def registerShuffle[K, V, C](
       shuffleId: Int,

http://git-wip-us.apache.org/repos/asf/spark/blob/369127f0/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index 66b6bbc..9b1a279 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -79,6 +79,8 @@ private[spark] class SortShuffleManager(conf: SparkConf) 
extends ShuffleManager
    */
   private[this] val numMapsForShuffle = new ConcurrentHashMap[Int, Int]()
 
+  override val shortName: String = "sort"
+
   override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/369127f0/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index ed05143..540e1ec 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -200,7 +200,7 @@ private[spark] class BlockManager(
     val shuffleConfig = new ExecutorShuffleInfo(
       diskBlockManager.localDirs.map(_.toString),
       diskBlockManager.subDirsPerLocalDir,
-      shuffleManager.getClass.getName)
+      shuffleManager.shortName)
 
     val MAX_ATTEMPTS = 3
     val SLEEP_TIME_SECS = 5

http://git-wip-us.apache.org/repos/asf/spark/blob/369127f0/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
----------------------------------------------------------------------
diff --git 
a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
 
b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index e5cb68c..fe933ed 100644
--- 
a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ 
b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -183,11 +183,10 @@ public class ExternalShuffleBlockResolver {
         String.format("Executor is not registered (appId=%s, execId=%s)", 
appId, execId));
     }
 
-    if 
("org.apache.spark.shuffle.hash.HashShuffleManager".equals(executor.shuffleManager))
 {
-      return getHashBasedShuffleBlockData(executor, blockId);
-    } else if 
("org.apache.spark.shuffle.sort.SortShuffleManager".equals(executor.shuffleManager)
-      || 
"org.apache.spark.shuffle.unsafe.UnsafeShuffleManager".equals(executor.shuffleManager))
 {
+    if ("sort".equals(executor.shuffleManager) || 
"tungsten-sort".equals(executor.shuffleManager)) {
       return getSortBasedShuffleBlockData(executor, shuffleId, mapId, 
reduceId);
+    } else if ("hash".equals(executor.shuffleManager)) {
+      return getHashBasedShuffleBlockData(executor, blockId);
     } else {
       throw new UnsupportedOperationException(
         "Unsupported shuffle manager: " + executor.shuffleManager);

http://git-wip-us.apache.org/repos/asf/spark/blob/369127f0/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
----------------------------------------------------------------------
diff --git 
a/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
 
b/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
index f573d96..0ea631e 100644
--- 
a/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
+++ 
b/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
@@ -221,8 +221,7 @@ public class SaslIntegrationSuite {
 
       // Register an executor so that the next steps work.
       ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(
-        new String[] { System.getProperty("java.io.tmpdir") }, 1,
-        "org.apache.spark.shuffle.sort.SortShuffleManager");
+        new String[] { System.getProperty("java.io.tmpdir") }, 1, "sort");
       RegisterExecutor regmsg = new RegisterExecutor("app-1", "0", 
executorInfo);
       client1.sendRpcSync(regmsg.toByteBuffer(), TIMEOUT_MS);
 

http://git-wip-us.apache.org/repos/asf/spark/blob/369127f0/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
----------------------------------------------------------------------
diff --git 
a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
 
b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
index a995823..60a1b8b 100644
--- 
a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
+++ 
b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
@@ -83,7 +83,7 @@ public class ExternalShuffleBlockResolverSuite {
 
     // Nonexistent shuffle block
     resolver.registerExecutor("app0", "exec3",
-      
dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
+      dataContext.createExecutorInfo("sort"));
     try {
       resolver.getBlockData("app0", "exec3", "shuffle_1_1_0");
       fail("Should have failed");
@@ -96,7 +96,7 @@ public class ExternalShuffleBlockResolverSuite {
   public void testSortShuffleBlocks() throws IOException {
     ExternalShuffleBlockResolver resolver = new 
ExternalShuffleBlockResolver(conf, null);
     resolver.registerExecutor("app0", "exec0",
-      
dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
+      dataContext.createExecutorInfo("sort"));
 
     InputStream block0Stream =
       resolver.getBlockData("app0", "exec0", 
"shuffle_0_0_0").createInputStream();
@@ -115,7 +115,7 @@ public class ExternalShuffleBlockResolverSuite {
   public void testHashShuffleBlocks() throws IOException {
     ExternalShuffleBlockResolver resolver = new 
ExternalShuffleBlockResolver(conf, null);
     resolver.registerExecutor("app0", "exec0",
-      
dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager"));
+      dataContext.createExecutorInfo("hash"));
 
     InputStream block0Stream =
       resolver.getBlockData("app0", "exec0", 
"shuffle_1_0_0").createInputStream();

http://git-wip-us.apache.org/repos/asf/spark/blob/369127f0/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
----------------------------------------------------------------------
diff --git 
a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
 
b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index 2095f41..5e706bf 100644
--- 
a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ 
b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -49,8 +49,8 @@ import org.apache.spark.network.util.TransportConf;
 public class ExternalShuffleIntegrationSuite {
 
   static String APP_ID = "app-id";
-  static String SORT_MANAGER = 
"org.apache.spark.shuffle.sort.SortShuffleManager";
-  static String HASH_MANAGER = 
"org.apache.spark.shuffle.hash.HashShuffleManager";
+  static String SORT_MANAGER = "sort";
+  static String HASH_MANAGER = "hash";
 
   // Executor 0 is sort-based
   static TestShuffleDataContext dataContext0;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to