[
https://issues.apache.org/jira/browse/SPARK-21564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16122651#comment-16122651
]
Andrew Ash commented on SPARK-21564:
------------------------------------
[~irashid] a possible fix could look roughly like this:
{noformat}
diff --git
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index a2f1aa22b0..06d72fe106 100644
---
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -17,6 +17,7 @@
package org.apache.spark.executor
+import java.io.{DataInputStream, NotSerializableException}
import java.net.URL
import java.nio.ByteBuffer
import java.util.Locale
@@ -35,7 +36,7 @@ import org.apache.spark.rpc._
import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.serializer.SerializerInstance
-import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.{ByteBufferInputStream, ThreadUtils, Utils}
private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
@@ -93,9 +94,26 @@ private[spark] class CoarseGrainedExecutorBackend(
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
- val taskDesc = TaskDescription.decode(data.value)
- logInfo("Got assigned task " + taskDesc.taskId)
- executor.launchTask(this, taskDesc)
+ try {
+ val taskDesc = TaskDescription.decode(data.value)
+ logInfo("Got assigned task " + taskDesc.taskId)
+ executor.launchTask(this, taskDesc)
+ } catch {
+ case e: Exception =>
+ val taskId = new DataInputStream(new ByteBufferInputStream(
+ ByteBuffer.wrap(data.value.array()))).readLong()
+ val ser = env.closureSerializer.newInstance()
+ val serializedTaskEndReason = {
+ try {
+ ser.serialize(new ExceptionFailure(e, Nil))
+ } catch {
+ case _: NotSerializableException =>
+ // e is not serializable so just send the stacktrace
+ ser.serialize(new ExceptionFailure(e, Nil, false))
+ }
+ }
+ statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)
+ }
}
case KillTask(taskId, _, interruptThread, reason) =>
{noformat}
The downside here though is that we're still making the assumption that the
TaskDescription is well-formatted enough to be able to get the taskId out of it
(the first long in the serialized bytes).
Any other thoughts on how to do this?
> TaskDescription decoding failure should fail the task
> -----------------------------------------------------
>
> Key: SPARK-21564
> URL: https://issues.apache.org/jira/browse/SPARK-21564
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.2.0
> Reporter: Andrew Ash
>
> cc [~robert3005]
> I was seeing an issue where Spark was throwing this exception:
> {noformat}
> 16:16:28.294 [dispatcher-event-loop-14] ERROR
> org.apache.spark.rpc.netty.Inbox - Ignoring error
> java.io.EOFException: null
> at java.io.DataInputStream.readFully(DataInputStream.java:197)
> at java.io.DataInputStream.readUTF(DataInputStream.java:609)
> at java.io.DataInputStream.readUTF(DataInputStream.java:564)
> at
> org.apache.spark.scheduler.TaskDescription$$anonfun$decode$1.apply(TaskDescription.scala:127)
> at
> org.apache.spark.scheduler.TaskDescription$$anonfun$decode$1.apply(TaskDescription.scala:126)
> at scala.collection.immutable.Range.foreach(Range.scala:160)
> at
> org.apache.spark.scheduler.TaskDescription$.decode(TaskDescription.scala:126)
> at
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:95)
> at
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
> at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
> at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
> at
> org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:748)
> {noformat}
> For details on the cause of that exception, see SPARK-21563
> We've since changed the application and have a proposed fix in Spark at the
> ticket above, but it was troubling that decoding the TaskDescription wasn't
> failing the tasks. So the Spark job ended up hanging and making no progress,
> permanently stuck, because the driver thinks the task is running but the
> thread has died in the executor.
> We should make a change around
> https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L96
> so that when that decode throws an exception, the task is marked as failed.
> cc [~kayousterhout] [~irashid]
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]