XiDuo You created SPARK-39291:
---------------------------------
Summary: Fetch blocks and open stream should not respond a closed
channel
Key: SPARK-39291
URL: https://issues.apache.org/jira/browse/SPARK-39291
Project: Spark
Issue Type: Improvement
Components: Spark Core
Affects Versions: 3.4.0
Reporter: XiDuo You
If user cancel and interrupt a reduce task who is fetching shuffle blocks, the
channel would be closed. However there may be some ChunkFetchRequest still in
flight, so the server side TransportRequestHandler would still try to respond
those ChunkFetchRequest. It gets worser if the reduce stage is big.
{code:java}
22/05/24 21:29:30 ERROR ChunkFetchRequestHandler: Error sending result
ChunkFetchFailure[streamChunkId=StreamChunkId[streamId=736493140719,chunkIndex=6],errorString=java.lang.IllegalStateException:
Requested chunk not available since streamId 736493140719 is closed
at
org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:92)
at
org.apache.spark.network.server.ChunkFetchRequestHandler.processFetchRequest(ChunkFetchRequestHandler.java:103)
at
org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:82)
at
org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:51)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
at
io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
at
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:750)
] to /ip:port; closing connection
java.nio.channels.ClosedChannelException
at
io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
at
io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
at
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
at
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
at
io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:110)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
at
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
at
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
at
io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
at
io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
at
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:750) {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]