This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 660cf24de [CELEBORN-1319][FOLLOWUP] Fix IndexOutOfBoundsException when
using old celeborn client
660cf24de is described below
commit 660cf24debca6eba7674bcfe260e75395b104204
Author: Wang, Fei <[email protected]>
AuthorDate: Fri Feb 28 11:34:14 2025 +0800
[CELEBORN-1319][FOLLOWUP] Fix IndexOutOfBoundsException when using old
celeborn client
### What changes were proposed in this pull request?
Followup for https://github.com/apache/celeborn/pull/2373,
fix the `IndexOutOfBoundsException` when using old celeborn client.
### Why are the changes needed?
I meet below exception when using old celeborn client, seem incompatibility
issue.
The error log in worker end:
```
25/02/27 14:18:24,182 ERROR [rpc_service-server-4-6] NettyRpcHandler: Error
while invoking NettyRpcHandler#receive() on RPC id 4330
java.lang.IndexOutOfBoundsException: Index:0, Size:0
at
com.google.protobuf.LongArrayList.ensureIndexInRange(LongArrayList.java:265)
at com.google.protobuf.LongArrayList.getLong(LongArrayList.java:113)
at
org.apache.celeborn.common.protocol.PbPackedPartitionLocations.getFileSizes(PbPackedPartitionLocations.java:465)
at
org.apache.celeborn.common.util.PbSerDeUtils$.fromPackedPartitionLocations(PbSerDeUtils.scala:653)
at
org.apache.celeborn.common.util.PbSerDeUtils$.fromPbPackedPartitionLocationsPair(PbSerDeUtils.scala:591)
at
org.apache.celeborn.common.protocol.message.ControlMessages$.fromTransportMessage(ControlMessages.scala:1312)
at
org.apache.celeborn.common.util.Utils$.fromTransportMessage(Utils.scala:1056)
at
org.apache.celeborn.common.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:110)
at
org.apache.celeborn.common.rpc.netty.NettyRpcEnv.$anonfun$deserialize$2(NettyRpcEnv.scala:313)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at
org.apache.celeborn.common.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:363)
at
org.apache.celeborn.common.rpc.netty.NettyRpcEnv.$anonfun$deserialize$1(NettyRpcEnv.scala:312)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at
org.apache.celeborn.common.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:312)
at
org.apache.celeborn.common.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:555)
at
org.apache.celeborn.common.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:619)
at
org.apache.celeborn.common.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:605)
at
org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:101)
at
org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:85)
at
org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)
at
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
at
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:501)
at
io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:399)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
25/02/27 14:18:24,234 WARN [celeborn-dispatcher-81] Controller: Shuffle
application_1739172886147_152994_1-0 not registered!
25/02/27 14:18:24,284 WARN [celeborn-dispatcher-82] Controller: Shuffle
application_1739172886147_152994_1-0 not registered!
25/02/27 14:18:24,335 WARN [celeborn-dispatcher-83] Controller: Shuffle
application_1739172886147_152994_1-0 not registered!
```
The client end log:
```
org.apache.celeborn.common.exception.CelebornIOException: Register shuffle
failed for shuffle 0, reason: RESERVE_SLOTS_FAILED
at
org.apache.celeborn.client.ShuffleClientImpl.registerShuffleInternal(ShuffleClientImpl.java:710)
at
org.apache.celeborn.client.ShuffleClientImpl.registerShuffle(ShuffleClientImpl.java:519)
at
org.apache.celeborn.client.ShuffleClientImpl.lambda$getPartitionLocation$4(ShuffleClientImpl.java:579)
at
java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708)
at
org.apache.celeborn.client.ShuffleClientImpl.getPartitionLocation(ShuffleClientImpl.java:575)
at
org.apache.celeborn.client.write.DataPushQueue.takePushTasks(DataPushQueue.java:92)
at
org.apache.celeborn.client.write.DataPusher$1.run(DataPusher.java:124)
at java.base/java.lang.Thread.run(Thread.java:833)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UT and IT.
Closes #3122 from turboFei/index_outof.
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../apache/celeborn/common/util/PbSerDeUtils.scala | 32 ++++++++++++++++------
1 file changed, 23 insertions(+), 9 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index 553a4b9f9..0f7895947 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -633,6 +633,28 @@ object PbSerDeUtils {
Mode.REPLICA
}
+ val storageInfo =
+ if (pbPackedPartitionLocations.getFileSizesList.isEmpty ||
+ pbPackedPartitionLocations.getChunksOffsetsList.isEmpty) {
+ new StorageInfo(
+ StorageInfo.typesMap.get(pbPackedPartitionLocations.getTypes(index)),
+ pbPackedPartitionLocations.getMountPointsSet(
+ pbPackedPartitionLocations.getMountPoints(index)),
+ pbPackedPartitionLocations.getFinalResult(index),
+ filePath,
+ pbPackedPartitionLocations.getAvailableStorageTypes(index))
+ } else {
+ new StorageInfo(
+ StorageInfo.typesMap.get(pbPackedPartitionLocations.getTypes(index)),
+ pbPackedPartitionLocations.getMountPointsSet(
+ pbPackedPartitionLocations.getMountPoints(index)),
+ pbPackedPartitionLocations.getFinalResult(index),
+ filePath,
+ pbPackedPartitionLocations.getAvailableStorageTypes(index),
+ pbPackedPartitionLocations.getFileSizes(index),
+
pbPackedPartitionLocations.getChunksOffsets(index).getChunkOffsetList)
+ }
+
new PartitionLocation(
pbPackedPartitionLocations.getIds(index),
pbPackedPartitionLocations.getEpoches(index),
@@ -643,15 +665,7 @@ object PbSerDeUtils {
workerIdParts(4).toInt,
mode,
null,
- new StorageInfo(
- StorageInfo.typesMap.get(pbPackedPartitionLocations.getTypes(index)),
- pbPackedPartitionLocations.getMountPointsSet(
- pbPackedPartitionLocations.getMountPoints(index)),
- pbPackedPartitionLocations.getFinalResult(index),
- filePath,
- pbPackedPartitionLocations.getAvailableStorageTypes(index),
- pbPackedPartitionLocations.getFileSizes(index),
- pbPackedPartitionLocations.getChunksOffsets(index).getChunkOffsetList),
+ storageInfo,
Utils.byteStringToRoaringBitmap(pbPackedPartitionLocations.getMapIdBitMap(index)))
}