danny0405 commented on code in PR #10898:
URL: https://github.com/apache/hudi/pull/10898#discussion_r1590494018
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java:
##########
@@ -61,14 +76,66 @@ public Map<Integer, HoodieRecordLocation>
loadBucketIdToFileIdMappingForPartitio
if (!bucketIdToFileIdMapping.containsKey(bucketId)) {
bucketIdToFileIdMapping.put(bucketId, new
HoodieRecordLocation(commitTime, fileId));
} else {
+ // Finding the instants which conflict with the bucket id
+ Set<String> instants =
findTheConflictBucketIdInPartition(hoodieTable, partition, bucketId);
+
// Check if bucket data is valid
throw new HoodieIOException("Find multiple files at partition
path="
- + partition + " belongs to the same bucket id = " + bucketId);
+ + partition + " belongs to the same bucket id = " + bucketId
+ + ", these instants need to rollback: " + instants.toString()
+ + ", you can use rollback_to_instant procedure to recovery");
}
});
return bucketIdToFileIdMapping;
}
+ /**
+ * Find out the conflict files in bucket partition with bucket id
+ */
+ public HashSet<String> findTheConflictBucketIdInPartition(HoodieTable
hoodieTable, String partition, int bucketId) {
+ HashSet<String> instants = new HashSet<>();
+ HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
+ StoragePath basePath = metaClient.getBasePathV2();
+ StoragePath partitionPath = new StoragePath(basePath.toString(),
partition);
+
+ Stream<FileSlice> latestFileSlicesIncludingInflight =
hoodieTable.getSliceView().getLatestFileSlicesIncludingInflight(partition);
+ List<String> pendingInstants =
latestFileSlicesIncludingInflight.map(fileSlice1 ->
fileSlice1.getBaseInstantTime()).collect(Collectors.toList());
+
+ for (String i : pendingInstants) {
+ if (judgeInstantInPath(metaClient, partitionPath, i, bucketId)) {
+ instants.add(i);
+ // error out director and stop circulate when find out conflict instant
+ break;
+ }
+ }
+ return instants;
+ }
+
+ public Boolean judgeInstantInPath(HoodieTableMetaClient metaClient,
StoragePath path, String instant, int bucketId) {
+ Boolean ret = false;
+ try {
+ List<StoragePathInfo> fileStatuses =
metaClient.getStorage().listFiles(path);
Review Comment:
still we should limit the file listing to be only once for each partition.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]