This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 91aa38712e8 [HUDI-7522] Support find out the conflict instants in
bucket partition when bucket id multiple (#10898)
91aa38712e8 is described below
commit 91aa38712e8e4f2e100b4c6f456d71f509edbef4
Author: xuzifu666 <[email protected]>
AuthorDate: Wed May 8 12:03:09 2024 +0800
[HUDI-7522] Support find out the conflict instants in bucket partition when
bucket id multiple (#10898)
---------
Co-authored-by: danny0405 <[email protected]>
---
.../hudi/index/bucket/HoodieSimpleBucketIndex.java | 77 +++++++++++++++++++++-
1 file changed, 75 insertions(+), 2 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
index a38fa489a2a..4bbe12675fe 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
@@ -21,18 +21,35 @@ package org.apache.hudi.index.bucket;
import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.HoodieTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+
import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;
@@ -41,6 +58,8 @@ import static
org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;
*/
public class HoodieSimpleBucketIndex extends HoodieBucketIndex {
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieSimpleBucketIndex.class);
+
public HoodieSimpleBucketIndex(HoodieWriteConfig config) {
super(config);
}
@@ -50,7 +69,9 @@ public class HoodieSimpleBucketIndex extends
HoodieBucketIndex {
String partition) {
// bucketId -> fileIds
Map<Integer, HoodieRecordLocation> bucketIdToFileIdMapping = new
HashMap<>();
- hoodieTable.getMetaClient().reloadActiveTimeline();
+ HoodieActiveTimeline hoodieActiveTimeline =
hoodieTable.getMetaClient().reloadActiveTimeline();
+ Set<String> pendingInstants =
hoodieActiveTimeline.filterInflights().getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+
HoodieIndexUtils
.getLatestFileSlicesForPartition(partition, hoodieTable)
.forEach(fileSlice -> {
@@ -61,14 +82,66 @@ public class HoodieSimpleBucketIndex extends
HoodieBucketIndex {
if (!bucketIdToFileIdMapping.containsKey(bucketId)) {
bucketIdToFileIdMapping.put(bucketId, new
HoodieRecordLocation(commitTime, fileId));
} else {
+ // Finding the instants which conflict with the bucket id
+ List<String> instants =
findConflictInstantsInPartition(hoodieTable, partition, bucketId,
pendingInstants);
+
// Check if bucket data is valid
throw new HoodieIOException("Find multiple files at partition
path="
- + partition + " belongs to the same bucket id = " + bucketId);
+ + partition + " that belong to the same bucket id = " +
bucketId
+ + ", these instants need to rollback: " + instants.toString()
+ + ", you can use 'rollback_to_instant' procedure to revert the
conflicts.");
}
});
return bucketIdToFileIdMapping;
}
+ /**
+ * Find out the conflict files in bucket partition with bucket id.
+ */
+ public List<String> findConflictInstantsInPartition(HoodieTable hoodieTable,
String partition, int bucketId, Set<String> pendingInstants) {
+ List<String> instants = new ArrayList<>();
+ HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
+ StoragePath partitionPath = new StoragePath(metaClient.getBasePathV2(),
partition);
+
+ List<StoragePathInfo> filesInPartition =
listFilesFromPartition(metaClient, partitionPath);
+
+ Stream<FileSlice> latestFileSlicesIncludingInflight =
hoodieTable.getSliceView().getLatestFileSlicesIncludingInflight(partition);
+ List<String> candidates =
latestFileSlicesIncludingInflight.map(FileSlice::getLatestInstantTime)
+ .filter(pendingInstants::contains)
+ .collect(Collectors.toList());
+
+ for (String i : candidates) {
+ if (hasPendingDataFilesForInstant(filesInPartition, i, bucketId)) {
+ instants.add(i);
+ }
+ }
+ return instants;
+ }
+
+ private static List<StoragePathInfo>
listFilesFromPartition(HoodieTableMetaClient metaClient, StoragePath
partitionPath) {
+ try {
+ return metaClient.getStorage().listFiles(partitionPath);
+ } catch (IOException e) {
+ // ignore the exception though
+ return Collections.emptyList();
+ }
+ }
+
+ public Boolean hasPendingDataFilesForInstant(List<StoragePathInfo>
filesInPartition, String instant, int bucketId) {
+ for (StoragePathInfo status : filesInPartition) {
+ String fileName = status.getPath().getName();
+
+ try {
+ if (status.isFile() && BucketIdentifier.bucketIdFromFileId(fileName)
== bucketId && fileName.contains(instant)) {
+ return true;
+ }
+ } catch (NumberFormatException e) {
+ LOG.warn("File is not bucket file");
+ }
+ }
+ return false;
+ }
+
public int getBucketID(HoodieKey key) {
return BucketIdentifier.getBucketId(key, indexKeyFields, numBuckets);
}