This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new cbaef742e [CELEBORN-1424] Fix getChunk NPE when enable local read
cbaef742e is described below
commit cbaef742efec136cf4ed550223ad6d2d72f9301d
Author: onebox-li <[email protected]>
AuthorDate: Thu May 16 09:32:52 2024 +0800
[CELEBORN-1424] Fix getChunk NPE when enable local read
### What changes were proposed in this pull request?
When build the open stream request, additionally check whether the host is
equivalent to judge whether to read locally.
### Why are the changes needed?
When local read is enabled, batch open stream forgot to determine whether
the hosts are equal, causing the worker to think it is a local read
situation(stream state's `buffers` is null). When this is actually a remote
read, NPE will be thrown during getChunk as below stack, and then a remote read
retry will occur.
```
2024/05/14 11:07:43,933 WARN [data-client-5-7] TransportResponseHandler:
Receive ChunkFetchFailure, errorMsg java.lang.NullPointerException
at
org.apache.celeborn.service.deploy.worker.storage.ChunkStreamManager.getChunk(ChunkStreamManager.java:85)
at
org.apache.celeborn.service.deploy.worker.FetchHandler.handleChunkFetchRequest(FetchHandler.scala:503)
at
org.apache.celeborn.service.deploy.worker.FetchHandler.handleRpcRequest(FetchHandler.scala:181)
at
org.apache.celeborn.service.deploy.worker.FetchHandler.receive(FetchHandler.scala:101)
at
org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:96)
at
org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
at
org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manual test, and exception is gone.
Closes #2510 from onebox-li/fix-local-read-npe.
Authored-by: onebox-li <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala | 4 +++-
.../java/org/apache/celeborn/client/read/CelebornInputStream.java | 2 +-
2 files changed, 4 insertions(+), 2 deletions(-)
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index bb66a3cef..b18733e77 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -117,6 +117,7 @@ class CelebornShuffleReader[K, C](
val startTime = System.currentTimeMillis()
val fetchTimeoutMs = conf.clientFetchTimeoutMs
val localFetchEnabled = conf.enableReadLocalShuffleFile
+ val localHostAddress = Utils.localHostName(conf)
val shuffleKey = Utils.makeShuffleKey(handle.appUniqueId, shuffleId)
// startPartition is irrelevant
val fileGroups = shuffleClient.updateFileGroup(shuffleId, startPartition)
@@ -148,7 +149,8 @@ class CelebornShuffleReader[K, C](
pbOpenStreamListBuilder.addFileName(location.getFileName)
.addStartIndex(startMapIndex)
.addEndIndex(endMapIndex)
- pbOpenStreamListBuilder.addReadLocalShuffle(localFetchEnabled)
+ pbOpenStreamListBuilder.addReadLocalShuffle(
+ localFetchEnabled && location.getHost.equals(localHostAddress))
}
}
}
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index 798128d2f..99b131a48 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -444,7 +444,7 @@ public abstract class CelebornInputStream extends
InputStream {
switch (storageInfo.getType()) {
case HDD:
case SSD:
- if (enabledReadLocalShuffle &&
location.getWorker().host().equals(localHostAddress)) {
+ if (enabledReadLocalShuffle &&
location.getHost().equals(localHostAddress)) {
logger.debug("Read local shuffle file {}", localHostAddress);
containLocalRead = true;
return new LocalPartitionReader(