SteNicholas opened a new pull request, #2356:
URL: https://github.com/apache/incubator-celeborn/pull/2356
### What changes were proposed in this pull request?
Fix `FetchHandler#handleEndStreamFromClient` `NullPointerException` after
recycling stream for `streams` which removes the corresponding `streamId`.
### Why are the changes needed?
`FetchHandler#handleEndStreamFromClient` needs to get the shuffle key to
record application active connection. But after recycling stream,
`FetchHandler#handleEndStreamFromClient` may cause `NullPointerException`.
Because `recycleStream` may be invoked in `MapPartitionDataReader`, which
causes that the corresponding `streamId` is removed before
`FetchHandler#handleEndStreamFromClient`.
```
24/03/05 13:27:14,522 DEBUG [worker-credit-stream-manager-recycler]
MapDataPartition: release all for stream: 990159671000
24/03/05 13:27:14,524 DEBUG [worker-credit-stream-manager-recycler]
MapDataPartition: release map data partition
FileInfo{file=/data00/home/guoyangze/data/celeborn-worker/shuffle_data/1709616425086-343fe33c97559405b474412efc0d9ce5/0/0-0-0,
chunkOffsets=0, userIdentifier=`default`.`default`, partitionType=MAP}
24/03/05 13:27:14,531 ERROR [fetch-server-11-1] TransportRequestHandler:
Error while invoking handler#receive() on RPC id 18
java.lang.NullPointerException
at
org.apache.celeborn.service.deploy.worker.storage.CreditStreamManager.getStreamShuffleKey(CreditStreamManager.java:189)
at
org.apache.celeborn.service.deploy.worker.FetchHandler.handleEndStreamFromClient(FetchHandler.scala:369)
at
org.apache.celeborn.service.deploy.worker.FetchHandler.handleRpcRequest(FetchHandler.scala:143)
at
org.apache.celeborn.service.deploy.worker.FetchHandler.receive(FetchHandler.scala:97)
at
org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:96)
at
org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
at
org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- `ChunkStreamManagerSuiteJ#testStreamRegisterAndCleanup`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]