This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new fa80c346e [#2684] fix: Infinite memory data reading due to duplicate
blockId (#2685)
fa80c346e is described below
commit fa80c346e9eafc69ef7d752cee029e06913e8ad5
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Dec 1 10:29:08 2025 +0800
[#2684] fix: Infinite memory data reading due to duplicate blockId (#2685)
### What changes were proposed in this pull request?
This PR introduces an additional `isEnd` flag to indicate whether the end
of the in-memory data has been reached.
For simplicity, this flag does not guarantee precise end-of-data detection;
the client should still rely on its existing logic to determine completion.
However, this flag should be sufficient to cover most cases.
Attention that after this PR, the upgrade should first be performed on the
client side, followed by upgrading the shuffle servers.
### Why are the changes needed?
for #2684 .
As we all know, reading memory data is based on `last_block_id`, which
determines the starting position. However, when duplicate `block_id`s appear in
the sequence, the client may read duplicate blocks. This is acceptable because
the client relies on the `processed_block_ids` mechanism to handle duplicates.
The issue arises when a duplicate block appears at the end of the memory
data sequence. In this case, the client keeps receiving the same last block
repeatedly, causing the read operation to run indefinitely, as illustrated
below.
First round of blockIds:
```
703702683883796
844440172239124
703702683884548
985177660594452
1125915148949780 (duplicate)
844440172239876
985177660595204
1125915148950532
1266652637305860
985177660594452
1125915148949780 (duplicate)
```
Second round of blockIds:
```
844440172239876
985177660595204
1125915148950532
1266652637305860
985177660594452
1125915148949780 (duplicate)
```
At this point, the duplicated last blocks cause the client to repeatedly
read the same tail of the sequence, resulting in an infinite loop.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
existing unit tests.
---
.../protocol/GetMemoryShuffleDataV2Response.java | 112 +++++++++++++++++++++
.../uniffle/common/netty/protocol/Message.java | 5 +
.../client/impl/grpc/ShuffleServerGrpcClient.java | 4 +-
.../impl/grpc/ShuffleServerGrpcNettyClient.java | 8 +-
.../RssGetInMemoryShuffleDataResponse.java | 15 ++-
proto/src/main/proto/Rss.proto | 1 +
.../uniffle/server/ShuffleServerGrpcService.java | 16 ++-
.../server/buffer/AbstractShuffleBuffer.java | 8 +-
.../server/buffer/MemoryShuffleDataResult.java | 37 +++----
.../server/netty/ShuffleServerNettyHandler.java | 47 ++++++---
.../handler/impl/MemoryClientReadHandler.java | 7 ++
11 files changed, 209 insertions(+), 51 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetMemoryShuffleDataV2Response.java
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetMemoryShuffleDataV2Response.java
new file mode 100644
index 000000000..eeeef8ef2
--- /dev/null
+++
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetMemoryShuffleDataV2Response.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.netty.protocol;
+
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import org.apache.uniffle.common.BufferSegment;
+import org.apache.uniffle.common.netty.buffer.ManagedBuffer;
+import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.ByteBufUtils;
+
+public class GetMemoryShuffleDataV2Response extends
GetMemoryShuffleDataResponse {
+ private boolean isEnd = false;
+
+ public GetMemoryShuffleDataV2Response(
+ long requestId, StatusCode statusCode, List<BufferSegment>
bufferSegments, byte[] data) {
+ super(requestId, statusCode, null, bufferSegments, data);
+ }
+
+ public GetMemoryShuffleDataV2Response(
+ long requestId,
+ StatusCode statusCode,
+ String retMessage,
+ List<BufferSegment> bufferSegments,
+ byte[] data,
+ boolean isEnd) {
+ super(requestId, statusCode, retMessage, bufferSegments,
Unpooled.wrappedBuffer(data));
+ this.isEnd = isEnd;
+ }
+
+ public GetMemoryShuffleDataV2Response(
+ long requestId,
+ StatusCode statusCode,
+ String retMessage,
+ List<BufferSegment> bufferSegments,
+ ByteBuf data,
+ boolean isEnd) {
+ super(requestId, statusCode, retMessage, bufferSegments, new
NettyManagedBuffer(data));
+ this.isEnd = isEnd;
+ }
+
+ public GetMemoryShuffleDataV2Response(
+ long requestId,
+ StatusCode statusCode,
+ String retMessage,
+ List<BufferSegment> bufferSegments,
+ ManagedBuffer managedBuffer,
+ boolean isEnd) {
+ super(requestId, statusCode, retMessage, bufferSegments, managedBuffer);
+ this.isEnd = isEnd;
+ }
+
+ @Override
+ public int encodedLength() {
+ return super.encodedLength() + 1;
+ }
+
+ @Override
+ public void encode(ByteBuf buf) {
+ super.encode(buf);
+ buf.writeByte(isEnd ? 1 : 0);
+ }
+
+ public static GetMemoryShuffleDataResponse decode(ByteBuf byteBuf, boolean
decodeBody) {
+ long requestId = byteBuf.readLong();
+ StatusCode statusCode = StatusCode.fromCode(byteBuf.readInt());
+ String retMessage = ByteBufUtils.readLengthAndString(byteBuf);
+ List<BufferSegment> bufferSegments =
Decoders.decodeBufferSegments(byteBuf);
+ boolean isEnd = byteBuf.readByte() == 1;
+ if (decodeBody) {
+ NettyManagedBuffer nettyManagedBuffer = new NettyManagedBuffer(byteBuf);
+ return new GetMemoryShuffleDataV2Response(
+ requestId, statusCode, retMessage, bufferSegments,
nettyManagedBuffer, isEnd);
+ } else {
+ return new GetMemoryShuffleDataV2Response(
+ requestId,
+ statusCode,
+ retMessage,
+ bufferSegments,
+ NettyManagedBuffer.EMPTY_BUFFER,
+ isEnd);
+ }
+ }
+
+ @Override
+ public Type type() {
+ return Type.GET_MEMORY_SHUFFLE_DATA_V2_RESPONSE;
+ }
+
+ public boolean isEnd() {
+ return isEnd;
+ }
+}
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/Message.java
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/Message.java
index 5fcb47bd0..82aba41fb 100644
--- a/common/src/main/java/org/apache/uniffle/common/netty/protocol/Message.java
+++ b/common/src/main/java/org/apache/uniffle/common/netty/protocol/Message.java
@@ -68,6 +68,7 @@ public abstract class Message implements Encodable {
GET_LOCAL_SHUFFLE_INDEX_V2_RESPONSE(23),
GET_LOCAL_SHUFFLE_DATA_V2_REQUEST(24),
GET_LOCAL_SHUFFLE_DATA_V3_REQUEST(25),
+ GET_MEMORY_SHUFFLE_DATA_V2_RESPONSE(26),
;
private final byte id;
@@ -146,6 +147,8 @@ public abstract class Message implements Encodable {
return GET_LOCAL_SHUFFLE_INDEX_V2_RESPONSE;
case 24:
return GET_LOCAL_SHUFFLE_DATA_V2_REQUEST;
+ case 26:
+ return GET_MEMORY_SHUFFLE_DATA_V2_RESPONSE;
case -1:
throw new IllegalArgumentException("User type messages cannot be
decoded.");
default:
@@ -176,6 +179,8 @@ public abstract class Message implements Encodable {
return GetMemoryShuffleDataRequest.decode(in);
case GET_MEMORY_SHUFFLE_DATA_RESPONSE:
return GetMemoryShuffleDataResponse.decode(in, true);
+ case GET_MEMORY_SHUFFLE_DATA_V2_RESPONSE:
+ return GetMemoryShuffleDataV2Response.decode(in, true);
case GET_SORTED_SHUFFLE_DATA_REQUEST:
return GetSortedShuffleDataRequest.decode(in);
case GET_SORTED_SHUFFLE_DATA_RESPONSE:
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 366866d06..2ad920d04 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -1126,11 +1126,13 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
requestInfo,
System.currentTimeMillis() - start,
data.length);
+ boolean isEnd = rpcResponse.hasIsEnd() ?
rpcResponse.getIsEnd().getValue() : false;
response =
new RssGetInMemoryShuffleDataResponse(
StatusCode.SUCCESS,
ByteBuffer.wrap(data),
-
toBufferSegments(rpcResponse.getShuffleDataBlockSegmentsList()));
+
toBufferSegments(rpcResponse.getShuffleDataBlockSegmentsList()),
+ isEnd);
break;
default:
String msg =
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index b2e59fe34..bae742283 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -63,6 +63,7 @@ import
org.apache.uniffle.common.netty.protocol.GetLocalShuffleIndexRequest;
import org.apache.uniffle.common.netty.protocol.GetLocalShuffleIndexResponse;
import org.apache.uniffle.common.netty.protocol.GetMemoryShuffleDataRequest;
import org.apache.uniffle.common.netty.protocol.GetMemoryShuffleDataResponse;
+import org.apache.uniffle.common.netty.protocol.GetMemoryShuffleDataV2Response;
import org.apache.uniffle.common.netty.protocol.GetSortedShuffleDataRequest;
import org.apache.uniffle.common.netty.protocol.GetSortedShuffleDataResponse;
import org.apache.uniffle.common.netty.protocol.RpcResponse;
@@ -305,10 +306,15 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
nettyPort,
requestInfo,
System.currentTimeMillis() - start);
+ boolean isEnd = false;
+ if (getMemoryShuffleDataResponse instanceof
GetMemoryShuffleDataV2Response) {
+ isEnd = ((GetMemoryShuffleDataV2Response)
getMemoryShuffleDataResponse).isEnd();
+ }
return new RssGetInMemoryShuffleDataResponse(
StatusCode.SUCCESS,
getMemoryShuffleDataResponse.body(),
- getMemoryShuffleDataResponse.getBufferSegments());
+ getMemoryShuffleDataResponse.getBufferSegments(),
+ isEnd);
default:
String msg =
"Can't get shuffle in memory data from "
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetInMemoryShuffleDataResponse.java
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetInMemoryShuffleDataResponse.java
index bbf3738cb..8117e9867 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetInMemoryShuffleDataResponse.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetInMemoryShuffleDataResponse.java
@@ -31,17 +31,26 @@ public class RssGetInMemoryShuffleDataResponse extends
ClientResponse {
private final ManagedBuffer data;
private final List<BufferSegment> bufferSegments;
+ private final boolean isEnd;
public RssGetInMemoryShuffleDataResponse(
- StatusCode statusCode, ByteBuffer data, List<BufferSegment>
bufferSegments) {
- this(statusCode, new NettyManagedBuffer(Unpooled.wrappedBuffer(data)),
bufferSegments);
+ StatusCode statusCode, ByteBuffer data, List<BufferSegment>
bufferSegments, boolean isEnd) {
+ this(statusCode, new NettyManagedBuffer(Unpooled.wrappedBuffer(data)),
bufferSegments, isEnd);
}
public RssGetInMemoryShuffleDataResponse(
- StatusCode statusCode, ManagedBuffer data, List<BufferSegment>
bufferSegments) {
+ StatusCode statusCode,
+ ManagedBuffer data,
+ List<BufferSegment> bufferSegments,
+ boolean isEnd) {
super(statusCode);
this.bufferSegments = bufferSegments;
this.data = data;
+ this.isEnd = isEnd;
+ }
+
+ public boolean isEnd() {
+ return isEnd;
}
public ManagedBuffer getData() {
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 89845fe18..ec8f3f278 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -116,6 +116,7 @@ message GetMemoryShuffleDataResponse {
bytes data = 2;
StatusCode status = 3;
string retMsg = 4;
+ google.protobuf.BoolValue is_end = 5;
}
message GetLocalShuffleIndexRequest {
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 979761f9e..bd35750d3 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -29,6 +29,7 @@ import java.util.stream.Collectors;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import com.google.protobuf.BoolValue;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import io.grpc.Context;
@@ -97,6 +98,7 @@ import
org.apache.uniffle.proto.RssProtos.ShuffleRegisterRequest;
import org.apache.uniffle.proto.RssProtos.ShuffleRegisterResponse;
import org.apache.uniffle.proto.ShuffleServerGrpc.ShuffleServerImplBase;
import org.apache.uniffle.server.audit.ServerRpcAuditContext;
+import org.apache.uniffle.server.buffer.MemoryShuffleDataResult;
import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
import org.apache.uniffle.server.merge.MergeStatus;
import org.apache.uniffle.storage.common.Storage;
@@ -1356,7 +1358,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
// todo: if can get the exact memory size?
if
(shuffleServer.getShuffleBufferManager().requireReadMemory(readBufferSize)) {
- ShuffleDataResult shuffleDataResult = null;
+ MemoryShuffleDataResult shuffleDataResult = null;
try {
final long start = System.currentTimeMillis();
Roaring64NavigableMap expectedTaskIds = null;
@@ -1367,12 +1369,14 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
request.getSerializedExpectedTaskIdsBitmap().toByteArray());
}
shuffleDataResult =
- shuffleServer
- .getShuffleTaskManager()
- .getInMemoryShuffleData(
- appId, shuffleId, partitionId, blockId, readBufferSize,
expectedTaskIds);
+ (MemoryShuffleDataResult)
+ shuffleServer
+ .getShuffleTaskManager()
+ .getInMemoryShuffleData(
+ appId, shuffleId, partitionId, blockId,
readBufferSize, expectedTaskIds);
byte[] data = new byte[] {};
List<BufferSegment> bufferSegments = Lists.newArrayList();
+ boolean isEnd = false;
if (shuffleDataResult != null) {
data = shuffleDataResult.getData();
bufferSegments = shuffleDataResult.getBufferSegments();
@@ -1380,6 +1384,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
ShuffleServerMetrics.counterTotalReadMemoryDataSize.inc(data.length);
ShuffleServerMetrics.gaugeReadMemoryDataThreadNum.inc();
ShuffleServerMetrics.gaugeReadMemoryDataBufferSize.inc(readBufferSize);
+ isEnd = shuffleDataResult.isEnd();
}
long costTime = System.currentTimeMillis() - start;
shuffleServer
@@ -1397,6 +1402,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
.setStatus(status.toProto())
.setRetMsg(msg)
.setData(UnsafeByteOperations.unsafeWrap(data))
+ .setIsEnd(BoolValue.of(isEnd))
.addAllShuffleDataBlockSegments(toShuffleDataBlockSegments(bufferSegments))
.build();
} catch (Exception e) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
index 41911e00d..184074545 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
@@ -104,14 +104,18 @@ public abstract class AbstractShuffleBuffer implements
ShuffleBuffer {
NettyUtils.getSharedUnpooledByteBufAllocator(true),
true,
Constants.COMPOSITE_BYTE_BUF_MAX_COMPONENTS);
+ // Use isEnd to indicate whether the end has been reached.
+ // Although this flag is not perfectly accurate, it is sufficient for
most cases.
+ int readLength = readBlocks.stream().map(x ->
x.getDataLength()).reduce(0, Integer::sum);
+ boolean isEnd = readLength < readBufferSize;
// copy result data
updateShuffleData(readBlocks, byteBuf);
- return new ShuffleDataResult(byteBuf, bufferSegments);
+ return new MemoryShuffleDataResult(byteBuf, bufferSegments, isEnd);
}
} catch (Exception e) {
LOG.error("Exception happened when getShuffleData in buffer", e);
}
- return new ShuffleDataResult();
+ return new MemoryShuffleDataResult();
}
// here is the rule to read data in memory:
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetInMemoryShuffleDataResponse.java
b/server/src/main/java/org/apache/uniffle/server/buffer/MemoryShuffleDataResult.java
similarity index 50%
copy from
internal-client/src/main/java/org/apache/uniffle/client/response/RssGetInMemoryShuffleDataResponse.java
copy to
server/src/main/java/org/apache/uniffle/server/buffer/MemoryShuffleDataResult.java
index bbf3738cb..67909dd06 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssGetInMemoryShuffleDataResponse.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/MemoryShuffleDataResult.java
@@ -15,40 +15,31 @@
* limitations under the License.
*/
-package org.apache.uniffle.client.response;
+package org.apache.uniffle.server.buffer;
-import java.nio.ByteBuffer;
import java.util.List;
-import io.netty.buffer.Unpooled;
+import io.netty.buffer.ByteBuf;
import org.apache.uniffle.common.BufferSegment;
-import org.apache.uniffle.common.netty.buffer.ManagedBuffer;
+import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
-import org.apache.uniffle.common.rpc.StatusCode;
-public class RssGetInMemoryShuffleDataResponse extends ClientResponse {
+public class MemoryShuffleDataResult extends ShuffleDataResult {
+ private final boolean isEnd;
- private final ManagedBuffer data;
- private final List<BufferSegment> bufferSegments;
-
- public RssGetInMemoryShuffleDataResponse(
- StatusCode statusCode, ByteBuffer data, List<BufferSegment>
bufferSegments) {
- this(statusCode, new NettyManagedBuffer(Unpooled.wrappedBuffer(data)),
bufferSegments);
- }
-
- public RssGetInMemoryShuffleDataResponse(
- StatusCode statusCode, ManagedBuffer data, List<BufferSegment>
bufferSegments) {
- super(statusCode);
- this.bufferSegments = bufferSegments;
- this.data = data;
+ public MemoryShuffleDataResult() {
+ super();
+ // empty
+ this.isEnd = true;
}
- public ManagedBuffer getData() {
- return data;
+ public MemoryShuffleDataResult(ByteBuf data, List<BufferSegment>
bufferSegments, boolean isEnd) {
+ super(new NettyManagedBuffer(data), bufferSegments);
+ this.isEnd = isEnd;
}
- public List<BufferSegment> getBufferSegments() {
- return bufferSegments;
+ public boolean isEnd() {
+ return isEnd;
}
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index c7f438d34..0b9b5f83c 100644
---
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -56,6 +56,7 @@ import
org.apache.uniffle.common.netty.protocol.GetLocalShuffleIndexResponse;
import org.apache.uniffle.common.netty.protocol.GetLocalShuffleIndexV2Response;
import org.apache.uniffle.common.netty.protocol.GetMemoryShuffleDataRequest;
import org.apache.uniffle.common.netty.protocol.GetMemoryShuffleDataResponse;
+import org.apache.uniffle.common.netty.protocol.GetMemoryShuffleDataV2Response;
import org.apache.uniffle.common.netty.protocol.GetSortedShuffleDataRequest;
import org.apache.uniffle.common.netty.protocol.GetSortedShuffleDataResponse;
import org.apache.uniffle.common.netty.protocol.RequestMessage;
@@ -70,6 +71,7 @@ import org.apache.uniffle.server.ShuffleServerMetrics;
import org.apache.uniffle.server.ShuffleTaskInfo;
import org.apache.uniffle.server.ShuffleTaskManager;
import org.apache.uniffle.server.audit.ServerRpcAuditContext;
+import org.apache.uniffle.server.buffer.MemoryShuffleDataResult;
import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.server.merge.MergeStatus;
@@ -458,21 +460,23 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
// todo: if can get the exact memory size?
if
(shuffleServer.getShuffleBufferManager().requireReadMemory(readBufferSize)) {
- ShuffleDataResult shuffleDataResult = null;
+ MemoryShuffleDataResult shuffleDataResult = null;
try {
final long start = System.currentTimeMillis();
shuffleDataResult =
- shuffleServer
- .getShuffleTaskManager()
- .getInMemoryShuffleData(
- appId,
- shuffleId,
- partitionId,
- blockId,
- readBufferSize,
- req.getExpectedTaskIdsBitmap());
+ (MemoryShuffleDataResult)
+ shuffleServer
+ .getShuffleTaskManager()
+ .getInMemoryShuffleData(
+ appId,
+ shuffleId,
+ partitionId,
+ blockId,
+ readBufferSize,
+ req.getExpectedTaskIdsBitmap());
ManagedBuffer data = NettyManagedBuffer.EMPTY_BUFFER;
List<BufferSegment> bufferSegments = Lists.newArrayList();
+ boolean isEnd = false;
if (shuffleDataResult != null) {
data = shuffleDataResult.getManagedBuffer();
bufferSegments = shuffleDataResult.getBufferSegments();
@@ -480,13 +484,14 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
ShuffleServerMetrics.counterTotalReadMemoryDataSize.inc(data.size());
ShuffleServerMetrics.gaugeReadMemoryDataThreadNum.inc();
ShuffleServerMetrics.gaugeReadMemoryDataBufferSize.inc(readBufferSize);
+ isEnd = shuffleDataResult.isEnd();
}
auditContext.withStatusCode(status);
auditContext.withReturnValue(
"len=" + data.size() + ", bufferSegments=" +
bufferSegments.size());
response =
- new GetMemoryShuffleDataResponse(
- req.getRequestId(), status, msg, bufferSegments, data);
+ new GetMemoryShuffleDataV2Response(
+ req.getRequestId(), status, msg, bufferSegments, data,
isEnd);
ReleaseMemoryAndRecordReadTimeListener listener =
new ReleaseMemoryAndRecordReadTimeListener(
start, readBufferSize, data.size(), requestInfo, req,
response, client);
@@ -505,16 +510,26 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
+ e.getMessage();
LOG.error(msg, e);
response =
- new GetMemoryShuffleDataResponse(
- req.getRequestId(), status, msg, Lists.newArrayList(),
Unpooled.EMPTY_BUFFER);
+ new GetMemoryShuffleDataV2Response(
+ req.getRequestId(),
+ status,
+ msg,
+ Lists.newArrayList(),
+ Unpooled.EMPTY_BUFFER,
+ false);
}
} else {
status = StatusCode.NO_BUFFER;
msg = "Can't require memory to get in memory shuffle data";
LOG.warn("{} for {}", msg, requestInfo);
response =
- new GetMemoryShuffleDataResponse(
- req.getRequestId(), status, msg, Lists.newArrayList(),
Unpooled.EMPTY_BUFFER);
+ new GetMemoryShuffleDataV2Response(
+ req.getRequestId(),
+ status,
+ msg,
+ Lists.newArrayList(),
+ Unpooled.EMPTY_BUFFER,
+ false);
}
auditContext.withStatusCode(response.getStatusCode());
client.getChannel().writeAndFlush(response);
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
index b2f15e472..3de6c2a84 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
@@ -44,6 +44,7 @@ public class MemoryClientReadHandler extends
PrefetchableClientReadHandler {
private int retryMax;
private long retryIntervalMax;
private ShuffleServerReadCostTracker readCostTracker;
+ private boolean isEnd = false;
public MemoryClientReadHandler(
String appId,
@@ -91,6 +92,9 @@ public class MemoryClientReadHandler extends
PrefetchableClientReadHandler {
@Override
public ShuffleDataResult doReadShuffleData() {
+ if (isEnd) {
+ return null;
+ }
ShuffleDataResult result = null;
RssGetInMemoryShuffleDataRequest request =
@@ -108,6 +112,9 @@ public class MemoryClientReadHandler extends
PrefetchableClientReadHandler {
long start = System.currentTimeMillis();
RssGetInMemoryShuffleDataResponse response =
shuffleServerClient.getInMemoryShuffleData(request);
+ if (response.isEnd()) {
+ isEnd = true;
+ }
result = new ShuffleDataResult(response.getData(),
response.getBufferSegments());
ClientInfo clientInfo = shuffleServerClient.getClientInfo();
if (readCostTracker != null && clientInfo != null) {