[ 
https://issues.apache.org/jira/browse/SPARK-21562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin resolved SPARK-21562.
------------------------------------
    Resolution: Duplicate

> Spark may request extra containers if the rpc between YARN and spark is too 
> fast
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-21562
>                 URL: https://issues.apache.org/jira/browse/SPARK-21562
>             Project: Spark
>          Issue Type: Bug
>          Components: YARN
>    Affects Versions: 2.2.0
>            Reporter: Wei Chen
>              Labels: YARN
>
> hi huys,
> I find an interesting problem when spark tries to request containers from 
> YARN. 
> Here is the case:
> In YarnAllocator.scala
> 1. this function requests container from YARN only if there are executors are 
> not be requested. 
> {code:java}def updateResourceRequests(): Unit = {
>     val pendingAllocate = getPendingAllocate
>     val numPendingAllocate = pendingAllocate.size
>     val missing = targetNumExecutors - numPendingAllocate - 
> numExecutorsRunning
>   
>     if (missing > 0) {
>  ......
>     }
>   .....
> }
> {code}
> 2. After the requested containers are allocated(granted through RPC), then it 
> will update the pending queues
>   
> {code:java}
> private def matchContainerToRequest(
>       allocatedContainer: Container,
>       location: String,
>       containersToUse: ArrayBuffer[Container],
>       remaining: ArrayBuffer[Container]): Unit = {
>       .....
>      
>    amClient.removeContainerRequest(containerRequest) //update pending queues
>    
>    .....
> }
> {code}
> 3. After the allocated containers are launched, it will update the running 
> queue
> {code:java}
> private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): 
> Unit = {
>     for (container <- containersToUse) {
>      ....
>     auncherPool.execute(new Runnable {
>             override def run(): Unit = {
>               try {
>                 new ExecutorRunnable(
>                   Some(container),
>                   conf,
>                   sparkConf,
>                   driverUrl,
>                   executorId,
>                   executorHostname,
>                   executorMemory,
>                   executorCores,
>                   appAttemptId.getApplicationId.toString,
>                   securityMgr,
>                   localResources
>                 ).run()
>                 logInfo(s"has launched $containerId")
>                 updateInternalState()   //update running queues
>      ....
>       
> } 
> }{code}
> However, in step 3 it will launch a thread to first launch ExecutorRunnable 
> then update running queue. We found it would take almost 1 sec before the 
> updating running queue function is called(updateInternalState()). So there 
> would be an inconsistent situation here since the pending queue is updated 
> but the running queue is not updated yet due to the launching thread does not 
> reach updateInternalState() yet. If there is an RPC call to 
> amClient.allocate() between this inconsistent interval, then more executors 
> than targetNumExecutors would be requested.
> {noformat}
> Here is an example:
> Initial:
> targetNumExecutors      numPendingAllocate         numExecutorsRunning
> 1                              0                            0
> After first RPC call to amClient.allocate:
> targetNumExecutors      numPendingAllocate         numExecutorsRunning
> 1                              1                             0
> After the first allocated container is granted by YARN
> targetNumExecutors      numPendingAllocate         numExecutorsRunning
> 1                              0(is removed in step 2)      0
> =====>if there is a RPC call here to amClient.allocate(), then more 
> containers are requested,
> however this situation is caused by the inconsistent state.
> After the container is launched in step 3
> targetNumExecutors      numPendingAllocate         numExecutorsRunning
> 1                               0                            1
> {noformat}
> =======================================================================
> I found this problem because I am changing requestType to test some features 
> on YARN's opportunisitc containers(e.g., allocation takes 100ms) which is 
> much faster then guaranteed containers(e.g., allocation takes almost 1s).
> I am not sure if I have a correct understanding.
> Appreciate anyone's help in this issue(correct me if I have miss 
> understanding)
> Wei



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to