Author: brock
Date: Fri Jan 30 20:56:42 2015
New Revision: 1656117
URL: http://svn.apache.org/r1656117
Log:
HIVE-9428: LocalSparkJobStatus may return failed job as successful [Spark
Branch] (Rui via Xuefu)
Modified:
hive/branches/branch-1.1/ (props changed)
hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
Propchange: hive/branches/branch-1.1/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan 30 20:56:42 2015
@@ -1,6 +1,6 @@
/hive/branches/branch-0.11:1480385,1480458,1481120,1481344,1481346,1481348,1481352,1483872,1505184
/hive/branches/cbo:1605012-1627125
-/hive/branches/spark:1608589-1654414,1654553
+/hive/branches/spark:1608589-1654414,1654553,1654869
/hive/branches/tez:1494760-1622766
/hive/branches/vectorization:1466908-1527856
/hive/trunk:1655202,1655210,1655213,1655436,1655460,1655894-1655895,1656114
Modified:
hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
URL:
http://svn.apache.org/viewvc/hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java?rev=1656117&r1=1656116&r2=1656117&view=diff
==============================================================================
---
hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
(original)
+++
hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
Fri Jan 30 20:56:42 2015
@@ -22,6 +22,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
@@ -43,6 +45,7 @@ import com.google.common.collect.Maps;
public class LocalSparkJobStatus implements SparkJobStatus {
private final JavaSparkContext sparkContext;
+ private static final Log LOG =
LogFactory.getLog(LocalSparkJobStatus.class.getName());
private int jobId;
// After SPARK-2321, we only use JobMetricsListener to get job metrics
// TODO: remove it when the new API provides equivalent functionality
@@ -69,16 +72,20 @@ public class LocalSparkJobStatus impleme
@Override
public JobExecutionStatus getState() {
+ SparkJobInfo sparkJobInfo = getJobInfo();
// For spark job with empty source data, it's not submitted actually, so
we would never
// receive JobStart/JobEnd event in JobStateListener, use JavaFutureAction
to get current
// job state.
- if (future.isDone()) {
+ if (sparkJobInfo == null && future.isDone()) {
+ try {
+ future.get();
+ } catch (Exception e) {
+ LOG.error("Failed to run job " + jobId, e);
+ return JobExecutionStatus.FAILED;
+ }
return JobExecutionStatus.SUCCEEDED;
- } else {
- // SparkJobInfo may not be available yet
- SparkJobInfo sparkJobInfo = getJobInfo();
- return sparkJobInfo == null ? null : sparkJobInfo.status();
}
+ return sparkJobInfo == null ? null : sparkJobInfo.status();
}
@Override
Modified:
hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
URL:
http://svn.apache.org/viewvc/hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java?rev=1656117&r1=1656116&r2=1656117&view=diff
==============================================================================
---
hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
(original)
+++
hive/branches/branch-1.1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
Fri Jan 30 20:56:42 2015
@@ -179,7 +179,15 @@ public class RemoteSparkJobStatus implem
if (list != null && list.size() == 1) {
JavaFutureAction<?> futureAction = list.get(0);
if (futureAction.isDone()) {
- jobInfo = getDefaultJobInfo(sparkJobId,
JobExecutionStatus.SUCCEEDED);
+ boolean futureSucceed = true;
+ try {
+ futureAction.get();
+ } catch (Exception e) {
+ LOG.error("Failed to run job " + sparkJobId, e);
+ futureSucceed = false;
+ }
+ jobInfo = getDefaultJobInfo(sparkJobId,
+ futureSucceed ? JobExecutionStatus.SUCCEEDED :
JobExecutionStatus.FAILED);
}
}
}