This is an automated email from the ASF dual-hosted git repository.
irashid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2e28622 [SPARK-28211][CORE][SHUFFLE] Propose Shuffle Driver
Components API
2e28622 is described below
commit 2e28622d8aeb9ce2460e803bb7d994196bcc0253
Author: Yifei Huang <[email protected]>
AuthorDate: Tue Oct 15 12:26:49 2019 -0500
[SPARK-28211][CORE][SHUFFLE] Propose Shuffle Driver Components API
### What changes were proposed in this pull request?
This is the next step of the Spark-25299 work of proposing a new Shuffle
storage API. This patch includes the components of the plugin that hook into
the driver, including driver shuffle initialization, application cleanup, and
shuffle cleanup.
### How was this patch tested?
Existing unit tests, plus an additional test for testing the interactions
between the driver and executor initialization.
Closes #25823 from yifeih/yh/upstream/driver-lifecycle.
Lead-authored-by: Yifei Huang <[email protected]>
Co-authored-by: mccheah <[email protected]>
Signed-off-by: Imran Rashid <[email protected]>
---
.../apache/spark/shuffle/api/ShuffleDataIO.java | 6 ++
.../spark/shuffle/api/ShuffleDriverComponents.java | 64 +++++++++++++++
.../shuffle/api/ShuffleExecutorComponents.java | 12 ++-
.../shuffle/sort/io/LocalDiskShuffleDataIO.java | 8 +-
....java => LocalDiskShuffleDriverComponents.java} | 35 +++++---
.../io/LocalDiskShuffleExecutorComponents.java | 7 +-
.../scala/org/apache/spark/ContextCleaner.scala | 8 +-
.../main/scala/org/apache/spark/Dependency.scala | 1 +
.../main/scala/org/apache/spark/SparkContext.scala | 17 +++-
.../apache/spark/shuffle/ShuffleDataIOUtils.scala | 42 ++++++++++
.../spark/shuffle/sort/SortShuffleManager.scala | 15 ++--
.../apache/spark/InternalAccumulatorSuite.scala | 3 +-
.../shuffle/ShuffleDriverComponentsSuite.scala | 94 ++++++++++++++++++++++
13 files changed, 281 insertions(+), 31 deletions(-)
diff --git a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDataIO.java
b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDataIO.java
index e9e50ec..e4554bd 100644
--- a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDataIO.java
+++ b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDataIO.java
@@ -46,4 +46,10 @@ public interface ShuffleDataIO {
* are only invoked on the executors.
*/
ShuffleExecutorComponents executor();
+
+ /**
+ * Called once on driver process to bootstrap the shuffle metadata modules
that
+ * are maintained by the driver.
+ */
+ ShuffleDriverComponents driver();
}
diff --git
a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDriverComponents.java
b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDriverComponents.java
new file mode 100644
index 0000000..b4cec17
--- /dev/null
+++
b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDriverComponents.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.api;
+
+import java.util.Map;
+
+import org.apache.spark.annotation.Private;
+
+/**
+ * :: Private ::
+ * An interface for building shuffle support modules for the Driver.
+ */
+@Private
+public interface ShuffleDriverComponents {
+
+ /**
+ * Called once in the driver to bootstrap this module that is specific to
this application.
+ * This method is called before submitting executor requests to the cluster
manager.
+ *
+ * This method should prepare the module with its shuffle components i.e.
registering against
+ * an external file servers or shuffle services, or creating tables in a
shuffle
+ * storage data database.
+ *
+ * @return additional SparkConf settings necessary for initializing the
executor components.
+ * This would include configurations that cannot be statically set on the
application, like
+ * the host:port of external services for shuffle storage.
+ */
+ Map<String, String> initializeApplication();
+
+ /**
+ * Called once at the end of the Spark application to clean up any existing
shuffle state.
+ */
+ void cleanupApplication();
+
+ /**
+ * Called once per shuffle id when the shuffle id is first generated for a
shuffle stage.
+ *
+ * @param shuffleId The unique identifier for the shuffle stage.
+ */
+ default void registerShuffle(int shuffleId) {}
+
+ /**
+ * Removes shuffle data associated with the given shuffle.
+ *
+ * @param shuffleId The unique identifier for the shuffle stage.
+ * @param blocking Whether this call should block on the deletion of the
data.
+ */
+ default void removeShuffle(int shuffleId, boolean blocking) {}
+}
diff --git
a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java
b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java
index d30f3da..30ca177 100644
---
a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java
+++
b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleExecutorComponents.java
@@ -18,6 +18,7 @@
package org.apache.spark.shuffle.api;
import java.io.IOException;
+import java.util.Map;
import java.util.Optional;
import org.apache.spark.annotation.Private;
@@ -34,21 +35,26 @@ public interface ShuffleExecutorComponents {
/**
* Called once per executor to bootstrap this module with state that is
specific to
* that executor, specifically the application ID and executor ID.
+ *
+ * @param appId The Spark application id
+ * @param execId The unique identifier of the executor being initialized
+ * @param extraConfigs Extra configs that were returned by
+ * {@link
ShuffleDriverComponents#initializeApplication()}
*/
- void initializeExecutor(String appId, String execId);
+ void initializeExecutor(String appId, String execId, Map<String, String>
extraConfigs);
/**
* Called once per map task to create a writer that will be responsible for
persisting all the
* partitioned bytes written by that map task.
*
* @param shuffleId Unique identifier for the shuffle the map task is a part
of
- * @param mapId An ID of the map task. The ID is unique within this Spark
application.
+ * @param mapTaskId An ID of the map task. The ID is unique within this
Spark application.
* @param numPartitions The number of partitions that will be written by the
map task. Some of
* these partitions may be empty.
*/
ShuffleMapOutputWriter createMapOutputWriter(
int shuffleId,
- long mapId,
+ long mapTaskId,
int numPartitions) throws IOException;
/**
diff --git
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDataIO.java
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDataIO.java
index cabcb17..50eb2f1 100644
---
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDataIO.java
+++
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDataIO.java
@@ -18,8 +18,9 @@
package org.apache.spark.shuffle.sort.io;
import org.apache.spark.SparkConf;
-import org.apache.spark.shuffle.api.ShuffleExecutorComponents;
import org.apache.spark.shuffle.api.ShuffleDataIO;
+import org.apache.spark.shuffle.api.ShuffleDriverComponents;
+import org.apache.spark.shuffle.api.ShuffleExecutorComponents;
/**
* Implementation of the {@link ShuffleDataIO} plugin system that replicates
the local shuffle
@@ -37,4 +38,9 @@ public class LocalDiskShuffleDataIO implements ShuffleDataIO {
public ShuffleExecutorComponents executor() {
return new LocalDiskShuffleExecutorComponents(sparkConf);
}
+
+ @Override
+ public ShuffleDriverComponents driver() {
+ return new LocalDiskShuffleDriverComponents();
+ }
}
diff --git
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDataIO.java
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDriverComponents.java
similarity index 50%
copy from
core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDataIO.java
copy to
core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDriverComponents.java
index cabcb17..92b4b31 100644
---
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDataIO.java
+++
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDriverComponents.java
@@ -17,24 +17,33 @@
package org.apache.spark.shuffle.sort.io;
-import org.apache.spark.SparkConf;
-import org.apache.spark.shuffle.api.ShuffleExecutorComponents;
-import org.apache.spark.shuffle.api.ShuffleDataIO;
+import java.util.Collections;
+import java.util.Map;
-/**
- * Implementation of the {@link ShuffleDataIO} plugin system that replicates
the local shuffle
- * storage and index file functionality that has historically been used from
Spark 2.4 and earlier.
- */
-public class LocalDiskShuffleDataIO implements ShuffleDataIO {
+import org.apache.spark.SparkEnv;
+import org.apache.spark.shuffle.api.ShuffleDriverComponents;
+import org.apache.spark.storage.BlockManagerMaster;
+
+public class LocalDiskShuffleDriverComponents implements
ShuffleDriverComponents {
+
+ private BlockManagerMaster blockManagerMaster;
- private final SparkConf sparkConf;
+ @Override
+ public Map<String, String> initializeApplication() {
+ blockManagerMaster = SparkEnv.get().blockManager().master();
+ return Collections.emptyMap();
+ }
- public LocalDiskShuffleDataIO(SparkConf sparkConf) {
- this.sparkConf = sparkConf;
+ @Override
+ public void cleanupApplication() {
+ // nothing to clean up
}
@Override
- public ShuffleExecutorComponents executor() {
- return new LocalDiskShuffleExecutorComponents(sparkConf);
+ public void removeShuffle(int shuffleId, boolean blocking) {
+ if (blockManagerMaster == null) {
+ throw new IllegalStateException("Driver components must be initialized
before using");
+ }
+ blockManagerMaster.removeShuffle(shuffleId, blocking);
}
}
diff --git
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
index a0c7d3c..eb4d9d9 100644
---
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
+++
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
@@ -17,6 +17,7 @@
package org.apache.spark.shuffle.sort.io;
+import java.util.Map;
import java.util.Optional;
import com.google.common.annotations.VisibleForTesting;
@@ -50,7 +51,7 @@ public class LocalDiskShuffleExecutorComponents implements
ShuffleExecutorCompon
}
@Override
- public void initializeExecutor(String appId, String execId) {
+ public void initializeExecutor(String appId, String execId, Map<String,
String> extraConfigs) {
blockManager = SparkEnv.get().blockManager();
if (blockManager == null) {
throw new IllegalStateException("No blockManager available from the
SparkEnv.");
@@ -61,14 +62,14 @@ public class LocalDiskShuffleExecutorComponents implements
ShuffleExecutorCompon
@Override
public ShuffleMapOutputWriter createMapOutputWriter(
int shuffleId,
- long mapId,
+ long mapTaskId,
int numPartitions) {
if (blockResolver == null) {
throw new IllegalStateException(
"Executor components must be initialized before getting writers.");
}
return new LocalDiskShuffleMapOutputWriter(
- shuffleId, mapId, numPartitions, blockResolver, sparkConf);
+ shuffleId, mapTaskId, numPartitions, blockResolver, sparkConf);
}
@Override
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index dfbd7d1..9506c36 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -27,6 +27,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
+import org.apache.spark.shuffle.api.ShuffleDriverComponents
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, ThreadUtils,
Utils}
/**
@@ -58,7 +59,9 @@ private class CleanupTaskWeakReference(
* to be processed when the associated object goes out of scope of the
application. Actual
* cleanup is performed in a separate daemon thread.
*/
-private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
+private[spark] class ContextCleaner(
+ sc: SparkContext,
+ shuffleDriverComponents: ShuffleDriverComponents) extends Logging {
/**
* A buffer to ensure that `CleanupTaskWeakReference`s are not garbage
collected as long as they
@@ -221,7 +224,7 @@ private[spark] class ContextCleaner(sc: SparkContext)
extends Logging {
try {
logDebug("Cleaning shuffle " + shuffleId)
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
- blockManagerMaster.removeShuffle(shuffleId, blocking)
+ shuffleDriverComponents.removeShuffle(shuffleId, blocking)
listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
logDebug("Cleaned shuffle " + shuffleId)
} catch {
@@ -269,7 +272,6 @@ private[spark] class ContextCleaner(sc: SparkContext)
extends Logging {
}
}
- private def blockManagerMaster = sc.env.blockManager.master
private def broadcastManager = sc.env.broadcastManager
private def mapOutputTrackerMaster =
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
}
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala
b/core/src/main/scala/org/apache/spark/Dependency.scala
index f0ac9ac..ba8e4d6 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -96,6 +96,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C:
ClassTag](
shuffleId, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
+ _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4792c0a..2db8809 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -58,6 +58,8 @@ import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
import org.apache.spark.scheduler.local.LocalSchedulerBackend
+import org.apache.spark.shuffle.ShuffleDataIOUtils
+import org.apache.spark.shuffle.api.ShuffleDriverComponents
import org.apache.spark.status.{AppStatusSource, AppStatusStore}
import org.apache.spark.status.api.v1.ThreadStackTrace
import org.apache.spark.storage._
@@ -217,6 +219,7 @@ class SparkContext(config: SparkConf) extends Logging {
private var _statusStore: AppStatusStore = _
private var _heartbeater: Heartbeater = _
private var _resources: scala.collection.immutable.Map[String,
ResourceInformation] = _
+ private var _shuffleDriverComponents: ShuffleDriverComponents = _
/*
-------------------------------------------------------------------------------------
*
| Accessors and public fields. These provide access to the internal state
of the |
@@ -319,6 +322,8 @@ class SparkContext(config: SparkConf) extends Logging {
_dagScheduler = ds
}
+ private[spark] def shuffleDriverComponents: ShuffleDriverComponents =
_shuffleDriverComponents
+
/**
* A unique identifier for the Spark application.
* Its format depends on the scheduler implementation.
@@ -524,6 +529,11 @@ class SparkContext(config: SparkConf) extends Logging {
executorEnvs ++= _conf.getExecutorEnv
executorEnvs("SPARK_USER") = sparkUser
+ _shuffleDriverComponents =
ShuffleDataIOUtils.loadShuffleDataIO(config).driver()
+ _shuffleDriverComponents.initializeApplication().asScala.foreach { case
(k, v) =>
+ _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v)
+ }
+
// We need to register "HeartbeatReceiver" before "createTaskScheduler"
because Executor will
// retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
_heartbeatReceiver = env.rpcEnv.setupEndpoint(
@@ -576,7 +586,7 @@ class SparkContext(config: SparkConf) extends Logging {
_cleaner =
if (_conf.get(CLEANER_REFERENCE_TRACKING)) {
- Some(new ContextCleaner(this))
+ Some(new ContextCleaner(this, _shuffleDriverComponents))
} else {
None
}
@@ -1975,6 +1985,11 @@ class SparkContext(config: SparkConf) extends Logging {
}
_heartbeater = null
}
+ if (_shuffleDriverComponents != null) {
+ Utils.tryLogNonFatalError {
+ _shuffleDriverComponents.cleanupApplication()
+ }
+ }
if (env != null && _heartbeatReceiver != null) {
Utils.tryLogNonFatalError {
env.rpcEnv.stop(_heartbeatReceiver)
diff --git
a/core/src/main/scala/org/apache/spark/shuffle/ShuffleDataIOUtils.scala
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleDataIOUtils.scala
new file mode 100644
index 0000000..e9507a7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleDataIOUtils.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config.SHUFFLE_IO_PLUGIN_CLASS
+import org.apache.spark.shuffle.api.ShuffleDataIO
+import org.apache.spark.util.Utils
+
+private[spark] object ShuffleDataIOUtils {
+
+ /**
+ * The prefix of spark config keys that are passed from the driver to the
executor.
+ */
+ val SHUFFLE_SPARK_CONF_PREFIX = "spark.shuffle.plugin.__config__."
+
+ def loadShuffleDataIO(conf: SparkConf): ShuffleDataIO = {
+ val configuredPluginClass = conf.get(SHUFFLE_IO_PLUGIN_CLASS)
+ val maybeIO = Utils.loadExtensions(
+ classOf[ShuffleDataIO], Seq(configuredPluginClass), conf)
+ require(maybeIO.nonEmpty, s"A valid shuffle plugin must be specified by
config " +
+ s"${SHUFFLE_IO_PLUGIN_CLASS.key}, but $configuredPluginClass resulted in
zero valid " +
+ s"plugins.")
+ maybeIO.head
+ }
+
+}
diff --git
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index b21ce9c..5adfd71 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -19,6 +19,8 @@ package org.apache.spark.shuffle.sort
import java.util.concurrent.ConcurrentHashMap
+import scala.collection.JavaConverters._
+
import org.apache.spark._
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.shuffle._
@@ -236,12 +238,13 @@ private[spark] object SortShuffleManager extends Logging {
}
private def loadShuffleExecutorComponents(conf: SparkConf):
ShuffleExecutorComponents = {
- val configuredPluginClasses = conf.get(config.SHUFFLE_IO_PLUGIN_CLASS)
- val maybeIO = Utils.loadExtensions(
- classOf[ShuffleDataIO], Seq(configuredPluginClasses), conf)
- require(maybeIO.size == 1, s"Failed to load plugins of type
$configuredPluginClasses")
- val executorComponents = maybeIO.head.executor()
- executorComponents.initializeExecutor(conf.getAppId,
SparkEnv.get.executorId)
+ val executorComponents =
ShuffleDataIOUtils.loadShuffleDataIO(conf).executor()
+ val extraConfigs =
conf.getAllWithPrefix(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX)
+ .toMap
+ executorComponents.initializeExecutor(
+ conf.getAppId,
+ SparkEnv.get.executorId,
+ extraConfigs.asJava)
executorComponents
}
}
diff --git
a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
index 8433a6f..b982626 100644
--- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
@@ -211,7 +211,8 @@ class InternalAccumulatorSuite extends SparkFunSuite with
LocalSparkContext {
/**
* A special [[ContextCleaner]] that saves the IDs of the accumulators
registered for cleanup.
*/
- private class SaveAccumContextCleaner(sc: SparkContext) extends
ContextCleaner(sc) {
+ private class SaveAccumContextCleaner(sc: SparkContext) extends
+ ContextCleaner(sc, null) {
private val accumsRegistered = new ArrayBuffer[Long]
override def registerAccumulatorForCleanup(a: AccumulatorV2[_, _]): Unit =
{
diff --git
a/core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsSuite.scala
b/core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsSuite.scala
new file mode 100644
index 0000000..d8657ec
--- /dev/null
+++
b/core/src/test/scala/org/apache/spark/shuffle/ShuffleDriverComponentsSuite.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import java.util.{Map => JMap}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import com.google.common.collect.ImmutableMap
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext,
SparkFunSuite}
+import org.apache.spark.internal.config.SHUFFLE_IO_PLUGIN_CLASS
+import org.apache.spark.shuffle.api.{ShuffleDataIO, ShuffleDriverComponents,
ShuffleExecutorComponents, ShuffleMapOutputWriter}
+import org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO
+
+class ShuffleDriverComponentsSuite
+ extends SparkFunSuite with LocalSparkContext with BeforeAndAfterEach {
+
+ test("test serialization of shuffle initialization conf to executors") {
+ val testConf = new SparkConf()
+ .setAppName("testing")
+ .set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + "test-plugin-key",
"user-set-value")
+ .set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + "test-user-key",
"user-set-value")
+ .setMaster("local-cluster[2,1,1024]")
+ .set(SHUFFLE_IO_PLUGIN_CLASS,
"org.apache.spark.shuffle.TestShuffleDataIO")
+
+ sc = new SparkContext(testConf)
+
+ val out = sc.parallelize(Seq((1, "one"), (2, "two"), (3, "three")), 3)
+ .groupByKey()
+ .foreach { _ =>
+ if (!TestShuffleExecutorComponentsInitialized.initialized.get()) {
+ throw new RuntimeException("TestShuffleExecutorComponents wasn't
initialized")
+ }
+ }
+ }
+}
+
+class TestShuffleDataIO(sparkConf: SparkConf) extends ShuffleDataIO {
+ private val delegate = new LocalDiskShuffleDataIO(sparkConf)
+
+ override def driver(): ShuffleDriverComponents = new
TestShuffleDriverComponents()
+
+ override def executor(): ShuffleExecutorComponents =
+ new TestShuffleExecutorComponentsInitialized(delegate.executor())
+}
+
+class TestShuffleDriverComponents extends ShuffleDriverComponents {
+ override def initializeApplication(): JMap[String, String] = {
+ ImmutableMap.of("test-plugin-key", "plugin-set-value")
+ }
+
+ override def cleanupApplication(): Unit = {}
+}
+
+object TestShuffleExecutorComponentsInitialized {
+ val initialized = new AtomicBoolean(false)
+}
+
+class TestShuffleExecutorComponentsInitialized(delegate:
ShuffleExecutorComponents)
+ extends ShuffleExecutorComponents {
+
+ override def initializeExecutor(
+ appId: String,
+ execId: String,
+ extraConfigs: JMap[String, String]): Unit = {
+ delegate.initializeExecutor(appId, execId, extraConfigs)
+ assert(extraConfigs.get("test-plugin-key") == "plugin-set-value",
extraConfigs)
+ assert(extraConfigs.get("test-user-key") == "user-set-value")
+ TestShuffleExecutorComponentsInitialized.initialized.set(true)
+ }
+
+ override def createMapOutputWriter(
+ shuffleId: Int,
+ mapTaskId: Long,
+ numPartitions: Int): ShuffleMapOutputWriter = {
+ delegate.createMapOutputWriter(shuffleId, mapTaskId, numPartitions)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]