Repository: hive
Updated Branches:
  refs/heads/master 204d715e3 -> 2f0b41bc6


HIVE-18831: Differentiate errors that are thrown by Spark tasks (Sahil Takiar 
reviewed by Rui Li)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2f0b41bc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2f0b41bc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2f0b41bc

Branch: refs/heads/master
Commit: 2f0b41bc6863333aa89876f70819cb2113c57091
Parents: 204d715
Author: Sahil Takiar <takiar.sa...@gmail.com>
Authored: Sun Apr 15 19:48:11 2018 -0700
Committer: Sahil Takiar <stak...@cloudera.com>
Committed: Sun Apr 15 19:48:11 2018 -0700

----------------------------------------------------------------------
 .../test/resources/testconfiguration.properties |   8 +-
 .../hadoop/hive/cli/control/CliConfigs.java     |   4 +-
 .../org/apache/hadoop/hive/ql/ErrorMsg.java     |   4 +-
 .../org/apache/hadoop/hive/ql/exec/Task.java    |   2 +-
 .../hadoop/hive/ql/exec/spark/SparkTask.java    |  84 +++++++---
 .../exec/spark/status/LocalSparkJobMonitor.java |   2 +-
 .../spark/status/RemoteSparkJobMonitor.java     |  28 +---
 .../ql/exec/spark/status/SparkJobStatus.java    |   6 +-
 .../spark/status/impl/LocalSparkJobStatus.java  |  22 +--
 .../spark/status/impl/RemoteSparkJobStatus.java |  14 +-
 .../hive/ql/exec/spark/TestSparkTask.java       | 125 ++++++++++++++-
 .../queries/clientnegative/spark_task_failure.q |   9 ++
 .../spark/spark_task_failure.q.out              |  53 +++++++
 .../apache/hive/spark/client/BaseProtocol.java  |  18 ++-
 .../hive/spark/client/JobResultSerializer.java  | 113 +++++++++++++
 .../hive/spark/client/SparkClientImpl.java      |   4 +-
 .../hive/spark/client/rpc/KryoMessageCodec.java |   6 +
 .../spark/client/TestJobResultSerializer.java   | 157 +++++++++++++++++++
 .../hive/spark/client/TestSparkClient.java      |  61 ++++++-
 19 files changed, 630 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties 
b/itests/src/test/resources/testconfiguration.properties
index 2db98c9..f99d6a1 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1683,9 +1683,11 @@ 
spark.query.negative.files=groupby2_map_skew_multi_distinct.q,\
   groupby2_multi_distinct.q,\
   groupby3_map_skew_multi_distinct.q,\
   groupby3_multi_distinct.q,\
-  groupby_grouping_sets7.q,\
-  spark_job_max_tasks.q,\
-  spark_stage_max_tasks.q
+  groupby_grouping_sets7.q
+
+spark.only.query.negative.files=spark_job_max_tasks.q,\
+  spark_stage_max_tasks.q,\
+  spark_task_failure.q
 
 spark.perf.disabled.query.files=query14.q,\
   query64.q

