This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new f5dc06ed [SPARK-32119][CORE][3.0] ExecutorPlugin doesn't work with 
Standalone Cluster and Kubernetes with --jars
f5dc06ed is described below

commit f5dc06ed694978bea2738bd778d3c63d2c516aea
Author: Kousuke Saruta <[email protected]>
AuthorDate: Wed Oct 28 14:21:32 2020 -0700

    [SPARK-32119][CORE][3.0] ExecutorPlugin doesn't work with Standalone 
Cluster and Kubernetes with --jars
    
    ### What changes were proposed in this pull request?
    
    This is a backport PR for branch-3.0.
    
    This PR changes Executor to load jars and files added by --jars and --files 
on Executor initialization.
    To avoid downloading those jars/files twice, they are assosiated with 
`startTime` as their uploaded timestamp.
    
    ### Why are the changes needed?
    
    ExecutorPlugin can't work with Standalone Cluster and Kubernetes
    when a jar which contains plugins and files used by the plugins are added 
by --jars and --files option with spark-submit.
    
    This is because jars and files added by --jars and --files are not loaded 
on Executor initialization.
    I confirmed it works with YARN because jars/files are distributed as 
distributed cache.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. jars/files added by --jars and --files are downloaded on each executor 
on initialization.
    
    ### How was this patch tested?
    
    Added a new testcase.
    
    Closes #29621 from sarutak/fix-plugin-issue-3.0.
    
    Lead-authored-by: Kousuke Saruta <[email protected]>
    Co-authored-by: Luca Canali <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../main/scala/org/apache/spark/SparkContext.scala | 27 +++++--
 .../scala/org/apache/spark/executor/Executor.scala | 14 ++++
 .../org/apache/spark/JavaSparkContextSuite.java    |  7 +-
 .../org/apache/spark/deploy/SparkSubmitSuite.scala | 83 +++++++++++++++++++++-
 docs/monitoring.md                                 |  6 --
 5 files changed, 122 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 9f9d611..3ccbea5 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -380,6 +380,7 @@ class SparkContext(config: SparkConf) extends Logging {
   try {
     _conf = config.clone()
     _conf.validateSettings()
+    _conf.set("spark.app.startTime", startTime.toString)
 
     if (!_conf.contains("spark.master")) {
       throw new SparkException("A master URL must be set in your 
configuration")
@@ -487,11 +488,17 @@ class SparkContext(config: SparkConf) extends Logging {
 
     // Add each JAR given through the constructor
     if (jars != null) {
-      jars.foreach(addJar)
+      jars.foreach(jar => addJar(jar, true))
+      if (addedJars.nonEmpty) {
+        _conf.set("spark.app.initial.jar.urls", 
addedJars.keys.toSeq.mkString(","))
+      }
     }
 
     if (files != null) {
-      files.foreach(addFile)
+      files.foreach(file => addFile(file, false, true))
+      if (addedFiles.nonEmpty) {
+        _conf.set("spark.app.initial.file.urls", 
addedFiles.keys.toSeq.mkString(","))
+      }
     }
 
     _executorMemory = _conf.getOption(EXECUTOR_MEMORY.key)
@@ -1495,7 +1502,7 @@ class SparkContext(config: SparkConf) extends Logging {
    * @note A path can be added only once. Subsequent additions of the same 
path are ignored.
    */
   def addFile(path: String): Unit = {
-    addFile(path, false)
+    addFile(path, false, false)
   }
 
   /**
@@ -1517,6 +1524,10 @@ class SparkContext(config: SparkConf) extends Logging {
    * @note A path can be added only once. Subsequent additions of the same 
path are ignored.
    */
   def addFile(path: String, recursive: Boolean): Unit = {
+    addFile(path, recursive, false)
+  }
+
+  private def addFile(path: String, recursive: Boolean, addedOnSubmit: 
Boolean): Unit = {
     val uri = new Path(path).toUri
     val schemeCorrectedURI = uri.getScheme match {
       case null => new File(path).getCanonicalFile.toURI
@@ -1554,7 +1565,7 @@ class SparkContext(config: SparkConf) extends Logging {
           path
         }
     }
-    val timestamp = System.currentTimeMillis
+    val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis
     if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
       logInfo(s"Added file $path at $key with timestamp $timestamp")
       // Fetch the file locally so that closures which are run on the driver 
can still use the
@@ -1564,7 +1575,7 @@ class SparkContext(config: SparkConf) extends Logging {
       postEnvironmentUpdate()
     } else {
       logWarning(s"The path $path has been added already. Overwriting of added 
paths " +
-       "is not supported in the current version.")
+        "is not supported in the current version.")
     }
   }
 
@@ -1827,6 +1838,10 @@ class SparkContext(config: SparkConf) extends Logging {
    * @note A path can be added only once. Subsequent additions of the same 
path are ignored.
    */
   def addJar(path: String): Unit = {
+    addJar(path, false)
+  }
+
+  private def addJar(path: String, addedOnSubmit: Boolean): Unit = {
     def addLocalJarFile(file: File): String = {
       try {
         if (!file.exists()) {
@@ -1891,7 +1906,7 @@ class SparkContext(config: SparkConf) extends Logging {
         }
       }
       if (key != null) {
-        val timestamp = System.currentTimeMillis
+        val timestamp = if (addedOnSubmit) startTime else 
System.currentTimeMillis
         if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
           logInfo(s"Added JAR $path at $key with timestamp $timestamp")
           postEnvironmentUpdate()
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 83e4a73..baf9e46 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -213,6 +213,20 @@ private[spark] class Executor(
 
   heartbeater.start()
 
+  private val appStartTime = conf.getLong("spark.app.startTime", 0)
+
+  // To allow users to distribute plugins and their required files
+  // specified by --jars and --files on application submission, those 
jars/files should be
+  // downloaded and added to the class loader via updateDependencies.
+  // This should be done before plugin initialization below
+  // because executors search plugins from the class loader and initialize 
them.
+  private val Seq(initialUserJars, initialUserFiles) = Seq("jar", "file").map 
{ key =>
+    conf.getOption(s"spark.app.initial.$key.urls").map { urls =>
+      Map(urls.split(",").map(url => (url, appStartTime)): _*)
+    }.getOrElse(Map.empty)
+  }
+  updateDependencies(initialUserFiles, initialUserJars)
+
   // Plugins need to load using a class loader that includes the executor's 
user classpath.
   // Plugins also needs to be initialized after the heartbeater started
   // to avoid blocking to send heartbeat (see SPARK-32175).
diff --git 
a/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java 
b/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java
index 0f489fb..b188ee1 100644
--- a/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java
+++ b/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 
 import org.apache.spark.api.java.*;
 import org.apache.spark.*;
+import org.apache.spark.util.Utils;
 
 /**
  * Java apps can use both Java-friendly JavaSparkContext and Scala 
SparkContext.
@@ -35,14 +36,16 @@ import org.apache.spark.*;
 public class JavaSparkContextSuite implements Serializable {
 
   @Test
-  public void javaSparkContext() {
+  public void javaSparkContext() throws IOException {
+    File tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), 
"spark");
+    String dummyJarFile = File.createTempFile(tempDir.toString(), 
"jarFile").toString();
     String[] jars = new String[] {};
     java.util.Map<String, String> environment = new java.util.HashMap<>();
 
     new JavaSparkContext(new 
SparkConf().setMaster("local").setAppName("name")).stop();
     new JavaSparkContext("local", "name", new SparkConf()).stop();
     new JavaSparkContext("local", "name").stop();
-    new JavaSparkContext("local", "name", "sparkHome", "jarFile").stop();
+    new JavaSparkContext("local", "name", "sparkHome", dummyJarFile).stop();
     new JavaSparkContext("local", "name", "sparkHome", jars).stop();
     new JavaSparkContext("local", "name", "sparkHome", jars, 
environment).stop();
   }
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 0715087..fb2a65e 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -566,7 +566,8 @@ class SparkSubmitSuite
       }
     }
 
-    val clArgs2 = Seq("--class", "org.SomeClass", "thejar.jar")
+    val dummyJarFile = TestUtils.createJarWithClasses(Seq.empty)
+    val clArgs2 = Seq("--class", "org.SomeClass", dummyJarFile.toString)
     val appArgs2 = new SparkSubmitArguments(clArgs2)
     val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
     assert(!conf2.contains(UI_SHOW_CONSOLE_PROGRESS))
@@ -1216,6 +1217,86 @@ class SparkSubmitSuite
     testRemoteResources(enableHttpFs = true, blacklistSchemes = Seq("*"))
   }
 
+  test("SPARK-32119: Jars and files should be loaded when Executors launch for 
plugins") {
+    val tempDir = Utils.createTempDir()
+    val tempFileName = "test.txt"
+    val tempFile = new File(tempDir, tempFileName)
+
+    // scalastyle:off println
+    Utils.tryWithResource {
+      new PrintWriter(tempFile)
+    } { writer =>
+      writer.println("SparkPluginTest")
+    }
+    // scalastyle:on println
+
+    val sparkPluginCodeBody =
+      """
+        |@Override
+        |public org.apache.spark.api.plugin.ExecutorPlugin executorPlugin() {
+        |  return new TestExecutorPlugin();
+        |}
+        |
+        |@Override
+        |public org.apache.spark.api.plugin.DriverPlugin driverPlugin() { 
return null; }
+      """.stripMargin
+    val executorPluginCodeBody =
+      s"""
+        |@Override
+        |public void init(
+        |    org.apache.spark.api.plugin.PluginContext ctx,
+        |    java.util.Map<String, String> extraConf) {
+        |  String str = null;
+        |  try (java.io.BufferedReader reader =
+        |    new java.io.BufferedReader(new java.io.InputStreamReader(
+        |      new java.io.FileInputStream("$tempFileName")))) {
+        |    str = reader.readLine();
+        |  } catch (java.io.IOException e) {
+        |    throw new RuntimeException(e);
+        |  } finally {
+        |    assert str == "SparkPluginTest";
+        |  }
+        |}
+      """.stripMargin
+
+    val compiledExecutorPlugin = TestUtils.createCompiledClass(
+      "TestExecutorPlugin",
+      tempDir,
+      "",
+      null,
+      Seq.empty,
+      Seq("org.apache.spark.api.plugin.ExecutorPlugin"),
+      executorPluginCodeBody)
+
+    val thisClassPath =
+      sys.props("java.class.path").split(File.pathSeparator).map(p => new 
File(p).toURI.toURL)
+    val compiledSparkPlugin = TestUtils.createCompiledClass(
+      "TestSparkPlugin",
+      tempDir,
+      "",
+      null,
+      Seq(tempDir.toURI.toURL) ++ thisClassPath,
+      Seq("org.apache.spark.api.plugin.SparkPlugin"),
+      sparkPluginCodeBody)
+
+    val jarUrl = TestUtils.createJar(
+      Seq(compiledSparkPlugin, compiledExecutorPlugin),
+      new File(tempDir, "testplugin.jar"))
+
+    val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+    val unusedFile = Files.createTempFile(tempDir.toPath, "unused", null)
+    val args = Seq(
+      "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
+      "--name", "testApp",
+      "--master", "local-cluster[1,1,1024]",
+      "--conf", "spark.plugins=TestSparkPlugin",
+      "--conf", "spark.ui.enabled=false",
+      "--jars", jarUrl.toString + "," + unusedJar.toString,
+      "--files", tempFile.toString + "," + unusedFile.toString,
+      unusedJar.toString)
+    runSparkSubmit(args)
+  }
+
   private def testRemoteResources(
       enableHttpFs: Boolean,
       blacklistSchemes: Seq[String] = Nil): Unit = {
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 4608a4e..8471417 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -1326,9 +1326,3 @@ Both take a comma-separated list of class names that 
implement the
 possible for one list to be placed in the Spark default config file, allowing 
users to
 easily add other plugins from the command line without overwriting the config 
file's list. Duplicate
 plugins are ignored.
-
-Distribution of the jar files containing the plugin code is currently not done 
by Spark. The user
-or admin should make sure that the jar files are available to Spark 
applications, for example, by
-including the plugin jar with the Spark distribution. The exception to this 
rule is the YARN
-backend, where the <code>--jars</code> command line option (or equivalent 
config entry) can be
-used to make the plugin code available to both executors and cluster-mode 
drivers.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to