This is an automated email from the ASF dual-hosted git repository.
rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 77319985d [MINOR] fix(client/netty): ShuffleServerGrpcNettyClient
missing to send shuffleId and partitionIds for requirePreAllocation request
(#1913)
77319985d is described below
commit 77319985d8fe9f63fe230dbb440e99b82bc35b12
Author: maobaolong <[email protected]>
AuthorDate: Tue Jul 16 14:21:12 2024 +0800
[MINOR] fix(client/netty): ShuffleServerGrpcNettyClient missing to send
shuffleId and partitionIds for requirePreAllocation request (#1913)
### What changes were proposed in this pull request?
Add partitionIds and shuffleId to `RequireBufferRequest`.
### Why are the changes needed?
Without this changes, server cannot check limitHugePartition.
```java
public long requireBuffer(
String appId, int shuffleId, List<Integer> partitionIds, int
requireSize) {
ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.get(appId);
if (null == shuffleTaskInfo) {
LOG.error("No such app is registered. appId: {}, shuffleId: {}",
appId, shuffleId);
throw new NoRegisterException("No such app is registered. appId: " +
appId);
}
for (int partitionId : partitionIds) {
long partitionUsedDataSize = getPartitionDataSize(appId, shuffleId,
partitionId);
if (shuffleBufferManager.limitHugePartition(
appId, shuffleId, partitionId, partitionUsedDataSize)) {
String errorMessage =
String.format(
"Huge partition is limited to writing. appId: %s,
shuffleId: %s, partitionIds: %s, partitionUsedDataSize: %s",
appId, shuffleId, partitionIds, partitionUsedDataSize);
LOG.error(errorMessage);
throw new NoBufferForHugePartitionException(errorMessage);
}
}
return requireBuffer(appId, requireSize);
}
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Test on local, start a rss cluster with netty, specific a small huge
partition size, you can see NoBufferForHugePartitionException
---
.../uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index a05d94b51..26e53851d 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -17,7 +17,7 @@
package org.apache.uniffle.client.impl.grpc;
-import java.util.Collections;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
@@ -148,11 +148,13 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
int shuffleId = stb.getKey();
int size = 0;
int blockNum = 0;
+ List<Integer> partitionIds = new ArrayList<>();
for (Map.Entry<Integer, List<ShuffleBlockInfo>> ptb :
stb.getValue().entrySet()) {
for (ShuffleBlockInfo sbi : ptb.getValue()) {
size += sbi.getSize();
blockNum++;
}
+ partitionIds.add(ptb.getKey());
}
SendShuffleDataRequest sendShuffleDataRequest =
@@ -173,8 +175,8 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
long requireId =
requirePreAllocation(
request.getAppId(),
- 0,
- Collections.emptyList(),
+ shuffleId,
+ partitionIds,
allocateSize,
request.getRetryMax(),
request.getRetryIntervalMax(),