This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.15.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 86552da1832d55bbcb2040e6757b0ac609bf9432 Author: Y Ethan Guo <[email protected]> AuthorDate: Sat May 25 20:22:13 2024 -0700 [HUDI-7778] Fixing global index for duplicate updates (#11305) Co-authored-by: sivabalan <[email protected]> --- .../org/apache/hudi/index/HoodieIndexUtils.java | 8 +-- .../TestGlobalIndexEnableUpdatePartitions.java | 62 +++++++++++++++++++++- 2 files changed, 64 insertions(+), 6 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index 580fcdd85e0..5751dbbf0b5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -237,7 +237,7 @@ public class HoodieIndexUtils { * @return {@link HoodieRecord}s that have the current location being set. */ private static <R> HoodieData<HoodieRecord<R>> getExistingRecords( - HoodieData<HoodieRecordGlobalLocation> partitionLocations, HoodieWriteConfig config, HoodieTable hoodieTable) { + HoodieData<Pair<String, String>> partitionLocations, HoodieWriteConfig config, HoodieTable hoodieTable) { final Option<String> instantTime = hoodieTable .getMetaClient() .getCommitsTimeline() @@ -245,7 +245,7 @@ public class HoodieIndexUtils { .lastInstant() .map(HoodieInstant::getTimestamp); return partitionLocations.flatMap(p - -> new HoodieMergedReadHandle(config, instantTime, hoodieTable, Pair.of(p.getPartitionPath(), p.getFileId())) + -> new HoodieMergedReadHandle(config, instantTime, hoodieTable, Pair.of(p.getKey(), p.getValue())) .getMergedRecords().iterator()); } @@ -351,9 +351,9 @@ public class HoodieIndexUtils { HoodieData<HoodieRecord<R>> untaggedUpdatingRecords = incomingRecordsAndLocations.filter(p -> p.getRight().isPresent()).map(Pair::getLeft) .distinctWithKey(HoodieRecord::getRecordKey, config.getGlobalIndexReconcileParallelism()); // the tagging partitions and locations - HoodieData<HoodieRecordGlobalLocation> globalLocations = incomingRecordsAndLocations + HoodieData<Pair<String, String>> globalLocations = incomingRecordsAndLocations .filter(p -> p.getRight().isPresent()) - .map(p -> p.getRight().get()) + .map(p -> Pair.of(p.getRight().get().getPartitionPath(), p.getRight().get().getFileId())) .distinct(config.getGlobalIndexReconcileParallelism()); // merged existing records with current locations being set HoodieData<HoodieRecord<R>> existingRecords = getExistingRecords(globalLocations, diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java index b0454f7f2aa..f37ec8462ed 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java @@ -38,7 +38,10 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Stream; import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; @@ -124,7 +127,6 @@ public class TestGlobalIndexEnableUpdatePartitions extends SparkClientFunctional assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch9, 2), commitTimeAtEpoch9).collect()); readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1, 9); } - } @ParameterizedTest @@ -180,8 +182,64 @@ public class TestGlobalIndexEnableUpdatePartitions extends SparkClientFunctional readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1, 9); } } + + @ParameterizedTest + @MethodSource("getTableTypeAndIndexType") + public void testUdpateSubsetOfRecUpdates(HoodieTableType tableType, IndexType indexType) throws IOException { + final Class<?> payloadClass = DefaultHoodieRecordPayload.class; + HoodieWriteConfig writeConfig = getWriteConfig(payloadClass, indexType); + HoodieTableMetaClient metaClient = getHoodieMetaClient(tableType, writeConfig.getProps()); + try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) { + final int totalRecords = 4; + final String p1 = "p1"; + final String p2 = "p2"; + + List<HoodieRecord> allInserts = getInserts(totalRecords, p1, 0, payloadClass); + + // 1st batch: insert 1,2 + String commitTimeAtEpoch0 = getCommitTimeAtUTC(0); + client.startCommitWithTime(commitTimeAtEpoch0); + assertNoWriteErrors(client.upsert(jsc().parallelize(allInserts.subList(0,2), 2), commitTimeAtEpoch0).collect()); + readTableAndValidate(metaClient, new int[] {0, 1}, p1, 0L); + + // 2nd batch: update records 1,2 and insert 3 + String commitTimeAtEpoch5 = getCommitTimeAtUTC(5); + List<HoodieRecord> updatesAtEpoch5 = getUpdates(allInserts.subList(0,3), 5, payloadClass); + client.startCommitWithTime(commitTimeAtEpoch5); + assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch5, 2), commitTimeAtEpoch5).collect()); + readTableAndValidate(metaClient, new int[] {0, 1, 2}, p1, getExpectedTsMap(new int[] {0, 1, 2}, new Long[] {5L, 5L, 5L})); + + // 3rd batch: update records 1,2,3 and insert 4 + String commitTimeAtEpoch10 = getCommitTimeAtUTC(10); + List<HoodieRecord> updatesAtEpoch10 = getUpdates(allInserts, 10, payloadClass); + client.startCommitWithTime(commitTimeAtEpoch10); + assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch10, 2), commitTimeAtEpoch10).collect()); + readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1, getExpectedTsMap(new int[] {0, 1, 2, 3}, new Long[] {10L, 10L, 10L, 10L})); + + // 4th batch: update all from p1 to p2 + String commitTimeAtEpoch20 = getCommitTimeAtUTC(20); + List<HoodieRecord> updatesAtEpoch20 = getUpdates(allInserts, p2, 20, payloadClass); + client.startCommitWithTime(commitTimeAtEpoch20); + assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch20, 2), commitTimeAtEpoch20).collect()); + readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p2, 20); + } + } + + private Map<String, Long> getExpectedTsMap(int[] recordKeys, Long[] expectedTses) { + Map<String, Long> expectedTsMap = new HashMap<>(); + for (int i = 0; i < recordKeys.length; i++) { + expectedTsMap.put(String.valueOf(recordKeys[i]), expectedTses[i]); + } + return expectedTsMap; + } private void readTableAndValidate(HoodieTableMetaClient metaClient, int[] expectedIds, String expectedPartition, long expectedTs) { + Map<String, Long> expectedTsMap = new HashMap<>(); + Arrays.stream(expectedIds).forEach(entry -> expectedTsMap.put(String.valueOf(entry), expectedTs)); + readTableAndValidate(metaClient, expectedIds, expectedPartition, expectedTsMap); + } + + private void readTableAndValidate(HoodieTableMetaClient metaClient, int[] expectedIds, String expectedPartition, Map<String, Long> expectedTsMap) { Dataset<Row> df = spark().read().format("hudi") .load(metaClient.getBasePathV2().toString()) .sort("id") @@ -198,7 +256,7 @@ public class TestGlobalIndexEnableUpdatePartitions extends SparkClientFunctional assertEquals(expectedPartition, r.getString(1)); assertEquals(expectedId, r.getInt(2)); assertEquals(expectedPartition, r.getString(3)); - assertEquals(expectedTs, r.getLong(4)); + assertEquals(expectedTsMap.get(String.valueOf(expectedId)), r.getLong(4)); } df.unpersist(); }
