danny0405 commented on code in PR #18848:
URL: https://github.com/apache/hudi/pull/18848#discussion_r3309060279


##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java:
##########
@@ -714,6 +720,277 @@ void testStreamReadMorTableWithCompactionPlan(boolean 
useSourceV2) throws Except
     assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
   }
 
+  /**
+   * Regression test for HUDI: data loss in stream read from earliest when
+   * {@code read.streaming.skip_compaction = true} on a MOR table with 
completed
+   * compaction commits. Covers the streaming earliest full table scan branch 
in
+   * {@link 
org.apache.hudi.source.IncrementalInputSplits#inputSplits(HoodieTableMetaClient,
 String, boolean)}.
+   *
+   * <p>Triggering condition:
+   * <ul>
+   *   <li>{@code read.start-commit = earliest} (no instant range -> full 
table scan path);</li>
+   *   <li>{@code read.streaming.skip_compaction = true} (active timeline 
filtered out compaction);</li>
+   *   <li>MOR table with at least one completed compaction commit that 
produced a
+   *       new base file from existing log files.</li>
+   * </ul>
+   *
+   * <p>Construction:
+   * <ol>
+   *   <li>Offline write {@code DATA_SET_INSERT} (8 records, ids 1..8) and then
+   *       {@code DATA_SET_UPDATE_INSERT} (8 records, where ids 1..5 update 
existing keys
+   *       and ids 9..11 are new) via {@link TestData#writeDataAsBatch}, which 
deterministically
+   *       triggers an inline compaction once {@code COMPACTION_DELTA_COMMITS 
= 1} +
+   *       {@code COMPACTION_ASYNC_ENABLED = true} are set. After this step 
the table has
+   *       both a base file (from compaction) and log files written by the 
UPDATE batch.</li>
+   *   <li>Streaming read from earliest with {@code skip_compaction = true} 
and wait until
+   *       the expected number of merged rows are received. Without the fix, 
the FS view used
+   *       in the earliest full-table-scan branch is built from a 
compaction-filtered
+   *       timeline, file slice boundaries are wrongly computed, log files are 
missed
+   *       and the read will never reach the expected row count (the test 
would time out).</li>
+   * </ol>
+   */
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testStreamReadMorTableWithCompactionFromEarliest(boolean useSourceV2) 
throws Exception {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.TABLE_NAME, "t1");
+    conf.set(FlinkOptions.TABLE_TYPE, MERGE_ON_READ.name());
+    conf.set(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
+    conf.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 2);
+    // mandatory for writeDataAsBatch#inlineCompaction to actually run a 
compaction
+    conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+    conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+
+    // Step 1: offline-write two batches with deterministic inline compaction 
in between.
+    TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, conf);
+    TestData.writeDataAsBatch(TestData.DATA_SET_UPDATE_INSERT, conf);
+
+    // Step 2: streaming read from earliest with skip_compaction = true.
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
+        .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
+        .option(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name())
+        .option(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 2)
+        .option(FlinkOptions.READ_AS_STREAMING, true)
+        .option(FlinkOptions.READ_START_COMMIT, 
FlinkOptions.START_COMMIT_EARLIEST)
+        .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2)
+        .option(FlinkOptions.READ_SOURCE_V2_ENABLED, useSourceV2)
+        // skip compaction instant -> active timeline drops compaction commit
+        .option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, true)
+        .end();
+    streamTableEnv.executeSql(hoodieTableDDL);
+
+    // After the UPDATE batch, the merged result must contain all up-to-date 
records:
+    //   - 5 updated records   (id1..id5 from DATA_SET_UPDATE_INSERT)
+    //   - 3 carried-over records (id6, id7, id8 from DATA_SET_INSERT, not 
touched by UPDATE)
+    //   - 3 newly inserted records (id9, id10, id11 from 
DATA_SET_UPDATE_INSERT)
+    // i.e. 11 records in total. Without the fix the streaming read would 
never reach
+    // expectedNum = 11 and the test would time out via the CollectSink.
+    final int expectedNum = 11;
+    List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select * 
from t1", expectedNum);
+    assertEquals(expectedNum, rows.size(),
+        "Expect 11 up-to-date records to be visible after earliest streaming 
read"
+            + " with skip_compaction on a MOR table that has a completed 
compaction commit"
+            + ", actual rows: " + rows);
+  }
+
+  /**
+   * Regression test for HUDI: data loss in batch read from earliest when
+   * {@code read.streaming.skip_compaction = true} on a MOR table with 
completed
+   * compaction commits. Covers the batch full-table-scan branch in
+   * {@link 
org.apache.hudi.source.IncrementalInputSplits#inputSplits(HoodieTableMetaClient,
 boolean)}.
+   *
+   * <p>This complements {@link 
#testStreamReadMorTableWithCompactionFromEarliest(boolean)}
+   * which only exercises the streaming code path. Without the fix, building 
the
+   * {@link org.apache.hudi.common.table.view.HoodieTableFileSystemView} with a
+   * compaction-filtered timeline would mis-classify file slice boundaries and
+   * lose log files.
+   */
+  @Test
+  void testBatchReadMorTableWithCompactionFromEarliest() throws Exception {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.TABLE_NAME, "t1");
+    conf.set(FlinkOptions.TABLE_TYPE, MERGE_ON_READ.name());
+    conf.set(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
+    conf.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 2);
+    // mandatory for writeDataAsBatch#inlineCompaction to actually run a 
compaction
+    conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+    conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+
+    // Offline-write two batches against overlapping record keys, the 2nd 
write triggers
+    // an inline compaction that merges existing log files into a new base 
file - exactly
+    // the scenario that exposes the buggy file-slice classification when 
skip_compaction
+    // is enabled.
+    TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, conf);
+    TestData.writeDataAsBatch(TestData.DATA_SET_UPDATE_INSERT, conf);
+
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
+        .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
+        .option(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name())
+        .option(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 2)
+        .option(FlinkOptions.READ_START_COMMIT, 
FlinkOptions.START_COMMIT_EARLIEST)
+        // skip compaction instant -> active timeline drops compaction commit
+        .option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, true)
+        .end();
+    batchTableEnv.executeSql(hoodieTableDDL);
+
+    List<Row> result = CollectionUtil.iteratorToList(
+        batchTableEnv.executeSql("select * from t1").collect());
+    // After update, the merged result must contain all up-to-date records:
+    //   - 5 updated records   (id1..id5 from DATA_SET_UPDATE_INSERT)
+    //   - 3 carried-over records (id6, id7, id8 from DATA_SET_INSERT, not 
touched by UPDATE)
+    //   - 3 newly inserted records (id9, id10, id11 from 
DATA_SET_UPDATE_INSERT)
+    // i.e. 11 records in total. Without the fix, log files belonging to the 
file slice prior
+    // to the inline compaction would be silently dropped by the file system 
view because
+    // the active timeline filtered out the compaction commit, and the result 
size would be
+    // smaller than 11.
+    assertEquals(11, result.size(),
+        "Expect all up-to-date records to be visible after earliest + 
skip_compaction batch read"
+            + ", actual rows: " + result);
+  }
+
+  /**
+   * Regression test for HUDI: data loss when the start commit has been 
archived
+   * and {@code read.streaming.skip_compaction = true} on a MOR table.
+   * Covers the batch "fallback to full table scan" branch in
+   * {@link 
org.apache.hudi.source.IncrementalInputSplits#inputSplits(HoodieTableMetaClient,
 boolean)}
+   * which is reached when {@code hasArchivedInstants == true}.
+   *
+   * <p>Construction:
+   * <ol>
+   *   <li>Write 10 delta-commit batches of {@code (id1,id2), (id3,id4), ...} 
on a MOR table
+   *       so that each batch only inserts new keys (clear, predictable 
per-commit semantics).</li>
+   *   <li>Trigger one completed compaction commit by issuing an extra UPDATE 
batch on
+   *       {@code id1..id4} with {@code COMPACTION_DELTA_COMMITS = 1} via
+   *       {@link TestData#writeDataAsBatch} (which explicitly calls {@code 
inlineCompaction()}).
+   *       This creates exactly the file-slice boundary that the buggy FS view 
would
+   *       mis-classify.</li>
+   *   <li>Pick the LAST archived delta-commit as {@code read.start-commit} 
(filtered by
+   *       {@code action = deltacommit} to exclude any archived compaction 
{@code commit}
+   *       instants). This is deterministic regardless of how many delta 
commits were
+   *       archived by the cleaner+archiver and routes the reader through the
+   *       "archived start commit -> fullTableScan" branch.</li>
+   *   <li>Read with {@code skip_compaction = true} and assert on the SET of 
record-keys
+   *       in the result (not just on count). The expected key set is derived 
dynamically
+   *       from the timeline: every delta_commit whose completion time is 
&gt;= the chosen
+   *       start_commit contributes its written ids, plus id1..id4 from the 
UPDATE batch
+   *       are always present because the UPDATE is the latest write. Without 
the fix,
+   *       log files of the file slice that straddles the compaction commit 
are silently
+   *       dropped, so some of these ids would be missing.</li>
+   * </ol>
+   */
+  @Test
+  void testBatchReadMorTableWithCompactionStartCommitArchived() throws 
Exception {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.TABLE_NAME, "t1");
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+    conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
+    conf.set(FlinkOptions.TABLE_TYPE, MERGE_ON_READ.name());
+    conf.set(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
+    conf.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 2);
+    // aggressive archival to force older instants out of the active timeline
+    conf.set(FlinkOptions.ARCHIVE_MIN_COMMITS, 4);
+    conf.set(FlinkOptions.ARCHIVE_MAX_COMMITS, 5);
+    conf.set(FlinkOptions.CLEAN_RETAIN_COMMITS, 3);
+    conf.setString("hoodie.commits.archival.batch", "1");
+
+    // Step 1: write 10 batches of 2 new records each -> 10 delta_commit 
instants, 20 distinct keys.
+    // Remember each batch's instant time so we can later map start_commit -> 
excluded id set.
+    List<String> batchInstantTimes = new ArrayList<>();
+    HoodieTableMetaClient metaClient = null;
+    for (int i = 0; i < 20; i += 2) {
+      List<RowData> dataset = TestData.dataSetInsert(i + 1, i + 2);
+      TestData.writeData(dataset, conf);
+      // Create metaClient lazily after the first write establishes the table 
on disk.
+      if (metaClient == null) {
+        metaClient = HoodieTestUtils.createMetaClient(
+            new 
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(new 
Configuration())),
+            tempFile.getAbsolutePath());
+      } else {
+        metaClient.reloadActiveTimeline();
+      }
+      batchInstantTimes.add(

Review Comment:
   we can list the timeline in one shot instead of 10 times?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to