[ 
https://issues.apache.org/jira/browse/SPARK-38856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

weixiuli updated SPARK-38856:
-----------------------------
    Description: 
When enabled push-based shuffle in our production, there will be a 
rejectedExecutionException error, this is because that the shuffle pusher pool 
has been shutdowned before using it.

This is the rejectedExecutionException error :

{{FetchFailed(BlockManagerId(26,xxxxx.hadoop.jd.local, 7337, None), 
shuffleId=0, mapIndex=6424, mapId=4177, reduceId=1031, message=
org.apache.spark.shuffle.FetchFailedException
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1181)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:919)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:81)
        at 
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:815)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:133)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.RejectedExecutionException: Task 
org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2$$Lambda$1045/583658475@3492bd6f
 rejected from java.util.concurrent.ThreadPoolExecutor@2e63bad5[Terminated, 
pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 243134]
        at 
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
        at 
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
        at 
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
        at 
org.apache.spark.shuffle.ShuffleBlockPusher.submitTask(ShuffleBlockPusher.scala:147)
        at 
org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2.handleResult(ShuffleBlockPusher.scala:235)
        at 
org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2.onBlockPushSuccess(ShuffleBlockPusher.scala:245)
        at 
org.apache.spark.network.shuffle.BlockPushingListener.onBlockTransferSuccess(BlockPushingListener.java:42)
        at 
org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2.onBlockTransferSuccess(ShuffleBlockPusher.scala:224)
        at 
org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.handleBlockTransferSuccess(RetryingBlockTransferor.java:258)
        at 
org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.onBlockPushSuccess(RetryingBlockTransferor.java:304)
        at 
org.apache.spark.network.shuffle.OneForOneBlockPusher$BlockPushCallback.onSuccess(OneForOneBlockPusher.java:97)
        at 
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:197)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        ... 1 more

)}}

> Fix a rejectedExecutionException error when push-based shuffle is enabled
> -------------------------------------------------------------------------
>
>                 Key: SPARK-38856
>                 URL: https://issues.apache.org/jira/browse/SPARK-38856
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle
>    Affects Versions: 3.2.0, 3.2.1
>            Reporter: weixiuli
>            Assignee: weixiuli
>            Priority: Major
>
> When enabled push-based shuffle in our production, there will be a 
> rejectedExecutionException error, this is because that the shuffle pusher 
> pool has been shutdowned before using it.
> This is the rejectedExecutionException error :
> {{FetchFailed(BlockManagerId(26,xxxxx.hadoop.jd.local, 7337, None), 
> shuffleId=0, mapIndex=6424, mapId=4177, reduceId=1031, message=
> org.apache.spark.shuffle.FetchFailedException
>       at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1181)
>       at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:919)
>       at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:81)
>       at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
>       at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
>       at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>       at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>       at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:815)
>       at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>       at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
>       at 
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
>       at org.apache.spark.scheduler.Task.run(Task.scala:133)
>       at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.RejectedExecutionException: Task 
> org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2$$Lambda$1045/583658475@3492bd6f
>  rejected from java.util.concurrent.ThreadPoolExecutor@2e63bad5[Terminated, 
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 243134]
>       at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
>       at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
>       at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
>       at 
> org.apache.spark.shuffle.ShuffleBlockPusher.submitTask(ShuffleBlockPusher.scala:147)
>       at 
> org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2.handleResult(ShuffleBlockPusher.scala:235)
>       at 
> org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2.onBlockPushSuccess(ShuffleBlockPusher.scala:245)
>       at 
> org.apache.spark.network.shuffle.BlockPushingListener.onBlockTransferSuccess(BlockPushingListener.java:42)
>       at 
> org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2.onBlockTransferSuccess(ShuffleBlockPusher.scala:224)
>       at 
> org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.handleBlockTransferSuccess(RetryingBlockTransferor.java:258)
>       at 
> org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.onBlockPushSuccess(RetryingBlockTransferor.java:304)
>       at 
> org.apache.spark.network.shuffle.OneForOneBlockPusher$BlockPushCallback.onSuccess(OneForOneBlockPusher.java:97)
>       at 
> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:197)
>       at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142)
>       at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
>       at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>       at 
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>       at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>       at 
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>       at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>       at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>       at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
>       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
>       at 
> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>       at 
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>       ... 1 more
> )}}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to