[ https://issues.apache.org/jira/browse/SPARK-38856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
weixiuli updated SPARK-38856: ----------------------------- Description: When enabled push-based shuffle in our production, there will be a rejectedExecutionException error, this is because that the shuffle pusher pool has been shutdowned before using it. This is the rejectedExecutionException error : {{FetchFailed(BlockManagerId(26,xxxxx.hadoop.jd.local, 7337, None), shuffleId=0, mapIndex=6424, mapId=4177, reduceId=1031, message= org.apache.spark.shuffle.FetchFailedException at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1181) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:919) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:81) at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:815) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:133) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.RejectedExecutionException: Task org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2$$Lambda$1045/583658475@3492bd6f rejected from java.util.concurrent.ThreadPoolExecutor@2e63bad5[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 243134] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at org.apache.spark.shuffle.ShuffleBlockPusher.submitTask(ShuffleBlockPusher.scala:147) at org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2.handleResult(ShuffleBlockPusher.scala:235) at org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2.onBlockPushSuccess(ShuffleBlockPusher.scala:245) at org.apache.spark.network.shuffle.BlockPushingListener.onBlockTransferSuccess(BlockPushingListener.java:42) at org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2.onBlockTransferSuccess(ShuffleBlockPusher.scala:224) at org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.handleBlockTransferSuccess(RetryingBlockTransferor.java:258) at org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.onBlockPushSuccess(RetryingBlockTransferor.java:304) at org.apache.spark.network.shuffle.OneForOneBlockPusher$BlockPushCallback.onSuccess(OneForOneBlockPusher.java:97) at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:197) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ... 1 more )}} > Fix a rejectedExecutionException error when push-based shuffle is enabled > ------------------------------------------------------------------------- > > Key: SPARK-38856 > URL: https://issues.apache.org/jira/browse/SPARK-38856 > Project: Spark > Issue Type: Bug > Components: Shuffle > Affects Versions: 3.2.0, 3.2.1 > Reporter: weixiuli > Assignee: weixiuli > Priority: Major > > When enabled push-based shuffle in our production, there will be a > rejectedExecutionException error, this is because that the shuffle pusher > pool has been shutdowned before using it. > This is the rejectedExecutionException error : > {{FetchFailed(BlockManagerId(26,xxxxx.hadoop.jd.local, 7337, None), > shuffleId=0, mapIndex=6424, mapId=4177, reduceId=1031, message= > org.apache.spark.shuffle.FetchFailedException > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1181) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:919) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:81) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29) > at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:815) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179) > at > org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) > at org.apache.spark.scheduler.Task.run(Task.scala:133) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.RejectedExecutionException: Task > org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2$$Lambda$1045/583658475@3492bd6f > rejected from java.util.concurrent.ThreadPoolExecutor@2e63bad5[Terminated, > pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 243134] > at > java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) > at > java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) > at > java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) > at > org.apache.spark.shuffle.ShuffleBlockPusher.submitTask(ShuffleBlockPusher.scala:147) > at > org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2.handleResult(ShuffleBlockPusher.scala:235) > at > org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2.onBlockPushSuccess(ShuffleBlockPusher.scala:245) > at > org.apache.spark.network.shuffle.BlockPushingListener.onBlockTransferSuccess(BlockPushingListener.java:42) > at > org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2.onBlockTransferSuccess(ShuffleBlockPusher.scala:224) > at > org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.handleBlockTransferSuccess(RetryingBlockTransferor.java:258) > at > org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.onBlockPushSuccess(RetryingBlockTransferor.java:304) > at > org.apache.spark.network.shuffle.OneForOneBlockPusher$BlockPushCallback.onSuccess(OneForOneBlockPusher.java:97) > at > org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:197) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53) > at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) > at > io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) > at > io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > at > io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) > ... 1 more > )}} -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org