This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new cbaef742e [CELEBORN-1424] Fix getChunk NPE when enable local read
cbaef742e is described below

commit cbaef742efec136cf4ed550223ad6d2d72f9301d
Author: onebox-li <[email protected]>
AuthorDate: Thu May 16 09:32:52 2024 +0800

    [CELEBORN-1424] Fix getChunk NPE when enable local read
    
    ### What changes were proposed in this pull request?
    When build the open stream request, additionally check whether the host is 
equivalent to judge whether to read locally.
    
    ### Why are the changes needed?
    When local read is enabled, batch open stream forgot to determine whether 
the hosts are equal, causing the worker to think it is a local read 
situation(stream state's `buffers` is null). When this is actually a remote 
read, NPE will be thrown during getChunk as below stack, and then a remote read 
retry will occur.
    ```
    2024/05/14 11:07:43,933 WARN [data-client-5-7] TransportResponseHandler: 
Receive ChunkFetchFailure, errorMsg java.lang.NullPointerException
            at 
org.apache.celeborn.service.deploy.worker.storage.ChunkStreamManager.getChunk(ChunkStreamManager.java:85)
            at 
org.apache.celeborn.service.deploy.worker.FetchHandler.handleChunkFetchRequest(FetchHandler.scala:503)
            at 
org.apache.celeborn.service.deploy.worker.FetchHandler.handleRpcRequest(FetchHandler.scala:181)
            at 
org.apache.celeborn.service.deploy.worker.FetchHandler.receive(FetchHandler.scala:101)
            at 
org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:96)
            at 
org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
            at 
org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
            at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
            at 
org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
            at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
            at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
            at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
            at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
            at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
            at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
            at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
            at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.lang.Thread.run(Thread.java:748)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manual test, and exception is gone.
    
    Closes #2510 from onebox-li/fix-local-read-npe.
    
    Authored-by: onebox-li <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala     | 4 +++-
 .../java/org/apache/celeborn/client/read/CelebornInputStream.java     | 2 +-
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index bb66a3cef..b18733e77 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -117,6 +117,7 @@ class CelebornShuffleReader[K, C](
     val startTime = System.currentTimeMillis()
     val fetchTimeoutMs = conf.clientFetchTimeoutMs
     val localFetchEnabled = conf.enableReadLocalShuffleFile
+    val localHostAddress = Utils.localHostName(conf)
     val shuffleKey = Utils.makeShuffleKey(handle.appUniqueId, shuffleId)
     // startPartition is irrelevant
     val fileGroups = shuffleClient.updateFileGroup(shuffleId, startPartition)
@@ -148,7 +149,8 @@ class CelebornShuffleReader[K, C](
           pbOpenStreamListBuilder.addFileName(location.getFileName)
             .addStartIndex(startMapIndex)
             .addEndIndex(endMapIndex)
-          pbOpenStreamListBuilder.addReadLocalShuffle(localFetchEnabled)
+          pbOpenStreamListBuilder.addReadLocalShuffle(
+            localFetchEnabled && location.getHost.equals(localHostAddress))
         }
       }
     }
diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index 798128d2f..99b131a48 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -444,7 +444,7 @@ public abstract class CelebornInputStream extends 
InputStream {
       switch (storageInfo.getType()) {
         case HDD:
         case SSD:
-          if (enabledReadLocalShuffle && 
location.getWorker().host().equals(localHostAddress)) {
+          if (enabledReadLocalShuffle && 
location.getHost().equals(localHostAddress)) {
             logger.debug("Read local shuffle file {}", localHostAddress);
             containLocalRead = true;
             return new LocalPartitionReader(

Reply via email to