This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a5bddaca8 [HUDI-4808] Fix HoodieSimpleBucketIndex not consider bucket 
num in lo… (#6630)
3a5bddaca8 is described below

commit 3a5bddaca89e24ae815c16c4377a980e2c631327
Author: FocusComputing <[email protected]>
AuthorDate: Mon Sep 19 14:16:24 2022 +0800

    [HUDI-4808] Fix HoodieSimpleBucketIndex not consider bucket num in lo… 
(#6630)
    
    * [HUDI-4808] Fix HoodieSimpleBucketIndex not consider bucket num in log 
file issue
    
    Co-authored-by: xiaoxingstack <[email protected]>
---
 .../org/apache/hudi/index/HoodieIndexUtils.java    | 21 ++++++++++++++++++
 .../hudi/index/bucket/HoodieSimpleBucketIndex.java | 10 ++++-----
 .../hudi/testutils/HoodieWriteableTestTable.java   | 25 ++++++++--------------
 .../index/bucket/TestHoodieSimpleBucketIndex.java  | 21 +++++++++++++-----
 .../testutils/HoodieSparkWriteableTestTable.java   | 11 ++++++++++
 5 files changed, 62 insertions(+), 26 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 9b3dc8df00..61be856d36 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -72,6 +73,26 @@ public class HoodieIndexUtils {
     return Collections.emptyList();
   }
 
+  /**
+   * Fetches Pair of partition path and {@link FileSlice}s for interested 
partitions.
+   *
+   * @param partition   Partition of interest
+   * @param hoodieTable Instance of {@link HoodieTable} of interest
+   * @return the list of {@link FileSlice}
+   */
+  public static List<FileSlice> getLatestFileSlicesForPartition(
+          final String partition,
+          final HoodieTable hoodieTable) {
+    Option<HoodieInstant> latestCommitTime = 
hoodieTable.getMetaClient().getCommitsTimeline()
+            .filterCompletedInstants().lastInstant();
+    if (latestCommitTime.isPresent()) {
+      return hoodieTable.getHoodieView()
+              .getLatestFileSlicesBeforeOrOn(partition, 
latestCommitTime.get().getTimestamp(), true)
+              .collect(toList());
+    }
+    return Collections.emptyList();
+  }
+
   /**
    * Fetches Pair of partition path and {@link HoodieBaseFile}s for interested 
partitions.
    *
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
index 2ccebb472f..aae50e1f95 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
@@ -25,7 +25,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.index.HoodieIndexUtils;
 import org.apache.hudi.table.HoodieTable;
-
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -52,10 +51,11 @@ public class HoodieSimpleBucketIndex extends 
HoodieBucketIndex {
     Map<Integer, HoodieRecordLocation> bucketIdToFileIdMapping = new 
HashMap<>();
     hoodieTable.getMetaClient().reloadActiveTimeline();
     HoodieIndexUtils
-        .getLatestBaseFilesForPartition(partition, hoodieTable)
-        .forEach(file -> {
-          String fileId = file.getFileId();
-          String commitTime = file.getCommitTime();
+        .getLatestFileSlicesForPartition(partition, hoodieTable)
+        .forEach(fileSlice -> {
+          String fileId = fileSlice.getFileId();
+          String commitTime = fileSlice.getBaseInstantTime();
+
           int bucketId = BucketIdentifier.bucketIdFromFileId(fileId);
           if (!bucketIdToFileIdMapping.containsKey(bucketId)) {
             bucketIdToFileIdMapping.put(bucketId, new 
HoodieRecordLocation(commitTime, fileId));
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
index 2f00b82772..8e7df833cc 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
@@ -32,7 +32,6 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -45,9 +44,9 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
-import org.apache.hudi.io.storage.HoodieParquetConfig;
 import org.apache.hudi.io.storage.HoodieOrcConfig;
 import org.apache.hudi.io.storage.HoodieOrcWriter;
+import org.apache.hudi.io.storage.HoodieParquetConfig;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -152,27 +151,21 @@ public class HoodieWriteableTestTable extends 
HoodieMetadataTestTable {
     return baseFilePath;
   }
 
-  public Map<String, List<HoodieLogFile>> withLogAppends(List<HoodieRecord> 
records) throws Exception {
+  public Map<String, List<HoodieLogFile>> withLogAppends(String partition, 
String fileId, List<HoodieRecord> records) throws Exception {
     Map<String, List<HoodieLogFile>> partitionToLogfilesMap = new HashMap<>();
-    for (List<HoodieRecord> groupedRecords : records.stream()
-        
.collect(Collectors.groupingBy(HoodieRecord::getCurrentLocation)).values()) {
-      final Pair<String, HoodieLogFile> appendedLogFile = 
appendRecordsToLogFile(groupedRecords);
-      partitionToLogfilesMap.computeIfAbsent(
-          appendedLogFile.getKey(), k -> new 
ArrayList<>()).add(appendedLogFile.getValue());
-    }
+    final Pair<String, HoodieLogFile> appendedLogFile = 
appendRecordsToLogFile(partition, fileId, records);
+    partitionToLogfilesMap.computeIfAbsent(appendedLogFile.getKey(), k -> new 
ArrayList<>()).add(appendedLogFile.getValue());
     return partitionToLogfilesMap;
   }
 
-  private Pair<String, HoodieLogFile> 
appendRecordsToLogFile(List<HoodieRecord> groupedRecords) throws Exception {
-    String partitionPath = groupedRecords.get(0).getPartitionPath();
-    HoodieRecordLocation location = groupedRecords.get(0).getCurrentLocation();
+  private Pair<String, HoodieLogFile> appendRecordsToLogFile(String 
partitionPath, String fileId, List<HoodieRecord> records) throws Exception {
     try (HoodieLogFormat.Writer logWriter = 
HoodieLogFormat.newWriterBuilder().onParentPath(new Path(basePath, 
partitionPath))
-        
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId())
-        .overBaseCommit(location.getInstantTime()).withFs(fs).build()) {
+        .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId)
+        .overBaseCommit(currentInstantTime).withFs(fs).build()) {
       Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
-      header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, 
location.getInstantTime());
+      header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, 
currentInstantTime);
       header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-      logWriter.appendBlock(new 
HoodieAvroDataBlock(groupedRecords.stream().map(r -> {
+      logWriter.appendBlock(new HoodieAvroDataBlock(records.stream().map(r -> {
         try {
           GenericRecord val = (GenericRecord) ((HoodieRecordPayload) 
r.getData()).getInsertValue(schema).get();
           HoodieAvroUtils.addHoodieKeyToRecord(val, r.getRecordKey(), 
r.getPartitionPath(), "");
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java
index a96ce04077..ea6418696c 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java
@@ -42,6 +42,8 @@ import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Arrays;
 import java.util.Properties;
@@ -89,8 +91,9 @@ public class TestHoodieSimpleBucketIndex extends 
HoodieClientTestHarness {
         .withBucketNum("8").build();
   }
 
-  @Test
-  public void testTagLocation() throws Exception {
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testTagLocation(boolean isInsert) throws Exception {
     String rowKey1 = UUID.randomUUID().toString();
     String rowKey2 = UUID.randomUUID().toString();
     String rowKey3 = UUID.randomUUID().toString();
@@ -119,9 +122,17 @@ public class TestHoodieSimpleBucketIndex extends 
HoodieClientTestHarness {
     assertFalse(taggedRecordRDD.collectAsList().stream().anyMatch(r -> 
r.isCurrentLocationKnown()));
 
     HoodieSparkWriteableTestTable testTable = 
HoodieSparkWriteableTestTable.of(table, SCHEMA);
-    testTable.addCommit("001").withInserts("2016/01/31", 
getRecordFileId(record1), record1);
-    testTable.addCommit("002").withInserts("2016/01/31", 
getRecordFileId(record2), record2);
-    testTable.addCommit("003").withInserts("2016/01/31", 
getRecordFileId(record3), record3);
+
+    if (isInsert) {
+      testTable.addCommit("001").withInserts("2016/01/31", 
getRecordFileId(record1), record1);
+      testTable.addCommit("002").withInserts("2016/01/31", 
getRecordFileId(record2), record2);
+      testTable.addCommit("003").withInserts("2016/01/31", 
getRecordFileId(record3), record3);
+    } else {
+      testTable.addCommit("001").withLogAppends("2016/01/31", 
getRecordFileId(record1), record1);
+      testTable.addCommit("002").withLogAppends("2016/01/31", 
getRecordFileId(record2), record2);
+      testTable.addCommit("003").withLogAppends("2016/01/31", 
getRecordFileId(record3), record3);
+    }
+
     taggedRecordRDD = bucketIndex.tagLocation(HoodieJavaRDD.of(recordRDD), 
context,
         HoodieSparkTable.create(config, context, metaClient));
     assertFalse(taggedRecordRDD.collectAsList().stream().filter(r -> 
r.isCurrentLocationKnown())
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkWriteableTestTable.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkWriteableTestTable.java
index 8940223926..3b50d1b29b 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkWriteableTestTable.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkWriteableTestTable.java
@@ -23,6 +23,7 @@ import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
@@ -36,6 +37,7 @@ import org.apache.log4j.Logger;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 public class HoodieSparkWriteableTestTable extends HoodieWriteableTestTable {
@@ -116,4 +118,13 @@ public class HoodieSparkWriteableTestTable extends 
HoodieWriteableTestTable {
   public Path withInserts(String partition, String fileId, List<HoodieRecord> 
records) throws Exception {
     return super.withInserts(partition, fileId, records, new 
SparkTaskContextSupplier());
   }
+
+  public HoodieSparkWriteableTestTable withLogAppends(String partition, String 
fileId, HoodieRecord... records) throws Exception {
+    withLogAppends(partition, fileId, Arrays.asList(records));
+    return this;
+  }
+
+  public Map<String, List<HoodieLogFile>> withLogAppends(String partition, 
String fileId, List<HoodieRecord> records) throws Exception {
+    return super.withLogAppends(partition, fileId, records);
+  }
 }

Reply via email to