[ 
https://issues.apache.org/jira/browse/SPARK-21562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei Chen updated SPARK-21562:
-----------------------------
    Description: 
hi huys,

I find an interesting problem when spark tries to request containers from YARN. 
Here is the case:

In YarnAllocator.scala

1. this function requests container from YARN only if there are executors are 
not be requested. 

{code:java}def updateResourceRequests(): Unit = {
    val pendingAllocate = getPendingAllocate
    val numPendingAllocate = pendingAllocate.size
    val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning

  
    if (missing > 0) {
 ......
    }

  .....
}
{code}


2. After the requested containers are allocated(granted through RPC), then it 
will update the pending queues
  
{code:java}
private def matchContainerToRequest(
      allocatedContainer: Container,
      location: String,
      containersToUse: ArrayBuffer[Container],
      remaining: ArrayBuffer[Container]): Unit = {
      .....
     


   amClient.removeContainerRequest(containerRequest) //update pending queues
   


   .....
}
{code}

3. After the allocated containers are launched, it will update the running queue
{code:java}
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): 
Unit = {
    for (container <- containersToUse) {
     ....
    auncherPool.execute(new Runnable {
            override def run(): Unit = {
              try {
                new ExecutorRunnable(
                  Some(container),
                  conf,
                  sparkConf,
                  driverUrl,
                  executorId,
                  executorHostname,
                  executorMemory,
                  executorCores,
                  appAttemptId.getApplicationId.toString,
                  securityMgr,
                  localResources
                ).run()
                logInfo(s"has launched $containerId")
                updateInternalState()   //update running queues
     ....
      


} 


}{code}



However, in step 3 it will launch a thread to first launch ExecutorRunnable 
then update running queue. We found it would take almost 1 sec before the 
updating running queue function is called(updateInternalState()). So there 
would be an inconsistent situation here since the pending queue is updated but 
the running queue is not updated yet due to the launching thread does not reach 
updateInternalState() yet. If there is an RPC call to amClient.allocate() 
between this inconsistent interval, then more executors than targetNumExecutors 
would be requested.


{noformat}
Here is an example:
Initial:
targetNumExecutors      numPendingAllocate         numExecutorsRunning
1                              0                            0



After first RPC call to amClient.allocate:
targetNumExecutors      numPendingAllocate         numExecutorsRunning
1                              1                             0



After the first allocated container is granted by YARN
targetNumExecutors      numPendingAllocate         numExecutorsRunning
1                              0(is removed in step 2)      0


=====>if there is a RPC call here to amClient.allocate(), then more containers 
are requested,
however this situation is caused by the inconsistent state.


After the container is launched in step 3
targetNumExecutors      numPendingAllocate         numExecutorsRunning
1                               0                            1


{noformat}
=======================================================================
I found this problem because I am changing requestType to test some features on 
YARN's opportunisitc containers(e.g., allocation takes 100ms) which is much 
faster then guaranteed containers(e.g., allocation takes almost 1s).


I am not sure if I have a correct understanding.
Appreciate anyone's help in this issue(correct me if I have miss understanding)


Wei



  was:
hi huys,

I find an interesting problem when spark tries to request containers from YARN. 
Here is the case:

In YarnAllocator.scala

1. this function requests container from YARN only if there are executors are 
not be requested. 

{code:java}def updateResourceRequests(): Unit = {
    val pendingAllocate = getPendingAllocate
    val numPendingAllocate = pendingAllocate.size
    val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning

  
    if (missing > 0) {
 ......
    }

  .....
}
{code}


2. After the requested containers are allocated(granted through RPC), then it 
will update the pending queues
  
{code:java}
private def matchContainerToRequest(
      allocatedContainer: Container,
      location: String,
      containersToUse: ArrayBuffer[Container],
      remaining: ArrayBuffer[Container]): Unit = {
      .....
     


   amClient.removeContainerRequest(containerRequest) //update pending queues
   


   .....
}
{code}

3. After the allocated containers are launched, it will update the running queue
{code:java}
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): 
Unit = {
    for (container <- containersToUse) {
     ....
    auncherPool.execute(new Runnable {
            override def run(): Unit = {
              try {
                new ExecutorRunnable(
                  Some(container),
                  conf,
                  sparkConf,
                  driverUrl,
                  executorId,
                  executorHostname,
                  executorMemory,
                  executorCores,
                  appAttemptId.getApplicationId.toString,
                  securityMgr,
                  localResources
                ).run()
                logInfo(s"has launched $containerId")
                updateInternalState()   //update running queues
     ....
      


} 


}{code}



However, in step 3 it will launch a thread to first launch ExecutorRunnable 
then update running queue. We found it would take almost 1 sec before the 
updating running queue function is called(updateInternalState()). So there 
would be an inconsistent situation here since the pending queue is updated but 
the running queue is not updated yet due to the launching thread does not reach 
updateInternalState() yet. If there is an RPC call to amClient.allocate() 
between this inconsistent interval, then more executors than targetNumExecutors 
would be requested.


{noformat}
Here is an example:
Initial:
targetNumExecutors      numPendingAllocate         numExecutorsRunning
1                              0                            0



After first RPC call to amClient.allocate:
targetNumExecutors      numPendingAllocate         numExecutorsRunning
1                              1                             0



After the first allocated container is granted by YARN
targetNumExecutors      numPendingAllocate         numExecutorsRunning
1                              0(is removed in step 2)      0


=====>if there is a RPC call here to amClient.allocate(), then more containers 
are requested,
however this situation is caused by the inconsistent state.


After the container is launched in step 3
targetNumExecutors      numPendingAllocate         numExecutorsRunning
1                               0                            1


{noformat}
=======================================================================
I found this problem because I am testng the feature on YARN's opportunisitc 
containers(e.g., allocation takes 100ms) which is much faster then guaranteed 
containers(e.g., allocateion takes almost 1s).


I am not sure if I have a correct understanding.
Appreciate anyone's help in this issue(correct me if I have miss understanding)


Wei




> Spark may request extra containers if the rpc between YARN and spark is too 
> fast
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-21562
>                 URL: https://issues.apache.org/jira/browse/SPARK-21562
>             Project: Spark
>          Issue Type: Bug
>          Components: YARN
>    Affects Versions: 2.2.0
>            Reporter: Wei Chen
>              Labels: YARN
>
> hi huys,
> I find an interesting problem when spark tries to request containers from 
> YARN. 
> Here is the case:
> In YarnAllocator.scala
> 1. this function requests container from YARN only if there are executors are 
> not be requested. 
> {code:java}def updateResourceRequests(): Unit = {
>     val pendingAllocate = getPendingAllocate
>     val numPendingAllocate = pendingAllocate.size
>     val missing = targetNumExecutors - numPendingAllocate - 
> numExecutorsRunning
>   
>     if (missing > 0) {
>  ......
>     }
>   .....
> }
> {code}
> 2. After the requested containers are allocated(granted through RPC), then it 
> will update the pending queues
>   
> {code:java}
> private def matchContainerToRequest(
>       allocatedContainer: Container,
>       location: String,
>       containersToUse: ArrayBuffer[Container],
>       remaining: ArrayBuffer[Container]): Unit = {
>       .....
>      
>    amClient.removeContainerRequest(containerRequest) //update pending queues
>    
>    .....
> }
> {code}
> 3. After the allocated containers are launched, it will update the running 
> queue
> {code:java}
> private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): 
> Unit = {
>     for (container <- containersToUse) {
>      ....
>     auncherPool.execute(new Runnable {
>             override def run(): Unit = {
>               try {
>                 new ExecutorRunnable(
>                   Some(container),
>                   conf,
>                   sparkConf,
>                   driverUrl,
>                   executorId,
>                   executorHostname,
>                   executorMemory,
>                   executorCores,
>                   appAttemptId.getApplicationId.toString,
>                   securityMgr,
>                   localResources
>                 ).run()
>                 logInfo(s"has launched $containerId")
>                 updateInternalState()   //update running queues
>      ....
>       
> } 
> }{code}
> However, in step 3 it will launch a thread to first launch ExecutorRunnable 
> then update running queue. We found it would take almost 1 sec before the 
> updating running queue function is called(updateInternalState()). So there 
> would be an inconsistent situation here since the pending queue is updated 
> but the running queue is not updated yet due to the launching thread does not 
> reach updateInternalState() yet. If there is an RPC call to 
> amClient.allocate() between this inconsistent interval, then more executors 
> than targetNumExecutors would be requested.
> {noformat}
> Here is an example:
> Initial:
> targetNumExecutors      numPendingAllocate         numExecutorsRunning
> 1                              0                            0
> After first RPC call to amClient.allocate:
> targetNumExecutors      numPendingAllocate         numExecutorsRunning
> 1                              1                             0
> After the first allocated container is granted by YARN
> targetNumExecutors      numPendingAllocate         numExecutorsRunning
> 1                              0(is removed in step 2)      0
> =====>if there is a RPC call here to amClient.allocate(), then more 
> containers are requested,
> however this situation is caused by the inconsistent state.
> After the container is launched in step 3
> targetNumExecutors      numPendingAllocate         numExecutorsRunning
> 1                               0                            1
> {noformat}
> =======================================================================
> I found this problem because I am changing requestType to test some features 
> on YARN's opportunisitc containers(e.g., allocation takes 100ms) which is 
> much faster then guaranteed containers(e.g., allocation takes almost 1s).
> I am not sure if I have a correct understanding.
> Appreciate anyone's help in this issue(correct me if I have miss 
> understanding)
> Wei



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to