Repository: spark
Updated Branches:
  refs/heads/master 613b71a12 -> cfcd74668


[SPARK-11035][CORE] Add in-process Spark app launcher.

This change adds a new launcher that allows applications to be run
in a separate thread in the same process as the calling code. To
achieve that, some code from the child process implementation was
moved to abstract classes that implement the common functionality,
and the new launcher inherits from those.

The new launcher was added as a new class, instead of implemented
as a new option to the existing SparkLauncher, to avoid ambigous
APIs. For example, SparkLauncher has ways to set the child app's
environment, modify SPARK_HOME, or control the logging of the
child process, none of which apply to in-process apps.

The in-process launcher has limitations: it needs Spark in the
context class loader of the calling thread, and it's bound by
Spark's current limitation of a single client-mode application
per JVM. It also relies on the recently added SparkApplication
trait to make sure different apps don't mess up each other's
configuration, so config isolation is currently limited to cluster mode.

I also chose to keep the same socket-based communication for in-process
apps, even though it might be possible to avoid it for in-process
mode. That helps both implementations share more code.

Tested with new and existing unit tests, and with a simple app that
uses the launcher; also made sure the app ran fine with older launcher
jar to check binary compatibility.

Author: Marcelo Vanzin <[email protected]>

Closes #19591 from vanzin/SPARK-11035.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cfcd7466
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cfcd7466
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cfcd7466

Branch: refs/heads/master
Commit: cfcd746689c2b84824745fa6d327ffb584c7a17d
Parents: 613b71a
Author: Marcelo Vanzin <[email protected]>
Authored: Thu Dec 28 17:00:49 2017 -0600
Committer: Imran Rashid <[email protected]>
Committed: Thu Dec 28 17:00:49 2017 -0600

----------------------------------------------------------------------
 core/pom.xml                                    |   8 +
 .../apache/spark/launcher/LauncherBackend.scala |  11 +-
 .../cluster/StandaloneSchedulerBackend.scala    |   1 +
 .../scheduler/local/LocalSchedulerBackend.scala |   1 +
 .../spark/launcher/SparkLauncherSuite.java      |  70 +++-
 .../spark/launcher/AbstractAppHandle.java       | 129 +++++++
 .../apache/spark/launcher/AbstractLauncher.java | 307 ++++++++++++++++
 .../spark/launcher/ChildProcAppHandle.java      | 117 +------
 .../spark/launcher/InProcessAppHandle.java      |  83 +++++
 .../spark/launcher/InProcessLauncher.java       | 110 ++++++
 .../apache/spark/launcher/LauncherProtocol.java |   6 +
 .../apache/spark/launcher/LauncherServer.java   | 113 +++---
 .../apache/spark/launcher/SparkLauncher.java    | 351 +++++--------------
 .../launcher/SparkSubmitCommandBuilder.java     |   2 +-
 .../org/apache/spark/launcher/package-info.java |  28 +-
 .../org/apache/spark/launcher/BaseSuite.java    |  35 +-
 .../spark/launcher/ChildProcAppHandleSuite.java |  16 -
 .../spark/launcher/InProcessLauncherSuite.java  | 170 +++++++++
 .../spark/launcher/LauncherServerSuite.java     |  82 ++---
 .../MesosCoarseGrainedSchedulerBackend.scala    |   2 +
 .../org/apache/spark/deploy/yarn/Client.scala   |   2 +
 21 files changed, 1139 insertions(+), 505 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cfcd7466/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index fa138d3..0a5bd95 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -351,6 +351,14 @@
       <artifactId>spark-tags_${scala.binary.version}</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-launcher_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
     <!--
       This spark-tags test-dep is needed even though it isn't used in this 
module, otherwise testing-cmds that exclude
       them will yield errors.

http://git-wip-us.apache.org/repos/asf/spark/blob/cfcd7466/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala 
b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala
index a5d41a1..aaae33c 100644
--- a/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala
+++ b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala
@@ -19,7 +19,7 @@ package org.apache.spark.launcher
 
 import java.net.{InetAddress, Socket}
 
-import org.apache.spark.SPARK_VERSION
+import org.apache.spark.{SPARK_VERSION, SparkConf}
 import org.apache.spark.launcher.LauncherProtocol._
 import org.apache.spark.util.{ThreadUtils, Utils}
 
