This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new d8f51e0f7a7 [HUDI-7778] Fixing global index for duplicate updates 
(#11305)
d8f51e0f7a7 is described below

commit d8f51e0f7a73cf8aaef0a125640435ad3f4e66d2
Author: Y Ethan Guo <[email protected]>
AuthorDate: Sat May 25 20:22:13 2024 -0700

    [HUDI-7778] Fixing global index for duplicate updates (#11305)
    
    Co-authored-by: sivabalan <[email protected]>
---
 .../org/apache/hudi/index/HoodieIndexUtils.java    |  8 +--
 .../TestGlobalIndexEnableUpdatePartitions.java     | 62 +++++++++++++++++++++-
 2 files changed, 64 insertions(+), 6 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 580fcdd85e0..5751dbbf0b5 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -237,7 +237,7 @@ public class HoodieIndexUtils {
    * @return {@link HoodieRecord}s that have the current location being set.
    */
   private static <R> HoodieData<HoodieRecord<R>> getExistingRecords(
-      HoodieData<HoodieRecordGlobalLocation> partitionLocations, 
HoodieWriteConfig config, HoodieTable hoodieTable) {
+      HoodieData<Pair<String, String>> partitionLocations, HoodieWriteConfig 
config, HoodieTable hoodieTable) {
     final Option<String> instantTime = hoodieTable
         .getMetaClient()
         .getCommitsTimeline()
@@ -245,7 +245,7 @@ public class HoodieIndexUtils {
         .lastInstant()
         .map(HoodieInstant::getTimestamp);
     return partitionLocations.flatMap(p
-        -> new HoodieMergedReadHandle(config, instantTime, hoodieTable, 
Pair.of(p.getPartitionPath(), p.getFileId()))
+        -> new HoodieMergedReadHandle(config, instantTime, hoodieTable, 
Pair.of(p.getKey(), p.getValue()))
         .getMergedRecords().iterator());
   }
 
@@ -351,9 +351,9 @@ public class HoodieIndexUtils {
     HoodieData<HoodieRecord<R>> untaggedUpdatingRecords = 
incomingRecordsAndLocations.filter(p -> 
p.getRight().isPresent()).map(Pair::getLeft)
         .distinctWithKey(HoodieRecord::getRecordKey, 
config.getGlobalIndexReconcileParallelism());
     // the tagging partitions and locations
-    HoodieData<HoodieRecordGlobalLocation> globalLocations = 
incomingRecordsAndLocations
+    HoodieData<Pair<String, String>> globalLocations = 
incomingRecordsAndLocations
         .filter(p -> p.getRight().isPresent())
-        .map(p -> p.getRight().get())
+        .map(p -> Pair.of(p.getRight().get().getPartitionPath(), 
p.getRight().get().getFileId()))
         .distinct(config.getGlobalIndexReconcileParallelism());
     // merged existing records with current locations being set
     HoodieData<HoodieRecord<R>> existingRecords = 
getExistingRecords(globalLocations,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
index b0454f7f2aa..f37ec8462ed 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
@@ -38,7 +38,10 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Stream;
 
 import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
@@ -124,7 +127,6 @@ public class TestGlobalIndexEnableUpdatePartitions extends 
SparkClientFunctional
       assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch9, 2), 
commitTimeAtEpoch9).collect());
       readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1, 9);
     }
-
   }
 
   @ParameterizedTest
@@ -180,8 +182,64 @@ public class TestGlobalIndexEnableUpdatePartitions extends 
SparkClientFunctional
       readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1, 9);
     }
   }