http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
----------------------------------------------------------------------
diff --git 
a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java 
b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
index fa4de0f..8ee7fb9 100644
--- 
a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
+++ 
b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
@@ -358,9 +358,8 @@ public class CliConfigs {
         setQueryDir("ql/src/test/queries/clientnegative");
 
         excludesFrom(testConfigProps, "minimr.query.negative.files");
+        excludesFrom(testConfigProps, "spark.only.query.negative.files");
         excludeQuery("authorization_uri_import.q");
-        excludeQuery("spark_job_max_tasks.q");
-        excludeQuery("spark_stage_max_tasks.q");
 
         setResultsDir("ql/src/test/results/clientnegative");
         setLogDir("itests/qtest/target/qfile-results/clientnegative");
@@ -595,6 +594,7 @@ public class CliConfigs {
         setQueryDir("ql/src/test/queries/clientnegative");
 
         includesFrom(testConfigProps, "spark.query.negative.files");
+        includesFrom(testConfigProps, "spark.only.query.negative.files");
 
         setResultsDir("ql/src/test/results/clientnegative/spark");
         
setLogDir("itests/qtest-spark/target/qfile-results/clientnegative/spark");

http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java 
b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index f3e40eb..fde16f8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -584,8 +584,8 @@ public enum ErrorMsg {
 
   //========================== 40000 range starts here 
========================//
 
-  SPARK_JOB_RUNTIME_ERROR(40001,
-      "Spark job failed during runtime. Please check stacktrace for the root 
cause.")
+  SPARK_JOB_RUNTIME_ERROR(40001, "Spark job failed due to: {0}", true),
+  SPARK_TASK_RUNTIME_ERROR(40002, "Spark job failed due to task failures: 
{0}", true)
   ;
 
   private int errorCode;

http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index 1e8857b..f329570 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -603,7 +603,7 @@ public abstract class Task<T extends Serializable> 
implements Serializable, Node
   public void shutdown() {
   }
 
-  Throwable getException() {
+  public Throwable getException() {
     return exception;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index 3083e30..bfa2da6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -26,15 +26,19 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Pattern;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
+
 import com.google.common.base.Throwables;
 import org.apache.hadoop.hive.common.metrics.common.Metrics;
 import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.DriverContext;
@@ -72,7 +76,6 @@ import 
org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.collect.Lists;
-import org.apache.spark.SparkException;
 
 public class SparkTask extends Task<SparkWork> {
   private static final String CLASS_NAME = SparkTask.class.getName();
@@ -153,7 +156,8 @@ public class SparkTask extends Task<SparkWork> {
 
       // Get the final state of the Spark job and parses its job info
       SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
-      getSparkJobInfo(sparkJobStatus, rc);
+      getSparkJobInfo(sparkJobStatus);
+      setSparkException(sparkJobStatus, rc);
 
       if (rc == 0) {
         sparkStatistics = sparkJobStatus.getSparkStatistics();
@@ -457,7 +461,7 @@ public class SparkTask extends Task<SparkWork> {
     return counters;
   }
 
-  private void getSparkJobInfo(SparkJobStatus sparkJobStatus, int rc) {
+  private void getSparkJobInfo(SparkJobStatus sparkJobStatus) {
     try {
       stageIds = new ArrayList<Integer>();
       int[] ids = sparkJobStatus.getStageIds();
@@ -482,36 +486,68 @@ public class SparkTask extends Task<SparkWork> {
       succeededTaskCount = sumComplete;
       totalTaskCount = sumTotal;
       failedTaskCount = sumFailed;
-      if (rc != 0) {
-        Throwable error = sparkJobStatus.getError();
-        if (error != null) {
-          if ((error instanceof InterruptedException) ||
-              (error instanceof HiveException &&
-              error.getCause() instanceof InterruptedException)) {
-            LOG.info("Killing Spark job since query was interrupted");
-            killJob();
-          }
-          HiveException he;
-          if (isOOMError(error)) {
-            he = new HiveException(error, ErrorMsg.SPARK_RUNTIME_OOM);
-          } else {
-            he = new HiveException(error, ErrorMsg.SPARK_JOB_RUNTIME_ERROR);
-          }
-          setException(he);
-        }
-      }
     } catch (Exception e) {
       LOG.error("Failed to get Spark job information", e);
     }
   }
 
+  @VisibleForTesting
+  void setSparkException(SparkJobStatus sparkJobStatus, int rc) {
+    if (rc != 0) {
+
+      // Set the Spark Job Exception
+      Throwable sparkJobException = sparkJobStatus.getSparkJobException();
+      if (sparkJobException != null) {
+        HiveException he;
+        if (isOOMError(sparkJobException)) {
+          he = new HiveException(sparkJobException, 
ErrorMsg.SPARK_RUNTIME_OOM);
+        } else if (isTaskFailure(sparkJobException)) {
+          he = new HiveException(sparkJobException, 
ErrorMsg.SPARK_TASK_RUNTIME_ERROR,
+                  Throwables.getRootCause(sparkJobException).getMessage());
+        } else {
+          he = new HiveException(sparkJobException, 
ErrorMsg.SPARK_JOB_RUNTIME_ERROR,
+                  Throwables.getRootCause(sparkJobException).getMessage());
+        }
+        setException(he);
+      }
+
+      // Set the Monitor Error
+      Throwable monitorError = sparkJobStatus.getMonitorError();
+      if (monitorError != null) {
+        if ((monitorError instanceof InterruptedException) ||
+                (monitorError instanceof HiveException &&
+                        monitorError.getCause() instanceof 
InterruptedException)) {
+          LOG.info("Killing Spark job since query was interrupted");
+          killJob();
+        }
+
+        // Prefer to propagate errors from the Spark job rather than the 
monitor, as errors from
+        // the Spark job are likely to be more relevant
+        if (getException() == null) {
+          setException(monitorError);
+        }
+      }
+    }
+  }
+
+  private boolean isTaskFailure(Throwable error) {
+    Pattern taskFailedPattern = Pattern.compile("Task.*in 
stage.*failed.*times");
+    while (error != null) {
+      if (taskFailedPattern.matcher(error.getMessage()).find()) {
+        return true;
+      }
+      error = error.getCause();
+    }
+    return false;
+  }
+
   private boolean isOOMError(Throwable error) {
     while (error != null) {
       if (error instanceof OutOfMemoryError) {
         return true;
-      } else if (error instanceof SparkException) {
-        String sts = Throwables.getStackTraceAsString(error);
-        return sts.contains("Container killed by YARN for exceeding memory 
limits");
+      } else if (error.getMessage().contains("Container killed by YARN for 
exceeding memory " +
+              "limits")) {
+        return true;
       }
       error = error.getCause();
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
index 0525315..4ce9f53 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
@@ -128,7 +128,7 @@ public class LocalSparkJobMonitor extends SparkJobMonitor {
         console.printError(msg, "\n" + 
org.apache.hadoop.util.StringUtils.stringifyException(e));
         rc = 1;
         done = true;
-        sparkJobStatus.setError(e);
+        sparkJobStatus.setMonitorError(e);
       } finally {
         if (done) {
           break;

http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
index a132f74..98c228b 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
@@ -77,7 +77,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
             HiveException he = new 
HiveException(ErrorMsg.SPARK_JOB_MONITOR_TIMEOUT,
                 Long.toString(timeCount));
             console.printError(he.getMessage());
-            sparkJobStatus.setError(he);
+            sparkJobStatus.setMonitorError(he);
             running = false;
             done = true;
             rc = 2;
@@ -147,29 +147,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor 
{
           done = true;
           break;
         case FAILED:
-          String detail = sparkJobStatus.getError().getMessage();
-          StringBuilder errBuilder = new StringBuilder();
-          errBuilder.append("Job failed with ");
-          if (detail == null) {
-            errBuilder.append("UNKNOWN reason");
-          } else {
-            // We SerDe the Throwable as String, parse it for the root cause
-            final String CAUSE_CAPTION = "Caused by: ";
-            int index = detail.lastIndexOf(CAUSE_CAPTION);
-            if (index != -1) {
-              String rootCause = detail.substring(index + 
CAUSE_CAPTION.length());
-              index = rootCause.indexOf(System.getProperty("line.separator"));
-              if (index != -1) {
-                errBuilder.append(rootCause.substring(0, index));
-              } else {
-                errBuilder.append(rootCause);
-              }
-            } else {
-              errBuilder.append(detail);
-            }
-            detail = System.getProperty("line.separator") + detail;
-          }
-          console.printError(errBuilder.toString(), detail);
+          LOG.error("Spark job[" + sparkJobStatus.getJobId() + "] failed", 
sparkJobStatus.getSparkJobException());
           running = false;
           done = true;
           rc = 3;
@@ -202,7 +180,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
         }
         rc = 1;
         done = true;
-        sparkJobStatus.setError(finalException);
+        sparkJobStatus.setMonitorError(finalException);
       } finally {
         if (done) {
           break;

http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
index 8474afc..1e584f4 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
@@ -47,7 +47,9 @@ public interface SparkJobStatus {
 
   void cleanup();
 
-  Throwable getError();
+  Throwable getMonitorError();
 
-  void setError(Throwable e);
+  void setMonitorError(Throwable e);
+
+  Throwable getSparkJobException();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
index 03f8a0b..4368eb0 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
@@ -22,6 +22,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.base.Throwables;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -181,10 +183,17 @@ public class LocalSparkJobStatus implements 
SparkJobStatus {
   }
 
   @Override
-  public Throwable getError() {
-    if (error != null) {
-      return error;
-    }
+  public Throwable getMonitorError() {
+    return error;
+  }
+
+  @Override
+  public void setMonitorError(Throwable e) {
+    this.error = e;
+  }
+
+  @Override
+  public Throwable getSparkJobException() {
     if (future.isDone()) {
       try {
         future.get();
@@ -195,11 +204,6 @@ public class LocalSparkJobStatus implements SparkJobStatus 
{
     return null;
   }
 
-  @Override
-  public void setError(Throwable e) {
-    this.error = e;
-  }
-
   private SparkJobInfo getJobInfo() {
     return sparkContext.statusTracker().getJobInfo(jobId);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
index ff969e0..e4a53fb 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
@@ -165,18 +165,20 @@ public class RemoteSparkJobStatus implements 
SparkJobStatus {
   }
 
   @Override
-  public Throwable getError() {
-    if (error != null) {
-      return error;
-    }
-    return jobHandle.getError();
+  public Throwable getMonitorError() {
+    return error;
   }
 
   @Override
-  public void setError(Throwable e) {
+  public void setMonitorError(Throwable e) {
     this.error = e;
   }
 
+  @Override
+  public Throwable getSparkJobException() {
+    return jobHandle.getError();
+  }
+
   /**
    * Indicates whether the remote context is active. SparkJobMonitor can use 
this to decide whether
    * to stop monitoring.

http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
index 75b4151..368fa9f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
@@ -17,36 +17,48 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.hadoop.hive.common.metrics.common.Metrics;
 import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
+import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
 import org.apache.hadoop.hive.ql.exec.spark.status.RemoteSparkJobMonitor;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
 import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hive.spark.client.JobHandle.State;
+
+import org.apache.spark.SparkException;
+
 import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 public class TestSparkTask {
 
   @Test
   public void sparkTask_updates_Metrics() throws IOException {
 
-    Metrics mockMetrics = Mockito.mock(Metrics.class);
+    Metrics mockMetrics = mock(Metrics.class);
 
     SparkTask sparkTask = new SparkTask();
     sparkTask.updateTaskMetrics(mockMetrics);
@@ -89,7 +101,7 @@ public class TestSparkTask {
 
   @Test
   public void testRemoteSparkCancel() {
-    RemoteSparkJobStatus jobSts = Mockito.mock(RemoteSparkJobStatus.class);
+    RemoteSparkJobStatus jobSts = mock(RemoteSparkJobStatus.class);
     when(jobSts.getRemoteJobState()).thenReturn(State.CANCELLED);
     when(jobSts.isRemoteActive()).thenReturn(true);
     HiveConf hiveConf = new HiveConf();
@@ -111,6 +123,109 @@ public class TestSparkTask {
     Assert.assertTrue(statsString.contains("1"));
   }
 
+  @Test
+  public void testSetSparkExceptionWithJobError() {
+    SparkTask sparkTask = new SparkTask();
+    SparkJobStatus mockSparkJobStatus = mock(SparkJobStatus.class);
+
+    ExecutionException ee = new ExecutionException("Exception thrown by job",
+            new SparkException("Job aborted due to stage failure: Not a task 
or OOM error"));
+
+    when(mockSparkJobStatus.getSparkJobException()).thenReturn(ee);
+
+    sparkTask.setSparkException(mockSparkJobStatus, 3);
+
+    Assert.assertTrue(sparkTask.getException() instanceof HiveException);
+    Assert.assertEquals(((HiveException) 
sparkTask.getException()).getCanonicalErrorMsg(),
+            ErrorMsg.SPARK_JOB_RUNTIME_ERROR);
+    Assert.assertTrue(sparkTask.getException().getMessage().contains("Not a 
task or OOM error"));
+  }
+
+  @Test
+  public void testSetSparkExceptionWithTimeoutError() {
+    SparkTask sparkTask = new SparkTask();
+    SparkJobStatus mockSparkJobStatus = mock(SparkJobStatus.class);
+    when(mockSparkJobStatus.getMonitorError()).thenReturn(new 
HiveException(ErrorMsg
+            .SPARK_JOB_MONITOR_TIMEOUT, Long.toString(60)));
+
+    sparkTask.setSparkException(mockSparkJobStatus, 3);
+
+    Assert.assertTrue(sparkTask.getException() instanceof HiveException);
+    Assert.assertEquals(((HiveException) 
sparkTask.getException()).getCanonicalErrorMsg(),
+            ErrorMsg.SPARK_JOB_MONITOR_TIMEOUT);
+    Assert.assertTrue(sparkTask.getException().getMessage().contains("60s"));
+  }
+
+  @Test
+  public void testSetSparkExceptionWithOOMError() {
+    SparkTask sparkTask = new SparkTask();
+    SparkJobStatus mockSparkJobStatus = mock(SparkJobStatus.class);
+
+    ExecutionException jobError = new ExecutionException(
+            new SparkException("Container killed by YARN for exceeding memory 
limits"));
+    when(mockSparkJobStatus.getSparkJobException()).thenReturn(jobError);
+
+    sparkTask.setSparkException(mockSparkJobStatus, 3);
+
+    Assert.assertTrue(sparkTask.getException() instanceof HiveException);
+    Assert.assertEquals(((HiveException) 
sparkTask.getException()).getCanonicalErrorMsg(),
+            ErrorMsg.SPARK_RUNTIME_OOM);
+  }
+
+  @Test
+  public void testSparkExceptionAndMonitorError() {
+    SparkTask sparkTask = new SparkTask();
+    SparkJobStatus mockSparkJobStatus = mock(SparkJobStatus.class);
+    when(mockSparkJobStatus.getMonitorError()).thenReturn(new 
RuntimeException());
+    when(mockSparkJobStatus.getSparkJobException()).thenReturn(
+            new ExecutionException(new SparkException("")));
+
+    sparkTask.setSparkException(mockSparkJobStatus, 3);
+
+    Assert.assertTrue(sparkTask.getException() instanceof HiveException);
+    Assert.assertEquals(((HiveException) 
sparkTask.getException()).getCanonicalErrorMsg(),
+            ErrorMsg.SPARK_JOB_RUNTIME_ERROR);
+  }
+
+  @Test
+  public void testHandleInterruptedException() throws Exception {
+    HiveConf hiveConf = new HiveConf();
+
+    SparkTask sparkTask = new SparkTask();
+    sparkTask.setWork(mock(SparkWork.class));
+
+    DriverContext mockDriverContext = mock(DriverContext.class);
+
+    QueryState mockQueryState = mock(QueryState.class);
+    when(mockQueryState.getConf()).thenReturn(hiveConf);
+
+    sparkTask.initialize(mockQueryState, null, mockDriverContext, null);
+
+    SparkJobStatus mockSparkJobStatus = mock(SparkJobStatus.class);
+    when(mockSparkJobStatus.getMonitorError()).thenReturn(new 
InterruptedException());
+
+    SparkSession mockSparkSession = mock(SparkSession.class);
+    SparkJobRef mockSparkJobRef = mock(SparkJobRef.class);
+
+    when(mockSparkJobRef.monitorJob()).thenReturn(2);
+    when(mockSparkJobRef.getSparkJobStatus()).thenReturn(mockSparkJobStatus);
+    when(mockSparkSession.submit(any(), any())).thenReturn(mockSparkJobRef);
+
+    SessionState.start(hiveConf);
+    SessionState.get().setSparkSession(mockSparkSession);
+
+    sparkTask.execute(mockDriverContext);
+
+    verify(mockSparkJobRef, atLeastOnce()).cancelJob();
+
+    when(mockSparkJobStatus.getMonitorError()).thenReturn(
+            new HiveException(new InterruptedException()));
+
+    sparkTask.execute(mockDriverContext);
+
+    verify(mockSparkJobRef, atLeastOnce()).cancelJob();
+  }
+
   private boolean isEmptySparkWork(SparkWork sparkWork) {
     List<BaseWork> allWorks = sparkWork.getAllWork();
     boolean allWorksIsEmtpy = true;

http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/ql/src/test/queries/clientnegative/spark_task_failure.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientnegative/spark_task_failure.q 
b/ql/src/test/queries/clientnegative/spark_task_failure.q
new file mode 100644
index 0000000..7bb8c50
--- /dev/null
+++ b/ql/src/test/queries/clientnegative/spark_task_failure.q
@@ -0,0 +1,9 @@
+ADD FILE ../../data/scripts/error_script;
+
+EXPLAIN
+SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue)
+FROM src;
+
+SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue)
+FROM src;
+

http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/ql/src/test/results/clientnegative/spark/spark_task_failure.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/spark/spark_task_failure.q.out 
b/ql/src/test/results/clientnegative/spark/spark_task_failure.q.out
new file mode 100644
index 0000000..1e68002
--- /dev/null
+++ b/ql/src/test/results/clientnegative/spark/spark_task_failure.q.out
@@ -0,0 +1,53 @@
+PREHOOK: query: EXPLAIN
+SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue)
+FROM src
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS (tkey, tvalue)
+FROM src
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: src
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: 
COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: key (type: string), value (type: string)
+                    outputColumnNames: _col0, _col1
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: 
COMPLETE Column stats: NONE
+                    Transform Operator
+                      command: error_script
+                      output info:
+                          input format: 
org.apache.hadoop.mapred.TextInputFormat
+                          output format: 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                          serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: 
COMPLETE Column stats: NONE
+                      File Output Operator
+                        compressed: false
+                        Statistics: Num rows: 500 Data size: 5312 Basic stats: 
COMPLETE Column stats: NONE
+                        table:
+                            input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                            output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                            serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: SELECT TRANSFORM(src.key, src.value) USING 'error_script' AS 
(tkey, tvalue)
+FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+FAILED: Execution Error, return code 3 from 
org.apache.hadoop.hive.ql.exec.spark.SparkTask. Spark job failed due to task 
failures: [Error 20003]: An error occurred when trying to close the Operator 
running your custom script.

http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java 
b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
index 4dbc490..6a988a4 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
@@ -23,9 +23,8 @@ import org.apache.hive.spark.client.metrics.Metrics;
 import org.apache.hive.spark.client.rpc.RpcDispatcher;
 import org.apache.hive.spark.counter.SparkCounters;
 
-import com.google.common.base.Throwables;
 
-abstract class BaseProtocol extends RpcDispatcher {
+public abstract class BaseProtocol extends RpcDispatcher {
 
   protected static class CancelJob implements Serializable {
 
@@ -97,17 +96,17 @@ abstract class BaseProtocol extends RpcDispatcher {
 
   }
 
-  protected static class JobResult<T extends Serializable> implements 
Serializable {
+  public static class JobResult<T extends Serializable> implements 
Serializable {
 
     final String id;
     final T result;
-    final String error;
+    final Throwable error;
     final SparkCounters sparkCounters;
 
     JobResult(String id, T result, Throwable error, SparkCounters 
sparkCounters) {
       this.id = id;
       this.result = result;
-      this.error = error != null ? Throwables.getStackTraceAsString(error) : 
null;
+      this.error = error;
       this.sparkCounters = sparkCounters;
     }
 
@@ -115,6 +114,15 @@ abstract class BaseProtocol extends RpcDispatcher {
       this(null, null, null, null);
     }
 
+    @Override
+    public String toString() {
+      return "JobResult{" +
+              "id='" + id + '\'' +
+              ", result=" + result +
+              ", error=" + error +
+              ", sparkCounters=" + sparkCounters +
+              '}';
+    }
   }
 
   protected static class JobStarted implements Serializable {

http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/spark-client/src/main/java/org/apache/hive/spark/client/JobResultSerializer.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/main/java/org/apache/hive/spark/client/JobResultSerializer.java
 
b/spark-client/src/main/java/org/apache/hive/spark/client/JobResultSerializer.java
new file mode 100644
index 0000000..97cbcf0
--- /dev/null
+++ 
b/spark-client/src/main/java/org/apache/hive/spark/client/JobResultSerializer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+import java.util.Objects;
+
+
+/**
+ * A custom {@link Serializer} for serializing / deserializing {@link 
BaseProtocol.JobResult}
+ * objects. This class uses Java serialization to write / read the JobResult 
objects. This has
+ * the nice property that it is able to successfully serialize Java {@link 
Throwable}s. Whereas
+ * the Kryo serializer cannot (because certain objects in a Throwable don't 
have public zero-arg
+ * constructors.
+ *
+ * <p>
+ *   Given that any developer can write a custom exception that contains 
non-serializable objects
+ *   (e.g. objects that don't implement {@link java.io.Serializable}), this 
class needs to handle
+ *   the case where the given Throwable cannot be serialized by Java. In this 
case, the
+ *   serializer will recursively go through the {@link Throwable} and wrap all 
objects with a
+ *   {@link RuntimeException} which is guaranteed to be serializable.
+ * </p>
+ */
+public class JobResultSerializer extends Serializer<BaseProtocol.JobResult<?>> 
{
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(JobResultSerializer.class);
+
+  @Override
+  public BaseProtocol.JobResult<?> read(Kryo kryo, Input input, Class type) {
+    try {
+      return (BaseProtocol.JobResult<?>) new 
ObjectInputStream(input).readObject();
+    } catch (Exception e) {
+      throw new KryoException("Error during Java deserialization.", e);
+    }
+  }
+
+  @Override
+  public void write(Kryo kryo, Output output, BaseProtocol.JobResult<?> 
object) {
+    try {
+      safeWriteToOutput(output, object);
+    } catch (Exception e) {
+      LOG.warn("Unable to serialize JobResult object " + object, e);
+
+      BaseProtocol.JobResult<?> serializableJobResult = new 
BaseProtocol.JobResult<>(object.id,
+              object.result, convertToSerializableSparkException(object.error),
+              object.sparkCounters);
+      try {
+        safeWriteToOutput(output, serializableJobResult);
+      } catch (Exception ex) {
+        throw new KryoException("Error during Java serialization.", ex);
+      }
+    }
+  }
+
+  private void safeWriteToOutput(Output output,
+                                 BaseProtocol.JobResult<?> jobResult) throws 
IOException {
+    ByteArrayOutputStream boas = new ByteArrayOutputStream();
+    ObjectOutputStream oos = new ObjectOutputStream(boas);
+
+    oos.writeObject(jobResult);
+    oos.flush();
+
+    output.write(boas.toByteArray());
+    output.flush();
+  }
+
+  @VisibleForTesting
+  static RuntimeException convertToSerializableSparkException(Throwable error) 
{
+    RuntimeException serializableThrowable = new RuntimeException(
+            error.getClass().getName() + ": " + 
Objects.toString(error.getMessage(), ""),
+            error.getCause() == null ? null : 
convertToSerializableSparkException(
+                    error.getCause()));
+
+    serializableThrowable.setStackTrace(error.getStackTrace());
+
+    Arrays.stream(error.getSuppressed())
+            .map(JobResultSerializer::convertToSerializableSparkException)
+            .forEach(serializableThrowable::addSuppressed);
+
+    return serializableThrowable;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java 
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
index 665ed92..e4f72a3 100644
--- 
a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ 
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
@@ -38,7 +38,6 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
-import java.io.PrintStream;
 import java.io.Serializable;
 import java.io.Writer;
 import java.net.URI;
@@ -64,7 +63,6 @@ import org.apache.hive.spark.client.rpc.Rpc;
 import org.apache.hive.spark.client.rpc.RpcConfiguration;
 import org.apache.hive.spark.client.rpc.RpcServer;
 import org.apache.spark.SparkContext;
-import org.apache.spark.SparkException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -592,7 +590,7 @@ class SparkClientImpl implements SparkClient {
       if (handle != null) {
         LOG.info("Received result for {}", msg.id);
         handle.setSparkCounters(msg.sparkCounters);
-        Throwable error = msg.error != null ? new SparkException(msg.error) : 
null;
+        Throwable error = msg.error;
         if (error == null) {
           handle.setSuccess(msg.result);
         } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java
 
b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java
index 9e789cf..5454ec2 100644
--- 
a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java
+++ 
b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java
@@ -23,7 +23,11 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.hive.spark.client.BaseProtocol;
+import org.apache.hive.spark.client.JobResultSerializer;
+
 import org.objenesis.strategy.StdInstantiatorStrategy;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,6 +35,7 @@ import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.ByteBufferInputStream;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
+
 import com.google.common.base.Preconditions;
 
 import io.netty.buffer.ByteBuf;
@@ -60,6 +65,7 @@ class KryoMessageCodec extends ByteToMessageCodec<Object> {
         kryo.register(klass, REG_ID_BASE + count);
         count++;
       }
+      kryo.register(BaseProtocol.JobResult.class, new JobResultSerializer(), 
count);
       kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new 
StdInstantiatorStrategy()));
       return kryo;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/spark-client/src/test/java/org/apache/hive/spark/client/TestJobResultSerializer.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/test/java/org/apache/hive/spark/client/TestJobResultSerializer.java
 
b/spark-client/src/test/java/org/apache/hive/spark/client/TestJobResultSerializer.java
new file mode 100644
index 0000000..81e12c9
--- /dev/null
+++ 
b/spark-client/src/test/java/org/apache/hive/spark/client/TestJobResultSerializer.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import com.google.common.base.Strings;
+import org.apache.hive.spark.counter.SparkCounters;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+
+public class TestJobResultSerializer {
+
+  @Test
+  public void testSerializablableExceptionSingleBlankException() {
+    RuntimeException blankRuntimeException = new RuntimeException();
+
+    RuntimeException serializableException = 
JobResultSerializer.convertToSerializableSparkException(
+            blankRuntimeException);
+
+    assertException(serializableException, blankRuntimeException);
+  }
+
+  @Test
+  public void testSerializablableExceptionSingleException() {
+    RuntimeException blankRuntimeException = new RuntimeException("hello");
+
+    RuntimeException serializableException = 
JobResultSerializer.convertToSerializableSparkException(
+            blankRuntimeException);
+
+    assertException(serializableException, blankRuntimeException);
+  }
+
+  @Test
+  public void testSerializablableExceptionNestedBlankException() {
+    RuntimeException nestedBlankRuntimeException = new RuntimeException();
+    RuntimeException blankRuntimeException = new 
RuntimeException(nestedBlankRuntimeException);
+
+    RuntimeException serializableException = 
JobResultSerializer.convertToSerializableSparkException(
+            blankRuntimeException);
+
+    assertNestedException(serializableException, blankRuntimeException);
+  }
+
+  @Test
+  public void testSerializablableExceptionNestedException() {
+    RuntimeException nestedRuntimeException = new RuntimeException("hello");
+    RuntimeException blankRuntimeException = new 
RuntimeException(nestedRuntimeException);
+
+    RuntimeException serializableException = 
JobResultSerializer.convertToSerializableSparkException(
+            blankRuntimeException);
+
+    assertNestedException(serializableException, blankRuntimeException);
+
+    nestedRuntimeException = new RuntimeException();
+    blankRuntimeException = new RuntimeException("hello", 
nestedRuntimeException);
+
+    serializableException = 
JobResultSerializer.convertToSerializableSparkException(
+            blankRuntimeException);
+
+    assertNestedException(serializableException, blankRuntimeException);
+
+    nestedRuntimeException = new RuntimeException("hello");
+    blankRuntimeException = new RuntimeException("hello", 
nestedRuntimeException);
+
+    serializableException = 
JobResultSerializer.convertToSerializableSparkException(
+            blankRuntimeException);
+
+    assertNestedException(serializableException, blankRuntimeException);
+  }
+
+  private void assertException(Throwable serializedException, Throwable 
originalException) {
+    Assert.assertEquals(originalException.getClass().getName() + ": " + 
Strings.nullToEmpty(
+            originalException.getMessage()), serializedException.getMessage());
+    Assert.assertArrayEquals(originalException.getStackTrace(),
+            serializedException.getStackTrace());
+  }
+
+  private void assertNestedException(Throwable serializedException, Throwable 
originalException) {
+    assertException(serializedException, originalException);
+    assertException(serializedException.getCause(), 
originalException.getCause());
+  }
+
+  @Test
+  public void testSerializeNonSerializableObject() {
+    Kryo kryo = new Kryo();
+    kryo.addDefaultSerializer(BaseProtocol.JobResult.class, new 
JobResultSerializer());
+
+    ByteArrayOutputStream boas = new ByteArrayOutputStream();
+    Output output = new Output(boas);
+
+    String id = "1";
+    String result = "result";
+    SparkCounters counters = new SparkCounters(null);
+
+    BaseProtocol.JobResult<String> jobResult = new 
BaseProtocol.JobResult<>(id, result, new
+            NonSerializableException("content"), counters);
+
+    kryo.writeClassAndObject(output, jobResult);
+    output.flush();
+
+    Input kryoIn = new Input(new ByteArrayInputStream(boas.toByteArray()));
+    Object deserializedObject = kryo.readClassAndObject(kryoIn);
+
+    Assert.assertTrue(deserializedObject instanceof BaseProtocol.JobResult);
+
+    BaseProtocol.JobResult<String> deserializedJobResult = 
(BaseProtocol.JobResult<String>) deserializedObject;
+
+    Assert.assertEquals(id, deserializedJobResult.id);
+    Assert.assertEquals(result, deserializedJobResult.result);
+    Assert.assertEquals(counters.toString(), 
deserializedJobResult.sparkCounters.toString());
+    Assert.assertTrue(deserializedJobResult.error instanceof RuntimeException);
+  }
+
+  private static final class NonSerializableException extends Exception {
+
+    private static final long serialVersionUID = 2548414562750016219L;
+
+    private final NonSerializableObject nonSerializableObject;
+
+    private NonSerializableException(String content) {
+      this.nonSerializableObject = new NonSerializableObject(content);
+    }
+  }
+
+  private static final class NonSerializableObject {
+
+    private String content;
+
+    private NonSerializableObject(String content) {
+      this.content = content;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2f0b41bc/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java 
b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
index fb31c93..fdf882b 100644
--- 
a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
+++ 
b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
@@ -133,8 +133,36 @@ public class TestSparkClient {
           handle.get(TIMEOUT, TimeUnit.SECONDS);
           fail("Should have thrown an exception.");
         } catch (ExecutionException ee) {
-          assertTrue(ee.getCause() instanceof SparkException);
-          
assertTrue(ee.getCause().getMessage().contains("IllegalStateException: Hello"));
+          assertTrue(ee.getCause() instanceof IllegalStateException);
+          assertTrue(ee.getCause().getMessage().contains("Hello"));
+        }
+
+        // Try an invalid state transition on the handle. This ensures that 
the actual state
+        // change we're interested in actually happened, since internally the 
handle serializes
+        // state changes.
+        
assertFalse(((JobHandleImpl<String>)handle).changeState(JobHandle.State.SENT));
+
+        verify(listener).onJobQueued(handle);
+        verify(listener).onJobStarted(handle);
+        verify(listener).onJobFailed(same(handle), any(Throwable.class));
+      }
+    });
+  }
+
+  @Test
+  public void testErrorJobNotSerializable() throws Exception {
+    runTest(new TestFunction() {
+      @Override
+      public void call(SparkClient client) throws Exception {
+        JobHandle.Listener<String> listener = newListener();
+        List<JobHandle.Listener<String>> listeners = 
Lists.newArrayList(listener);
+        JobHandle<String> handle = client.submit(new 
ErrorJobNotSerializable(), listeners);
+        try {
+          handle.get(TIMEOUT, TimeUnit.SECONDS);
+          fail("Should have thrown an exception.");
+        } catch (ExecutionException ee) {
+          assertTrue(ee.getCause() instanceof RuntimeException);
+          assertTrue(ee.getCause().getMessage().contains("Hello"));
         }
 
         // Try an invalid state transition on the handle. This ensures that 
the actual state
@@ -331,6 +359,35 @@ public class TestSparkClient {
 
   }
 
+  private static class ErrorJobNotSerializable implements Job<String> {
+
+    private static final class NonSerializableException extends Exception {
+
+      private static final long serialVersionUID = 2548414562750016219L;
+
+      private final NonSerializableObject nonSerializableObject;
+
+      private NonSerializableException(String content) {
+        super("Hello");
+        this.nonSerializableObject = new NonSerializableObject(content);
+      }
+    }
+
+    private static final class NonSerializableObject {
+
+      String content;
+
+      private NonSerializableObject(String content) {
+        this.content = content;
+      }
+    }
+
+    @Override
+    public String call(JobContext jc) throws NonSerializableException {
+      throw new NonSerializableException("Hello");
+    }
+  }
+
   private static class SparkJob implements Job<Long> {
 
     @Override

Reply via email to