[ 
https://issues.apache.org/jira/browse/SPARK-31664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-31664:
------------------------------------

    Assignee:     (was: Apache Spark)

> Race in YARN scheduler shutdown leads to uncaught SparkException "Could not 
> find CoarseGrainedScheduler"
> --------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-31664
>                 URL: https://issues.apache.org/jira/browse/SPARK-31664
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler, YARN
>    Affects Versions: 3.0.0, 3.0.1, 3.1.0
>            Reporter: Baohe Zhang
>            Priority: Minor
>
> I used this command to run SparkPi on a yarn cluster with dynamicAllocation 
> enabled: "$SPARK_HOME/bin/spark-submit --master yarn --deploy-mode cluster 
> --class org.apache.spark.examples.SparkPi ./spark-examples.jar 1000" and 
> received error log below every time.
>  
> {code:java}
> 20/05/06 16:31:44 ERROR TransportRequestHandler: Error while invoking 
> RpcHandler#receive() for one-way message.
> org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.
>       at 
> org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:169)
>       at 
> org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:150)
>       at 
> org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:684)
>       at 
> org.apache.spark.network.server.AbstractAuthRpcHandler.receive(AbstractAuthRpcHandler.java:66)
>       at 
> org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:253)
>       at 
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:111)
>       at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
>       at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
>       at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>       at 
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>       at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>       at 
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>       at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>       at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>       at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
>       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>       at 
> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>       at 
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>       at java.lang.Thread.run(Thread.java:748)
> 20/05/06 16:31:45 INFO MapOutputTrackerMasterEndpoint: 
> MapOutputTrackerMasterEndpoint stopped!
> 20/05/06 16:31:45 INFO MemoryStore: MemoryStore cleared
> 20/05/06 16:31:45 INFO BlockManager: BlockManager stopped
> {code}
>  
> After some investigation, I found this issue might be introduced in 
> [https://github.com/apache/spark/pull/25964]. There is a race between driver 
> backend and executor backend that could happen when driver shutdown.
>  
> PR#25964 added a new message type LaunchedExecutor and updated the 
> communication mechanism between executor and driver when launching executor 
> to:
>  # executor backend sends "RegisterExecutor" to the driver backend.
>  # the driver backend replies "true".
>  # executor backend instantiates executor once it receives "true" from driver 
> backend.
>  # after the executor is instantiated, the executor backend sends 
> "LaunchedExecutor" to the driver backend.
>  # the driver backend makes offers for executor when received 
> "LaunchedExecutor".
> So the issue occurs in steps 3 and 4. If the driver backend is stopped(hence 
> driver endpoint removed in dispatcher) during step 3, in step 4, when 
> executor backend tries to send "LaunchedExecutor" to driver backend, RPC 
> dispatcher will throw a SparkException for "Could not find 
> CoarseGrainedScheduler".  These exception logs are verbose and somewhat 
> misleading.
>  
> This race can be fixed or greatly alleviated through these changes:
> When the stop() in CoarseGrainedSchedulerBackend is called:
>  # A stopping boolean variable is set to true.
>  # driverEndpoint will not be stopped at this time. (dispatcher will stop it 
> at the end)
> And when the stopping is set to true, the driver backend will:
>  # replies sendFailure to executor backend when receives "RegisterExecutor".
>  # replies "StopExecutor" to executor backend (or "RemoveExecutor" to self) 
> when receives "LaunchedExecutor"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to