This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new d8f51e0f7a7 [HUDI-7778] Fixing global index for duplicate updates
(#11305)
d8f51e0f7a7 is described below
commit d8f51e0f7a73cf8aaef0a125640435ad3f4e66d2
Author: Y Ethan Guo <[email protected]>
AuthorDate: Sat May 25 20:22:13 2024 -0700
[HUDI-7778] Fixing global index for duplicate updates (#11305)
Co-authored-by: sivabalan <[email protected]>
---
.../org/apache/hudi/index/HoodieIndexUtils.java | 8 +--
.../TestGlobalIndexEnableUpdatePartitions.java | 62 +++++++++++++++++++++-
2 files changed, 64 insertions(+), 6 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 580fcdd85e0..5751dbbf0b5 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -237,7 +237,7 @@ public class HoodieIndexUtils {
* @return {@link HoodieRecord}s that have the current location being set.
*/
private static <R> HoodieData<HoodieRecord<R>> getExistingRecords(
- HoodieData<HoodieRecordGlobalLocation> partitionLocations,
HoodieWriteConfig config, HoodieTable hoodieTable) {
+ HoodieData<Pair<String, String>> partitionLocations, HoodieWriteConfig
config, HoodieTable hoodieTable) {
final Option<String> instantTime = hoodieTable
.getMetaClient()
.getCommitsTimeline()
@@ -245,7 +245,7 @@ public class HoodieIndexUtils {
.lastInstant()
.map(HoodieInstant::getTimestamp);
return partitionLocations.flatMap(p
- -> new HoodieMergedReadHandle(config, instantTime, hoodieTable,
Pair.of(p.getPartitionPath(), p.getFileId()))
+ -> new HoodieMergedReadHandle(config, instantTime, hoodieTable,
Pair.of(p.getKey(), p.getValue()))
.getMergedRecords().iterator());
}
@@ -351,9 +351,9 @@ public class HoodieIndexUtils {
HoodieData<HoodieRecord<R>> untaggedUpdatingRecords =
incomingRecordsAndLocations.filter(p ->
p.getRight().isPresent()).map(Pair::getLeft)
.distinctWithKey(HoodieRecord::getRecordKey,
config.getGlobalIndexReconcileParallelism());
// the tagging partitions and locations
- HoodieData<HoodieRecordGlobalLocation> globalLocations =
incomingRecordsAndLocations
+ HoodieData<Pair<String, String>> globalLocations =
incomingRecordsAndLocations
.filter(p -> p.getRight().isPresent())
- .map(p -> p.getRight().get())
+ .map(p -> Pair.of(p.getRight().get().getPartitionPath(),
p.getRight().get().getFileId()))
.distinct(config.getGlobalIndexReconcileParallelism());
// merged existing records with current locations being set
HoodieData<HoodieRecord<R>> existingRecords =
getExistingRecords(globalLocations,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
index b0454f7f2aa..f37ec8462ed 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java
@@ -38,7 +38,10 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Stream;
import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
@@ -124,7 +127,6 @@ public class TestGlobalIndexEnableUpdatePartitions extends
SparkClientFunctional
assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch9, 2),
commitTimeAtEpoch9).collect());
readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1, 9);
}
-
}
@ParameterizedTest
@@ -180,8 +182,64 @@ public class TestGlobalIndexEnableUpdatePartitions extends
SparkClientFunctional
readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1, 9);
}
}
+
+ @ParameterizedTest
+ @MethodSource("getTableTypeAndIndexType")
+ public void testUdpateSubsetOfRecUpdates(HoodieTableType tableType,
IndexType indexType) throws IOException {
+ final Class<?> payloadClass = DefaultHoodieRecordPayload.class;
+ HoodieWriteConfig writeConfig = getWriteConfig(payloadClass, indexType);
+ HoodieTableMetaClient metaClient = getHoodieMetaClient(tableType,
writeConfig.getProps());
+ try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) {
+ final int totalRecords = 4;
+ final String p1 = "p1";
+ final String p2 = "p2";
+
+ List<HoodieRecord> allInserts = getInserts(totalRecords, p1, 0,
payloadClass);
+
+ // 1st batch: insert 1,2
+ String commitTimeAtEpoch0 = getCommitTimeAtUTC(0);
+ client.startCommitWithTime(commitTimeAtEpoch0);
+
assertNoWriteErrors(client.upsert(jsc().parallelize(allInserts.subList(0,2),
2), commitTimeAtEpoch0).collect());
+ readTableAndValidate(metaClient, new int[] {0, 1}, p1, 0L);
+
+ // 2nd batch: update records 1,2 and insert 3
+ String commitTimeAtEpoch5 = getCommitTimeAtUTC(5);
+ List<HoodieRecord> updatesAtEpoch5 = getUpdates(allInserts.subList(0,3),
5, payloadClass);
+ client.startCommitWithTime(commitTimeAtEpoch5);
+ assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch5, 2),
commitTimeAtEpoch5).collect());
+ readTableAndValidate(metaClient, new int[] {0, 1, 2}, p1,
getExpectedTsMap(new int[] {0, 1, 2}, new Long[] {5L, 5L, 5L}));
+
+ // 3rd batch: update records 1,2,3 and insert 4
+ String commitTimeAtEpoch10 = getCommitTimeAtUTC(10);
+ List<HoodieRecord> updatesAtEpoch10 = getUpdates(allInserts, 10,
payloadClass);
+ client.startCommitWithTime(commitTimeAtEpoch10);
+ assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch10,
2), commitTimeAtEpoch10).collect());
+ readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1,
getExpectedTsMap(new int[] {0, 1, 2, 3}, new Long[] {10L, 10L, 10L, 10L}));
+
+ // 4th batch: update all from p1 to p2
+ String commitTimeAtEpoch20 = getCommitTimeAtUTC(20);
+ List<HoodieRecord> updatesAtEpoch20 = getUpdates(allInserts, p2, 20,
payloadClass);
+ client.startCommitWithTime(commitTimeAtEpoch20);
+ assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch20,
2), commitTimeAtEpoch20).collect());
+ readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p2, 20);
+ }
+ }
+
+ private Map<String, Long> getExpectedTsMap(int[] recordKeys, Long[]
expectedTses) {
+ Map<String, Long> expectedTsMap = new HashMap<>();
+ for (int i = 0; i < recordKeys.length; i++) {
+ expectedTsMap.put(String.valueOf(recordKeys[i]), expectedTses[i]);
+ }
+ return expectedTsMap;
+ }
private void readTableAndValidate(HoodieTableMetaClient metaClient, int[]
expectedIds, String expectedPartition, long expectedTs) {
+ Map<String, Long> expectedTsMap = new HashMap<>();
+ Arrays.stream(expectedIds).forEach(entry ->
expectedTsMap.put(String.valueOf(entry), expectedTs));
+ readTableAndValidate(metaClient, expectedIds, expectedPartition,
expectedTsMap);
+ }
+
+ private void readTableAndValidate(HoodieTableMetaClient metaClient, int[]
expectedIds, String expectedPartition, Map<String, Long> expectedTsMap) {
Dataset<Row> df = spark().read().format("hudi")
.load(metaClient.getBasePathV2().toString())
.sort("id")
@@ -198,7 +256,7 @@ public class TestGlobalIndexEnableUpdatePartitions extends
SparkClientFunctional
assertEquals(expectedPartition, r.getString(1));
assertEquals(expectedId, r.getInt(2));
assertEquals(expectedPartition, r.getString(3));
- assertEquals(expectedTs, r.getLong(4));
+ assertEquals(expectedTsMap.get(String.valueOf(expectedId)),
r.getLong(4));
}
df.unpersist();
}