This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new fd64d9dcc [#1757] feat(server): Add block number check on getting
shuffle result (#1758)
fd64d9dcc is described below
commit fd64d9dcc2e4eb26aa4da1a6fd4a5f07db95845f
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Jun 3 10:14:54 2024 +0800
[#1757] feat(server): Add block number check on getting shuffle result
(#1758)
### What changes were proposed in this pull request?
Add block number check on getting shuffle result
### Why are the changes needed?
Data validation, ensure data stable and correct
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
---
.../apache/uniffle/test/ShuffleServerGrpcTest.java | 13 +++----
.../uniffle/server/ShuffleServerGrpcService.java | 23 ++++++++----
.../org/apache/uniffle/server/ShuffleTaskInfo.java | 22 +++++++++++
.../apache/uniffle/server/ShuffleTaskManager.java | 43 +++++++++++++++++++++-
4 files changed, 85 insertions(+), 16 deletions(-)
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index 9b486629c..c7a17b9b4 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -671,10 +671,10 @@ public class ShuffleServerGrpcTest extends
IntegrationTestBase {
@ParameterizedTest
@MethodSource("testBlockIdLayouts")
public void multipleShuffleResultTest(BlockIdLayout layout) throws Exception
{
+ String appId = "multipleShuffleResultTest_" + layout.sequenceNoBits;
Set<Long> expectedBlockIds = Sets.newConcurrentHashSet();
RssRegisterShuffleRequest rrsr =
- new RssRegisterShuffleRequest(
- "multipleShuffleResultTest", 100, Lists.newArrayList(new
PartitionRange(0, 1)), "");
+ new RssRegisterShuffleRequest(appId, 100, Lists.newArrayList(new
PartitionRange(0, 1)), "");
grpcShuffleServerClient.registerShuffle(rrsr);
Runnable r1 =
@@ -687,7 +687,7 @@ public class ShuffleServerGrpcTest extends
IntegrationTestBase {
blockIds.add(blockId);
ptbs.put(1, blockIds);
RssReportShuffleResultRequest req1 =
- new RssReportShuffleResultRequest("multipleShuffleResultTest",
1, 0, ptbs, 1);
+ new RssReportShuffleResultRequest(appId, 1, 0, ptbs, 1);
grpcShuffleServerClient.reportShuffleResult(req1);
}
};
@@ -701,7 +701,7 @@ public class ShuffleServerGrpcTest extends
IntegrationTestBase {
blockIds.add(blockId);
ptbs.put(1, blockIds);
RssReportShuffleResultRequest req1 =
- new RssReportShuffleResultRequest("multipleShuffleResultTest",
1, 1, ptbs, 1);
+ new RssReportShuffleResultRequest(appId, 1, 1, ptbs, 1);
grpcShuffleServerClient.reportShuffleResult(req1);
}
};
@@ -715,7 +715,7 @@ public class ShuffleServerGrpcTest extends
IntegrationTestBase {
blockIds.add(blockId);
ptbs.put(1, blockIds);
RssReportShuffleResultRequest req1 =
- new RssReportShuffleResultRequest("multipleShuffleResultTest",
1, 2, ptbs, 1);
+ new RssReportShuffleResultRequest(appId, 1, 2, ptbs, 1);
grpcShuffleServerClient.reportShuffleResult(req1);
}
};
@@ -734,8 +734,7 @@ public class ShuffleServerGrpcTest extends
IntegrationTestBase {
blockIdBitmap.addLong(blockId);
}
- RssGetShuffleResultRequest req =
- new RssGetShuffleResultRequest("multipleShuffleResultTest", 1, 1,
layout);
+ RssGetShuffleResultRequest req = new RssGetShuffleResultRequest(appId, 1,
1, layout);
RssGetShuffleResultResponse result =
grpcShuffleServerClient.getShuffleResult(req);
Roaring64NavigableMap actualBlockIdBitmap = result.getBlockIdBitmap();
assertEquals(blockIdBitmap, actualBlockIdBitmap);
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 7b14d981d..7e1eb88cf 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -506,14 +506,23 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
"appId[" + appId + "], shuffleId[" + shuffleId + "], taskAttemptId[" +
taskAttemptId + "]";
try {
+ int expectedBlockCount =
partitionToBlockIds.values().stream().mapToInt(x -> x.length).sum();
LOG.info(
- "Report "
- + partitionToBlockIds.size()
- + " blocks as shuffle result for the task of "
- + requestInfo);
- shuffleServer
- .getShuffleTaskManager()
- .addFinishedBlockIds(appId, shuffleId, partitionToBlockIds,
bitmapNum);
+ "Accepted blockIds report for {} blocks across {} partitions as
shuffle result for task {}",
+ expectedBlockCount,
+ partitionToBlockIds.size(),
+ request);
+ int updatedBlockCount =
+ shuffleServer
+ .getShuffleTaskManager()
+ .addFinishedBlockIds(appId, shuffleId, partitionToBlockIds,
bitmapNum);
+ if (expectedBlockCount != updatedBlockCount) {
+ LOG.warn(
+ "Existing {} duplicated blockIds on blockId report for appId: {},
shuffleId: {}",
+ expectedBlockCount - updatedBlockCount,
+ appId,
+ shuffleId);
+ }
} catch (Exception e) {
status = StatusCode.INTERNAL_ERROR;
msg = "error happened when report shuffle result, check shuffle server
for detail";
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index f45b1be94..b6806f634 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -64,6 +64,8 @@ public class ShuffleTaskInfo {
private final AtomicReference<ShuffleSpecification> specification;
+ private final Map<Integer, Map<Integer, AtomicLong>> partitionBlockCounters;
+
public ShuffleTaskInfo(String appId) {
this.appId = appId;
this.currentTimes = System.currentTimeMillis();
@@ -75,6 +77,7 @@ public class ShuffleTaskInfo {
this.hugePartitionTags = JavaUtils.newConcurrentMap();
this.existHugePartition = new AtomicBoolean(false);
this.specification = new AtomicReference<>();
+ this.partitionBlockCounters = JavaUtils.newConcurrentMap();
}
public Long getCurrentTimes() {
@@ -198,6 +201,25 @@ public class ShuffleTaskInfo {
return partitionDataSizes.keySet();
}
+ public void incBlockNumber(int shuffleId, int partitionId, int delta) {
+ this.partitionBlockCounters
+ .computeIfAbsent(shuffleId, x -> JavaUtils.newConcurrentMap())
+ .computeIfAbsent(partitionId, x -> new AtomicLong())
+ .addAndGet(delta);
+ }
+
+ public long getBlockNumber(int shuffleId, int partitionId) {
+ Map<Integer, AtomicLong> partitionBlockCounters =
this.partitionBlockCounters.get(shuffleId);
+ if (partitionBlockCounters == null) {
+ return 0L;
+ }
+ AtomicLong counter = partitionBlockCounters.get(partitionId);
+ if (counter == null) {
+ return 0L;
+ }
+ return counter.get();
+ }
+
@Override
public String toString() {
return "ShuffleTaskInfo{"
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 0a07f70f2..a98eac26c 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -387,7 +387,16 @@ public class ShuffleTaskManager {
return StatusCode.SUCCESS;
}
- public void addFinishedBlockIds(
+ /**
+ * Add finished blockIds from client
+ *
+ * @param appId
+ * @param shuffleId
+ * @param partitionToBlockIds
+ * @param bitmapNum
+ * @return the number of added blockIds
+ */
+ public int addFinishedBlockIds(
String appId, Integer shuffleId, Map<Integer, long[]>
partitionToBlockIds, int bitmapNum) {
refreshAppId(appId);
Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions =
partitionsToBlockIds.get(appId);
@@ -413,15 +422,28 @@ public class ShuffleTaskManager {
+ " bitmaps!");
}
+ ShuffleTaskInfo taskInfo = getShuffleTaskInfo(appId);
+ if (taskInfo == null) {
+ throw new InvalidRequestException(
+ "ShuffleTaskInfo is not found that should not happen for appId: " +
appId);
+ }
+ int totalUpdatedBlockCount = 0;
for (Map.Entry<Integer, long[]> entry : partitionToBlockIds.entrySet()) {
Integer partitionId = entry.getKey();
Roaring64NavigableMap bitmap = blockIds[partitionId % bitmapNum];
+ int updatedBlockCount = 0;
synchronized (bitmap) {
for (long blockId : entry.getValue()) {
- bitmap.addLong(blockId);
+ if (!bitmap.contains(blockId)) {
+ bitmap.addLong(blockId);
+ updatedBlockCount++;
+ totalUpdatedBlockCount++;
+ }
}
}
+ taskInfo.incBlockNumber(shuffleId, partitionId, updatedBlockCount);
}
+ return totalUpdatedBlockCount;
}
public int updateAndGetCommitCount(String appId, int shuffleId) {
@@ -553,13 +575,18 @@ public class ShuffleTaskManager {
}
Map<Integer, Roaring64NavigableMap[]> shuffleIdToPartitions =
partitionsToBlockIds.get(appId);
if (shuffleIdToPartitions == null) {
+ LOG.warn("Empty blockIds for app: {}. This should not happen", appId);
return null;
}
Roaring64NavigableMap[] blockIds = shuffleIdToPartitions.get(shuffleId);
if (blockIds == null) {
+ LOG.warn("Empty blockIds for app: {}, shuffleId: {}", appId, shuffleId);
return new byte[] {};
}
+
+ ShuffleTaskInfo taskInfo = getShuffleTaskInfo(appId);
+ long expectedBlockNumber = 0;
Map<Integer, Set<Integer>> bitmapIndexToPartitions = Maps.newHashMap();
for (int partitionId : partitions) {
int bitmapIndex = partitionId % blockIds.length;
@@ -569,6 +596,7 @@ public class ShuffleTaskManager {
HashSet<Integer> newHashSet = Sets.newHashSet(partitionId);
bitmapIndexToPartitions.put(bitmapIndex, newHashSet);
}
+ expectedBlockNumber += taskInfo.getBlockNumber(shuffleId, partitionId);
}
Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf();
@@ -577,6 +605,17 @@ public class ShuffleTaskManager {
Roaring64NavigableMap bitmap = blockIds[entry.getKey()];
getBlockIdsByPartitionId(requestPartitions, bitmap, res, blockIdLayout);
}
+
+ if (res.getLongCardinality() != expectedBlockNumber) {
+ throw new RssException(
+ "Inconsistent block number for partitions: "
+ + partitions
+ + ". Excepted: "
+ + expectedBlockNumber
+ + ", actual: "
+ + res.getLongCardinality());
+ }
+
return RssUtils.serializeBitMap(res);
}