This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d6bb34361546 [HUDI-8358] Add more tests to cover completion time 
incremental query (#13725)
d6bb34361546 is described below

commit d6bb34361546661384b4839ea609e49c8de8c621
Author: Shuo Cheng <[email protected]>
AuthorDate: Mon Aug 18 12:09:48 2025 +0800

    [HUDI-8358] Add more tests to cover completion time incremental query 
(#13725)
---
 .../utilities/sources/TestHoodieIncrSource.java    | 138 +++++++++++++++++++++
 1 file changed, 138 insertions(+)

diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
index c31b35bac3ca..b34da19b121f 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
@@ -657,6 +657,144 @@ public class TestHoodieIncrSource extends 
SparkClientFunctionalTestHarness {
     });
   }
 
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testIncrSourceQueryWithIntersectingCommits(HoodieTableType 
tableType) throws IOException {
+    this.tableType = tableType;
+    metaClient = getHoodieMetaClient(storageConf(), basePath());
+    HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient)
+        
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(4, 
5).build())
+        
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(2).build())
+        .withCompactionConfig(
+            HoodieCompactionConfig.newBuilder()
+                .withInlineCompaction(false)
+                .withMaxNumDeltaCommitsBeforeCompaction(3)
+                .build())
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        .build();
+
+    try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) {
+      List<WriteResult> inserts = new ArrayList<>();
+
+      // Create commits following the pattern: start1, start2, end2, [query 
time], end1
+      // 0: start1 commit (will be made inflight)
+      // 1: start2 commit
+      // 2: end2
+      // 3: [query time]
+      // 4: end1
+      for (int i = 0; i < 2; i++) {
+        inserts.add(writeRecords(writeClient, tableType, INSERT, null, 100));
+      }
+
+      // Emulates a scenario where an inflight commit intersects with query 
time
+      // We'll make the first commit inflight, query time will be between 
completion time of 2nd commit
+      // and the later completion time of 1st commit.
+      HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+      HoodieInstant instant0 = activeTimeline
+          .filter(instant -> 
instant.requestedTime().equals(inserts.get(0).getInstantTime())).firstInstant().get();
+      HoodieCommitMetadata instant0CommitData = 
metaClient.reloadActiveTimeline().readCommitMetadata(instant0);
+      activeTimeline.revertToInflight(instant0);
+      metaClient.reloadActiveTimeline();
+
+      // Complete the inflight commit
+      activeTimeline.reload().saveAsComplete(
+          INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
instant0.getAction(), inserts.get(0).getInstantTime()),
+          Option.of(instant0CommitData));
+
+      instant0 = activeTimeline.reload()
+          .filter(instant -> 
instant.requestedTime().equals(inserts.get(0).getInstantTime())).firstInstant().get();
+      assertNotNull(instant0.getCompletionTime());
+      // When querying from instant1 (end2) with READ_UPTO_LATEST_COMMIT,
+      // the query should only read up to the completion time of 1st commit 
(end1)
+      readAndAssertWithLatestTableVersion(
+          IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
+          Option.of(inserts.get(1).getInstant()),
+          100, // Only records from instant 1 (which is inflight but was 
committed before becoming inflight)
+          instant0);
+    }
+  }
+
+  @Test
+  void testIncrSourceSameResultWithPendingAndCompletedCompaction() throws 
IOException {
+    metaClient = getHoodieMetaClient(storageConf(), basePath(), 
getPropertiesForKeyGen(true), MERGE_ON_READ);
+    HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient)
+        
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(10, 
12).build())
+        
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(9).build())
+        .withCompactionConfig(
+            HoodieCompactionConfig.newBuilder()
+                .withScheduleInlineCompaction(true)
+                .withMaxNumDeltaCommitsBeforeCompaction(1)
+                .build())
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        .build();
+
+    try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) {
+      List<WriteResult> dataBatches = new ArrayList<>();
+      //   0: bulk_insert of 100 records
+      //   1: bulk_insert of 100 records
+      //   2: bulk_insert of 100 records
+      //   3: bulk_insert of 100 records
+      //   4: upsert of 100 records (updates only based on round 3)
+      //      schedule compaction
+      //   5: upsert of 100 records (updates only based on round 3)
+      //   6: bulk_insert of 100 records
+      for (int i = 0; i < 6; i++) {
+        WriteOperationType opType = i < 4 ? BULK_INSERT : UPSERT;
+        List<HoodieRecord> recordsForUpdate = i < 4 ? null : 
dataBatches.get(3).getRecords();
+        dataBatches.add(writeRecords(writeClient, MERGE_ON_READ, opType, 
recordsForUpdate));
+        if (i == 4) {
+          writeClient.scheduleCompaction(Option.empty());
+        }
+      }
+      dataBatches.add(writeRecords(writeClient, MERGE_ON_READ, BULK_INSERT, 
null));
+
+      String latestCommitTimestamp = dataBatches.get(dataBatches.size() - 
1).getInstantTime();
+      // Pending compaction exists
+      Option<HoodieInstant> compactionInstant =
+          
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
+      assertTrue(compactionInstant.isPresent());
+      
assertTrue(compactionInstant.get().requestedTime().compareTo(latestCommitTimestamp)
 < 0);
+
+      // The pending tables services should not block the incremental pulls
+      // Reads everything up to latest
+      readAndAssertWithLatestTableVersion(
+          IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
+          Option.empty(),
+          500,
+          dataBatches.get(6).getInstant());
+
+      // set start completion timestamp in active timeline
+      readAndAssertWithLatestTableVersion(
+          IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
+          Option.of(dataBatches.get(2).getInstant()),
+          200,
+          dataBatches.get(6).getInstant());
+
+      // complete the compaction
+      String compactionRequestTime = compactionInstant.get().requestedTime();
+      writeClient.compact(compactionRequestTime, true);
+
+      compactionInstant =
+          
metaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants()
+              .filter(instant -> 
instant.requestedTime().equals(compactionRequestTime)).firstInstant();
+      assertTrue(compactionInstant.isPresent());
+      
assertTrue(compactionInstant.get().getCompletionTime().compareTo(latestCommitTimestamp)
 > 0);
+
+      // Reads everything up to latest with completed compaction
+      readAndAssertWithLatestTableVersion(
+          IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
+          Option.empty(),
+          500,
+          compactionInstant.get());
+      // set start completion timestamp in active timeline
+      readAndAssertWithLatestTableVersion(
+          IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
+          Option.of(dataBatches.get(2).getInstant()),
+          200,
+          compactionInstant.get());
+    }
+  }
+
   private static ExternalSpillableMap<BaseHoodieTableFileIndex.PartitionPath, 
List<FileSlice>> getSpillableMap(BaseHoodieTableFileIndex hoodieTableFileIndex) 
{
     // cachedAllInputFileSlices is a private field, using reflection to assert 
the size.
     try {

Reply via email to