This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new fa4327e09 [CELEBORN-1885] Fix nullptr exceptions in FetchChunk after
worker restart
fa4327e09 is described below
commit fa4327e0931923483cabf4a513efb207ca68d687
Author: Sanskar Modi <[email protected]>
AuthorDate: Tue Mar 4 22:26:38 2025 +0800
[CELEBORN-1885] Fix nullptr exceptions in FetchChunk after worker restart
### What changes were proposed in this pull request?
Handling nullptr exception in FetchHandler after worker restarts.
### Why are the changes needed?
Current code throw nullptr during handleChunkFetchRequest –
```
25/02/27 09:23:25 WARN [data-client-5-1] TransportResponseHandler: Ignoring
response for RPC 425133 from phx61-u46.prod.uber.internal/10.154.141.80:19103
(java.lang.NullPointerException: Cannot read field "shuffleKey" because "state"
is null
at
org.apache.celeborn.service.deploy.worker.storage.ChunkStreamManager.getShuffleKeyAndFileName(ChunkStreamManager.java:199)
at
org.apache.celeborn.service.deploy.worker.FetchHandler.handleChunkFetchRequest(FetchHandler.scala:493)
at
org.apache.celeborn.service.deploy.worker.FetchHandler.handleRpcRequest(FetchHandler.scala:181)
at
org.apache.celeborn.service.deploy.worker.FetchHandler.receive(FetchHandler.scala:101)
at
org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100)
at
org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
at
org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:840)
) since it is not outstanding
```
and during handleEndStreamFromClient –
```
2025-02-27T17:31:03+05:30 WARN [data-client-5-3] TransportResponseHandler:
Ignoring response for RPC 461011 from
phx61-hkr.prod.uber.internal/10.154.133.115:19103
(java.lang.NullPointerException: Cannot read field "shuffleKey" because
"streamState" is null
at
org.apache.celeborn.service.deploy.worker.FetchHandler.handleEndStreamFromClient(FetchHandler.scala:458)
at
org.apache.celeborn.service.deploy.worker.FetchHandler.handleRpcRequest(FetchHandler.scala:174)
at
org.apache.celeborn.service.deploy.worker.FetchHandler.receive(FetchHandler.scala:101)
at
org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100)
at
org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
at
org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:840)
) since it is not outstanding
```
Due to which this error does not get handled properly in the
`TransportResponseHandler` as ChunkFetchFailure. Instead it considered this
failure as `RpcFailure`.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Tested in our staging cluster.
Closes #3128 from s0nskar/fix_nullptr_restart.
Authored-by: Sanskar Modi <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../deploy/worker/storage/ChunkStreamManager.java | 7 -------
.../service/deploy/worker/FetchHandler.scala | 22 ++++++++++++++++------
2 files changed, 16 insertions(+), 13 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java
index 8b0caa211..91852ab0c 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java
@@ -22,8 +22,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
-import scala.Tuple2;
-
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -198,11 +196,6 @@ public class ChunkStreamManager {
return streams.get(streamId);
}
- public Tuple2<String, String> getShuffleKeyAndFileName(long streamId) {
- StreamState state = streams.get(streamId);
- return new Tuple2<>(state.shuffleKey, state.fileName);
- }
-
public int getStreamsCount() {
return streams.size();
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index c58c535fc..2e9895beb 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -469,9 +469,11 @@ class FetchHandler(
streamType match {
case StreamType.ChunkStream =>
val streamState = chunkStreamManager.getStreamState(streamId)
- val (shuffleKey, fileName) = (streamState.shuffleKey,
streamState.fileName)
- workerSource.recordAppActiveConnection(client, shuffleKey)
- getRawFileInfo(shuffleKey, fileName).closeStream(streamId)
+ if (streamState != null) {
+ val (shuffleKey, fileName) = (streamState.shuffleKey,
streamState.fileName)
+ workerSource.recordAppActiveConnection(client, shuffleKey)
+ getRawFileInfo(shuffleKey, fileName).closeStream(streamId)
+ }
case StreamType.CreditStream =>
val shuffleKey = creditStreamManager.getStreamShuffleKey(streamId)
if (shuffleKey != null) {
@@ -533,9 +535,15 @@ class FetchHandler(
logDebug(s"Received req from ${remoteAddr}" +
s" to fetch block $streamChunkSlice")
- workerSource.recordAppActiveConnection(
- client,
-
chunkStreamManager.getShuffleKeyAndFileName(streamChunkSlice.streamId)._1)
+ val streamState =
chunkStreamManager.getStreamState(streamChunkSlice.streamId)
+ if (streamState == null) {
+ val message = s"Stream ${streamChunkSlice.streamId} is not registered
with worker. " +
+ "This can happen if the worker was restart recently."
+ logError(message)
+ workerSource.incCounter(WorkerSource.FETCH_CHUNK_FAIL_COUNT)
+ client.getChannel.writeAndFlush(new ChunkFetchFailure(streamChunkSlice,
message))
+ return
+ }
maxChunkBeingTransferred.foreach { threshold =>
val chunksBeingTransferred = chunkStreamManager.chunksBeingTransferred
// take high cpu usage
@@ -550,6 +558,8 @@ class FetchHandler(
}
}
+ workerSource.recordAppActiveConnection(client, streamState.shuffleKey)
+
val reqStr = req.toString
workerSource.startTimer(WorkerSource.FETCH_CHUNK_TIME, reqStr)
val fetchTimeMetric =
chunkStreamManager.getFetchTimeMetric(streamChunkSlice.streamId)