Repository: spark
Updated Branches:
  refs/heads/branch-2.4 c67c597b6 -> 43c62e797


[SPARK-24918][CORE] Executor Plugin API

## What changes were proposed in this pull request?

A continuation of squito's executor plugin task. By his request I took his code 
and added testing and moved the plugin initialization to a separate thread.

Executor plugins now run on one separate thread, so the executor does not wait 
on them. Added testing.

## How was this patch tested?

Added test cases that test using a sample plugin.

Closes #22192 from NiharS/executorPlugin.

Lead-authored-by: Nihar Sheth <[email protected]>
Co-authored-by: NiharS <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
(cherry picked from commit 2f51e72356babac703cc20a531b4dcc7712f34af)
Signed-off-by: Marcelo Vanzin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43c62e79
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43c62e79
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43c62e79

Branch: refs/heads/branch-2.4
Commit: 43c62e7974445cc2fa8574fd6bf2dbfa94195a0c
Parents: c67c597
Author: Nihar Sheth <[email protected]>
Authored: Thu Sep 20 11:52:20 2018 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Thu Sep 20 11:52:31 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/spark/ExecutorPlugin.java   |  57 ++++++++
 .../org/apache/spark/executor/Executor.scala    |  35 +++++
 .../apache/spark/internal/config/package.scala  |  10 ++
 .../scala/org/apache/spark/util/Utils.scala     |  13 ++
 .../org/apache/spark/ExecutorPluginSuite.java   | 139 +++++++++++++++++++
 5 files changed, 254 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/43c62e79/core/src/main/java/org/apache/spark/ExecutorPlugin.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/ExecutorPlugin.java 
