[
https://issues.apache.org/jira/browse/SPARK-21562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Marcelo Vanzin resolved SPARK-21562.
------------------------------------
Resolution: Duplicate
> Spark may request extra containers if the rpc between YARN and spark is too
> fast
> --------------------------------------------------------------------------------
>
> Key: SPARK-21562
> URL: https://issues.apache.org/jira/browse/SPARK-21562
> Project: Spark
> Issue Type: Bug
> Components: YARN
> Affects Versions: 2.2.0
> Reporter: Wei Chen
> Labels: YARN
>
> hi huys,
> I find an interesting problem when spark tries to request containers from
> YARN.
> Here is the case:
> In YarnAllocator.scala
> 1. this function requests container from YARN only if there are executors are
> not be requested.
> {code:java}def updateResourceRequests(): Unit = {
> val pendingAllocate = getPendingAllocate
> val numPendingAllocate = pendingAllocate.size
> val missing = targetNumExecutors - numPendingAllocate -
> numExecutorsRunning
>
> if (missing > 0) {
> ......
> }
> .....
> }
> {code}
> 2. After the requested containers are allocated(granted through RPC), then it
> will update the pending queues
>
> {code:java}
> private def matchContainerToRequest(
> allocatedContainer: Container,
> location: String,
> containersToUse: ArrayBuffer[Container],
> remaining: ArrayBuffer[Container]): Unit = {
> .....
>
> amClient.removeContainerRequest(containerRequest) //update pending queues
>
> .....
> }
> {code}
> 3. After the allocated containers are launched, it will update the running
> queue
> {code:java}
> private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]):
> Unit = {
> for (container <- containersToUse) {
> ....
> auncherPool.execute(new Runnable {
> override def run(): Unit = {
> try {
> new ExecutorRunnable(
> Some(container),
> conf,
> sparkConf,
> driverUrl,
> executorId,
> executorHostname,
> executorMemory,
> executorCores,
> appAttemptId.getApplicationId.toString,
> securityMgr,
> localResources
> ).run()
> logInfo(s"has launched $containerId")
> updateInternalState() //update running queues
> ....
>
> }
> }{code}
> However, in step 3 it will launch a thread to first launch ExecutorRunnable
> then update running queue. We found it would take almost 1 sec before the
> updating running queue function is called(updateInternalState()). So there
> would be an inconsistent situation here since the pending queue is updated
> but the running queue is not updated yet due to the launching thread does not
> reach updateInternalState() yet. If there is an RPC call to
> amClient.allocate() between this inconsistent interval, then more executors
> than targetNumExecutors would be requested.
> {noformat}
> Here is an example:
> Initial:
> targetNumExecutors numPendingAllocate numExecutorsRunning
> 1 0 0
> After first RPC call to amClient.allocate:
> targetNumExecutors numPendingAllocate numExecutorsRunning
> 1 1 0
> After the first allocated container is granted by YARN
> targetNumExecutors numPendingAllocate numExecutorsRunning
> 1 0(is removed in step 2) 0
> =====>if there is a RPC call here to amClient.allocate(), then more
> containers are requested,
> however this situation is caused by the inconsistent state.
> After the container is launched in step 3
> targetNumExecutors numPendingAllocate numExecutorsRunning
> 1 0 1
> {noformat}
> =======================================================================
> I found this problem because I am changing requestType to test some features
> on YARN's opportunisitc containers(e.g., allocation takes 100ms) which is
> much faster then guaranteed containers(e.g., allocation takes almost 1s).
> I am not sure if I have a correct understanding.
> Appreciate anyone's help in this issue(correct me if I have miss
> understanding)
> Wei
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]