This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d6bb34361546 [HUDI-8358] Add more tests to cover completion time
incremental query (#13725)
d6bb34361546 is described below
commit d6bb34361546661384b4839ea609e49c8de8c621
Author: Shuo Cheng <[email protected]>
AuthorDate: Mon Aug 18 12:09:48 2025 +0800
[HUDI-8358] Add more tests to cover completion time incremental query
(#13725)
---
.../utilities/sources/TestHoodieIncrSource.java | 138 +++++++++++++++++++++
1 file changed, 138 insertions(+)
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
index c31b35bac3ca..b34da19b121f 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
@@ -657,6 +657,144 @@ public class TestHoodieIncrSource extends
SparkClientFunctionalTestHarness {
});
}
+ @ParameterizedTest
+ @EnumSource(HoodieTableType.class)
+ public void testIncrSourceQueryWithIntersectingCommits(HoodieTableType
tableType) throws IOException {
+ this.tableType = tableType;
+ metaClient = getHoodieMetaClient(storageConf(), basePath());
+ HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient)
+
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(4,
5).build())
+
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(2).build())
+ .withCompactionConfig(
+ HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(3)
+ .build())
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+ .build();
+
+ try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) {
+ List<WriteResult> inserts = new ArrayList<>();
+
+ // Create commits following the pattern: start1, start2, end2, [query
time], end1
+ // 0: start1 commit (will be made inflight)
+ // 1: start2 commit
+ // 2: end2
+ // 3: [query time]
+ // 4: end1
+ for (int i = 0; i < 2; i++) {
+ inserts.add(writeRecords(writeClient, tableType, INSERT, null, 100));
+ }
+
+ // Emulates a scenario where an inflight commit intersects with query
time
+ // We'll make the first commit inflight, query time will be between
completion time of 2nd commit
+ // and the later completion time of 1st commit.
+ HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+ HoodieInstant instant0 = activeTimeline
+ .filter(instant ->
instant.requestedTime().equals(inserts.get(0).getInstantTime())).firstInstant().get();
+ HoodieCommitMetadata instant0CommitData =
metaClient.reloadActiveTimeline().readCommitMetadata(instant0);
+ activeTimeline.revertToInflight(instant0);
+ metaClient.reloadActiveTimeline();
+
+ // Complete the inflight commit
+ activeTimeline.reload().saveAsComplete(
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
instant0.getAction(), inserts.get(0).getInstantTime()),
+ Option.of(instant0CommitData));
+
+ instant0 = activeTimeline.reload()
+ .filter(instant ->
instant.requestedTime().equals(inserts.get(0).getInstantTime())).firstInstant().get();
+ assertNotNull(instant0.getCompletionTime());
+ // When querying from instant1 (end2) with READ_UPTO_LATEST_COMMIT,
+ // the query should only read up to the completion time of 1st commit
(end1)
+ readAndAssertWithLatestTableVersion(
+ IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
+ Option.of(inserts.get(1).getInstant()),
+ 100, // Only records from instant 1 (which is inflight but was
committed before becoming inflight)
+ instant0);
+ }
+ }
+
+ @Test
+ void testIncrSourceSameResultWithPendingAndCompletedCompaction() throws
IOException {
+ metaClient = getHoodieMetaClient(storageConf(), basePath(),
getPropertiesForKeyGen(true), MERGE_ON_READ);
+ HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient)
+
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(10,
12).build())
+
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(9).build())
+ .withCompactionConfig(
+ HoodieCompactionConfig.newBuilder()
+ .withScheduleInlineCompaction(true)
+ .withMaxNumDeltaCommitsBeforeCompaction(1)
+ .build())
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+ .build();
+
+ try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) {
+ List<WriteResult> dataBatches = new ArrayList<>();
+ // 0: bulk_insert of 100 records
+ // 1: bulk_insert of 100 records
+ // 2: bulk_insert of 100 records
+ // 3: bulk_insert of 100 records
+ // 4: upsert of 100 records (updates only based on round 3)
+ // schedule compaction
+ // 5: upsert of 100 records (updates only based on round 3)
+ // 6: bulk_insert of 100 records
+ for (int i = 0; i < 6; i++) {
+ WriteOperationType opType = i < 4 ? BULK_INSERT : UPSERT;
+ List<HoodieRecord> recordsForUpdate = i < 4 ? null :
dataBatches.get(3).getRecords();
+ dataBatches.add(writeRecords(writeClient, MERGE_ON_READ, opType,
recordsForUpdate));
+ if (i == 4) {
+ writeClient.scheduleCompaction(Option.empty());
+ }
+ }
+ dataBatches.add(writeRecords(writeClient, MERGE_ON_READ, BULK_INSERT,
null));
+
+ String latestCommitTimestamp = dataBatches.get(dataBatches.size() -
1).getInstantTime();
+ // Pending compaction exists
+ Option<HoodieInstant> compactionInstant =
+
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
+ assertTrue(compactionInstant.isPresent());
+
assertTrue(compactionInstant.get().requestedTime().compareTo(latestCommitTimestamp)
< 0);
+
+ // The pending tables services should not block the incremental pulls
+ // Reads everything up to latest
+ readAndAssertWithLatestTableVersion(
+ IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
+ Option.empty(),
+ 500,
+ dataBatches.get(6).getInstant());
+
+ // set start completion timestamp in active timeline
+ readAndAssertWithLatestTableVersion(
+ IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
+ Option.of(dataBatches.get(2).getInstant()),
+ 200,
+ dataBatches.get(6).getInstant());
+
+ // complete the compaction
+ String compactionRequestTime = compactionInstant.get().requestedTime();
+ writeClient.compact(compactionRequestTime, true);
+
+ compactionInstant =
+
metaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants()
+ .filter(instant ->
instant.requestedTime().equals(compactionRequestTime)).firstInstant();
+ assertTrue(compactionInstant.isPresent());
+
assertTrue(compactionInstant.get().getCompletionTime().compareTo(latestCommitTimestamp)
> 0);
+
+ // Reads everything up to latest with completed compaction
+ readAndAssertWithLatestTableVersion(
+ IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
+ Option.empty(),
+ 500,
+ compactionInstant.get());
+ // set start completion timestamp in active timeline
+ readAndAssertWithLatestTableVersion(
+ IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
+ Option.of(dataBatches.get(2).getInstant()),
+ 200,
+ compactionInstant.get());
+ }
+ }
+
private static ExternalSpillableMap<BaseHoodieTableFileIndex.PartitionPath,
List<FileSlice>> getSpillableMap(BaseHoodieTableFileIndex hoodieTableFileIndex)
{
// cachedAllInputFileSlices is a private field, using reflection to assert
the size.
try {