+  
+  @ParameterizedTest
+  @MethodSource("getTableTypeAndIndexType")
+  public void testUdpateSubsetOfRecUpdates(HoodieTableType tableType, 
IndexType indexType) throws IOException {
+    final Class<?> payloadClass = DefaultHoodieRecordPayload.class;
+    HoodieWriteConfig writeConfig = getWriteConfig(payloadClass, indexType);
+    HoodieTableMetaClient metaClient = getHoodieMetaClient(tableType, 
writeConfig.getProps());
+    try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) {
+      final int totalRecords = 4;
+      final String p1 = "p1";
+      final String p2 = "p2";
+
+      List<HoodieRecord> allInserts = getInserts(totalRecords, p1, 0, 
payloadClass);
+
+      // 1st batch: insert 1,2
+      String commitTimeAtEpoch0 = getCommitTimeAtUTC(0);
+      client.startCommitWithTime(commitTimeAtEpoch0);
+      
assertNoWriteErrors(client.upsert(jsc().parallelize(allInserts.subList(0,2), 
2), commitTimeAtEpoch0).collect());
+      readTableAndValidate(metaClient, new int[] {0, 1}, p1, 0L);
+
+      // 2nd batch: update records 1,2 and insert 3
+      String commitTimeAtEpoch5 = getCommitTimeAtUTC(5);
+      List<HoodieRecord> updatesAtEpoch5 = getUpdates(allInserts.subList(0,3), 
5, payloadClass);
+      client.startCommitWithTime(commitTimeAtEpoch5);
+      assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch5, 2), 
commitTimeAtEpoch5).collect());
+      readTableAndValidate(metaClient, new int[] {0, 1, 2}, p1, 
getExpectedTsMap(new int[] {0, 1, 2}, new Long[] {5L, 5L, 5L}));
+
+      // 3rd batch: update records 1,2,3 and insert 4
+      String commitTimeAtEpoch10 = getCommitTimeAtUTC(10);
+      List<HoodieRecord> updatesAtEpoch10 = getUpdates(allInserts, 10, 
payloadClass);
+      client.startCommitWithTime(commitTimeAtEpoch10);
+      assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch10, 
2), commitTimeAtEpoch10).collect());
+      readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1, 
getExpectedTsMap(new int[] {0, 1, 2, 3}, new Long[] {10L, 10L, 10L, 10L}));
+
+      // 4th batch: update all from p1 to p2
+      String commitTimeAtEpoch20 = getCommitTimeAtUTC(20);
+      List<HoodieRecord> updatesAtEpoch20 = getUpdates(allInserts, p2, 20, 
payloadClass);
+      client.startCommitWithTime(commitTimeAtEpoch20);
+      assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch20, 
2), commitTimeAtEpoch20).collect());
+      readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p2, 20);
+    }
+  }
+
+  private Map<String, Long> getExpectedTsMap(int[] recordKeys, Long[] 
expectedTses) {
+    Map<String, Long> expectedTsMap = new HashMap<>();
+    for (int i = 0; i < recordKeys.length; i++) {
+      expectedTsMap.put(String.valueOf(recordKeys[i]), expectedTses[i]);
+    }
+    return expectedTsMap;
+  }
 
   private void readTableAndValidate(HoodieTableMetaClient metaClient, int[] 
expectedIds, String expectedPartition, long expectedTs) {
+    Map<String, Long> expectedTsMap = new HashMap<>();
+    Arrays.stream(expectedIds).forEach(entry -> 
expectedTsMap.put(String.valueOf(entry), expectedTs));
+    readTableAndValidate(metaClient, expectedIds, expectedPartition, 
expectedTsMap);
+  }
+
+  private void readTableAndValidate(HoodieTableMetaClient metaClient, int[] 
expectedIds, String expectedPartition, Map<String, Long> expectedTsMap) {
     Dataset<Row> df = spark().read().format("hudi")
         .load(metaClient.getBasePathV2().toString())
         .sort("id")
@@ -198,7 +256,7 @@ public class TestGlobalIndexEnableUpdatePartitions extends 
SparkClientFunctional
       assertEquals(expectedPartition, r.getString(1));
       assertEquals(expectedId, r.getInt(2));
       assertEquals(expectedPartition, r.getString(3));
-      assertEquals(expectedTs, r.getLong(4));
+      assertEquals(expectedTsMap.get(String.valueOf(expectedId)), 
r.getLong(4));
     }
     df.unpersist();
   }

Reply via email to