Repository: spark Updated Branches: refs/heads/branch-2.0 5363783af -> 62765cbeb
[SPARK-15652][LAUNCHER] Added a new State (LOST) for the listeners of SparkLauncher ## What changes were proposed in this pull request? This situation can happen when the LauncherConnection gets an exception while reading through the socket and terminating silently without notifying making the client/listener think that the job is still in previous state. The fix force sends a notification to client that the job finished with unknown status and let client handle it accordingly. ## How was this patch tested? Added a unit test. Author: Subroto Sanyal <ssan...@datameer.com> Closes #13497 from subrotosanyal/SPARK-15652-handle-spark-submit-jvm-crash. (cherry picked from commit c409e23abd128dad33557025f1e824ef47e6222f) Signed-off-by: Marcelo Vanzin <van...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62765cbe Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62765cbe Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62765cbe Branch: refs/heads/branch-2.0 Commit: 62765cbebe0cb41b0c4fdc344828337ee15e1fd2 Parents: 5363783 Author: Subroto Sanyal <ssan...@datameer.com> Authored: Mon Jun 6 16:05:40 2016 -0700 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Mon Jun 6 16:05:52 2016 -0700 ---------------------------------------------------------------------- .../apache/spark/launcher/LauncherServer.java | 4 +++ .../apache/spark/launcher/SparkAppHandle.java | 4 ++- .../spark/launcher/LauncherServerSuite.java | 31 ++++++++++++++++++++ 3 files changed, 38 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/62765cbe/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java ---------------------------------------------------------------------- diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java index e3413fd..28e9420 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java +++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java @@ -337,6 +337,10 @@ class LauncherServer implements Closeable { } super.close(); if (handle != null) { + if (!handle.getState().isFinal()) { + LOG.log(Level.WARNING, "Lost connection to spark application."); + handle.setState(SparkAppHandle.State.LOST); + } handle.disconnect(); } } http://git-wip-us.apache.org/repos/asf/spark/blob/62765cbe/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java ---------------------------------------------------------------------- diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java index 625d026..0aa7bd1 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java @@ -46,7 +46,9 @@ public interface SparkAppHandle { /** The application finished with a failed status. */ FAILED(true), /** The application was killed. */ - KILLED(true); + KILLED(true), + /** The Spark Submit JVM exited with a unknown status. */ + LOST(true); private final boolean isFinal; http://git-wip-us.apache.org/repos/asf/spark/blob/62765cbe/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java ---------------------------------------------------------------------- diff --git a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java index bfe1fcc..12f1a0c 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java @@ -152,6 +152,37 @@ public class LauncherServerSuite extends BaseSuite { } } + @Test + public void testSparkSubmitVmShutsDown() throws Exception { + ChildProcAppHandle handle = LauncherServer.newAppHandle(); + TestClient client = null; + final Semaphore semaphore = new Semaphore(0); + try { + Socket s = new Socket(InetAddress.getLoopbackAddress(), + LauncherServer.getServerInstance().getPort()); + handle.addListener(new SparkAppHandle.Listener() { + public void stateChanged(SparkAppHandle handle) { + semaphore.release(); + } + public void infoChanged(SparkAppHandle handle) { + semaphore.release(); + } + }); + client = new TestClient(s); + client.send(new Hello(handle.getSecret(), "1.4.0")); + assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS)); + // Make sure the server matched the client to the handle. + assertNotNull(handle.getConnection()); + close(client); + assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS)); + assertEquals(SparkAppHandle.State.LOST, handle.getState()); + } finally { + kill(handle); + close(client); + client.clientThread.join(); + } + } + private void kill(SparkAppHandle handle) { if (handle != null) { handle.kill(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org