This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new fa4327e09 [CELEBORN-1885] Fix nullptr exceptions in FetchChunk after 
worker restart
fa4327e09 is described below

commit fa4327e0931923483cabf4a513efb207ca68d687
Author: Sanskar Modi <[email protected]>
AuthorDate: Tue Mar 4 22:26:38 2025 +0800

    [CELEBORN-1885] Fix nullptr exceptions in FetchChunk after worker restart
    
    ### What changes were proposed in this pull request?
    
    Handling nullptr exception in FetchHandler after worker restarts.
    
    ### Why are the changes needed?
    
    Current code throw nullptr during handleChunkFetchRequest –
    
    ```
    25/02/27 09:23:25 WARN [data-client-5-1] TransportResponseHandler: Ignoring 
response for RPC 425133 from phx61-u46.prod.uber.internal/10.154.141.80:19103 
(java.lang.NullPointerException: Cannot read field "shuffleKey" because "state" 
is null
            at 
org.apache.celeborn.service.deploy.worker.storage.ChunkStreamManager.getShuffleKeyAndFileName(ChunkStreamManager.java:199)
            at 
org.apache.celeborn.service.deploy.worker.FetchHandler.handleChunkFetchRequest(FetchHandler.scala:493)
            at 
org.apache.celeborn.service.deploy.worker.FetchHandler.handleRpcRequest(FetchHandler.scala:181)
            at 
org.apache.celeborn.service.deploy.worker.FetchHandler.receive(FetchHandler.scala:101)
            at 
org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100)
            at 
org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
            at 
org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
            at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
            at 
org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
            at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
            at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
            at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
            at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
            at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
            at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
            at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
            at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.base/java.lang.Thread.run(Thread.java:840)
    ) since it is not outstanding
    ```
    and during handleEndStreamFromClient – 
    
    ```
    2025-02-27T17:31:03+05:30 WARN [data-client-5-3] TransportResponseHandler: 
Ignoring response for RPC 461011 from 
phx61-hkr.prod.uber.internal/10.154.133.115:19103 
(java.lang.NullPointerException: Cannot read field "shuffleKey" because 
"streamState" is null
            at 
org.apache.celeborn.service.deploy.worker.FetchHandler.handleEndStreamFromClient(FetchHandler.scala:458)
            at 
org.apache.celeborn.service.deploy.worker.FetchHandler.handleRpcRequest(FetchHandler.scala:174)
            at 
org.apache.celeborn.service.deploy.worker.FetchHandler.receive(FetchHandler.scala:101)
            at 
org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100)
            at 
org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
            at 
org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
            at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
            at 
org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
            at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
            at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
            at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
            at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
            at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
            at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
            at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
            at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.base/java.lang.Thread.run(Thread.java:840)
    ) since it is not outstanding
    ```
    Due to which this error does not get handled properly in the 
`TransportResponseHandler` as ChunkFetchFailure. Instead it considered this 
failure as `RpcFailure`.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Tested in our staging cluster.
    
    Closes #3128 from s0nskar/fix_nullptr_restart.
    
    Authored-by: Sanskar Modi <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../deploy/worker/storage/ChunkStreamManager.java  |  7 -------
 .../service/deploy/worker/FetchHandler.scala       | 22 ++++++++++++++++------
 2 files changed, 16 insertions(+), 13 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java
index 8b0caa211..91852ab0c 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java
@@ -22,8 +22,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
-import scala.Tuple2;
-
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -198,11 +196,6 @@ public class ChunkStreamManager {
     return streams.get(streamId);
   }
 
-  public Tuple2<String, String> getShuffleKeyAndFileName(long streamId) {
-    StreamState state = streams.get(streamId);
-    return new Tuple2<>(state.shuffleKey, state.fileName);
-  }
-
   public int getStreamsCount() {
     return streams.size();
   }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index c58c535fc..2e9895beb 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -469,9 +469,11 @@ class FetchHandler(
     streamType match {
       case StreamType.ChunkStream =>
         val streamState = chunkStreamManager.getStreamState(streamId)
-        val (shuffleKey, fileName) = (streamState.shuffleKey, 
streamState.fileName)
-        workerSource.recordAppActiveConnection(client, shuffleKey)
-        getRawFileInfo(shuffleKey, fileName).closeStream(streamId)
+        if (streamState != null) {
+          val (shuffleKey, fileName) = (streamState.shuffleKey, 
streamState.fileName)
+          workerSource.recordAppActiveConnection(client, shuffleKey)
+          getRawFileInfo(shuffleKey, fileName).closeStream(streamId)
+        }
       case StreamType.CreditStream =>
         val shuffleKey = creditStreamManager.getStreamShuffleKey(streamId)
         if (shuffleKey != null) {
@@ -533,9 +535,15 @@ class FetchHandler(
     logDebug(s"Received req from ${remoteAddr}" +
       s" to fetch block $streamChunkSlice")
 
-    workerSource.recordAppActiveConnection(
-      client,
-      
chunkStreamManager.getShuffleKeyAndFileName(streamChunkSlice.streamId)._1)
+    val streamState = 
chunkStreamManager.getStreamState(streamChunkSlice.streamId)
+    if (streamState == null) {
+      val message = s"Stream ${streamChunkSlice.streamId} is not registered 
with worker. " +
+        "This can happen if the worker was restart recently."
+      logError(message)
+      workerSource.incCounter(WorkerSource.FETCH_CHUNK_FAIL_COUNT)
+      client.getChannel.writeAndFlush(new ChunkFetchFailure(streamChunkSlice, 
message))
+      return
+    }
 
     maxChunkBeingTransferred.foreach { threshold =>
       val chunksBeingTransferred = chunkStreamManager.chunksBeingTransferred 
// take high cpu usage
@@ -550,6 +558,8 @@ class FetchHandler(
       }
     }
 
+    workerSource.recordAppActiveConnection(client, streamState.shuffleKey)
+
     val reqStr = req.toString
     workerSource.startTimer(WorkerSource.FETCH_CHUNK_TIME, reqStr)
     val fetchTimeMetric = 
chunkStreamManager.getFetchTimeMetric(streamChunkSlice.streamId)

Reply via email to