[ 
https://issues.apache.org/jira/browse/SPARK-22958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shaoquan Zhang updated SPARK-22958:
-----------------------------------
    Description: 
We have encountered the following scenario. We run a very simple job in yarn 
cluster mode. This job needs only one executor to complete. In the running, 
this job was stuck forever.

After checking the job log, we found an issue in the Spark. When executor fails 
to register with driver, YarnAllocator is blind to know it. As a result, the 
variable (numExecutorsRunning) maintained by YarnAllocator does not reflect the 
truth. When this variable is used to allocate resources to the running job, 
misunderstanding happens. As for our job, the misunderstanding results in 
forever stuck.

The more details are as follows. The following figure shows how executor is 
allocated when the job starts to run. Now suppose only one executor is needed. 
In the figure, step 1,2,3 show how the executor is allocated. After the 
executor is allocated, it needs to register with the driver (step 4) and the 
driver responses to it (step 5). After the 5 steps, the executor can be used to 
run tasks.
!How new executor is registered.png!
In YarnAllocator, when step 3 is finished, it will increase the the variable 
(numExecutorsRunning)  as show in the following code.

{code:java}
def updateInternalState(): Unit = synchronized {
        numExecutorsRunning += 1
        executorIdToContainer(executorId) = container
        containerIdToExecutorId(container.getId) = executorId

        val containerSet = 
allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
          new HashSet[ContainerId])
        containerSet += containerId
        allocatedContainerToHostMap.put(containerId, executorHostname)
      }

      if (numExecutorsRunning < targetNumExecutors) {
        if (launchContainers) {
          launcherPool.execute(new Runnable {
            override def run(): Unit = {
              try {
                new ExecutorRunnable(
                  Some(container),
                  conf,
                  sparkConf,
                  driverUrl,
                  executorId,
                  executorHostname,
                  executorMemory,
                  executorCores,
                  appAttemptId.getApplicationId.toString,
                  securityMgr,
                  localResources
                ).run()
                updateInternalState()
              } catch {
                case NonFatal(e) =>
                  logError(s"Failed to launch executor $executorId on container 
$containerId", e)
                  // Assigned container should be released immediately to avoid 
unnecessary resource
                  // occupation.
                  amClient.releaseAssignedContainer(containerId)
              }
            }
          })
        } else {
          // For test only
          updateInternalState()
        }
      } else {
        logInfo(("Skip launching executorRunnable as runnning Excecutors count: 
%d " +
          "reached target Executors count: %d.").format(numExecutorsRunning, 
targetNumExecutors))
      }
{code}

Imagine the step 4 is failed due to some reason (for example network 
fluctuation). 

  was:
We have encountered the following scenario. We run a very simple job in yarn 
cluster mode. This job needs only one executor to complete. In the running, 
this job was stuck forever.

After checking the job log, we found an issue in the Spark. When executor fails 
to register with driver, YarnAllocator is blind to know it. As a result, the 
variable (numExecutorsRunning) maintained by YarnAllocator does not reflect the 
truth. When this variable is used to allocate resources to the running job, 
misunderstanding happens. As for our job, the misunderstanding results in 
forever stuck.

The more details are as follows. The following figure shows how 
!How new executor is registered.png!



> Spark is stuck when the only one executor fails to register with driver
> -----------------------------------------------------------------------
>
>                 Key: SPARK-22958
>                 URL: https://issues.apache.org/jira/browse/SPARK-22958
>             Project: Spark
>          Issue Type: Bug
>          Components: YARN
>    Affects Versions: 2.1.0
>            Reporter: Shaoquan Zhang
>         Attachments: How new executor is registered.png
>
>
> We have encountered the following scenario. We run a very simple job in yarn 
> cluster mode. This job needs only one executor to complete. In the running, 
> this job was stuck forever.
> After checking the job log, we found an issue in the Spark. When executor 
> fails to register with driver, YarnAllocator is blind to know it. As a 
> result, the variable (numExecutorsRunning) maintained by YarnAllocator does 
> not reflect the truth. When this variable is used to allocate resources to 
> the running job, misunderstanding happens. As for our job, the 
> misunderstanding results in forever stuck.
> The more details are as follows. The following figure shows how executor is 
> allocated when the job starts to run. Now suppose only one executor is 
> needed. In the figure, step 1,2,3 show how the executor is allocated. After 
> the executor is allocated, it needs to register with the driver (step 4) and 
> the driver responses to it (step 5). After the 5 steps, the executor can be 
> used to run tasks.
> !How new executor is registered.png!
> In YarnAllocator, when step 3 is finished, it will increase the the variable 
> (numExecutorsRunning)  as show in the following code.
> {code:java}
> def updateInternalState(): Unit = synchronized {
>         numExecutorsRunning += 1
>         executorIdToContainer(executorId) = container
>         containerIdToExecutorId(container.getId) = executorId
>         val containerSet = 
> allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
>           new HashSet[ContainerId])
>         containerSet += containerId
>         allocatedContainerToHostMap.put(containerId, executorHostname)
>       }
>       if (numExecutorsRunning < targetNumExecutors) {
>         if (launchContainers) {
>           launcherPool.execute(new Runnable {
>             override def run(): Unit = {
>               try {
>                 new ExecutorRunnable(
>                   Some(container),
>                   conf,
>                   sparkConf,
>                   driverUrl,
>                   executorId,
>                   executorHostname,
>                   executorMemory,
>                   executorCores,
>                   appAttemptId.getApplicationId.toString,
>                   securityMgr,
>                   localResources
>                 ).run()
>                 updateInternalState()
>               } catch {
>                 case NonFatal(e) =>
>                   logError(s"Failed to launch executor $executorId on 
> container $containerId", e)
>                   // Assigned container should be released immediately to 
> avoid unnecessary resource
>                   // occupation.
>                   amClient.releaseAssignedContainer(containerId)
>               }
>             }
>           })
>         } else {
>           // For test only
>           updateInternalState()
>         }
>       } else {
>         logInfo(("Skip launching executorRunnable as runnning Excecutors 
> count: %d " +
>           "reached target Executors count: %d.").format(numExecutorsRunning, 
> targetNumExecutors))
>       }
> {code}
> Imagine the step 4 is failed due to some reason (for example network 
> fluctuation). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to