[
https://issues.apache.org/jira/browse/SPARK-18113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15806897#comment-15806897
]
jin xing commented on SPARK-18113:
----------------------------------
[~xq2005], [~aash]
I am seeing this issue in my cluster some times. If
*OutputCommittCoordinatorEndpoint* receive *AskPermissionToCommitOutput* for
the first time, *OutputCommitCoordinatoryEndpoint* will mark the task attempt
as a committer in *authorizedCommittersByStage* and send back the response. But
if the worker failed to get the response in *spark.rpc.timeout*, it will retry
sending *AskPermissionToCommitOutput*. However it will be denied by
*OutputCommitCoordinatorEndpoint*, because it has already registered a
committer for the partition, even though the registered committer and the
worker are the same.
Reproducing is easy:
{code:title=OutputCommitCoordinator.scala|borderStyle=solid}
......
// Marked private[scheduler] instead of private so this can be mocked in tests
private[scheduler] def handleAskPermissionToCommit(
stage: StageId,
partition: PartitionId,
attemptNumber: TaskAttemptNumber): Boolean = synchronized {
authorizedCommittersByStage.get(stage) match {
case Some(authorizedCommitters) =>
authorizedCommitters(partition) match {
case NO_AUTHORIZED_COMMITTER =>
logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for
stage=$stage, " +
s"partition=$partition")
authorizedCommitters(partition) = attemptNumber
Thread.sleep(150000)
true
case existingCommitter =>
logDebug(s"Denying attemptNumber=$attemptNumber to commit for
stage=$stage, " +
s"partition=$partition; existingCommitter = $existingCommitter")
false
}
case None =>
logDebug(s"Stage $stage has completed, so not allowing attempt number
$attemptNumber of" +
s"partition $partition to commit")
false
}
}
......
{code}
When worker asks to be registered as a committer for the first time, sleep 150
seconds, which is bigger than *spark.rpc.timeout=120 seconds*. when worker
retries *AskPermissionToCommitOutput* it will get *CommitDeniedException*, then
the task will fail with reason *TaskCommitDenied*, which is not regarded as a
task failure(SPARK-11178), so TaskScheduler will schedule this task infinitely.
[~xq2005]
If you don't have time, could I make a pr for this?
> Sending AskPermissionToCommitOutput failed, driver enter into task deadloop
> ---------------------------------------------------------------------------
>
> Key: SPARK-18113
> URL: https://issues.apache.org/jira/browse/SPARK-18113
> Project: Spark
> Issue Type: Bug
> Components: Scheduler
> Affects Versions: 2.0.1
> Environment: # cat /etc/redhat-release
> Red Hat Enterprise Linux Server release 7.2 (Maipo)
> Reporter: xuqing
>
> Executor sends *AskPermissionToCommitOutput* to driver failed, and retry
> another sending. Driver receives 2 AskPermissionToCommitOutput messages and
> handles them. But executor ignores the first response(true) and receives the
> second response(false). The TaskAttemptNumber for this partition in
> authorizedCommittersByStage is locked forever. Driver enters into infinite
> loop.
> h4. Driver Log:
> {noformat}
> 16/10/25 05:38:28 INFO TaskSetManager: Starting task 24.0 in stage 2.0 (TID
> 110, cwss04.sh01.com, partition 24, PROCESS_LOCAL, 5248 bytes)
> ...
> 16/10/25 05:39:00 WARN TaskSetManager: Lost task 24.0 in stage 2.0 (TID 110,
> cwss04.sh01.com): TaskCommitDenied (Driver denied task commit) for job: 2,
> partition: 24, attemptNumber: 0
> ...
> 16/10/25 05:39:00 INFO OutputCommitCoordinator: Task was denied committing,
> stage: 2, partition: 24, attempt: 0
> ...
> 16/10/26 15:53:03 INFO TaskSetManager: Starting task 24.1 in stage 2.0 (TID
> 119, cwss04.sh01.com, partition 24, PROCESS_LOCAL, 5248 bytes)
> ...
> 16/10/26 15:53:05 WARN TaskSetManager: Lost task 24.1 in stage 2.0 (TID 119,
> cwss04.sh01.com): TaskCommitDenied (Driver denied task commit) for job: 2,
> partition: 24, attemptNumber: 1
> 16/10/26 15:53:05 INFO OutputCommitCoordinator: Task was denied committing,
> stage: 2, partition: 24, attempt: 1
> ...
> 16/10/26 15:53:05 INFO TaskSetManager: Starting task 24.28654 in stage 2.0
> (TID 28733, cwss04.sh01.com, partition 24, PROCESS_LOCAL, 5248 bytes)
> ...
> {noformat}
> h4. Executor Log:
> {noformat}
> ...
> 16/10/25 05:38:42 INFO Executor: Running task 24.0 in stage 2.0 (TID 110)
> ...
> 16/10/25 05:39:10 WARN NettyRpcEndpointRef: Error sending message [message =
> AskPermissionToCommitOutput(2,24,0)] in 1 attempts
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10
> seconds]. This timeout is controlled by spark.rpc.askTimeout
> at
> org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
> at
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
> at
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
> at
> org.apache.spark.scheduler.OutputCommitCoordinator.canCommit(OutputCommitCoordinator.scala:95)
> at
> org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:73)
> at
> org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:106)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1212)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1190)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:279)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.lang.Thread.run(Thread.java:785)
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10
> seconds]
> at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:190)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81)
> ... 13 more
> ...
> 16/10/25 05:39:16 INFO Executor: Running task 24.1 in stage 2.0 (TID 119)
> ...
> 16/10/25 05:39:24 INFO SparkHadoopMapRedUtil:
> attempt_201610250536_0002_m_000024_119: Not committed because the driver did
> not authorize commit
> ...
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]