[ 
https://issues.apache.org/jira/browse/SPARK-18113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15806897#comment-15806897
 ] 

jin xing commented on SPARK-18113:
----------------------------------

[~xq2005], [~aash]
I am seeing this issue in my cluster some times. If 
*OutputCommittCoordinatorEndpoint* receive *AskPermissionToCommitOutput* for 
the first time, *OutputCommitCoordinatoryEndpoint* will mark the task attempt 
as a committer in *authorizedCommittersByStage* and send back the response. But 
if the worker failed to get the response in *spark.rpc.timeout*, it will retry 
sending *AskPermissionToCommitOutput*. However it will be denied by 
*OutputCommitCoordinatorEndpoint*, because it has already registered a 
committer for the partition, even though the registered committer and the 
worker are the same. 
Reproducing is easy:
{code:title=OutputCommitCoordinator.scala|borderStyle=solid}
......
  // Marked private[scheduler] instead of private so this can be mocked in tests
  private[scheduler] def handleAskPermissionToCommit(
      stage: StageId,
      partition: PartitionId,
      attemptNumber: TaskAttemptNumber): Boolean = synchronized {
    authorizedCommittersByStage.get(stage) match {
      case Some(authorizedCommitters) =>
        authorizedCommitters(partition) match {
          case NO_AUTHORIZED_COMMITTER =>
            logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for 
stage=$stage, " +
              s"partition=$partition")
            authorizedCommitters(partition) = attemptNumber
            Thread.sleep(150000)
            true
          case existingCommitter =>
            logDebug(s"Denying attemptNumber=$attemptNumber to commit for 
stage=$stage, " +
              s"partition=$partition; existingCommitter = $existingCommitter")
            false
        }
      case None =>
        logDebug(s"Stage $stage has completed, so not allowing attempt number 
$attemptNumber of" +
          s"partition $partition to commit")
        false
    }
  }
......
{code}
When worker asks to be registered as a committer for the first time, sleep 150 
seconds, which is bigger than *spark.rpc.timeout=120 seconds*. when worker 
retries *AskPermissionToCommitOutput* it will get *CommitDeniedException*, then 
the task will fail with reason *TaskCommitDenied*, which is not regarded as a 
task failure(SPARK-11178), so TaskScheduler will schedule this task infinitely.

[~xq2005]
If you don't have time, could I make a pr for this?

> Sending AskPermissionToCommitOutput failed, driver enter into task deadloop
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-18113
>                 URL: https://issues.apache.org/jira/browse/SPARK-18113
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 2.0.1
>         Environment: # cat /etc/redhat-release 
> Red Hat Enterprise Linux Server release 7.2 (Maipo)
>            Reporter: xuqing
>
> Executor sends *AskPermissionToCommitOutput* to driver failed, and retry 
> another sending. Driver receives 2 AskPermissionToCommitOutput messages and 
> handles them. But executor ignores the first response(true) and receives the 
> second response(false). The TaskAttemptNumber for this partition in 
> authorizedCommittersByStage is locked forever. Driver enters into infinite 
> loop.
> h4. Driver Log:
> {noformat}
> 16/10/25 05:38:28 INFO TaskSetManager: Starting task 24.0 in stage 2.0 (TID 
> 110, cwss04.sh01.com, partition 24, PROCESS_LOCAL, 5248 bytes)
> ...
> 16/10/25 05:39:00 WARN TaskSetManager: Lost task 24.0 in stage 2.0 (TID 110, 
> cwss04.sh01.com): TaskCommitDenied (Driver denied task commit) for job: 2, 
> partition: 24, attemptNumber: 0
> ...
> 16/10/25 05:39:00 INFO OutputCommitCoordinator: Task was denied committing, 
> stage: 2, partition: 24, attempt: 0
> ...
> 16/10/26 15:53:03 INFO TaskSetManager: Starting task 24.1 in stage 2.0 (TID 
> 119, cwss04.sh01.com, partition 24, PROCESS_LOCAL, 5248 bytes)
> ...
> 16/10/26 15:53:05 WARN TaskSetManager: Lost task 24.1 in stage 2.0 (TID 119, 
> cwss04.sh01.com): TaskCommitDenied (Driver denied task commit) for job: 2, 
> partition: 24, attemptNumber: 1
> 16/10/26 15:53:05 INFO OutputCommitCoordinator: Task was denied committing, 
> stage: 2, partition: 24, attempt: 1
> ...
> 16/10/26 15:53:05 INFO TaskSetManager: Starting task 24.28654 in stage 2.0 
> (TID 28733, cwss04.sh01.com, partition 24, PROCESS_LOCAL, 5248 bytes)
> ...
> {noformat}
> h4. Executor Log:
> {noformat}
> ...
> 16/10/25 05:38:42 INFO Executor: Running task 24.0 in stage 2.0 (TID 110)
> ...
> 16/10/25 05:39:10 WARN NettyRpcEndpointRef: Error sending message [message = 
> AskPermissionToCommitOutput(2,24,0)] in 1 attempts
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 
> seconds]. This timeout is controlled by spark.rpc.askTimeout
>         at 
> org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
>         at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
>         at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>         at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>         at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
>         at 
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
>         at 
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
>         at 
> org.apache.spark.scheduler.OutputCommitCoordinator.canCommit(OutputCommitCoordinator.scala:95)
>         at 
> org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:73)
>         at 
> org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:106)
>         at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1212)
>         at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1190)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>         at org.apache.spark.scheduler.Task.run(Task.scala:86)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:279)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         at java.lang.Thread.run(Thread.java:785)
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 
> seconds]
>         at 
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>         at 
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>         at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
>         at 
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>         at scala.concurrent.Await$.result(package.scala:190)
>         at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81)
>         ... 13 more
> ...
> 16/10/25 05:39:16 INFO Executor: Running task 24.1 in stage 2.0 (TID 119)
> ...
> 16/10/25 05:39:24 INFO SparkHadoopMapRedUtil: 
> attempt_201610250536_0002_m_000024_119: Not committed because the driver did 
> not authorize commit
> ...
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to