@@ -36,9 +36,14 @@ private[spark] abstract class LauncherBackend {
   private var lastState: SparkAppHandle.State = _
   @volatile private var _isConnected = false
 
+  protected def conf: SparkConf
+
   def connect(): Unit = {
-    val port = sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT).map(_.toInt)
-    val secret = sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET)
+    val port = conf.getOption(LauncherProtocol.CONF_LAUNCHER_PORT)
+      .orElse(sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT))
+      .map(_.toInt)
+    val secret = conf.getOption(LauncherProtocol.CONF_LAUNCHER_SECRET)
+      .orElse(sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET))
     if (port != None && secret != None) {
       val s = new Socket(InetAddress.getLoopbackAddress(), port.get)
       connection = new BackendConnection(s)

http://git-wip-us.apache.org/repos/asf/spark/blob/cfcd7466/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 505c342..f73a58f 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -45,6 +45,7 @@ private[spark] class StandaloneSchedulerBackend(
   private var client: StandaloneAppClient = null
   private val stopping = new AtomicBoolean(false)
   private val launcherBackend = new LauncherBackend() {
+    override protected def conf: SparkConf = sc.conf
     override protected def onStopRequest(): Unit = 
stop(SparkAppHandle.State.KILLED)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cfcd7466/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
index 35509bc..4c614c5 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
@@ -105,6 +105,7 @@ private[spark] class LocalSchedulerBackend(
   private val userClassPath = getUserClasspath(conf)
   private val listenerBus = scheduler.sc.listenerBus
   private val launcherBackend = new LauncherBackend() {
+    override def conf: SparkConf = LocalSchedulerBackend.this.conf
     override def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cfcd7466/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java 
b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index ac4391e..c2261c2 100644
--- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -18,30 +18,26 @@
 package org.apache.spark.launcher;
 
 import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.bridge.SLF4JBridgeHandler;
 import static org.junit.Assert.*;
 import static org.junit.Assume.*;
+import static org.mockito.Mockito.*;
 
+import org.apache.spark.SparkContext;
 import org.apache.spark.internal.config.package$;
 import org.apache.spark.util.Utils;
 
 /**
  * These tests require the Spark assembly to be built before they can be run.
  */
-public class SparkLauncherSuite {
+public class SparkLauncherSuite extends BaseSuite {
 
-  static {
-    SLF4JBridgeHandler.removeHandlersForRootLogger();
-    SLF4JBridgeHandler.install();
-  }
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(SparkLauncherSuite.class);
   private static final NamedThreadFactory TF = new 
NamedThreadFactory("SparkLauncherSuite-%d");
 
   private final SparkLauncher launcher = new SparkLauncher();
@@ -123,6 +119,50 @@ public class SparkLauncherSuite {
     assertEquals(0, app.waitFor());
   }
 
+  @Test
+  public void testInProcessLauncher() throws Exception {
+    // Because this test runs SparkLauncher in process and in client mode, it 
pollutes the system
+    // properties, and that can cause test failures down the test pipeline. So 
restore the original
+    // system properties after this test runs.
+    Map<Object, Object> properties = new HashMap<>(System.getProperties());
+    try {
+      inProcessLauncherTestImpl();
+    } finally {
+      Properties p = new Properties();
+      for (Map.Entry<Object, Object> e : properties.entrySet()) {
+        p.put(e.getKey(), e.getValue());
+      }
+      System.setProperties(p);
+    }
+  }
+
+  private void inProcessLauncherTestImpl() throws Exception {
+    final List<SparkAppHandle.State> transitions = new ArrayList<>();
+    SparkAppHandle.Listener listener = mock(SparkAppHandle.Listener.class);
+    doAnswer(invocation -> {
+      SparkAppHandle h = (SparkAppHandle) invocation.getArguments()[0];
+      transitions.add(h.getState());
+      return null;
+    }).when(listener).stateChanged(any(SparkAppHandle.class));
+
+    SparkAppHandle handle = new InProcessLauncher()
+      .setMaster("local")
+      .setAppResource(SparkLauncher.NO_RESOURCE)
+      .setMainClass(InProcessTestApp.class.getName())
+      .addAppArgs("hello")
+      .startApplication(listener);
+
+    waitFor(handle);
+    assertEquals(SparkAppHandle.State.FINISHED, handle.getState());
+
+    // Matches the behavior of LocalSchedulerBackend.
+    List<SparkAppHandle.State> expected = Arrays.asList(
+      SparkAppHandle.State.CONNECTED,
+      SparkAppHandle.State.RUNNING,
+      SparkAppHandle.State.FINISHED);
+    assertEquals(expected, transitions);
+  }
+
   public static class SparkLauncherTestApp {
 
     public static void main(String[] args) throws Exception {
@@ -134,4 +174,14 @@ public class SparkLauncherSuite {
 
   }
 
+  public static class InProcessTestApp {
+
+    public static void main(String[] args) throws Exception {
+      assertNotEquals(0, args.length);
+      assertEquals(args[0], "hello");
+      new SparkContext().stop();
+    }
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfcd7466/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java 
b/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java
new file mode 100644
index 0000000..df1e731
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+abstract class AbstractAppHandle implements SparkAppHandle {
+
+  private static final Logger LOG = 
Logger.getLogger(ChildProcAppHandle.class.getName());
+
+  private final LauncherServer server;
+
+  private LauncherConnection connection;
+  private List<Listener> listeners;
+  private State state;
+  private String appId;
+  private boolean disposed;
+
+  protected AbstractAppHandle(LauncherServer server) {
+    this.server = server;
+    this.state = State.UNKNOWN;
+  }
+
+  @Override
+  public synchronized void addListener(Listener l) {
+    if (listeners == null) {
+      listeners = new ArrayList<>();
+    }
+    listeners.add(l);
+  }
+
+  @Override
+  public State getState() {
+    return state;
+  }
+
+  @Override
+  public String getAppId() {
+    return appId;
+  }
+
+  @Override
+  public void stop() {
+    CommandBuilderUtils.checkState(connection != null, "Application is still 
not connected.");
+    try {
+      connection.send(new LauncherProtocol.Stop());
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  @Override
+  public synchronized void disconnect() {
+    if (!disposed) {
+      disposed = true;
+      if (connection != null) {
+        try {
+          connection.close();
+        } catch (IOException ioe) {
+          // no-op.
+        }
+      }
+      server.unregister(this);
+    }
+  }
+
+  void setConnection(LauncherConnection connection) {
+    this.connection = connection;
+  }
+
+  LauncherConnection getConnection() {
+    return connection;
+  }
+
+  boolean isDisposed() {
+    return disposed;
+  }
+
+  void setState(State s) {
+    setState(s, false);
+  }
+
+  synchronized void setState(State s, boolean force) {
+    if (force || !state.isFinal()) {
+      state = s;
+      fireEvent(false);
+    } else {
+      LOG.log(Level.WARNING, "Backend requested transition from final state 
{0} to {1}.",
+        new Object[] { state, s });
+    }
+  }
+
+  synchronized void setAppId(String appId) {
+    this.appId = appId;
+    fireEvent(true);
+  }
+
+  private void fireEvent(boolean isInfoChanged) {
+    if (listeners != null) {
+      for (Listener l : listeners) {
+        if (isInfoChanged) {
+          l.infoChanged(this);
+        } else {
+          l.stateChanged(this);
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cfcd7466/launcher/src/main/java/org/apache/spark/launcher/AbstractLauncher.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/AbstractLauncher.java 
b/launcher/src/main/java/org/apache/spark/launcher/AbstractLauncher.java
new file mode 100644
index 0000000..44e69fc
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractLauncher.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+/**
+ * Base class for launcher implementations.
+ *
+ * @since Spark 2.3.0
+ */
+public abstract class AbstractLauncher<T extends AbstractLauncher> {
+
+  final SparkSubmitCommandBuilder builder;
+
+  AbstractLauncher() {
+    this.builder = new SparkSubmitCommandBuilder();
+  }
+
+  /**
+   * Set a custom properties file with Spark configuration for the application.
+   *
+   * @param path Path to custom properties file to use.
+   * @return This launcher.
+   */
+  public T setPropertiesFile(String path) {
+    checkNotNull(path, "path");
+    builder.setPropertiesFile(path);
+    return self();
+  }
+
+  /**
+   * Set a single configuration value for the application.
+   *
+   * @param key Configuration key.
+   * @param value The value to use.
+   * @return This launcher.
+   */
+  public T setConf(String key, String value) {
+    checkNotNull(key, "key");
+    checkNotNull(value, "value");
+    checkArgument(key.startsWith("spark."), "'key' must start with 'spark.'");
+    builder.conf.put(key, value);
+    return self();
+  }
+
+  /**
+   * Set the application name.
+   *
+   * @param appName Application name.
+   * @return This launcher.
+   */
+  public T setAppName(String appName) {
+    checkNotNull(appName, "appName");
+    builder.appName = appName;
+    return self();
+  }
+
+  /**
+   * Set the Spark master for the application.
+   *
+   * @param master Spark master.
+   * @return This launcher.
+   */
+  public T setMaster(String master) {
+    checkNotNull(master, "master");
+    builder.master = master;
+    return self();
+  }
+
+  /**
+   * Set the deploy mode for the application.
+   *
+   * @param mode Deploy mode.
+   * @return This launcher.
+   */
+  public T setDeployMode(String mode) {
+    checkNotNull(mode, "mode");
+    builder.deployMode = mode;
+    return self();
+  }
+
+  /**
+   * Set the main application resource. This should be the location of a jar 
file for Scala/Java
+   * applications, or a python script for PySpark applications.
+   *
+   * @param resource Path to the main application resource.
+   * @return This launcher.
+   */
+  public T setAppResource(String resource) {
+    checkNotNull(resource, "resource");
+    builder.appResource = resource;
+    return self();
+  }
+
+  /**
+   * Sets the application class name for Java/Scala applications.
+   *
+   * @param mainClass Application's main class.
+   * @return This launcher.
+   */
+  public T setMainClass(String mainClass) {
+    checkNotNull(mainClass, "mainClass");
+    builder.mainClass = mainClass;
+    return self();
+  }
+
+  /**
+   * Adds a no-value argument to the Spark invocation. If the argument is 
known, this method
+   * validates whether the argument is indeed a no-value argument, and throws 
an exception
+   * otherwise.
+   * <p>
+   * Use this method with caution. It is possible to create an invalid Spark 
command by passing
+   * unknown arguments to this method, since those are allowed for forward 
compatibility.
+   *
+   * @since 1.5.0
+   * @param arg Argument to add.
+   * @return This launcher.
+   */
+  public T addSparkArg(String arg) {
+    SparkSubmitOptionParser validator = new ArgumentValidator(false);
+    validator.parse(Arrays.asList(arg));
+    builder.sparkArgs.add(arg);
+    return self();
+  }
+
+  /**
+   * Adds an argument with a value to the Spark invocation. If the argument 
name corresponds to
+   * a known argument, the code validates that the argument actually expects a 
value, and throws
+   * an exception otherwise.
+   * <p>
+   * It is safe to add arguments modified by other methods in this class (such 
as
+   * {@link #setMaster(String)} - the last invocation will be the one to take 
effect.
+   * <p>
+   * Use this method with caution. It is possible to create an invalid Spark 
command by passing
+   * unknown arguments to this method, since those are allowed for forward 
compatibility.
+   *
+   * @since 1.5.0
+   * @param name Name of argument to add.
+   * @param value Value of the argument.
+   * @return This launcher.
+   */
+  public T addSparkArg(String name, String value) {
+    SparkSubmitOptionParser validator = new ArgumentValidator(true);
+    if (validator.MASTER.equals(name)) {
+      setMaster(value);
+    } else if (validator.PROPERTIES_FILE.equals(name)) {
+      setPropertiesFile(value);
+    } else if (validator.CONF.equals(name)) {
+      String[] vals = value.split("=", 2);
+      setConf(vals[0], vals[1]);
+    } else if (validator.CLASS.equals(name)) {
+      setMainClass(value);
+    } else if (validator.JARS.equals(name)) {
+      builder.jars.clear();
+      for (String jar : value.split(",")) {
+        addJar(jar);
+      }
+    } else if (validator.FILES.equals(name)) {
+      builder.files.clear();
+      for (String file : value.split(",")) {
+        addFile(file);
+      }
+    } else if (validator.PY_FILES.equals(name)) {
+      builder.pyFiles.clear();
+      for (String file : value.split(",")) {
+        addPyFile(file);
+      }
+    } else {
+      validator.parse(Arrays.asList(name, value));
+      builder.sparkArgs.add(name);
+      builder.sparkArgs.add(value);
+    }
+    return self();
+  }
+
+  /**
+   * Adds command line arguments for the application.
+   *
+   * @param args Arguments to pass to the application's main class.
+   * @return This launcher.
+   */
+  public T addAppArgs(String... args) {
+    for (String arg : args) {
+      checkNotNull(arg, "arg");
+      builder.appArgs.add(arg);
+    }
+    return self();
+  }
+
+  /**
+   * Adds a jar file to be submitted with the application.
+   *
+   * @param jar Path to the jar file.
+   * @return This launcher.
+   */
+  public T addJar(String jar) {
+    checkNotNull(jar, "jar");
+    builder.jars.add(jar);
+    return self();
+  }
+
+  /**
+   * Adds a file to be submitted with the application.
+   *
+   * @param file Path to the file.
+   * @return This launcher.
+   */
+  public T addFile(String file) {
+    checkNotNull(file, "file");
+    builder.files.add(file);
+    return self();
+  }
+
+  /**
+   * Adds a python file / zip / egg to be submitted with the application.
+   *
+   * @param file Path to the file.
+   * @return This launcher.
+   */
+  public T addPyFile(String file) {
+    checkNotNull(file, "file");
+    builder.pyFiles.add(file);
+    return self();
+  }
+
+  /**
+   * Enables verbose reporting for SparkSubmit.
+   *
+   * @param verbose Whether to enable verbose output.
+   * @return This launcher.
+   */
+  public T setVerbose(boolean verbose) {
+    builder.verbose = verbose;
+    return self();
+  }
+
+  /**
+   * Starts a Spark application.
+   *
+   * <p>
+   * This method returns a handle that provides information about the running 
application and can
+   * be used to do basic interaction with it.
+   * <p>
+   * The returned handle assumes that the application will instantiate a 
single SparkContext
+   * during its lifetime. Once that context reports a final state (one that 
indicates the
+   * SparkContext has stopped), the handle will not perform new state 
transitions, so anything
+   * that happens after that cannot be monitored. If the underlying 
application is launched as
+   * a child process, {@link SparkAppHandle#kill()} can still be used to kill 
the child process.
+   *
+   * @since 1.6.0
+   * @param listeners Listeners to add to the handle before the app is 
launched.
+   * @return A handle for the launched application.
+   */
+  public abstract SparkAppHandle startApplication(SparkAppHandle.Listener... 
listeners)
+    throws IOException;
+
+  abstract T self();
+
+  private static class ArgumentValidator extends SparkSubmitOptionParser {
+
+    private final boolean hasValue;
+
+    ArgumentValidator(boolean hasValue) {
+      this.hasValue = hasValue;
+    }
+
+    @Override
+    protected boolean handle(String opt, String value) {
+      if (value == null && hasValue) {
+        throw new IllegalArgumentException(String.format("'%s' expects a 
value.", opt));
+      }
+      return true;
+    }
+
+    @Override
+    protected boolean handleUnknown(String opt) {
+      // Do not fail on unknown arguments, to support future arguments added 
to SparkSubmit.
+      return true;
+    }
+
+    protected void handleExtraArgs(List<String> extra) {
+      // No op.
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cfcd7466/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java 
b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
index 5391d4a..3bb7e12 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
@@ -17,77 +17,29 @@
 
 package org.apache.spark.launcher;
 
-import java.io.IOException;
 import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 /**
  * Handle implementation for monitoring apps started as a child process.
  */
-class ChildProcAppHandle implements SparkAppHandle {
+class ChildProcAppHandle extends AbstractAppHandle {
 
   private static final Logger LOG = 
Logger.getLogger(ChildProcAppHandle.class.getName());
 
-  private final String secret;
-  private final LauncherServer server;
-
   private volatile Process childProc;
-  private boolean disposed;
-  private LauncherConnection connection;
-  private List<Listener> listeners;
-  private State state;
-  private String appId;
   private OutputRedirector redirector;
 
-  ChildProcAppHandle(String secret, LauncherServer server) {
-    this.secret = secret;
-    this.server = server;
-    this.state = State.UNKNOWN;
-  }
-
-  @Override
-  public synchronized void addListener(Listener l) {
-    if (listeners == null) {
-      listeners = new ArrayList<>();
-    }
-    listeners.add(l);
-  }
-
-  @Override
-  public State getState() {
-    return state;
-  }
-
-  @Override
-  public String getAppId() {
-    return appId;
-  }
-
-  @Override
-  public void stop() {
-    CommandBuilderUtils.checkState(connection != null, "Application is still 
not connected.");
-    try {
-      connection.send(new LauncherProtocol.Stop());
-    } catch (IOException ioe) {
-      throw new RuntimeException(ioe);
-    }
+  ChildProcAppHandle(LauncherServer server) {
+    super(server);
   }
 
   @Override
   public synchronized void disconnect() {
-    if (!disposed) {
-      disposed = true;
-      if (connection != null) {
-        try {
-          connection.close();
-        } catch (IOException ioe) {
-          // no-op.
-        }
-      }
-      server.unregister(this);
+    try {
+      super.disconnect();
+    } finally {
       if (redirector != null) {
         redirector.stop();
       }
@@ -106,10 +58,6 @@ class ChildProcAppHandle implements SparkAppHandle {
     setState(State.KILLED);
   }
 
-  String getSecret() {
-    return secret;
-  }
-
   void setChildProc(Process childProc, String loggerName, InputStream 
logStream) {
     this.childProc = childProc;
     if (logStream != null) {
@@ -118,39 +66,10 @@ class ChildProcAppHandle implements SparkAppHandle {
     } else {
       // If there is no log redirection, spawn a thread that will wait for the 
child process
       // to finish.
-      Thread waiter = 
SparkLauncher.REDIRECTOR_FACTORY.newThread(this::monitorChild);
-      waiter.setDaemon(true);
-      waiter.start();
+      SparkLauncher.REDIRECTOR_FACTORY.newThread(this::monitorChild).start();
     }
   }
 
-  void setConnection(LauncherConnection connection) {
-    this.connection = connection;
-  }
-
-  LauncherServer getServer() {
-    return server;
-  }
-
-  LauncherConnection getConnection() {
-    return connection;
-  }
-
-  synchronized void setState(State s) {
-    if (!state.isFinal()) {
-      state = s;
-      fireEvent(false);
-    } else {
-      LOG.log(Level.WARNING, "Backend requested transition from final state 
{0} to {1}.",
-        new Object[] { state, s });
-    }
-  }
-
-  synchronized void setAppId(String appId) {
-    this.appId = appId;
-    fireEvent(true);
-  }
-
   /**
    * Wait for the child process to exit and update the handle's state if 
necessary, accoding to
    * the exit code.
@@ -171,7 +90,7 @@ class ChildProcAppHandle implements SparkAppHandle {
     }
 
     synchronized (this) {
-      if (disposed) {
+      if (isDisposed()) {
         return;
       }
 
@@ -185,31 +104,19 @@ class ChildProcAppHandle implements SparkAppHandle {
         ec = 1;
       }
 
+      State currState = getState();
       State newState = null;
       if (ec != 0) {
         // Override state with failure if the current state is not final, or 
is success.
-        if (!state.isFinal() || state == State.FINISHED) {
+        if (!currState.isFinal() || currState == State.FINISHED) {
           newState = State.FAILED;
         }
-      } else if (!state.isFinal()) {
+      } else if (!currState.isFinal()) {
         newState = State.LOST;
       }
 
       if (newState != null) {
-        state = newState;
-        fireEvent(false);
-      }
-    }
-  }
-
-  private void fireEvent(boolean isInfoChanged) {
-    if (listeners != null) {
-      for (Listener l : listeners) {
-        if (isInfoChanged) {
-          l.infoChanged(this);
-        } else {
-          l.stateChanged(this);
-        }
+        setState(newState, true);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfcd7466/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java 
b/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
new file mode 100644
index 0000000..0d6a73a
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+class InProcessAppHandle extends AbstractAppHandle {
+
+  private static final String THREAD_NAME_FMT = "spark-app-%d: '%s'";
+  private static final Logger LOG = 
Logger.getLogger(ChildProcAppHandle.class.getName());
+  private static final AtomicLong THREAD_IDS = new AtomicLong();
+
+  // Avoid really long thread names.
+  private static final int MAX_APP_NAME_LEN = 16;
+
+  private Thread app;
+
+  InProcessAppHandle(LauncherServer server) {
+    super(server);
+  }
+
+  @Override
+  public synchronized void kill() {
+    LOG.warning("kill() may leave the underlying app running in in-process 
mode.");
+    disconnect();
+
+    // Interrupt the thread. This is not guaranteed to kill the app, though.
+    if (app != null) {
+      app.interrupt();
+    }
+
+    setState(State.KILLED);
+  }
+
+  synchronized void start(String appName, Method main, String[] args) {
+    CommandBuilderUtils.checkState(app == null, "Handle already started.");
+
+    if (appName.length() > MAX_APP_NAME_LEN) {
+      appName = "..." + appName.substring(appName.length() - MAX_APP_NAME_LEN);
+    }
+
+    app = new Thread(() -> {
+      try {
+        main.invoke(null, (Object) args);
+      } catch (Throwable t) {
+        LOG.log(Level.WARNING, "Application failed with exception.", t);
+        setState(State.FAILED);
+      }
+
+      synchronized (InProcessAppHandle.this) {
+        if (!isDisposed()) {
+          disconnect();
+          if (!getState().isFinal()) {
+            setState(State.LOST, true);
+          }
+        }
+      }
+    });
+
+    app.setName(String.format(THREAD_NAME_FMT, THREAD_IDS.incrementAndGet(), 
appName));
+    app.start();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cfcd7466/launcher/src/main/java/org/apache/spark/launcher/InProcessLauncher.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/InProcessLauncher.java 
b/launcher/src/main/java/org/apache/spark/launcher/InProcessLauncher.java
new file mode 100644
index 0000000..6d726b4
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/InProcessLauncher.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.List;
+import java.util.logging.Logger;
+
+/**
+ * In-process launcher for Spark applications.
+ * <p>
+ * Use this class to start Spark applications programmatically. Applications 
launched using this
+ * class will run in the same process as the caller.
+ * <p>
+ * Because Spark only supports a single active instance of 
<code>SparkContext</code> per JVM, code
+ * that uses this class should be careful about which applications are 
launched. It's recommended
+ * that this launcher only be used to launch applications in cluster mode.
+ * <p>
+ * Also note that, when running applications in client mode, JVM-related 
configurations (like
+ * driver memory or configs which modify the driver's class path) do not take 
effect. Logging
+ * configuration is also inherited from the parent application.
+ *
+ * @since Spark 2.3.0
+ */
+public class InProcessLauncher extends AbstractLauncher<InProcessLauncher> {
+
+  private static final Logger LOG = 
Logger.getLogger(InProcessLauncher.class.getName());
+
+  /**
+   * Starts a Spark application.
+   *
+   * @see AbstractLauncher#startApplication(SparkAppHandle.Listener...)
+   * @param listeners Listeners to add to the handle before the app is 
launched.
+   * @return A handle for the launched application.
+   */
+  @Override
+  public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) 
throws IOException {
+    if (builder.isClientMode(builder.getEffectiveConfig())) {
+      LOG.warning("It's not recommended to run client-mode applications using 
InProcessLauncher.");
+    }
+
+    Method main = findSparkSubmit();
+    LauncherServer server = LauncherServer.getOrCreateServer();
+    InProcessAppHandle handle = new InProcessAppHandle(server);
+    for (SparkAppHandle.Listener l : listeners) {
+      handle.addListener(l);
+    }
+
+    String secret = server.registerHandle(handle);
+    setConf(LauncherProtocol.CONF_LAUNCHER_PORT, 
String.valueOf(server.getPort()));
+    setConf(LauncherProtocol.CONF_LAUNCHER_SECRET, secret);
+
+    List<String> sparkArgs = builder.buildSparkSubmitArgs();
+    String[] argv = sparkArgs.toArray(new String[sparkArgs.size()]);
+
+    String appName = CommandBuilderUtils.firstNonEmpty(builder.appName, 
builder.mainClass,
+      "<unknown>");
+    handle.start(appName, main, argv);
+    return handle;
+  }
+
+  @Override
+  InProcessLauncher self() {
+    return this;
+  }
+
+  // Visible for testing.
+  Method findSparkSubmit() throws IOException {
+    ClassLoader cl = Thread.currentThread().getContextClassLoader();
+    if (cl == null) {
+      cl = getClass().getClassLoader();
+    }
+
+    Class<?> sparkSubmit;
+    try {
+      sparkSubmit = cl.loadClass("org.apache.spark.deploy.SparkSubmit");
+    } catch (Exception e) {
+      throw new IOException("Cannot find SparkSubmit; make sure necessary jars 
are available.", e);
+    }
+
+    Method main;
+    try {
+      main = sparkSubmit.getMethod("main", String[].class);
+    } catch (Exception e) {
+      throw new IOException("Cannot find SparkSubmit main method.", e);
+    }
+
+    CommandBuilderUtils.checkState(Modifier.isStatic(main.getModifiers()),
+      "main method is not static.");
+    return main;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cfcd7466/launcher/src/main/java/org/apache/spark/launcher/LauncherProtocol.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/LauncherProtocol.java 
b/launcher/src/main/java/org/apache/spark/launcher/LauncherProtocol.java
index 042f11c..e1eadb9 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/LauncherProtocol.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherProtocol.java
@@ -32,6 +32,12 @@ final class LauncherProtocol {
   /** Environment variable where the secret for connecting back to the server 
is stored. */
   static final String ENV_LAUNCHER_SECRET = "_SPARK_LAUNCHER_SECRET";
 
+  /** Spark conf key used to propagate the server port for in-process 
launches. */
+  static final String CONF_LAUNCHER_PORT = "spark.launcher.port";
+
+  /** Spark conf key used to propagate the app secret for in-process launches. 
*/
+  static final String CONF_LAUNCHER_SECRET = "spark.launcher.secret";
+
   static class Message implements Serializable {
 
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfcd7466/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java 
b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
index 4353e3f..b8999a1 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
@@ -26,6 +26,7 @@ import java.net.Socket;
 import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
@@ -88,31 +89,25 @@ class LauncherServer implements Closeable {
 
   private static volatile LauncherServer serverInstance;
 
-  /**
-   * Creates a handle for an app to be launched. This method will start a 
server if one hasn't been
-   * started yet. The server is shared for multiple handles, and once all 
handles are disposed of,
-   * the server is shut down.
-   */
-  static synchronized ChildProcAppHandle newAppHandle() throws IOException {
-    LauncherServer server = serverInstance != null ? serverInstance : new 
LauncherServer();
+  static synchronized LauncherServer getOrCreateServer() throws IOException {
+    LauncherServer server;
+    do {
+      server = serverInstance != null ? serverInstance : new LauncherServer();
+    } while (!server.running);
+
     server.ref();
     serverInstance = server;
-
-    String secret = server.createSecret();
-    while (server.pending.containsKey(secret)) {
-      secret = server.createSecret();
-    }
-
-    return server.newAppHandle(secret);
+    return server;
   }
 
-  static LauncherServer getServerInstance() {
+  // For testing.
+  static synchronized LauncherServer getServer() {
     return serverInstance;
   }
 
   private final AtomicLong refCount;
   private final AtomicLong threadIds;
-  private final ConcurrentMap<String, ChildProcAppHandle> pending;
+  private final ConcurrentMap<String, AbstractAppHandle> secretToPendingApps;
   private final List<ServerConnection> clients;
   private final ServerSocket server;
   private final Thread serverThread;
@@ -132,7 +127,7 @@ class LauncherServer implements Closeable {
       this.clients = new ArrayList<>();
       this.threadIds = new AtomicLong();
       this.factory = new NamedThreadFactory(THREAD_NAME_FMT);
-      this.pending = new ConcurrentHashMap<>();
+      this.secretToPendingApps = new ConcurrentHashMap<>();
       this.timeoutTimer = new Timer("LauncherServer-TimeoutTimer", true);
       this.server = server;
       this.running = true;
@@ -149,32 +144,38 @@ class LauncherServer implements Closeable {
   }
 
   /**
-   * Creates a new app handle. The handle will wait for an incoming connection 
for a configurable
-   * amount of time, and if one doesn't arrive, it will transition to an error 
state.
+   * Registers a handle with the server, and returns the secret the child app 
needs to connect
+   * back.
    */
-  ChildProcAppHandle newAppHandle(String secret) {
-    ChildProcAppHandle handle = new ChildProcAppHandle(secret, this);
-    ChildProcAppHandle existing = pending.putIfAbsent(secret, handle);
-    CommandBuilderUtils.checkState(existing == null, "Multiple handles with 
the same secret.");
-    return handle;
+  synchronized String registerHandle(AbstractAppHandle handle) {
+    String secret = createSecret();
+    secretToPendingApps.put(secret, handle);
+    return secret;
   }
 
   @Override
   public void close() throws IOException {
     synchronized (this) {
-      if (running) {
-        running = false;
-        timeoutTimer.cancel();
-        server.close();
-        synchronized (clients) {
-          List<ServerConnection> copy = new ArrayList<>(clients);
-          clients.clear();
-          for (ServerConnection client : copy) {
-            client.close();
-          }
-        }
+      if (!running) {
+        return;
+      }
+      running = false;
+    }
+
+    synchronized(LauncherServer.class) {
+      serverInstance = null;
+    }
+
+    timeoutTimer.cancel();
+    server.close();
+    synchronized (clients) {
+      List<ServerConnection> copy = new ArrayList<>(clients);
+      clients.clear();
+      for (ServerConnection client : copy) {
+        client.close();
       }
     }
+
     if (serverThread != null) {
       try {
         serverThread.join();
@@ -195,8 +196,6 @@ class LauncherServer implements Closeable {
           close();
         } catch (IOException ioe) {
           // no-op.
-        } finally {
-          serverInstance = null;
         }
       }
     }
@@ -210,8 +209,14 @@ class LauncherServer implements Closeable {
    * Removes the client handle from the pending list (in case it's still 
there), and unrefs
    * the server.
    */
-  void unregister(ChildProcAppHandle handle) {
-    pending.remove(handle.getSecret());
+  void unregister(AbstractAppHandle handle) {
+    for (Map.Entry<String, AbstractAppHandle> e : 
secretToPendingApps.entrySet()) {
+      if (e.getValue().equals(handle)) {
+        String secret = e.getKey();
+        secretToPendingApps.remove(secret);
+        break;
+      }
+    }
     unref();
   }
 
@@ -260,24 +265,30 @@ class LauncherServer implements Closeable {
   }
 
   private String createSecret() {
-    byte[] secret = new byte[128];
-    RND.nextBytes(secret);
-
-    StringBuilder sb = new StringBuilder();
-    for (byte b : secret) {
-      int ival = b >= 0 ? b : Byte.MAX_VALUE - b;
-      if (ival < 0x10) {
-        sb.append("0");
+    while (true) {
+      byte[] secret = new byte[128];
+      RND.nextBytes(secret);
+
+      StringBuilder sb = new StringBuilder();
+      for (byte b : secret) {
+        int ival = b >= 0 ? b : Byte.MAX_VALUE - b;
+        if (ival < 0x10) {
+          sb.append("0");
+        }
+        sb.append(Integer.toHexString(ival));
+      }
+
+      String secretStr = sb.toString();
+      if (!secretToPendingApps.containsKey(secretStr)) {
+        return secretStr;
       }
-      sb.append(Integer.toHexString(ival));
     }
-    return sb.toString();
   }
 
   private class ServerConnection extends LauncherConnection {
 
     private TimerTask timeout;
-    private ChildProcAppHandle handle;
+    private AbstractAppHandle handle;
 
     ServerConnection(Socket socket, TimerTask timeout) throws IOException {
       super(socket);
@@ -291,7 +302,7 @@ class LauncherServer implements Closeable {
           timeout.cancel();
           timeout = null;
           Hello hello = (Hello) msg;
-          ChildProcAppHandle handle = pending.remove(hello.secret);
+          AbstractAppHandle handle = secretToPendingApps.remove(hello.secret);
           if (handle != null) {
             handle.setConnection(this);
             handle.setState(SparkAppHandle.State.CONNECTED);

http://git-wip-us.apache.org/repos/asf/spark/blob/cfcd7466/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java 
b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
index 75b8ef5..f86d400 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
@@ -21,7 +21,6 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -37,7 +36,7 @@ import static org.apache.spark.launcher.CommandBuilderUtils.*;
  * to allow clients to configure the Spark application and launch it as a 
child process.
  * </p>
  */
-public class SparkLauncher {
+public class SparkLauncher extends AbstractLauncher<SparkLauncher> {
 
   /** The Spark master. */
   public static final String SPARK_MASTER = "spark.master";
@@ -109,7 +108,6 @@ public class SparkLauncher {
   }
 
   // Visible for testing.
-  final SparkSubmitCommandBuilder builder;
   File workingDir;
   boolean redirectErrorStream;
   ProcessBuilder.Redirect errorStream;
@@ -125,7 +123,6 @@ public class SparkLauncher {
    * @param env Environment variables to set.
    */
   public SparkLauncher(Map<String, String> env) {
-    this.builder = new SparkSubmitCommandBuilder();
     if (env != null) {
       this.builder.childEnv.putAll(env);
     }
@@ -156,297 +153,152 @@ public class SparkLauncher {
   }
 
   /**
-   * Set a custom properties file with Spark configuration for the application.
+   * Sets the working directory of spark-submit.
    *
-   * @param path Path to custom properties file to use.
+   * @param dir The directory to set as spark-submit's working directory.
    * @return This launcher.
    */
-  public SparkLauncher setPropertiesFile(String path) {
-    checkNotNull(path, "path");
-    builder.setPropertiesFile(path);
+  public SparkLauncher directory(File dir) {
+    workingDir = dir;
     return this;
   }
 
   /**
-   * Set a single configuration value for the application.
+   * Specifies that stderr in spark-submit should be redirected to stdout.
    *
-   * @param key Configuration key.
-   * @param value The value to use.
    * @return This launcher.
    */
-  public SparkLauncher setConf(String key, String value) {
-    checkNotNull(key, "key");
-    checkNotNull(value, "value");
-    checkArgument(key.startsWith("spark."), "'key' must start with 'spark.'");
-    builder.conf.put(key, value);
+  public SparkLauncher redirectError() {
+    redirectErrorStream = true;
     return this;
   }
 
   /**
-   * Set the application name.
+   * Redirects error output to the specified Redirect.
    *
-   * @param appName Application name.
+   * @param to The method of redirection.
    * @return This launcher.
    */
-  public SparkLauncher setAppName(String appName) {
-    checkNotNull(appName, "appName");
-    builder.appName = appName;
+  public SparkLauncher redirectError(ProcessBuilder.Redirect to) {
+    errorStream = to;
     return this;
   }
 
   /**
-   * Set the Spark master for the application.
+   * Redirects standard output to the specified Redirect.
    *
-   * @param master Spark master.
+   * @param to The method of redirection.
    * @return This launcher.
    */
-  public SparkLauncher setMaster(String master) {
-    checkNotNull(master, "master");
-    builder.master = master;
+  public SparkLauncher redirectOutput(ProcessBuilder.Redirect to) {
+    outputStream = to;
     return this;
   }
 
   /**
-   * Set the deploy mode for the application.
+   * Redirects error output to the specified File.
    *
-   * @param mode Deploy mode.
+   * @param errFile The file to which stderr is written.
    * @return This launcher.
    */
-  public SparkLauncher setDeployMode(String mode) {
-    checkNotNull(mode, "mode");
-    builder.deployMode = mode;
+  public SparkLauncher redirectError(File errFile) {
+    errorStream = ProcessBuilder.Redirect.to(errFile);
     return this;
   }
 
   /**
-   * Set the main application resource. This should be the location of a jar 
file for Scala/Java
-   * applications, or a python script for PySpark applications.
+   * Redirects error output to the specified File.
    *
-   * @param resource Path to the main application resource.
+   * @param outFile The file to which stdout is written.
    * @return This launcher.
    */
-  public SparkLauncher setAppResource(String resource) {
-    checkNotNull(resource, "resource");
-    builder.appResource = resource;
+  public SparkLauncher redirectOutput(File outFile) {
+    outputStream = ProcessBuilder.Redirect.to(outFile);
     return this;
   }
 
   /**
-   * Sets the application class name for Java/Scala applications.
+   * Sets all output to be logged and redirected to a logger with the 
specified name.
    *
-   * @param mainClass Application's main class.
+   * @param loggerName The name of the logger to log stdout and stderr.
    * @return This launcher.
    */
-  public SparkLauncher setMainClass(String mainClass) {
-    checkNotNull(mainClass, "mainClass");
-    builder.mainClass = mainClass;
+  public SparkLauncher redirectToLog(String loggerName) {
+    setConf(CHILD_PROCESS_LOGGER_NAME, loggerName);
     return this;
   }
 
-  /**
-   * Adds a no-value argument to the Spark invocation. If the argument is 
known, this method
-   * validates whether the argument is indeed a no-value argument, and throws 
an exception
-   * otherwise.
-   * <p>
-   * Use this method with caution. It is possible to create an invalid Spark 
command by passing
-   * unknown arguments to this method, since those are allowed for forward 
compatibility.
-   *
-   * @since 1.5.0
-   * @param arg Argument to add.
-   * @return This launcher.
-   */
-  public SparkLauncher addSparkArg(String arg) {
-    SparkSubmitOptionParser validator = new ArgumentValidator(false);
-    validator.parse(Arrays.asList(arg));
-    builder.sparkArgs.add(arg);
-    return this;
+  // The following methods just delegate to the parent class, but they are 
needed to keep
+  // binary compatibility with previous versions of this class.
+
+  @Override
+  public SparkLauncher setPropertiesFile(String path) {
+    return super.setPropertiesFile(path);
   }
 
-  /**
-   * Adds an argument with a value to the Spark invocation. If the argument 
name corresponds to
-   * a known argument, the code validates that the argument actually expects a 
value, and throws
-   * an exception otherwise.
-   * <p>
-   * It is safe to add arguments modified by other methods in this class (such 
as
-   * {@link #setMaster(String)} - the last invocation will be the one to take 
effect.
-   * <p>
-   * Use this method with caution. It is possible to create an invalid Spark 
command by passing
-   * unknown arguments to this method, since those are allowed for forward 
compatibility.
-   *
-   * @since 1.5.0
-   * @param name Name of argument to add.
-   * @param value Value of the argument.
-   * @return This launcher.
-   */
-  public SparkLauncher addSparkArg(String name, String value) {
-    SparkSubmitOptionParser validator = new ArgumentValidator(true);
-    if (validator.MASTER.equals(name)) {
-      setMaster(value);
-    } else if (validator.PROPERTIES_FILE.equals(name)) {
-      setPropertiesFile(value);
-    } else if (validator.CONF.equals(name)) {
-      String[] vals = value.split("=", 2);
-      setConf(vals[0], vals[1]);
-    } else if (validator.CLASS.equals(name)) {
-      setMainClass(value);
-    } else if (validator.JARS.equals(name)) {
-      builder.jars.clear();
-      for (String jar : value.split(",")) {
-        addJar(jar);
-      }
-    } else if (validator.FILES.equals(name)) {
-      builder.files.clear();
-      for (String file : value.split(",")) {
-        addFile(file);
-      }
-    } else if (validator.PY_FILES.equals(name)) {
-      builder.pyFiles.clear();
-      for (String file : value.split(",")) {
-        addPyFile(file);
-      }
-    } else {
-      validator.parse(Arrays.asList(name, value));
-      builder.sparkArgs.add(name);
-      builder.sparkArgs.add(value);
-    }
-    return this;
+  @Override
+  public SparkLauncher setConf(String key, String value) {
+    return super.setConf(key, value);
   }
 
-  /**
-   * Adds command line arguments for the application.
-   *
-   * @param args Arguments to pass to the application's main class.
-   * @return This launcher.
-   */
-  public SparkLauncher addAppArgs(String... args) {
-    for (String arg : args) {
-      checkNotNull(arg, "arg");
-      builder.appArgs.add(arg);
-    }
-    return this;
+  @Override
+  public SparkLauncher setAppName(String appName) {
+    return super.setAppName(appName);
   }
 
-  /**
-   * Adds a jar file to be submitted with the application.
-   *
-   * @param jar Path to the jar file.
-   * @return This launcher.
-   */
-  public SparkLauncher addJar(String jar) {
-    checkNotNull(jar, "jar");
-    builder.jars.add(jar);
-    return this;
+  @Override
+  public SparkLauncher setMaster(String master) {
+    return super.setMaster(master);
   }
 
-  /**
-   * Adds a file to be submitted with the application.
-   *
-   * @param file Path to the file.
-   * @return This launcher.
-   */
-  public SparkLauncher addFile(String file) {
-    checkNotNull(file, "file");
-    builder.files.add(file);
-    return this;
+  @Override
+  public SparkLauncher setDeployMode(String mode) {
+    return super.setDeployMode(mode);
   }
 
-  /**
-   * Adds a python file / zip / egg to be submitted with the application.
-   *
-   * @param file Path to the file.
-   * @return This launcher.
-   */
-  public SparkLauncher addPyFile(String file) {
-    checkNotNull(file, "file");
-    builder.pyFiles.add(file);
-    return this;
+  @Override
+  public SparkLauncher setAppResource(String resource) {
+    return super.setAppResource(resource);
   }
 
-  /**
-   * Enables verbose reporting for SparkSubmit.
-   *
-   * @param verbose Whether to enable verbose output.
-   * @return This launcher.
-   */
-  public SparkLauncher setVerbose(boolean verbose) {
-    builder.verbose = verbose;
-    return this;
+  @Override
+  public SparkLauncher setMainClass(String mainClass) {
+    return super.setMainClass(mainClass);
   }
 
-  /**
-   * Sets the working directory of spark-submit.
-   *
-   * @param dir The directory to set as spark-submit's working directory.
-   * @return This launcher.
-   */
-  public SparkLauncher directory(File dir) {
-    workingDir = dir;
-    return this;
+  @Override
+  public SparkLauncher addSparkArg(String arg) {
+    return super.addSparkArg(arg);
   }
 
-  /**
-   * Specifies that stderr in spark-submit should be redirected to stdout.
-   *
-   * @return This launcher.
-   */
-  public SparkLauncher redirectError() {
-    redirectErrorStream = true;
-    return this;
+  @Override
+  public SparkLauncher addSparkArg(String name, String value) {
+    return super.addSparkArg(name, value);
   }
 
-  /**
-   * Redirects error output to the specified Redirect.
-   *
-   * @param to The method of redirection.
-   * @return This launcher.
-   */
-  public SparkLauncher redirectError(ProcessBuilder.Redirect to) {
-    errorStream = to;
-    return this;
+  @Override
+  public SparkLauncher addAppArgs(String... args) {
+    return super.addAppArgs(args);
   }
 
-  /**
-   * Redirects standard output to the specified Redirect.
-   *
-   * @param to The method of redirection.
-   * @return This launcher.
-   */
-  public SparkLauncher redirectOutput(ProcessBuilder.Redirect to) {
-    outputStream = to;
-    return this;
+  @Override
+  public SparkLauncher addJar(String jar) {
+    return super.addJar(jar);
   }
 
-  /**
-   * Redirects error output to the specified File.
-   *
-   * @param errFile The file to which stderr is written.
-   * @return This launcher.
-   */
-  public SparkLauncher redirectError(File errFile) {
-    errorStream = ProcessBuilder.Redirect.to(errFile);
-    return this;
+  @Override
+  public SparkLauncher addFile(String file) {
+    return super.addFile(file);
   }
 
-  /**
-   * Redirects error output to the specified File.
-   *
-   * @param outFile The file to which stdout is written.
-   * @return This launcher.
-   */
-  public SparkLauncher redirectOutput(File outFile) {
-    outputStream = ProcessBuilder.Redirect.to(outFile);
-    return this;
+  @Override
+  public SparkLauncher addPyFile(String file) {
+    return super.addPyFile(file);
   }
 
-  /**
-   * Sets all output to be logged and redirected to a logger with the 
specified name.
-   *
-   * @param loggerName The name of the logger to log stdout and stderr.
-   * @return This launcher.
-   */
-  public SparkLauncher redirectToLog(String loggerName) {
-    setConf(CHILD_PROCESS_LOGGER_NAME, loggerName);
-    return this;
+  @Override
+  public SparkLauncher setVerbose(boolean verbose) {
+    return super.setVerbose(verbose);
   }
 
   /**
@@ -479,17 +331,9 @@ public class SparkLauncher {
 
   /**
    * Starts a Spark application.
+   *
    * <p>
-   * This method returns a handle that provides information about the running 
application and can
-   * be used to do basic interaction with it.
-   * <p>
-   * The returned handle assumes that the application will instantiate a 
single SparkContext
-   * during its lifetime. Once that context reports a final state (one that 
indicates the
-   * SparkContext has stopped), the handle will not perform new state 
transitions, so anything
-   * that happens after that cannot be monitored. If the underlying 
application is launched as
-   * a child process, {@link SparkAppHandle#kill()} can still be used to kill 
the child process.
-   * <p>
-   * Currently, all applications are launched as child processes. The child's 
stdout and stderr
+   * Applications launched by this launcher run as child processes. The 
child's stdout and stderr
    * are merged and written to a logger (see <code>java.util.logging</code>) 
only if redirection
    * has not otherwise been configured on this <code>SparkLauncher</code>. The 
logger's name can be
    * defined by setting {@link #CHILD_PROCESS_LOGGER_NAME} in the app's 
configuration. If that
@@ -499,15 +343,20 @@ public class SparkLauncher {
    * easily into the configuration of commonly-used logging systems.
    *
    * @since 1.6.0
+   * @see AbstractLauncher#startApplication(SparkAppHandle.Listener...)
    * @param listeners Listeners to add to the handle before the app is 
launched.
    * @return A handle for the launched application.
    */
+  @Override
   public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) 
throws IOException {
-    ChildProcAppHandle handle = LauncherServer.newAppHandle();
+    LauncherServer server = LauncherServer.getOrCreateServer();
+    ChildProcAppHandle handle = new ChildProcAppHandle(server);
     for (SparkAppHandle.Listener l : listeners) {
       handle.addListener(l);
     }
 
+    String secret = server.registerHandle(handle);
+
     String loggerName = getLoggerName();
     ProcessBuilder pb = createBuilder();
 
@@ -540,9 +389,8 @@ public class SparkLauncher {
       pb.redirectErrorStream(true);
     }
 
-    pb.environment().put(LauncherProtocol.ENV_LAUNCHER_PORT,
-      String.valueOf(LauncherServer.getServerInstance().getPort()));
-    pb.environment().put(LauncherProtocol.ENV_LAUNCHER_SECRET, 
handle.getSecret());
+    pb.environment().put(LauncherProtocol.ENV_LAUNCHER_PORT, 
String.valueOf(server.getPort()));
+    pb.environment().put(LauncherProtocol.ENV_LAUNCHER_SECRET, secret);
     try {
       Process child = pb.start();
       InputStream logStream = null;
@@ -604,6 +452,11 @@ public class SparkLauncher {
     return pb;
   }
 
+  @Override
+  SparkLauncher self() {
+    return this;
+  }
+
   // Visible for testing.
   String findSparkSubmit() {
     String script = isWindows() ? "spark-submit.cmd" : "spark-submit";
@@ -614,32 +467,4 @@ public class SparkLauncher {
     return builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
   }
 
-  private static class ArgumentValidator extends SparkSubmitOptionParser {
-
-    private final boolean hasValue;
-
-    ArgumentValidator(boolean hasValue) {
-      this.hasValue = hasValue;
-    }
-
-    @Override
-    protected boolean handle(String opt, String value) {
-      if (value == null && hasValue) {
-        throw new IllegalArgumentException(String.format("'%s' expects a 
value.", opt));
-      }
-      return true;
-    }
-
-    @Override
-    protected boolean handleUnknown(String opt) {
-      // Do not fail on unknown arguments, to support future arguments added 
to SparkSubmit.
-      return true;
-    }
-
-    protected void handleExtraArgs(List<String> extra) {
-      // No op.
-    }
-
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfcd7466/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
 
b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
index 5f2da03..e0ef22d 100644
--- 
a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
+++ 
b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
@@ -355,7 +355,7 @@ class SparkSubmitCommandBuilder extends 
AbstractCommandBuilder {
     env.put(submitArgsEnvVariable, submitArgs.toString());
   }
 
-  private boolean isClientMode(Map<String, String> userProps) {
+  boolean isClientMode(Map<String, String> userProps) {
     String userMaster = firstNonEmpty(master, 
userProps.get(SparkLauncher.SPARK_MASTER));
     String userDeployMode = firstNonEmpty(deployMode, 
userProps.get(SparkLauncher.DEPLOY_MODE));
     // Default master is "local[*]", so assume client mode in that case

http://git-wip-us.apache.org/repos/asf/spark/blob/cfcd7466/launcher/src/main/java/org/apache/spark/launcher/package-info.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/package-info.java 
b/launcher/src/main/java/org/apache/spark/launcher/package-info.java
index d1ac39b..248b6d9 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/package-info.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/package-info.java
@@ -16,17 +16,18 @@
  */
 
 /**
- * Library for launching Spark applications.
+ * Library for launching Spark applications programmatically.
  *
  * <p>
- * This library allows applications to launch Spark programmatically. There's 
only one entry
- * point to the library - the {@link org.apache.spark.launcher.SparkLauncher} 
class.
+ * There are two ways to start applications with this library: as a child 
process, using
+ * {@link org.apache.spark.launcher.SparkLauncher}, or in-process, using
+ * {@link org.apache.spark.launcher.InProcessLauncher}.
  * </p>
  *
  * <p>
- * The {@link org.apache.spark.launcher.SparkLauncher#startApplication(
- * org.apache.spark.launcher.SparkAppHandle.Listener...)} can be used to start 
Spark and provide
- * a handle to monitor and control the running application:
+ * The {@link org.apache.spark.launcher.AbstractLauncher#startApplication(
+ * org.apache.spark.launcher.SparkAppHandle.Listener...)}  method can be used 
to start Spark and
+ * provide a handle to monitor and control the running application:
  * </p>
  *
  * <pre>
@@ -49,7 +50,20 @@
  * </pre>
  *
  * <p>
- * It's also possible to launch a raw child process, using the
+ * Launching applications as a child process requires a full Spark 
installation. The installation
+ * directory can be provided to the launcher explicitly in the launcher's 
configuration, or by
+ * setting the <i>SPARK_HOME</i> environment variable.
+ * </p>
+ *
+ * <p>
+ * Launching applications in-process is only recommended in cluster mode, 
since Spark cannot run
+ * multiple client-mode applications concurrently in the same process. The 
in-process launcher
+ * requires the necessary Spark dependencies (such as spark-core and cluster 
manager-specific
+ * modules) to be present in the caller thread's class loader.
+ * </p>
+ *
+ * <p>
+ * It's also possible to launch a raw child process, without the extra 
monitoring, using the
  * {@link org.apache.spark.launcher.SparkLauncher#launch()} method:
  * </p>
  *

http://git-wip-us.apache.org/repos/asf/spark/blob/cfcd7466/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java
----------------------------------------------------------------------
diff --git a/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java 
b/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java
index 23e2c64..3e1a90e 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java
@@ -17,10 +17,14 @@
 
 package org.apache.spark.launcher;
 
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
 import org.slf4j.bridge.SLF4JBridgeHandler;
+import static org.junit.Assert.*;
 
 /**
- * Handles configuring the JUL -> SLF4J bridge.
+ * Handles configuring the JUL -> SLF4J bridge, and provides some utility 
methods for tests.
  */
 class BaseSuite {
 
@@ -29,4 +33,33 @@ class BaseSuite {
     SLF4JBridgeHandler.install();
   }
 
+  @After
+  public void postChecks() {
+    LauncherServer server = LauncherServer.getServer();
+    if (server != null) {
+      // Shut down the server to clean things up for the next test.
+      try {
+        server.close();
+      } catch (Exception e) {
+        // Ignore.
+      }
+    }
+    assertNull(server);
+  }
+
+  protected void waitFor(SparkAppHandle handle) throws Exception {
+    long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
+    try {
+      while (!handle.getState().isFinal()) {
+        assertTrue("Timed out waiting for handle to transition to final 
state.",
+          System.nanoTime() < deadline);
+        TimeUnit.MILLISECONDS.sleep(10);
+      }
+    } finally {
+      if (!handle.getState().isFinal()) {
+        handle.kill();
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfcd7466/launcher/src/test/java/org/apache/spark/launcher/ChildProcAppHandleSuite.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/test/java/org/apache/spark/launcher/ChildProcAppHandleSuite.java 
b/launcher/src/test/java/org/apache/spark/launcher/ChildProcAppHandleSuite.java
index 9f59b41..fe44efd 100644
--- 
a/launcher/src/test/java/org/apache/spark/launcher/ChildProcAppHandleSuite.java
+++ 
b/launcher/src/test/java/org/apache/spark/launcher/ChildProcAppHandleSuite.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import static java.nio.file.attribute.PosixFilePermission.*;
 
@@ -217,21 +216,6 @@ public class ChildProcAppHandleSuite extends BaseSuite {
     assertEquals(SparkAppHandle.State.FAILED, handle.getState());
   }
 
-  private void waitFor(SparkAppHandle handle) throws Exception {
-    long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
-    try {
-      while (!handle.getState().isFinal()) {
-        assertTrue("Timed out waiting for handle to transition to final 
state.",
-          System.nanoTime() < deadline);
-        TimeUnit.MILLISECONDS.sleep(10);
-      }
-    } finally {
-      if (!handle.getState().isFinal()) {
-        handle.kill();
-      }
-    }
-  }
-
   private static class TestSparkLauncher extends SparkLauncher {
 
     TestSparkLauncher() {

http://git-wip-us.apache.org/repos/asf/spark/blob/cfcd7466/launcher/src/test/java/org/apache/spark/launcher/InProcessLauncherSuite.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/test/java/org/apache/spark/launcher/InProcessLauncherSuite.java 
b/launcher/src/test/java/org/apache/spark/launcher/InProcessLauncherSuite.java
new file mode 100644
index 0000000..b23e565
--- /dev/null
+++ 
b/launcher/src/test/java/org/apache/spark/launcher/InProcessLauncherSuite.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class InProcessLauncherSuite extends BaseSuite {
+
+  // Arguments passed to the test class to identify the test being run.
+  private static final String TEST_SUCCESS = "success";
+  private static final String TEST_FAILURE = "failure";
+  private static final String TEST_KILL = "kill";
+
+  private static final String TEST_FAILURE_MESSAGE = "d'oh";
+
+  private static Throwable lastError;
+
+  @Before
+  public void testSetup() {
+    lastError = null;
+  }
+
+  @Test
+  public void testLauncher() throws Exception {
+    SparkAppHandle app = startTest(TEST_SUCCESS);
+    waitFor(app);
+    assertNull(lastError);
+
+    // Because the test doesn't implement the launcher protocol, the final 
state here will be
+    // LOST instead of FINISHED.
+    assertEquals(SparkAppHandle.State.LOST, app.getState());
+  }
+
+  @Test
+  public void testKill() throws Exception {
+    SparkAppHandle app = startTest(TEST_KILL);
+    app.kill();
+    waitFor(app);
+    assertNull(lastError);
+    assertEquals(SparkAppHandle.State.KILLED, app.getState());
+  }
+
+  @Test
+  public void testErrorPropagation() throws Exception {
+    SparkAppHandle app = startTest(TEST_FAILURE);
+    waitFor(app);
+    assertEquals(SparkAppHandle.State.FAILED, app.getState());
+
+    assertNotNull(lastError);
+    assertEquals(TEST_FAILURE_MESSAGE, lastError.getMessage());
+  }
+
+  private SparkAppHandle startTest(String test) throws Exception {
+    return new TestInProcessLauncher()
+      .addAppArgs(test)
+      .setAppResource(SparkLauncher.NO_RESOURCE)
+      .startApplication();
+  }
+
+  public static void runTest(String[] args) {
+    try {
+      assertTrue(args.length != 0);
+
+      // Make sure at least the launcher-provided config options are in the 
args array.
+      final AtomicReference<String> port = new AtomicReference<>();
+      final AtomicReference<String> secret = new AtomicReference<>();
+      SparkSubmitOptionParser parser = new SparkSubmitOptionParser() {
+
+        @Override
+        protected boolean handle(String opt, String value) {
+          if (opt == CONF) {
+            String[] conf = value.split("=");
+            switch(conf[0]) {
+              case LauncherProtocol.CONF_LAUNCHER_PORT:
+                port.set(conf[1]);
+                break;
+
+              case LauncherProtocol.CONF_LAUNCHER_SECRET:
+                secret.set(conf[1]);
+                break;
+
+              default:
+                // no op
+            }
+          }
+
+          return true;
+        }
+
+        @Override
+        protected boolean handleUnknown(String opt) {
+          return true;
+        }
+
+        @Override
+        protected void handleExtraArgs(List<String> extra) {
+          // no op.
+        }
+
+      };
+
+      parser.parse(Arrays.asList(args));
+      assertNotNull("Launcher port not found.", port.get());
+      assertNotNull("Launcher secret not found.", secret.get());
+
+      String test = args[args.length - 1];
+      switch (test) {
+      case TEST_SUCCESS:
+        break;
+
+      case TEST_FAILURE:
+        throw new IllegalStateException(TEST_FAILURE_MESSAGE);
+
+      case TEST_KILL:
+        try {
+          // Wait for a reasonable amount of time to avoid the test hanging 
forever on failure,
+          // but still allowing for time outs to hopefully not occur on busy 
machines.
+          Thread.sleep(10000);
+          fail("Did not get expected interrupt after 10s.");
+        } catch (InterruptedException ie) {
+          // Expected.
+        }
+        break;
+
+      default:
+        fail("Unknown test " + test);
+      }
+    } catch (Throwable t) {
+      lastError = t;
+      throw new RuntimeException(t);
+    }
+  }
+
+  private static class TestInProcessLauncher extends InProcessLauncher {
+
+    @Override
+    Method findSparkSubmit() throws IOException {
+      try {
+        return InProcessLauncherSuite.class.getMethod("runTest", 
String[].class);
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cfcd7466/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java 
b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
index d0f6abe..7e2b09c 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
@@ -39,39 +39,27 @@ public class LauncherServerSuite extends BaseSuite {
 
   @Test
   public void testLauncherServerReuse() throws Exception {
-    ChildProcAppHandle handle1 = null;
-    ChildProcAppHandle handle2 = null;
-    ChildProcAppHandle handle3 = null;
+    LauncherServer server1 = LauncherServer.getOrCreateServer();
+    ChildProcAppHandle handle = new ChildProcAppHandle(server1);
+    handle.kill();
 
+    LauncherServer server2 = LauncherServer.getOrCreateServer();
     try {
-      handle1 = LauncherServer.newAppHandle();
-      handle2 = LauncherServer.newAppHandle();
-      LauncherServer server1 = handle1.getServer();
-      assertSame(server1, handle2.getServer());
-
-      handle1.kill();
-      handle2.kill();
-
-      handle3 = LauncherServer.newAppHandle();
-      assertNotSame(server1, handle3.getServer());
-
-      handle3.kill();
-
-      assertNull(LauncherServer.getServerInstance());
+      assertNotSame(server1, server2);
     } finally {
-      kill(handle1);
-      kill(handle2);
-      kill(handle3);
+      server2.unref();
     }
   }
 
   @Test
   public void testCommunication() throws Exception {
-    ChildProcAppHandle handle = LauncherServer.newAppHandle();
+    LauncherServer server = LauncherServer.getOrCreateServer();
+    ChildProcAppHandle handle = new ChildProcAppHandle(server);
+    String secret = server.registerHandle(handle);
+
     TestClient client = null;
     try {
-      Socket s = new Socket(InetAddress.getLoopbackAddress(),
-        LauncherServer.getServerInstance().getPort());
+      Socket s = new Socket(InetAddress.getLoopbackAddress(), 
server.getPort());
 
       final Semaphore semaphore = new Semaphore(0);
       handle.addListener(new SparkAppHandle.Listener() {
@@ -86,7 +74,7 @@ public class LauncherServerSuite extends BaseSuite {
       });
 
       client = new TestClient(s);
-      client.send(new Hello(handle.getSecret(), "1.4.0"));
+      client.send(new Hello(secret, "1.4.0"));
       assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS));
 
       // Make sure the server matched the client to the handle.
@@ -104,7 +92,7 @@ public class LauncherServerSuite extends BaseSuite {
       Message stopMsg = client.inbound.poll(30, TimeUnit.SECONDS);
       assertTrue(stopMsg instanceof Stop);
     } finally {
-      kill(handle);
+      handle.kill();
       close(client);
       client.clientThread.join();
     }
@@ -112,34 +100,36 @@ public class LauncherServerSuite extends BaseSuite {
 
   @Test
   public void testTimeout() throws Exception {
-    ChildProcAppHandle handle = null;
+    LauncherServer server = LauncherServer.getOrCreateServer();
+    ChildProcAppHandle handle = new ChildProcAppHandle(server);
+    String secret = server.registerHandle(handle);
+
     TestClient client = null;
     try {
       // LauncherServer will immediately close the server-side socket when the 
timeout is set
       // to 0.
       SparkLauncher.setConfig(SparkLauncher.CHILD_CONNECTION_TIMEOUT, "0");
 
-      handle = LauncherServer.newAppHandle();
-
-      Socket s = new Socket(InetAddress.getLoopbackAddress(),
-        LauncherServer.getServerInstance().getPort());
+      Socket s = new Socket(InetAddress.getLoopbackAddress(), 
server.getPort());
       client = new TestClient(s);
-      waitForError(client, handle.getSecret());
+      waitForError(client, secret);
     } finally {
       
SparkLauncher.launcherConfig.remove(SparkLauncher.CHILD_CONNECTION_TIMEOUT);
-      kill(handle);
+      handle.kill();
       close(client);
     }
   }
 
   @Test
   public void testSparkSubmitVmShutsDown() throws Exception {
-    ChildProcAppHandle handle = LauncherServer.newAppHandle();
+    LauncherServer server = LauncherServer.getOrCreateServer();
+    ChildProcAppHandle handle = new ChildProcAppHandle(server);
+    String secret = server.registerHandle(handle);
+
     TestClient client = null;
     final Semaphore semaphore = new Semaphore(0);
     try {
-      Socket s = new Socket(InetAddress.getLoopbackAddress(),
-        LauncherServer.getServerInstance().getPort());
+      Socket s = new Socket(InetAddress.getLoopbackAddress(), 
server.getPort());
       handle.addListener(new SparkAppHandle.Listener() {
         public void stateChanged(SparkAppHandle handle) {
           semaphore.release();
@@ -149,7 +139,7 @@ public class LauncherServerSuite extends BaseSuite {
         }
       });
       client = new TestClient(s);
-      client.send(new Hello(handle.getSecret(), "1.4.0"));
+      client.send(new Hello(secret, "1.4.0"));
       assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS));
       // Make sure the server matched the client to the handle.
       assertNotNull(handle.getConnection());
@@ -157,7 +147,7 @@ public class LauncherServerSuite extends BaseSuite {
       assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS));
       assertEquals(SparkAppHandle.State.LOST, handle.getState());
     } finally {
-      kill(handle);
+      handle.kill();
       close(client);
       client.clientThread.join();
     }
@@ -165,11 +155,13 @@ public class LauncherServerSuite extends BaseSuite {
 
   @Test
   public void testStreamFiltering() throws Exception {
-    ChildProcAppHandle handle = LauncherServer.newAppHandle();
+    LauncherServer server = LauncherServer.getOrCreateServer();
+    ChildProcAppHandle handle = new ChildProcAppHandle(server);
+    String secret = server.registerHandle(handle);
+
     TestClient client = null;
     try {
-      Socket s = new Socket(InetAddress.getLoopbackAddress(),
-        LauncherServer.getServerInstance().getPort());
+      Socket s = new Socket(InetAddress.getLoopbackAddress(), 
server.getPort());
 
       client = new TestClient(s);
 
@@ -181,21 +173,15 @@ public class LauncherServerSuite extends BaseSuite {
         // happening for other reasons).
       }
 
-      waitForError(client, handle.getSecret());
+      waitForError(client, secret);
       assertEquals(0, EvilPayload.EVIL_BIT);
     } finally {
-      kill(handle);
+      handle.kill();
       close(client);
       client.clientThread.join();
     }
   }
 
-  private void kill(SparkAppHandle handle) {
-    if (handle != null) {
-      handle.kill();
-    }
-  }
-
   private void close(Closeable c) {
     if (c != null) {
       try {

http://git-wip-us.apache.org/repos/asf/spark/blob/cfcd7466/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 191415a..53f5f61 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -92,6 +92,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   private[this] var stopCalled: Boolean = false
 
   private val launcherBackend = new LauncherBackend() {
+    override protected def conf: SparkConf = sc.conf
+
     override protected def onStopRequest(): Unit = {
       stopSchedulerBackend()
       setState(SparkAppHandle.State.KILLED)

http://git-wip-us.apache.org/repos/asf/spark/blob/cfcd7466/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 3781b26..15328d0 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -100,6 +100,8 @@ private[spark] class Client(
   private var amKeytabFileName: String = null
 
   private val launcherBackend = new LauncherBackend() {
+    override protected def conf: SparkConf = sparkConf
+
     override def onStopRequest(): Unit = {
       if (isClusterMode && appId != null) {
         yarnClient.killApplication(appId)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to