turboFei commented on code in PR #2373:
URL: https://github.com/apache/celeborn/pull/2373#discussion_r1974364168
##########
common/src/main/proto/TransportMessages.proto:
##########
@@ -850,6 +865,8 @@ message PbPackedPartitionLocations {
repeated string filePaths = 10;
repeated int32 availableStorageTypes = 11;
repeated int32 modes = 12;
+ repeated int64 fileSizes = 13;
+ repeated PbChunkOffsets chunksOffsets = 14;
}
Review Comment:
I meet below exception when using celeborn client, seem incompatibility
issue.
The error log in worker end:
```
25/02/27 14:18:24,182 ERROR [rpc_service-server-4-6] NettyRpcHandler: Error
while invoking NettyRpcHandler#receive() on RPC id 4330
java.lang.IndexOutOfBoundsException: Index:0, Size:0
at
com.google.protobuf.LongArrayList.ensureIndexInRange(LongArrayList.java:265)
at com.google.protobuf.LongArrayList.getLong(LongArrayList.java:113)
at
org.apache.celeborn.common.protocol.PbPackedPartitionLocations.getFileSizes(PbPackedPartitionLocations.java:465)
at
org.apache.celeborn.common.util.PbSerDeUtils$.fromPackedPartitionLocations(PbSerDeUtils.scala:653)
at
org.apache.celeborn.common.util.PbSerDeUtils$.fromPbPackedPartitionLocationsPair(PbSerDeUtils.scala:591)
at
org.apache.celeborn.common.protocol.message.ControlMessages$.fromTransportMessage(ControlMessages.scala:1312)
at
org.apache.celeborn.common.util.Utils$.fromTransportMessage(Utils.scala:1056)
at
org.apache.celeborn.common.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:110)
at
org.apache.celeborn.common.rpc.netty.NettyRpcEnv.$anonfun$deserialize$2(NettyRpcEnv.scala:313)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at
org.apache.celeborn.common.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:363)
at
org.apache.celeborn.common.rpc.netty.NettyRpcEnv.$anonfun$deserialize$1(NettyRpcEnv.scala:312)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at
org.apache.celeborn.common.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:312)
at
org.apache.celeborn.common.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:555)
at
org.apache.celeborn.common.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:619)
at
org.apache.celeborn.common.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:605)
at
org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:101)
at
org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:85)
at
org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)
at
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
at
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:501)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:399)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
25/02/27 14:18:24,234 WARN [celeborn-dispatcher-81] Controller: Shuffle
application_1739172886147_152994_1-0 not registered!
25/02/27 14:18:24,284 WARN [celeborn-dispatcher-82] Controller: Shuffle
application_1739172886147_152994_1-0 not registered!
25/02/27 14:18:24,335 WARN [celeborn-dispatcher-83] Controller: Shuffle
application_1739172886147_152994_1-0 not registered!
```
The client end log:
```
org.apache.celeborn.common.exception.CelebornIOException: Register shuffle
failed for shuffle 0, reason: RESERVE_SLOTS_FAILED
at
org.apache.celeborn.client.ShuffleClientImpl.registerShuffleInternal(ShuffleClientImpl.java:710)
at
org.apache.celeborn.client.ShuffleClientImpl.registerShuffle(ShuffleClientImpl.java:519)
at
org.apache.celeborn.client.ShuffleClientImpl.lambda$getPartitionLocation$4(ShuffleClientImpl.java:579)
at
java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708)
at
org.apache.celeborn.client.ShuffleClientImpl.getPartitionLocation(ShuffleClientImpl.java:575)
at
org.apache.celeborn.client.write.DataPushQueue.takePushTasks(DataPushQueue.java:92)
at
org.apache.celeborn.client.write.DataPusher$1.run(DataPusher.java:124)
at java.base/java.lang.Thread.run(Thread.java:833)
```
##########
common/src/main/proto/TransportMessages.proto:
##########
@@ -134,6 +134,8 @@ message PbStorageInfo {
bool finalResult = 3;
string filePath = 4;
int32 availableStorageTypes = 5;
+ int64 fileSize = 6;
+ repeated int64 chunkOffsets = 7;
}
Review Comment:
I meet below exception when using celeborn client, seem incompatibility
issue.
The error log in worker end:
```
25/02/27 14:18:24,182 ERROR [rpc_service-server-4-6] NettyRpcHandler: Error
while invoking NettyRpcHandler#receive() on RPC id 4330
java.lang.IndexOutOfBoundsException: Index:0, Size:0
at
com.google.protobuf.LongArrayList.ensureIndexInRange(LongArrayList.java:265)
at com.google.protobuf.LongArrayList.getLong(LongArrayList.java:113)
at
org.apache.celeborn.common.protocol.PbPackedPartitionLocations.getFileSizes(PbPackedPartitionLocations.java:465)
at
org.apache.celeborn.common.util.PbSerDeUtils$.fromPackedPartitionLocations(PbSerDeUtils.scala:653)
at
org.apache.celeborn.common.util.PbSerDeUtils$.fromPbPackedPartitionLocationsPair(PbSerDeUtils.scala:591)
at
org.apache.celeborn.common.protocol.message.ControlMessages$.fromTransportMessage(ControlMessages.scala:1312)
at
org.apache.celeborn.common.util.Utils$.fromTransportMessage(Utils.scala:1056)
at
org.apache.celeborn.common.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:110)
at
org.apache.celeborn.common.rpc.netty.NettyRpcEnv.$anonfun$deserialize$2(NettyRpcEnv.scala:313)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at
org.apache.celeborn.common.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:363)
at
org.apache.celeborn.common.rpc.netty.NettyRpcEnv.$anonfun$deserialize$1(NettyRpcEnv.scala:312)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at
org.apache.celeborn.common.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:312)
at
org.apache.celeborn.common.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:555)
at
org.apache.celeborn.common.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:619)
at
org.apache.celeborn.common.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:605)
at
org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:101)
at
org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:85)
at
org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)
at
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
at
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:501)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:399)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
25/02/27 14:18:24,234 WARN [celeborn-dispatcher-81] Controller: Shuffle
application_1739172886147_152994_1-0 not registered!
25/02/27 14:18:24,284 WARN [celeborn-dispatcher-82] Controller: Shuffle
application_1739172886147_152994_1-0 not registered!
25/02/27 14:18:24,335 WARN [celeborn-dispatcher-83] Controller: Shuffle
application_1739172886147_152994_1-0 not registered!
```
The client end log:
```
org.apache.celeborn.common.exception.CelebornIOException: Register shuffle
failed for shuffle 0, reason: RESERVE_SLOTS_FAILED
at
org.apache.celeborn.client.ShuffleClientImpl.registerShuffleInternal(ShuffleClientImpl.java:710)
at
org.apache.celeborn.client.ShuffleClientImpl.registerShuffle(ShuffleClientImpl.java:519)
at
org.apache.celeborn.client.ShuffleClientImpl.lambda$getPartitionLocation$4(ShuffleClientImpl.java:579)
at
java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708)
at
org.apache.celeborn.client.ShuffleClientImpl.getPartitionLocation(ShuffleClientImpl.java:575)
at
org.apache.celeborn.client.write.DataPushQueue.takePushTasks(DataPushQueue.java:92)
at
org.apache.celeborn.client.write.DataPusher$1.run(DataPusher.java:124)
at java.base/java.lang.Thread.run(Thread.java:833)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]