Repository: spark
Updated Branches:
  refs/heads/branch-2.0 5363783af -> 62765cbeb


[SPARK-15652][LAUNCHER] Added a new State (LOST) for the listeners of 
SparkLauncher

## What changes were proposed in this pull request?
This situation can happen when the LauncherConnection gets an exception while 
reading through the socket and terminating silently without notifying making 
the client/listener think that the job is still in previous state.
The fix force sends a notification to client that the job finished with unknown 
status and let client handle it accordingly.

## How was this patch tested?
Added a unit test.

Author: Subroto Sanyal <ssan...@datameer.com>

Closes #13497 from subrotosanyal/SPARK-15652-handle-spark-submit-jvm-crash.

(cherry picked from commit c409e23abd128dad33557025f1e824ef47e6222f)
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62765cbe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62765cbe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62765cbe

Branch: refs/heads/branch-2.0
Commit: 62765cbebe0cb41b0c4fdc344828337ee15e1fd2
Parents: 5363783
Author: Subroto Sanyal <ssan...@datameer.com>
Authored: Mon Jun 6 16:05:40 2016 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Mon Jun 6 16:05:52 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/launcher/LauncherServer.java   |  4 +++
 .../apache/spark/launcher/SparkAppHandle.java   |  4 ++-
 .../spark/launcher/LauncherServerSuite.java     | 31 ++++++++++++++++++++
 3 files changed, 38 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/62765cbe/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java 
b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
index e3413fd..28e9420 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
@@ -337,6 +337,10 @@ class LauncherServer implements Closeable {
       }
       super.close();
       if (handle != null) {
+       if (!handle.getState().isFinal()) {
+         LOG.log(Level.WARNING, "Lost connection to spark application.");
+         handle.setState(SparkAppHandle.State.LOST);
+       }
         handle.disconnect();
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/62765cbe/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java 
b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
index 625d026..0aa7bd1 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
@@ -46,7 +46,9 @@ public interface SparkAppHandle {
     /** The application finished with a failed status. */
     FAILED(true),
     /** The application was killed. */
-    KILLED(true);
+    KILLED(true),
+    /** The Spark Submit JVM exited with a unknown status. */
+    LOST(true);
 
     private final boolean isFinal;
 

http://git-wip-us.apache.org/repos/asf/spark/blob/62765cbe/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java 
b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
index bfe1fcc..12f1a0c 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
@@ -152,6 +152,37 @@ public class LauncherServerSuite extends BaseSuite {
     }
   }
 
+  @Test
+  public void testSparkSubmitVmShutsDown() throws Exception {
+    ChildProcAppHandle handle = LauncherServer.newAppHandle();
+    TestClient client = null;
+    final Semaphore semaphore = new Semaphore(0);
+    try {
+      Socket s = new Socket(InetAddress.getLoopbackAddress(),
+        LauncherServer.getServerInstance().getPort());
+      handle.addListener(new SparkAppHandle.Listener() {
+        public void stateChanged(SparkAppHandle handle) {
+          semaphore.release();
+        }
+        public void infoChanged(SparkAppHandle handle) {
+          semaphore.release();
+        }
+      });
+      client = new TestClient(s);
+      client.send(new Hello(handle.getSecret(), "1.4.0"));
+      assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS));
+      // Make sure the server matched the client to the handle.
+      assertNotNull(handle.getConnection());
+      close(client);
+      assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS));
+      assertEquals(SparkAppHandle.State.LOST, handle.getState());
+    } finally {
+      kill(handle);
+      close(client);
+      client.clientThread.join();
+    }
+  }
+
   private void kill(SparkAppHandle handle) {
     if (handle != null) {
       handle.kill();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to