b/core/src/main/java/org/apache/spark/ExecutorPlugin.java
new file mode 100644
index 0000000..ec0b57f
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/ExecutorPlugin.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import org.apache.spark.annotation.DeveloperApi;
+
+/**
+ * A plugin which can be automaticaly instantiated within each Spark executor. 
 Users can specify
+ * plugins which should be created with the "spark.executor.plugins" 
configuration.  An instance
+ * of each plugin will be created for every executor, including those created 
by dynamic allocation,
+ * before the executor starts running any tasks.
+ *
+ * The specific api exposed to the end users still considered to be very 
unstable.  We will
+ * hopefully be able to keep compatability by providing default 
implementations for any methods
+ * added, but make no guarantees this will always be possible across all Spark 
releases.
+ *
+ * Spark does nothing to verify the plugin is doing legitimate things, or to 
manage the resources
+ * it uses.  A plugin acquires the same privileges as the user running the 
task.  A bad plugin
+ * could also intefere with task execution and make the executor fail in 
unexpected ways.
+ */
+@DeveloperApi
+public interface ExecutorPlugin {
+
+  /**
+   * Initialize the executor plugin.
+   *
+   * <p>Each executor will, during its initialization, invoke this method on 
each
+   * plugin provided in the spark.executor.plugins configuration.</p>
+   *
+   * <p>Plugins should create threads in their implementation of this method 
for
+   * any polling, blocking, or intensive computation.</p>
+   */
+  default void init() {}
+
+  /**
+   * Clean up and terminate this plugin.
+   *
+   * <p>This function is called during the executor shutdown phase. The 
executor
+   * will wait for the plugin to terminate before continuing its own 
shutdown.</p>
+   */
+  default void shutdown() {}
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/43c62e79/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 86b1957..27f7ec8 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -136,6 +136,29 @@ private[spark] class Executor(
   // for fetching remote cached RDD blocks, so need to make sure it uses the 
right classloader too.
   env.serializerManager.setDefaultClassLoader(replClassLoader)
 
+  private val executorPlugins: Seq[ExecutorPlugin] = {
+    val pluginNames = conf.get(EXECUTOR_PLUGINS)
+    if (pluginNames.nonEmpty) {
+      logDebug(s"Initializing the following plugins: ${pluginNames.mkString(", 
")}")
+
+      // Plugins need to load using a class loader that includes the 
executor's user classpath
+      val pluginList: Seq[ExecutorPlugin] =
+        Utils.withContextClassLoader(replClassLoader) {
+          val plugins = Utils.loadExtensions(classOf[ExecutorPlugin], 
pluginNames, conf)
+          plugins.foreach { plugin =>
+            plugin.init()
+            logDebug(s"Successfully loaded plugin " + 
plugin.getClass().getCanonicalName())
+          }
+          plugins
+        }
+
+      logDebug("Finished initializing plugins")
+      pluginList
+    } else {
+      Nil
+    }
+  }
+
   // Max size of direct result. If task result is bigger than this, we use the 
block manager
   // to send the result back.
   private val maxDirectResultSize = Math.min(
@@ -219,6 +242,18 @@ private[spark] class Executor(
     heartbeater.shutdown()
     heartbeater.awaitTermination(10, TimeUnit.SECONDS)
     threadPool.shutdown()
+
+    // Notify plugins that executor is shutting down so they can terminate 
cleanly
+    Utils.withContextClassLoader(replClassLoader) {
+      executorPlugins.foreach { plugin =>
+        try {
+          plugin.shutdown()
+        } catch {
+          case e: Exception =>
+            logWarning("Plugin " + plugin.getClass().getCanonicalName() + " 
shutdown failed", e)
+        }
+      }
+    }
     if (!isLocal) {
       env.stop()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/43c62e79/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 7f1eb1e..e723819 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -618,4 +618,14 @@ package object config {
       .intConf
       .checkValue(v => v > 0, "The max failures should be a positive value.")
       .createWithDefault(40)
+
+  private[spark] val EXECUTOR_PLUGINS =
+    ConfigBuilder("spark.executor.plugins")
+      .doc("Comma-separated list of class names for \"plugins\" implementing " 
+
+        "org.apache.spark.ExecutorPlugin.  Plugins have the same privileges as 
any task " +
+        "in a Spark executor.  They can also interfere with task execution and 
fail in " +
+        "unexpected ways.  So be sure to only use this for trusted plugins.")
+      .stringConf
+      .toSequence
+      .createWithDefault(Nil)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/43c62e79/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 14f68cd..c8b148b 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -240,6 +240,19 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Run a segment of code using a different context class loader in the 
current thread
+   */
+  def withContextClassLoader[T](ctxClassLoader: ClassLoader)(fn: => T): T = {
+    val oldClassLoader = Thread.currentThread().getContextClassLoader()
+    try {
+      Thread.currentThread().setContextClassLoader(ctxClassLoader)
+      fn
+    } finally {
+      Thread.currentThread().setContextClassLoader(oldClassLoader)
+    }
+  }
+
+  /**
    * Primitive often used when writing [[java.nio.ByteBuffer]] to 
[[java.io.DataOutput]]
    */
   def writeByteBuffer(bb: ByteBuffer, out: DataOutput): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/43c62e79/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java 
b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java
new file mode 100644
index 0000000..686eb28
--- /dev/null
+++ b/core/src/test/java/org/apache/spark/ExecutorPluginSuite.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import org.apache.spark.api.java.JavaSparkContext;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class ExecutorPluginSuite {
+  private static final String EXECUTOR_PLUGIN_CONF_NAME = 
"spark.executor.plugins";
+  private static final String testBadPluginName = 
TestBadShutdownPlugin.class.getName();
+  private static final String testPluginName = 
TestExecutorPlugin.class.getName();
+  private static final String testSecondPluginName = 
TestSecondPlugin.class.getName();
+
+  // Static value modified by testing plugins to ensure plugins loaded 
correctly.
+  public static int numSuccessfulPlugins = 0;
+
+  // Static value modified by testing plugins to verify plugins shut down 
properly.
+  public static int numSuccessfulTerminations = 0;
+
+  private JavaSparkContext sc;
+
+  @Before
+  public void setUp() {
+    sc = null;
+    numSuccessfulPlugins = 0;
+    numSuccessfulTerminations = 0;
+  }
+
+  @After
+  public void tearDown() {
+    if (sc != null) {
+      sc.stop();
+      sc = null;
+    }
+  }
+
+  private SparkConf initializeSparkConf(String pluginNames) {
+    return new SparkConf()
+        .setMaster("local")
+        .setAppName("test")
+        .set(EXECUTOR_PLUGIN_CONF_NAME, pluginNames);
+  }
+
+  @Test
+  public void testPluginClassDoesNotExist() {
+    SparkConf conf = initializeSparkConf("nonexistant.plugin");
+    try {
+      sc = new JavaSparkContext(conf);
+      fail("No exception thrown for nonexistant plugin");
+    } catch (Exception e) {
+      // We cannot catch ClassNotFoundException directly because Java doesn't 
think it'll be thrown
+      assertTrue(e.toString().startsWith("java.lang.ClassNotFoundException"));
+    }
+  }
+
+  @Test
+  public void testAddPlugin() throws InterruptedException {
+    // Load the sample TestExecutorPlugin, which will change the value of 
numSuccessfulPlugins
+    SparkConf conf = initializeSparkConf(testPluginName);
+    sc = new JavaSparkContext(conf);
+    assertEquals(1, numSuccessfulPlugins);
+    sc.stop();
+    sc = null;
+    assertEquals(1, numSuccessfulTerminations);
+  }
+
+  @Test
+  public void testAddMultiplePlugins() throws InterruptedException {
+    // Load two plugins and verify they both execute.
+    SparkConf conf = initializeSparkConf(testPluginName + "," + 
testSecondPluginName);
+    sc = new JavaSparkContext(conf);
+    assertEquals(2, numSuccessfulPlugins);
+    sc.stop();
+    sc = null;
+    assertEquals(2, numSuccessfulTerminations);
+  }
+
+  @Test
+  public void testPluginShutdownWithException() {
+    // Verify an exception in one plugin shutdown does not affect the others
+    String pluginNames = testPluginName + "," + testBadPluginName + "," + 
testPluginName;
+    SparkConf conf = initializeSparkConf(pluginNames);
+    sc = new JavaSparkContext(conf);
+    assertEquals(3, numSuccessfulPlugins);
+    sc.stop();
+    sc = null;
+    assertEquals(2, numSuccessfulTerminations);
+  }
+
+  public static class TestExecutorPlugin implements ExecutorPlugin {
+    public void init() {
+      ExecutorPluginSuite.numSuccessfulPlugins++;
+    }
+
+    public void shutdown() {
+      ExecutorPluginSuite.numSuccessfulTerminations++;
+    }
+  }
+
+  public static class TestSecondPlugin implements ExecutorPlugin {
+    public void init() {
+      ExecutorPluginSuite.numSuccessfulPlugins++;
+    }
+
+    public void shutdown() {
+      ExecutorPluginSuite.numSuccessfulTerminations++;
+    }
+  }
+
+  public static class TestBadShutdownPlugin implements ExecutorPlugin {
+    public void init() {
+      ExecutorPluginSuite.numSuccessfulPlugins++;
+    }
+
+    public void shutdown() {
+      throw new RuntimeException("This plugin will fail to cleanly shut down");
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to