Repository: spark Updated Branches: refs/heads/branch-1.5 4f9d1f8c7 -> 1e61ff4ca
[SPARK-14357][CORE] Properly handle the root cause being a commit denied exception ## What changes were proposed in this pull request? When deciding whether a CommitDeniedException caused a task to fail, consider the root cause of the Exception. ## How was this patch tested? Added a test suite for the component that extracts the root cause of the error. Made a distribution after cherry-picking this commit to branch-1.6 and used to run our Spark application that would quite often fail due to the CommitDeniedException. Author: Jason Moore <jasonmoor...@outlook.com> Closes #12228 from jasonmoore2k/SPARK-14357. (cherry picked from commit 22014e6fb919a35c31d852b7c2f5b7eb05751208) Signed-off-by: Andrew Or <and...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e61ff4c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e61ff4c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e61ff4c Branch: refs/heads/branch-1.5 Commit: 1e61ff4ca57eeb449bcca7980136fde2e250e11d Parents: 4f9d1f8 Author: Jason Moore <jasonmoor...@outlook.com> Authored: Sat Apr 9 23:34:57 2016 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Sat Apr 9 23:35:30 2016 -0700 ---------------------------------------------------------------------- .../org/apache/spark/executor/Executor.scala | 2 +- .../scala/org/apache/spark/util/CausedBy.scala | 36 +++++++++++++ .../org/apache/spark/util/CausedBySuite.scala | 56 ++++++++++++++++++++ 3 files changed, 93 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1e61ff4c/core/src/main/scala/org/apache/spark/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 42a85e4..d73abe3 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -287,7 +287,7 @@ private[spark] class Executor( logInfo(s"Executor killed $taskName (TID $taskId)") execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) - case cDE: CommitDeniedException => + case CausedBy(cDE: CommitDeniedException) => val reason = cDE.toTaskEndReason execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) http://git-wip-us.apache.org/repos/asf/spark/blob/1e61ff4c/core/src/main/scala/org/apache/spark/util/CausedBy.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/CausedBy.scala b/core/src/main/scala/org/apache/spark/util/CausedBy.scala new file mode 100644 index 0000000..73df446 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/CausedBy.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +/** + * Extractor Object for pulling out the root cause of an error. + * If the error contains no cause, it will return the error itself. + * + * Usage: + * try { + * ... + * } catch { + * case CausedBy(ex: CommitDeniedException) => ... + * } + */ +private[spark] object CausedBy { + + def unapply(e: Throwable): Option[Throwable] = { + Option(e.getCause).flatMap(cause => unapply(cause)).orElse(Some(e)) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/1e61ff4c/core/src/test/scala/org/apache/spark/util/CausedBySuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/CausedBySuite.scala b/core/src/test/scala/org/apache/spark/util/CausedBySuite.scala new file mode 100644 index 0000000..4a80e3f --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/CausedBySuite.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import org.apache.spark.SparkFunSuite + +class CausedBySuite extends SparkFunSuite { + + test("For an error without a cause, should return the error") { + val error = new Exception + + val causedBy = error match { + case CausedBy(e) => e + } + + assert(causedBy === error) + } + + test("For an error with a cause, should return the cause of the error") { + val cause = new Exception + val error = new Exception(cause) + + val causedBy = error match { + case CausedBy(e) => e + } + + assert(causedBy === cause) + } + + test("For an error with a cause that itself has a cause, return the root cause") { + val causeOfCause = new Exception + val cause = new Exception(causeOfCause) + val error = new Exception(cause) + + val causedBy = error match { + case CausedBy(e) => e + } + + assert(causedBy === causeOfCause) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org