SteNicholas opened a new pull request, #2356:
URL: https://github.com/apache/incubator-celeborn/pull/2356

   ### What changes were proposed in this pull request?
   
   Fix `FetchHandler#handleEndStreamFromClient` `NullPointerException` after 
recycling stream for `streams` which removes the corresponding `streamId`.
   
   ### Why are the changes needed?
   
   `FetchHandler#handleEndStreamFromClient` needs to get the shuffle key to 
record application active connection. But after recycling stream, 
`FetchHandler#handleEndStreamFromClient` may cause `NullPointerException`. 
Because `recycleStream` may be invoked in `MapPartitionDataReader`, which 
causes that the corresponding `streamId` is removed before 
`FetchHandler#handleEndStreamFromClient`.
   ```
   24/03/05 13:27:14,522 DEBUG [worker-credit-stream-manager-recycler] 
MapDataPartition: release all for stream: 990159671000
   24/03/05 13:27:14,524 DEBUG [worker-credit-stream-manager-recycler] 
MapDataPartition: release map data partition 
FileInfo{file=/data00/home/guoyangze/data/celeborn-worker/shuffle_data/1709616425086-343fe33c97559405b474412efc0d9ce5/0/0-0-0,
 chunkOffsets=0, userIdentifier=`default`.`default`, partitionType=MAP}
   24/03/05 13:27:14,531 ERROR [fetch-server-11-1] TransportRequestHandler: 
Error while invoking handler#receive() on RPC id 18
   java.lang.NullPointerException
           at 
org.apache.celeborn.service.deploy.worker.storage.CreditStreamManager.getStreamShuffleKey(CreditStreamManager.java:189)
           at 
org.apache.celeborn.service.deploy.worker.FetchHandler.handleEndStreamFromClient(FetchHandler.scala:369)
           at 
org.apache.celeborn.service.deploy.worker.FetchHandler.handleRpcRequest(FetchHandler.scala:143)
           at 
org.apache.celeborn.service.deploy.worker.FetchHandler.receive(FetchHandler.scala:97)
           at 
org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:96)
           at 
org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
           at 
org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
           at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
           at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
           at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
           at 
org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
           at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
           at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
           at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
           at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
           at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
           at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
           at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
           at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
           at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
           at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
           at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
           at java.base/java.lang.Thread.run(Thread.java:829)
   ```
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   - `ChunkStreamManagerSuiteJ#testStreamRegisterAndCleanup`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to