Repository: spark
Updated Branches:
  refs/heads/branch-1.5 4f9d1f8c7 -> 1e61ff4ca


[SPARK-14357][CORE] Properly handle the root cause being a commit denied 
exception

## What changes were proposed in this pull request?

When deciding whether a CommitDeniedException caused a task to fail, consider 
the root cause of the Exception.

## How was this patch tested?

Added a test suite for the component that extracts the root cause of the error.
Made a distribution after cherry-picking this commit to branch-1.6 and used to 
run our Spark application that would quite often fail due to the 
CommitDeniedException.

Author: Jason Moore <jasonmoor...@outlook.com>

Closes #12228 from jasonmoore2k/SPARK-14357.

(cherry picked from commit 22014e6fb919a35c31d852b7c2f5b7eb05751208)
Signed-off-by: Andrew Or <and...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e61ff4c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e61ff4c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e61ff4c

Branch: refs/heads/branch-1.5
Commit: 1e61ff4ca57eeb449bcca7980136fde2e250e11d
Parents: 4f9d1f8
Author: Jason Moore <jasonmoor...@outlook.com>
Authored: Sat Apr 9 23:34:57 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Sat Apr 9 23:35:30 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/executor/Executor.scala    |  2 +-
 .../scala/org/apache/spark/util/CausedBy.scala  | 36 +++++++++++++
 .../org/apache/spark/util/CausedBySuite.scala   | 56 ++++++++++++++++++++
 3 files changed, 93 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1e61ff4c/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 42a85e4..d73abe3 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -287,7 +287,7 @@ private[spark] class Executor(
           logInfo(s"Executor killed $taskName (TID $taskId)")
           execBackend.statusUpdate(taskId, TaskState.KILLED, 
ser.serialize(TaskKilled))
 
-        case cDE: CommitDeniedException =>
+        case CausedBy(cDE: CommitDeniedException) =>
           val reason = cDE.toTaskEndReason
           execBackend.statusUpdate(taskId, TaskState.FAILED, 
ser.serialize(reason))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1e61ff4c/core/src/main/scala/org/apache/spark/util/CausedBy.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/CausedBy.scala 
b/core/src/main/scala/org/apache/spark/util/CausedBy.scala
new file mode 100644
index 0000000..73df446
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/CausedBy.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+/**
+ * Extractor Object for pulling out the root cause of an error.
+ * If the error contains no cause, it will return the error itself.
+ *
+ * Usage:
+ * try {
+ *   ...
+ * } catch {
+ *   case CausedBy(ex: CommitDeniedException) => ...
+ * }
+ */
+private[spark] object CausedBy {
+
+  def unapply(e: Throwable): Option[Throwable] = {
+    Option(e.getCause).flatMap(cause => unapply(cause)).orElse(Some(e))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1e61ff4c/core/src/test/scala/org/apache/spark/util/CausedBySuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/CausedBySuite.scala 
b/core/src/test/scala/org/apache/spark/util/CausedBySuite.scala
new file mode 100644
index 0000000..4a80e3f
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/CausedBySuite.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.apache.spark.SparkFunSuite
+
+class CausedBySuite extends SparkFunSuite {
+
+  test("For an error without a cause, should return the error") {
+    val error = new Exception
+
+    val causedBy = error match {
+      case CausedBy(e) => e
+    }
+
+    assert(causedBy === error)
+  }
+
+  test("For an error with a cause, should return the cause of the error") {
+    val cause = new Exception
+    val error = new Exception(cause)
+
+    val causedBy = error match {
+      case CausedBy(e) => e
+    }
+
+    assert(causedBy === cause)
+  }
+
+  test("For an error with a cause that itself has a cause, return the root 
cause") {
+    val causeOfCause = new Exception
+    val cause = new Exception(causeOfCause)
+    val error = new Exception(cause)
+
+    val causedBy = error match {
+      case CausedBy(e) => e
+    }
+
+    assert(causedBy === causeOfCause)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to