Repository: spark
Updated Branches:
  refs/heads/master 1ab3d3e47 -> 543577a1e


[SPARK-24243][CORE] Expose exceptions from InProcessAppHandle

Adds a new method to SparkAppHandle called getError which returns
the exception (if present) that caused the underlying Spark app to
fail.

New tests added to SparkLauncherSuite for the new method.

Closes #21849

Closes #23221 from vanzin/SPARK-24243.

Signed-off-by: Marcelo Vanzin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/543577a1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/543577a1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/543577a1

Branch: refs/heads/master
Commit: 543577a1e8c0904048b73008fa7c4cee33f69894
Parents: 1ab3d3e
Author: Sahil Takiar <[email protected]>
Authored: Fri Dec 7 10:33:42 2018 -0800
Committer: Marcelo Vanzin <[email protected]>
Committed: Fri Dec 7 10:34:33 2018 -0800

----------------------------------------------------------------------
 .../spark/launcher/SparkLauncherSuite.java      | 102 +++++++++++++++++--
 .../spark/launcher/ChildProcAppHandle.java      |  20 +++-
 .../spark/launcher/InProcessAppHandle.java      |  13 +++
 .../apache/spark/launcher/OutputRedirector.java |  25 +++++
 .../apache/spark/launcher/SparkAppHandle.java   |   8 ++
 project/MimaExcludes.scala                      |   3 +
 6 files changed, 159 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/543577a1/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java 
b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index 6a1a38c..773c390 100644
--- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -41,6 +41,8 @@ import org.apache.spark.util.Utils;
 public class SparkLauncherSuite extends BaseSuite {
 
   private static final NamedThreadFactory TF = new 
NamedThreadFactory("SparkLauncherSuite-%d");
+  private static final String EXCEPTION_MESSAGE = "dummy-exception";
+  private static final RuntimeException DUMMY_EXCEPTION = new 
RuntimeException(EXCEPTION_MESSAGE);
 
   private final SparkLauncher launcher = new SparkLauncher();
 
@@ -130,17 +132,8 @@ public class SparkLauncherSuite extends BaseSuite {
     try {
       inProcessLauncherTestImpl();
     } finally {
-      Properties p = new Properties();
-      for (Map.Entry<Object, Object> e : properties.entrySet()) {
-        p.put(e.getKey(), e.getValue());
-      }
-      System.setProperties(p);
-      // Here DAGScheduler is stopped, while SparkContext.clearActiveContext 
may not be called yet.
-      // Wait for a reasonable amount of time to avoid creating two active 
SparkContext in JVM.
-      // See SPARK-23019 and SparkContext.stop() for details.
-      eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
-        assertTrue("SparkContext is still alive.", 
SparkContext$.MODULE$.getActive().isEmpty());
-      });
+      restoreSystemProperties(properties);
+      waitForSparkContextShutdown();
     }
   }
 
@@ -227,6 +220,82 @@ public class SparkLauncherSuite extends BaseSuite {
     assertEquals(SparkAppHandle.State.LOST, handle.getState());
   }
 
