This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 91aa38712e8 [HUDI-7522] Support find out the conflict instants in 
bucket partition when bucket id multiple (#10898)
91aa38712e8 is described below

commit 91aa38712e8e4f2e100b4c6f456d71f509edbef4
Author: xuzifu666 <[email protected]>
AuthorDate: Wed May 8 12:03:09 2024 +0800

    [HUDI-7522] Support find out the conflict instants in bucket partition when 
bucket id multiple (#10898)
    
    ---------
    
    Co-authored-by: danny0405 <[email protected]>
---
 .../hudi/index/bucket/HoodieSimpleBucketIndex.java | 77 +++++++++++++++++++++-
 1 file changed, 75 insertions(+), 2 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
index a38fa489a2a..4bbe12675fe 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
@@ -21,18 +21,35 @@ package org.apache.hudi.index.bucket;
 import org.apache.hudi.client.utils.LazyIterableIterator;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.table.HoodieTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+
 
 import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;
 
@@ -41,6 +58,8 @@ import static 
org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;
  */
 public class HoodieSimpleBucketIndex extends HoodieBucketIndex {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieSimpleBucketIndex.class);
+
   public HoodieSimpleBucketIndex(HoodieWriteConfig config) {
     super(config);
   }
@@ -50,7 +69,9 @@ public class HoodieSimpleBucketIndex extends 
HoodieBucketIndex {
       String partition) {
     // bucketId -> fileIds
     Map<Integer, HoodieRecordLocation> bucketIdToFileIdMapping = new 
HashMap<>();
-    hoodieTable.getMetaClient().reloadActiveTimeline();
+    HoodieActiveTimeline hoodieActiveTimeline = 
hoodieTable.getMetaClient().reloadActiveTimeline();
+    Set<String> pendingInstants = 
hoodieActiveTimeline.filterInflights().getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+
     HoodieIndexUtils
         .getLatestFileSlicesForPartition(partition, hoodieTable)
         .forEach(fileSlice -> {
@@ -61,14 +82,66 @@ public class HoodieSimpleBucketIndex extends 
HoodieBucketIndex {
           if (!bucketIdToFileIdMapping.containsKey(bucketId)) {
             bucketIdToFileIdMapping.put(bucketId, new 
HoodieRecordLocation(commitTime, fileId));
           } else {
+            // Finding the instants which conflict with the bucket id
+            List<String> instants = 
findConflictInstantsInPartition(hoodieTable, partition, bucketId, 
pendingInstants);
+
             // Check if bucket data is valid
             throw new HoodieIOException("Find multiple files at partition 
path="
-                + partition + " belongs to the same bucket id = " + bucketId);
+                + partition + " that belong to the same bucket id = " + 
bucketId
+                + ", these instants need to rollback: " + instants.toString()
+                + ", you can use 'rollback_to_instant' procedure to revert the 
conflicts.");
           }
         });
     return bucketIdToFileIdMapping;
   }
 
+  /**
+   * Find out the conflict files in bucket partition with bucket id.
+   */
+  public List<String> findConflictInstantsInPartition(HoodieTable hoodieTable, 
String partition, int bucketId, Set<String> pendingInstants) {
+    List<String> instants = new ArrayList<>();
+    HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
+    StoragePath partitionPath = new StoragePath(metaClient.getBasePathV2(), 
partition);
+
+    List<StoragePathInfo> filesInPartition = 
listFilesFromPartition(metaClient, partitionPath);
+
+    Stream<FileSlice> latestFileSlicesIncludingInflight = 
hoodieTable.getSliceView().getLatestFileSlicesIncludingInflight(partition);
+    List<String> candidates = 
latestFileSlicesIncludingInflight.map(FileSlice::getLatestInstantTime)
+        .filter(pendingInstants::contains)
+        .collect(Collectors.toList());
+
+    for (String i : candidates) {
+      if (hasPendingDataFilesForInstant(filesInPartition, i, bucketId)) {
+        instants.add(i);
+      }
+    }
+    return instants;
+  }
+
+  private static List<StoragePathInfo> 
listFilesFromPartition(HoodieTableMetaClient metaClient, StoragePath 
partitionPath) {
+    try {
+      return metaClient.getStorage().listFiles(partitionPath);
+    } catch (IOException e) {
+      // ignore the exception though
+      return Collections.emptyList();
+    }
+  }
+
+  public Boolean hasPendingDataFilesForInstant(List<StoragePathInfo> 
filesInPartition, String instant, int bucketId) {
+    for (StoragePathInfo status : filesInPartition) {
+      String fileName = status.getPath().getName();
+
+      try {
+        if (status.isFile() && BucketIdentifier.bucketIdFromFileId(fileName) 
== bucketId && fileName.contains(instant)) {
+          return true;
+        }
+      } catch (NumberFormatException e) {
+        LOG.warn("File is not bucket file");
+      }
+    }
+    return false;
+  }
+
   public int getBucketID(HoodieKey key) {
     return BucketIdentifier.getBucketId(key, indexKeyFields, numBuckets);
   }

Reply via email to