Repository: hive Updated Branches: refs/heads/master 3dd28fbbb -> 8fd767079
HIVE-15237: Propagate Spark job failure to Hive (Rui reviewed by Xuefu) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8fd76707 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8fd76707 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8fd76707 Branch: refs/heads/master Commit: 8fd76707958db9dbd787487d074739c97e875963 Parents: 3dd28fb Author: Rui Li <[email protected]> Authored: Thu Nov 24 16:26:18 2016 +0800 Committer: Rui Li <[email protected]> Committed: Thu Nov 24 16:26:18 2016 +0800 ---------------------------------------------------------------------- .../spark/status/RemoteSparkJobMonitor.java | 24 +++++++++++++++++++- .../ql/exec/spark/status/SparkJobStatus.java | 2 ++ .../spark/status/impl/LocalSparkJobStatus.java | 12 ++++++++++ .../spark/status/impl/RemoteSparkJobStatus.java | 5 ++++ .../org/apache/hive/spark/client/JobHandle.java | 5 ++++ .../apache/hive/spark/client/JobHandleImpl.java | 5 ++++ 6 files changed, 52 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/8fd76707/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index bdb1527..77038fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -116,7 +116,29 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor { done = true; break; case FAILED: - console.printError("Status: Failed"); + String detail = sparkJobStatus.getError().getMessage(); + StringBuilder errBuilder = new StringBuilder(); + errBuilder.append("Job failed with "); + if (detail == null) { + errBuilder.append("UNKNOWN reason"); + } else { + // We SerDe the Throwable as String, parse it for the root cause + final String CAUSE_CAPTION = "Caused by: "; + int index = detail.lastIndexOf(CAUSE_CAPTION); + if (index != -1) { + String rootCause = detail.substring(index + CAUSE_CAPTION.length()); + index = rootCause.indexOf(System.getProperty("line.separator")); + if (index != -1) { + errBuilder.append(rootCause.substring(0, index)); + } else { + errBuilder.append(rootCause); + } + } else { + errBuilder.append(detail); + } + detail = System.getProperty("line.separator") + detail; + } + console.printError(errBuilder.toString(), detail); running = false; done = true; rc = 3; http://git-wip-us.apache.org/repos/asf/hive/blob/8fd76707/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java index 7959089..72ce439 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java @@ -44,4 +44,6 @@ public interface SparkJobStatus { SparkStatistics getSparkStatistics(); void cleanup(); + + Throwable getError(); } http://git-wip-us.apache.org/repos/asf/hive/blob/8fd76707/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java index 4e93979..a94d4ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java @@ -159,6 +159,18 @@ public class LocalSparkJobStatus implements SparkJobStatus { } } + @Override + public Throwable getError() { + if (future.isDone()) { + try { + future.get(); + } catch (Throwable e) { + return e; + } + } + return null; + } + private SparkJobInfo getJobInfo() { return sparkContext.statusTracker().getJobInfo(jobId); } http://git-wip-us.apache.org/repos/asf/hive/blob/8fd76707/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index 9fc717f..e87a21a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -136,6 +136,11 @@ public class RemoteSparkJobStatus implements SparkJobStatus { } + @Override + public Throwable getError() { + return jobHandle.getError(); + } + private SparkJobInfo getSparkJobInfo() throws HiveException { Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1 ? jobHandle.getSparkJobIds().get(0) : null; http://git-wip-us.apache.org/repos/asf/hive/blob/8fd76707/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java index c02c403..dffd60c 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java @@ -61,6 +61,11 @@ public interface JobHandle<T extends Serializable> extends Future<T> { State getState(); /** + * Return the error if the job has failed. + */ + Throwable getError(); + + /** * The current state of the submitted job. */ static enum State { http://git-wip-us.apache.org/repos/asf/hive/blob/8fd76707/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java index 7645702..2881252 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java @@ -128,6 +128,11 @@ class JobHandleImpl<T extends Serializable> implements JobHandle<T> { return state; } + @Override + public Throwable getError() { + return promise.cause(); + } + public void setSparkCounters(SparkCounters sparkCounters) { this.sparkCounters = sparkCounters; }
