[ 
https://issues.apache.org/jira/browse/SPARK-17582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xukun updated SPARK-17582:
--------------------------
    Description: 
When executor is losted, SparkUI ExecutorsTab still show its executor info.

 class HeartbeatReceiver.scala
{code:borderStyle=solid}
  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, 
Unit] = {

    // Messages sent and received locally
    case ExecutorRegistered(executorId) =>
      executorLastSeen(executorId) = clock.getTimeMillis()
      context.reply(true)
    case ExecutorRemoved(executorId) =>
      executorLastSeen.remove(executorId)
      context.reply(true)
    case TaskSchedulerIsSet =>
      scheduler = sc.taskScheduler
      context.reply(true)
    case ExpireDeadHosts =>
      expireDeadHosts()
      context.reply(true)

    // Messages received from executors
    case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
      if (scheduler != null) {
        if (executorLastSeen.contains(executorId)) {
          executorLastSeen(executorId) = clock.getTimeMillis()
          eventLoopThread.submit(new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              val unknownExecutor = !scheduler.executorHeartbeatReceived(
                executorId, taskMetrics, blockManagerId)
              val response = HeartbeatResponse(reregisterBlockManager = 
unknownExecutor)
              context.reply(response)
            }
          })
        } else {
          // This may happen if we get an executor's in-flight heartbeat 
immediately
          // after we just removed it. It's not really an error condition so we 
should
          // not log warning here. Otherwise there may be a lot of noise 
especially if
          // we explicitly remove executors (SPARK-4134).
          logDebug(s"Received heartbeat from unknown executor $executorId")
          context.reply(HeartbeatResponse(reregisterBlockManager = false))
        }
      } else {
        // Because Executor will sleep several seconds before sending the first 
"Heartbeat", this
        // case rarely happens. However, if it really happens, log it and ask 
the executor to
        // register itself again.
        logWarning(s"Dropping $heartbeat because TaskScheduler is not ready 
yet")
        context.reply(HeartbeatResponse(reregisterBlockManager = true))
      }
  }
{code}

HeartbeatReceiver receive message:  Heartbeat and ExecutorRemoved; 
If the process like listed:
1. process HeartBeat and eventLoopThread not return result
 2.process ExecutorRemoved
variables unknownExecutor will  be true,it will lead to reregisterBlockManager.
The result is that executor is lost and blockManager is still alive.  



  was:
When executor is losted, SparkUI ExecutorsTab still show its executor info.

 class HeartbeatReceiver.scala
```
  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, 
Unit] = {

    // Messages sent and received locally
    case ExecutorRegistered(executorId) =>
      executorLastSeen(executorId) = clock.getTimeMillis()
      context.reply(true)
    case ExecutorRemoved(executorId) =>
      executorLastSeen.remove(executorId)
      context.reply(true)
    case TaskSchedulerIsSet =>
      scheduler = sc.taskScheduler
      context.reply(true)
    case ExpireDeadHosts =>
      expireDeadHosts()
      context.reply(true)

    // Messages received from executors
    case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
      if (scheduler != null) {
        if (executorLastSeen.contains(executorId)) {
          executorLastSeen(executorId) = clock.getTimeMillis()
          eventLoopThread.submit(new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              val unknownExecutor = !scheduler.executorHeartbeatReceived(
                executorId, taskMetrics, blockManagerId)
              val response = HeartbeatResponse(reregisterBlockManager = 
unknownExecutor)
              context.reply(response)
            }
          })
        } else {
          // This may happen if we get an executor's in-flight heartbeat 
immediately
          // after we just removed it. It's not really an error condition so we 
should
          // not log warning here. Otherwise there may be a lot of noise 
especially if
          // we explicitly remove executors (SPARK-4134).
          logDebug(s"Received heartbeat from unknown executor $executorId")
          context.reply(HeartbeatResponse(reregisterBlockManager = false))
        }
      } else {
        // Because Executor will sleep several seconds before sending the first 
"Heartbeat", this
        // case rarely happens. However, if it really happens, log it and ask 
the executor to
        // register itself again.
        logWarning(s"Dropping $heartbeat because TaskScheduler is not ready 
yet")
        context.reply(HeartbeatResponse(reregisterBlockManager = true))
      }
  }
```


HeartbeatReceiver receive message:  Heartbeat and ExecutorRemoved; 
If the process like listed:
1. process HeartBeat and eventLoopThread not return result
 2.process ExecutorRemoved
variables unknownExecutor will  be true,it will lead to reregisterBlockManager.
The result is that executor is lost and blockManager is still alive.  




> Error Executor info in SparkUI ExecutorsTab
> -------------------------------------------
>
>                 Key: SPARK-17582
>                 URL: https://issues.apache.org/jira/browse/SPARK-17582
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>            Reporter: xukun
>            Priority: Minor
>
> When executor is losted, SparkUI ExecutorsTab still show its executor info.
>  class HeartbeatReceiver.scala
> {code:borderStyle=solid}
>   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, 
> Unit] = {
>     // Messages sent and received locally
>     case ExecutorRegistered(executorId) =>
>       executorLastSeen(executorId) = clock.getTimeMillis()
>       context.reply(true)
>     case ExecutorRemoved(executorId) =>
>       executorLastSeen.remove(executorId)
>       context.reply(true)
>     case TaskSchedulerIsSet =>
>       scheduler = sc.taskScheduler
>       context.reply(true)
>     case ExpireDeadHosts =>
>       expireDeadHosts()
>       context.reply(true)
>     // Messages received from executors
>     case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
>       if (scheduler != null) {
>         if (executorLastSeen.contains(executorId)) {
>           executorLastSeen(executorId) = clock.getTimeMillis()
>           eventLoopThread.submit(new Runnable {
>             override def run(): Unit = Utils.tryLogNonFatalError {
>               val unknownExecutor = !scheduler.executorHeartbeatReceived(
>                 executorId, taskMetrics, blockManagerId)
>               val response = HeartbeatResponse(reregisterBlockManager = 
> unknownExecutor)
>               context.reply(response)
>             }
>           })
>         } else {
>           // This may happen if we get an executor's in-flight heartbeat 
> immediately
>           // after we just removed it. It's not really an error condition so 
> we should
>           // not log warning here. Otherwise there may be a lot of noise 
> especially if
>           // we explicitly remove executors (SPARK-4134).
>           logDebug(s"Received heartbeat from unknown executor $executorId")
>           context.reply(HeartbeatResponse(reregisterBlockManager = false))
>         }
>       } else {
>         // Because Executor will sleep several seconds before sending the 
> first "Heartbeat", this
>         // case rarely happens. However, if it really happens, log it and ask 
> the executor to
>         // register itself again.
>         logWarning(s"Dropping $heartbeat because TaskScheduler is not ready 
> yet")
>         context.reply(HeartbeatResponse(reregisterBlockManager = true))
>       }
>   }
> {code}
> HeartbeatReceiver receive message:  Heartbeat and ExecutorRemoved; 
> If the process like listed:
> 1. process HeartBeat and eventLoopThread not return result
>  2.process ExecutorRemoved
> variables unknownExecutor will  be true,it will lead to 
> reregisterBlockManager.
> The result is that executor is lost and blockManager is still alive.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to