+  @Test
+  public void testInProcessLauncherGetError() throws Exception {
+    // Because this test runs SparkLauncher in process and in client mode, it 
pollutes the system
+    // properties, and that can cause test failures down the test pipeline. So 
restore the original
+    // system properties after this test runs.
+    Map<Object, Object> properties = new HashMap<>(System.getProperties());
+
+    SparkAppHandle handle = null;
+    try {
+      handle = new InProcessLauncher()
+        .setMaster("local")
+        .setAppResource(SparkLauncher.NO_RESOURCE)
+        .setMainClass(ErrorInProcessTestApp.class.getName())
+        .addAppArgs("hello")
+        .startApplication();
+
+      final SparkAppHandle _handle = handle;
+      eventually(Duration.ofSeconds(60), Duration.ofMillis(1000), () -> {
+        assertEquals(SparkAppHandle.State.FAILED, _handle.getState());
+      });
+
+      assertNotNull(handle.getError());
+      assertTrue(handle.getError().isPresent());
+      assertSame(handle.getError().get(), DUMMY_EXCEPTION);
+    } finally {
+      if (handle != null) {
+        handle.kill();
+      }
+      restoreSystemProperties(properties);
+      waitForSparkContextShutdown();
+    }
+  }
+
+  @Test
+  public void testSparkLauncherGetError() throws Exception {
+    SparkAppHandle handle = null;
+    try {
+      handle = new SparkLauncher()
+        .setMaster("local")
+        .setAppResource(SparkLauncher.NO_RESOURCE)
+        .setMainClass(ErrorInProcessTestApp.class.getName())
+        .addAppArgs("hello")
+        .startApplication();
+
+      final SparkAppHandle _handle = handle;
+      eventually(Duration.ofSeconds(60), Duration.ofMillis(1000), () -> {
+        assertEquals(SparkAppHandle.State.FAILED, _handle.getState());
+      });
+
+      assertNotNull(handle.getError());
+      assertTrue(handle.getError().isPresent());
+      
assertTrue(handle.getError().get().getMessage().contains(EXCEPTION_MESSAGE));
+    } finally {
+      if (handle != null) {
+        handle.kill();
+      }
+    }
+  }
+
+  private void restoreSystemProperties(Map<Object, Object> properties) {
+    Properties p = new Properties();
+    for (Map.Entry<Object, Object> e : properties.entrySet()) {
+      p.put(e.getKey(), e.getValue());
+    }
+    System.setProperties(p);
+  }
+
+  private void waitForSparkContextShutdown() throws Exception {
+    // Here DAGScheduler is stopped, while SparkContext.clearActiveContext may 
not be called yet.
+    // Wait for a reasonable amount of time to avoid creating two active 
SparkContext in JVM.
+    // See SPARK-23019 and SparkContext.stop() for details.
+    eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
+      assertTrue("SparkContext is still alive.", 
SparkContext$.MODULE$.getActive().isEmpty());
+    });
+  }
+
   public static class SparkLauncherTestApp {
 
     public static void main(String[] args) throws Exception {
@@ -264,4 +333,15 @@ public class SparkLauncherSuite extends BaseSuite {
 
   }
 
+  /**
+   * Similar to {@link InProcessTestApp} except it throws an exception
+   */
+  public static class ErrorInProcessTestApp {
+
+    public static void main(String[] args) {
+      assertNotEquals(0, args.length);
+      assertEquals(args[0], "hello");
+      throw DUMMY_EXCEPTION;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/543577a1/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java 
b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
index 5609f84..7dfcf0e 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
@@ -18,6 +18,7 @@
 package org.apache.spark.launcher;
 
 import java.io.InputStream;
+import java.util.Optional;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -29,7 +30,7 @@ class ChildProcAppHandle extends AbstractAppHandle {
   private static final Logger LOG = 
Logger.getLogger(ChildProcAppHandle.class.getName());
 
   private volatile Process childProc;
-  private OutputRedirector redirector;
+  private volatile OutputRedirector redirector;
 
   ChildProcAppHandle(LauncherServer server) {
     super(server);
@@ -46,6 +47,23 @@ class ChildProcAppHandle extends AbstractAppHandle {
     }
   }
 
+  /**
+   * Parses the logs of {@code spark-submit} and returns the last exception 
thrown.
+   * <p>
+   * Since {@link SparkLauncher} runs {@code spark-submit} in a sub-process, 
it's difficult to
+   * accurately retrieve the full {@link Throwable} from the {@code 
spark-submit} process.
+   * This method parses the logs of the sub-process and provides a best-effort 
attempt at
+   * returning the last exception thrown by the {@code spark-submit} process. 
Only the exception
+   * message is parsed, the associated stacktrace is meaningless.
+   *
+   * @return an {@link Optional} containing a {@link RuntimeException} with 
the parsed
+   * exception, otherwise returns a {@link Optional#EMPTY}
+   */
+  @Override
+  public Optional<Throwable> getError() {
+    return redirector != null ? Optional.ofNullable(redirector.getError()) : 
Optional.empty();
+  }
+
   @Override
   public synchronized void kill() {
     if (!isDisposed()) {

http://git-wip-us.apache.org/repos/asf/spark/blob/543577a1/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java 
b/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
index 15fbca0..ba09050 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
@@ -17,7 +17,9 @@
 
 package org.apache.spark.launcher;
 
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -31,6 +33,8 @@ class InProcessAppHandle extends AbstractAppHandle {
   // Avoid really long thread names.
   private static final int MAX_APP_NAME_LEN = 16;
 
+  private volatile Throwable error;
+
   private Thread app;
 
   InProcessAppHandle(LauncherServer server) {
@@ -51,6 +55,11 @@ class InProcessAppHandle extends AbstractAppHandle {
     }
   }
 
+  @Override
+  public Optional<Throwable> getError() {
+    return Optional.ofNullable(error);
+  }
+
   synchronized void start(String appName, Method main, String[] args) {
     CommandBuilderUtils.checkState(app == null, "Handle already started.");
 
@@ -62,7 +71,11 @@ class InProcessAppHandle extends AbstractAppHandle {
       try {
         main.invoke(null, (Object) args);
       } catch (Throwable t) {
+        if (t instanceof InvocationTargetException) {
+          t = t.getCause();
+        }
         LOG.log(Level.WARNING, "Application failed with exception.", t);
+        error = t;
         setState(State.FAILED);
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/543577a1/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java 
b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java
index 6f4b0bb..0f097f8 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java
@@ -37,6 +37,7 @@ class OutputRedirector {
   private final ChildProcAppHandle callback;
 
   private volatile boolean active;
+  private volatile Throwable error;
 
   OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
     this(in, loggerName, tf, null);
@@ -61,6 +62,10 @@ class OutputRedirector {
       while ((line = reader.readLine()) != null) {
         if (active) {
           sink.info(line.replaceFirst("\\s*$", ""));
+          if ((containsIgnoreCase(line, "Error") || containsIgnoreCase(line, 
"Exception")) &&
+              !line.contains("at ")) {
+            error = new RuntimeException(line);
+          }
         }
       }
     } catch (IOException e) {
@@ -85,4 +90,24 @@ class OutputRedirector {
     return thread.isAlive();
   }
 
+  Throwable getError() {
+    return error;
+  }
+
+  /**
+   * Copied from Apache Commons Lang {@code 
StringUtils#containsIgnoreCase(String, String)}
+   */
+  private static boolean containsIgnoreCase(String str, String searchStr) {
+    if (str == null || searchStr == null) {
+      return false;
+    }
+    int len = searchStr.length();
+    int max = str.length() - len;
+    for (int i = 0; i <= max; i++) {
+      if (str.regionMatches(true, i, searchStr, 0, len)) {
+        return true;
+      }
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/543577a1/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java 
b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
index cefb4d1..afec270 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
@@ -17,6 +17,8 @@
 
 package org.apache.spark.launcher;
 
+import java.util.Optional;
+
 /**
  * A handle to a running Spark application.
  * <p>
@@ -101,6 +103,12 @@ public interface SparkAppHandle {
   void disconnect();
 
   /**
+   * If the application failed due to an error, return the underlying error. 
If the app
+   * succeeded, this method returns an empty {@link Optional}.
+   */
+  Optional<Throwable> getError();
+
+  /**
    * Listener for updates to a handle's state. The callbacks do not receive 
information about
    * what exactly has changed, just that an update has occurred.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/543577a1/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 1c83cf5..4eeebb8 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -36,6 +36,9 @@ object MimaExcludes {
 
   // Exclude rules for 3.0.x
   lazy val v30excludes = v24excludes ++ Seq(
+    // [SPARK-24243][CORE] Expose exceptions from InProcessAppHandle
+    
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.launcher.SparkAppHandle.getError"),
+
     // [SPARK-25867] Remove KMeans computeCost
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.clustering.KMeansModel.computeCost"),
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to