[
https://issues.apache.org/jira/browse/SPARK-38856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
weixiuli updated SPARK-38856:
-----------------------------
Description:
When enabled push-based shuffle in our production, there will be a
rejectedExecutionException error, this is because that the shuffle pusher pool
has been shutdowned before using it.
This is the rejectedExecutionException error :
{{FetchFailed(BlockManagerId(26,xxxxx.hadoop.jd.local, 7337, None),
shuffleId=0, mapIndex=6424, mapId=4177, reduceId=1031, message=
org.apache.spark.shuffle.FetchFailedException
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1181)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:919)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:81)
at
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:815)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:133)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.RejectedExecutionException: Task
org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2$$Lambda$1045/583658475@3492bd6f
rejected from java.util.concurrent.ThreadPoolExecutor@2e63bad5[Terminated,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 243134]
at
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at
org.apache.spark.shuffle.ShuffleBlockPusher.submitTask(ShuffleBlockPusher.scala:147)
at
org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2.handleResult(ShuffleBlockPusher.scala:235)
at
org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2.onBlockPushSuccess(ShuffleBlockPusher.scala:245)
at
org.apache.spark.network.shuffle.BlockPushingListener.onBlockTransferSuccess(BlockPushingListener.java:42)
at
org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2.onBlockTransferSuccess(ShuffleBlockPusher.scala:224)
at
org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.handleBlockTransferSuccess(RetryingBlockTransferor.java:258)
at
org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.onBlockPushSuccess(RetryingBlockTransferor.java:304)
at
org.apache.spark.network.shuffle.OneForOneBlockPusher$BlockPushCallback.onSuccess(OneForOneBlockPusher.java:97)
at
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:197)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
)}}
> Fix a rejectedExecutionException error when push-based shuffle is enabled
> -------------------------------------------------------------------------
>
> Key: SPARK-38856
> URL: https://issues.apache.org/jira/browse/SPARK-38856
> Project: Spark
> Issue Type: Bug
> Components: Shuffle
> Affects Versions: 3.2.0, 3.2.1
> Reporter: weixiuli
> Assignee: weixiuli
> Priority: Major
>
> When enabled push-based shuffle in our production, there will be a
> rejectedExecutionException error, this is because that the shuffle pusher
> pool has been shutdowned before using it.
> This is the rejectedExecutionException error :
> {{FetchFailed(BlockManagerId(26,xxxxx.hadoop.jd.local, 7337, None),
> shuffleId=0, mapIndex=6424, mapId=4177, reduceId=1031, message=
> org.apache.spark.shuffle.FetchFailedException
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1181)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:919)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:81)
> at
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
> at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:815)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
> at
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
> at org.apache.spark.scheduler.Task.run(Task.scala:133)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.RejectedExecutionException: Task
> org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2$$Lambda$1045/583658475@3492bd6f
> rejected from java.util.concurrent.ThreadPoolExecutor@2e63bad5[Terminated,
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 243134]
> at
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
> at
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
> at
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
> at
> org.apache.spark.shuffle.ShuffleBlockPusher.submitTask(ShuffleBlockPusher.scala:147)
> at
> org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2.handleResult(ShuffleBlockPusher.scala:235)
> at
> org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2.onBlockPushSuccess(ShuffleBlockPusher.scala:245)
> at
> org.apache.spark.network.shuffle.BlockPushingListener.onBlockTransferSuccess(BlockPushingListener.java:42)
> at
> org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2.onBlockTransferSuccess(ShuffleBlockPusher.scala:224)
> at
> org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.handleBlockTransferSuccess(RetryingBlockTransferor.java:258)
> at
> org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.onBlockPushSuccess(RetryingBlockTransferor.java:304)
> at
> org.apache.spark.network.shuffle.OneForOneBlockPusher$BlockPushCallback.onSuccess(OneForOneBlockPusher.java:97)
> at
> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:197)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
> at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> at
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> at
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> at
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> at
> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
> ... 1 more
> )}}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]