This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 660cf24de [CELEBORN-1319][FOLLOWUP] Fix IndexOutOfBoundsException when 
using old celeborn client
660cf24de is described below

commit 660cf24debca6eba7674bcfe260e75395b104204
Author: Wang, Fei <[email protected]>
AuthorDate: Fri Feb 28 11:34:14 2025 +0800

    [CELEBORN-1319][FOLLOWUP] Fix IndexOutOfBoundsException when using old 
celeborn client
    
    ### What changes were proposed in this pull request?
    Followup for https://github.com/apache/celeborn/pull/2373,
    fix the `IndexOutOfBoundsException` when using old celeborn client.
    
    ### Why are the changes needed?
    
    I meet below exception when using old celeborn client, seem incompatibility 
issue.
    
    The error log in worker end:
    ```
    25/02/27 14:18:24,182 ERROR [rpc_service-server-4-6] NettyRpcHandler: Error 
while invoking NettyRpcHandler#receive() on RPC id 4330
    java.lang.IndexOutOfBoundsException: Index:0, Size:0
            at 
com.google.protobuf.LongArrayList.ensureIndexInRange(LongArrayList.java:265)
            at com.google.protobuf.LongArrayList.getLong(LongArrayList.java:113)
            at 
org.apache.celeborn.common.protocol.PbPackedPartitionLocations.getFileSizes(PbPackedPartitionLocations.java:465)
            at 
org.apache.celeborn.common.util.PbSerDeUtils$.fromPackedPartitionLocations(PbSerDeUtils.scala:653)
            at 
org.apache.celeborn.common.util.PbSerDeUtils$.fromPbPackedPartitionLocationsPair(PbSerDeUtils.scala:591)
            at 
org.apache.celeborn.common.protocol.message.ControlMessages$.fromTransportMessage(ControlMessages.scala:1312)
            at 
org.apache.celeborn.common.util.Utils$.fromTransportMessage(Utils.scala:1056)
            at 
org.apache.celeborn.common.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:110)
            at 
org.apache.celeborn.common.rpc.netty.NettyRpcEnv.$anonfun$deserialize$2(NettyRpcEnv.scala:313)
            at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
            at 
org.apache.celeborn.common.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:363)
            at 
org.apache.celeborn.common.rpc.netty.NettyRpcEnv.$anonfun$deserialize$1(NettyRpcEnv.scala:312)
            at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
            at 
org.apache.celeborn.common.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:312)
            at 
org.apache.celeborn.common.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:555)
            at 
org.apache.celeborn.common.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:619)
            at 
org.apache.celeborn.common.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:605)
            at 
org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:101)
            at 
org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:85)
            at 
org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
            at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
            at 
org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
            at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)
            at 
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
            at 
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:501)
            at 
io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:399)
            at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)
            at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
            at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.base/java.lang.Thread.run(Thread.java:833)
    25/02/27 14:18:24,234 WARN [celeborn-dispatcher-81] Controller: Shuffle 
application_1739172886147_152994_1-0 not registered!
    25/02/27 14:18:24,284 WARN [celeborn-dispatcher-82] Controller: Shuffle 
application_1739172886147_152994_1-0 not registered!
    25/02/27 14:18:24,335 WARN [celeborn-dispatcher-83] Controller: Shuffle 
application_1739172886147_152994_1-0 not registered!
    ```
    
    The client end log:
    ```
    org.apache.celeborn.common.exception.CelebornIOException: Register shuffle 
failed for shuffle 0, reason: RESERVE_SLOTS_FAILED
            at 
org.apache.celeborn.client.ShuffleClientImpl.registerShuffleInternal(ShuffleClientImpl.java:710)
            at 
org.apache.celeborn.client.ShuffleClientImpl.registerShuffle(ShuffleClientImpl.java:519)
            at 
org.apache.celeborn.client.ShuffleClientImpl.lambda$getPartitionLocation$4(ShuffleClientImpl.java:579)
            at 
java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708)
            at 
org.apache.celeborn.client.ShuffleClientImpl.getPartitionLocation(ShuffleClientImpl.java:575)
            at 
org.apache.celeborn.client.write.DataPushQueue.takePushTasks(DataPushQueue.java:92)
            at 
org.apache.celeborn.client.write.DataPusher$1.run(DataPusher.java:124)
            at java.base/java.lang.Thread.run(Thread.java:833)
    
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UT and IT.
    
    Closes #3122 from turboFei/index_outof.
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../apache/celeborn/common/util/PbSerDeUtils.scala | 32 ++++++++++++++++------
 1 file changed, 23 insertions(+), 9 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index 553a4b9f9..0f7895947 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -633,6 +633,28 @@ object PbSerDeUtils {
         Mode.REPLICA
       }
 
+    val storageInfo =
+      if (pbPackedPartitionLocations.getFileSizesList.isEmpty ||
+        pbPackedPartitionLocations.getChunksOffsetsList.isEmpty) {
+        new StorageInfo(
+          StorageInfo.typesMap.get(pbPackedPartitionLocations.getTypes(index)),
+          pbPackedPartitionLocations.getMountPointsSet(
+            pbPackedPartitionLocations.getMountPoints(index)),
+          pbPackedPartitionLocations.getFinalResult(index),
+          filePath,
+          pbPackedPartitionLocations.getAvailableStorageTypes(index))
+      } else {
+        new StorageInfo(
+          StorageInfo.typesMap.get(pbPackedPartitionLocations.getTypes(index)),
+          pbPackedPartitionLocations.getMountPointsSet(
+            pbPackedPartitionLocations.getMountPoints(index)),
+          pbPackedPartitionLocations.getFinalResult(index),
+          filePath,
+          pbPackedPartitionLocations.getAvailableStorageTypes(index),
+          pbPackedPartitionLocations.getFileSizes(index),
+          
pbPackedPartitionLocations.getChunksOffsets(index).getChunkOffsetList)
+      }
+
     new PartitionLocation(
       pbPackedPartitionLocations.getIds(index),
       pbPackedPartitionLocations.getEpoches(index),
@@ -643,15 +665,7 @@ object PbSerDeUtils {
       workerIdParts(4).toInt,
       mode,
       null,
-      new StorageInfo(
-        StorageInfo.typesMap.get(pbPackedPartitionLocations.getTypes(index)),
-        pbPackedPartitionLocations.getMountPointsSet(
-          pbPackedPartitionLocations.getMountPoints(index)),
-        pbPackedPartitionLocations.getFinalResult(index),
-        filePath,
-        pbPackedPartitionLocations.getAvailableStorageTypes(index),
-        pbPackedPartitionLocations.getFileSizes(index),
-        pbPackedPartitionLocations.getChunksOffsets(index).getChunkOffsetList),
+      storageInfo,
       
Utils.byteStringToRoaringBitmap(pbPackedPartitionLocations.getMapIdBitMap(index)))
   }
 

Reply via email to