This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4e5034d35462 fix(flink): fix data loss in stream read from earliest
(#18848)
4e5034d35462 is described below
commit 4e5034d354628bab348a55d694171632a0c7c2a0
Author: fhan <[email protected]>
AuthorDate: Mon Jun 1 17:13:07 2026 +0800
fix(flink): fix data loss in stream read from earliest (#18848)
* fix(flink): fix data loss in stream read from earliest
* fix(flink): optimize UTs and refine de-duplicate full-table-scan timeline
comment
---------
Co-authored-by: fhan <[email protected]>
---
.../apache/hudi/source/IncrementalInputSplits.java | 35 ++-
.../apache/hudi/table/ITTestHoodieDataSource.java | 281 +++++++++++++++++++++
2 files changed, 313 insertions(+), 3 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index bc6ebd29da21..344b24bc10c4 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -184,7 +184,12 @@ public class IncrementalInputSplits implements
Serializable {
return Result.EMPTY;
}
fileInfoList = fileIndex.getFilesInPartitions();
- List<FileSlice> allFileSlices = getFileSlices(metaClient,
commitTimeline, readPartitions, fileInfoList,
analyzingResult.getMaxCompletionTime(), false);
+ // Use the full commits-and-compaction timeline rather than the
(possibly compaction-filtered)
+ // activeTimeline carried by the QueryContext. Otherwise, on a MOR table
with
+ // 'read.streaming.skip_compaction = true', file slice boundaries would
be wrongly
+ // computed and log files could be missed, causing data loss.
+ List<FileSlice> allFileSlices = getFileSlices(metaClient,
getFullCommitsTimeline(metaClient),
+ readPartitions, fileInfoList,
analyzingResult.getMaxCompletionTime(), false);
fileSlices = fileIndex.filterFileSlices(allFileSlices);
} else {
if (cdcEnabled) {
@@ -217,7 +222,11 @@ public class IncrementalInputSplits implements
Serializable {
return Result.EMPTY;
}
fileInfoList = fileIndex.getFilesInPartitions();
- List<FileSlice> allFileSlices = getFileSlices(metaClient,
commitTimeline, readPartitions, fileInfoList,
analyzingResult.getMaxCompletionTime(), false);
+ // Same reason as the full-table-scan branch above: build the
FileSystemView with the
+ // complete commits-and-compaction timeline to avoid losing data when
'skip_compaction'
+ // is enabled.
+ List<FileSlice> allFileSlices = getFileSlices(metaClient,
getFullCommitsTimeline(metaClient),
+ readPartitions, fileInfoList,
analyzingResult.getMaxCompletionTime(), false);
fileSlices = fileIndex.filterFileSlices(allFileSlices);
} else {
fileSlices = getFileSlices(metaClient, commitTimeline, readPartitions,
files, analyzingResult.getMaxCompletionTime(), false);
@@ -295,7 +304,10 @@ public class IncrementalInputSplits implements
Serializable {
log.warn("No files found for reading under path: {}", path);
return Result.EMPTY;
}
- List<FileSlice> allFileSlices = getFileSlices(metaClient,
commitTimeline, readPartitions, pathInfoList, offsetToIssue, false);
+ // Same reason as the batch full-table-scan branch:
+ // see getFullCommitsTimeline() for why a compaction-filtered timeline
must not be used here.
+ List<FileSlice> allFileSlices = getFileSlices(metaClient,
getFullCommitsTimeline(metaClient),
+ readPartitions, pathInfoList, offsetToIssue, false);
List<FileSlice> fileSlices = fileIndex.filterFileSlices(allFileSlices);
List<MergeOnReadInputSplit> inputSplits = getInputSplits(fileSlices,
metaClient, endInstant, null);
@@ -391,6 +403,23 @@ public class IncrementalInputSplits implements
Serializable {
return getInputSplits(fileSlices, metaClient, endInstant, instantRange);
}
+ /**
+ * Returns the full commit timeline (including completed compaction
instants) for building
+ * a {@link HoodieTableFileSystemView} during full table scan.
+ *
+ * <p>NOTE: when streaming/batch read enables {@code skip_compaction}, the
{@code activeTimeline}
+ * carried by {@link IncrementalQueryAnalyzer.QueryContext} has already
filtered out the
+ * compaction instants. Using such a partial timeline to construct a {@link
HoodieTableFileSystemView}
+ * would mis-classify the file slice boundaries on a MOR table (since file
slice boundaries
+ * are derived from compaction instants), leading to data loss when reading
from the earliest
+ * or after start commit got archived. For full table scan we should always
rely on the
+ * complete commits-and-compaction timeline; the {@code skip_compaction}
semantics is preserved
+ * by the instant range filtering applied later on the generated input
splits.
+ */
+ private static HoodieTimeline getFullCommitsTimeline(HoodieTableMetaClient
metaClient) {
+ return
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
+ }
+
private List<FileSlice> getFileSlices(
HoodieTableMetaClient metaClient,
HoodieTimeline commitTimeline,
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 168c1b893beb..5591d0461b6f 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -28,16 +28,21 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.marker.MarkerType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.bucket.partition.PartitionBucketIndexUtils;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.sink.buffer.BufferMemoryType;
import org.apache.hudi.sink.buffer.BufferType;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.table.catalog.HoodieCatalogTestUtils;
import org.apache.hudi.table.catalog.HoodieHiveCatalog;
import org.apache.hudi.util.StreamerUtil;
@@ -89,6 +94,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -712,6 +719,280 @@ public class ITTestHoodieDataSource {
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
}
+ /**
+ * Regression test for HUDI: data loss in stream read from earliest when
+ * {@code read.streaming.skip_compaction = true} on a MOR table with
completed
+ * compaction commits. Covers the streaming earliest full table scan branch
in
+ * {@link
org.apache.hudi.source.IncrementalInputSplits#inputSplits(HoodieTableMetaClient,
String, boolean)}.
+ *
+ * <p>Triggering condition:
+ * <ul>
+ * <li>{@code read.start-commit = earliest} (no instant range -> full
table scan path);</li>
+ * <li>{@code read.streaming.skip_compaction = true} (active timeline
filtered out compaction);</li>
+ * <li>MOR table with at least one completed compaction commit that
produced a
+ * new base file from existing log files.</li>
+ * </ul>
+ *
+ * <p>Construction:
+ * <ol>
+ * <li>Offline write {@code DATA_SET_INSERT} (8 records, ids 1..8) and then
+ * {@code DATA_SET_UPDATE_INSERT} (8 records, where ids 1..5 update
existing keys
+ * and ids 9..11 are new) via {@link TestData#writeDataAsBatch}, which
deterministically
+ * triggers an inline compaction once {@code COMPACTION_DELTA_COMMITS
= 1} +
+ * {@code COMPACTION_ASYNC_ENABLED = true} are set. After this step
the table has
+ * both a base file (from compaction) and log files written by the
UPDATE batch.</li>
+ * <li>Streaming read from earliest with {@code skip_compaction = true}
and wait until
+ * the expected number of merged rows are received. Without the fix,
the FS view used
+ * in the earliest full-table-scan branch is built from a
compaction-filtered
+ * timeline, file slice boundaries are wrongly computed, log files are
missed
+ * and the read will never reach the expected row count (the test
would time out).</li>
+ * </ol>
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testStreamReadMorTableWithCompactionFromEarliest(boolean useSourceV2)
throws Exception {
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.TABLE_NAME, "t1");
+ conf.set(FlinkOptions.TABLE_TYPE, MERGE_ON_READ.name());
+ conf.set(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
+ conf.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 2);
+ // mandatory for writeDataAsBatch#inlineCompaction to actually run a
compaction
+ conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+ conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+
+ // Step 1: offline-write two batches with deterministic inline compaction
in between.
+ TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, conf);
+ TestData.writeDataAsBatch(TestData.DATA_SET_UPDATE_INSERT, conf);
+
+ // Step 2: streaming read from earliest with skip_compaction = true.
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
+ .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
+ .option(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name())
+ .option(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 2)
+ .option(FlinkOptions.READ_AS_STREAMING, true)
+ .option(FlinkOptions.READ_START_COMMIT,
FlinkOptions.START_COMMIT_EARLIEST)
+ .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2)
+ .option(FlinkOptions.READ_SOURCE_V2_ENABLED, useSourceV2)
+ // skip compaction instant -> active timeline drops compaction commit
+ .option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, true)
+ .end();
+ streamTableEnv.executeSql(hoodieTableDDL);
+
+ // After the UPDATE batch, the merged result must contain all up-to-date
records:
+ // - 5 updated records (id1..id5 from DATA_SET_UPDATE_INSERT)
+ // - 3 carried-over records (id6, id7, id8 from DATA_SET_INSERT, not
touched by UPDATE)
+ // - 3 newly inserted records (id9, id10, id11 from
DATA_SET_UPDATE_INSERT)
+ // i.e. 11 records in total. Without the fix the streaming read would
never reach
+ // expectedNum = 11 and the test would time out via the CollectSink.
+ final int expectedNum = 11;
+ List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select *
from t1", expectedNum);
+ assertEquals(expectedNum, rows.size(),
+ "Expect 11 up-to-date records to be visible after earliest streaming
read"
+ + " with skip_compaction on a MOR table that has a completed
compaction commit"
+ + ", actual rows: " + rows);
+ }
+
+ /**
+ * Regression test for HUDI: data loss in batch read from earliest when
+ * {@code read.streaming.skip_compaction = true} on a MOR table with
completed
+ * compaction commits. Covers the batch full-table-scan branch in
+ * {@link
org.apache.hudi.source.IncrementalInputSplits#inputSplits(HoodieTableMetaClient,
boolean)}.
+ *
+ * <p>This complements {@link
#testStreamReadMorTableWithCompactionFromEarliest(boolean)}
+ * which only exercises the streaming code path. Without the fix, building
the
+ * {@link org.apache.hudi.common.table.view.HoodieTableFileSystemView} with a
+ * compaction-filtered timeline would mis-classify file slice boundaries and
+ * lose log files.
+ */
+ @Test
+ void testBatchReadMorTableWithCompactionFromEarliest() throws Exception {
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.TABLE_NAME, "t1");
+ conf.set(FlinkOptions.TABLE_TYPE, MERGE_ON_READ.name());
+ conf.set(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
+ conf.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 2);
+ // mandatory for writeDataAsBatch#inlineCompaction to actually run a
compaction
+ conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+ conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+
+ // Offline-write two batches against overlapping record keys, the 2nd
write triggers
+ // an inline compaction that merges existing log files into a new base
file - exactly
+ // the scenario that exposes the buggy file-slice classification when
skip_compaction
+ // is enabled.
+ TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, conf);
+ TestData.writeDataAsBatch(TestData.DATA_SET_UPDATE_INSERT, conf);
+
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
+ .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
+ .option(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name())
+ .option(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 2)
+ .option(FlinkOptions.READ_START_COMMIT,
FlinkOptions.START_COMMIT_EARLIEST)
+ // skip compaction instant -> active timeline drops compaction commit
+ .option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, true)
+ .end();
+ batchTableEnv.executeSql(hoodieTableDDL);
+
+ List<Row> result = CollectionUtil.iteratorToList(
+ batchTableEnv.executeSql("select * from t1").collect());
+ // After update, the merged result must contain all up-to-date records:
+ // - 5 updated records (id1..id5 from DATA_SET_UPDATE_INSERT)
+ // - 3 carried-over records (id6, id7, id8 from DATA_SET_INSERT, not
touched by UPDATE)
+ // - 3 newly inserted records (id9, id10, id11 from
DATA_SET_UPDATE_INSERT)
+ // i.e. 11 records in total. Without the fix, log files belonging to the
file slice prior
+ // to the inline compaction would be silently dropped by the file system
view because
+ // the active timeline filtered out the compaction commit, and the result
size would be
+ // smaller than 11.
+ assertEquals(11, result.size(),
+ "Expect all up-to-date records to be visible after earliest +
skip_compaction batch read"
+ + ", actual rows: " + result);
+ }
+
+ /**
+ * Regression test for HUDI: data loss when the start commit has been
archived
+ * and {@code read.streaming.skip_compaction = true} on a MOR table.
+ * Covers the batch "fallback to full table scan" branch in
+ * {@link
org.apache.hudi.source.IncrementalInputSplits#inputSplits(HoodieTableMetaClient,
boolean)}
+ * which is reached when {@code hasArchivedInstants == true}.
+ *
+ * <p>Construction:
+ * <ol>
+ * <li>Write 10 delta-commit batches of {@code (id1,id2), (id3,id4), ...}
on a MOR table
+ * so that each batch only inserts new keys (clear, predictable
per-commit semantics).</li>
+ * <li>Trigger one completed compaction commit by issuing an extra UPDATE
batch on
+ * {@code id1..id4} with {@code COMPACTION_DELTA_COMMITS = 1} via
+ * {@link TestData#writeDataAsBatch} (which explicitly calls {@code
inlineCompaction()}).
+ * This creates exactly the file-slice boundary that the buggy FS view
would
+ * mis-classify.</li>
+ * <li>Pick the LAST archived delta-commit as {@code read.start-commit}
(filtered by
+ * {@code action = deltacommit} to exclude any archived compaction
{@code commit}
+ * instants). This is deterministic regardless of how many delta
commits were
+ * archived by the cleaner+archiver and routes the reader through the
+ * "archived start commit -> fullTableScan" branch.</li>
+ * <li>Read with {@code skip_compaction = true} and assert on the SET of
record-keys
+ * in the result (not just on count). The expected key set is derived
dynamically
+ * from the timeline: every delta_commit whose completion time is
>= the chosen
+ * start_commit contributes its written ids, plus id1..id4 from the
UPDATE batch
+ * are always present because the UPDATE is the latest write. Without
the fix,
+ * log files of the file slice that straddles the compaction commit
are silently
+ * dropped, so some of these ids would be missing.</li>
+ * </ol>
+ */
+ @Test
+ void testBatchReadMorTableWithCompactionStartCommitArchived() throws
Exception {
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.TABLE_NAME, "t1");
+ conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+ conf.set(FlinkOptions.ORDERING_FIELDS, "ts");
+ conf.set(FlinkOptions.TABLE_TYPE, MERGE_ON_READ.name());
+ conf.set(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
+ conf.set(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 2);
+ // aggressive archival to force older instants out of the active timeline
+ conf.set(FlinkOptions.ARCHIVE_MIN_COMMITS, 4);
+ conf.set(FlinkOptions.ARCHIVE_MAX_COMMITS, 5);
+ conf.set(FlinkOptions.CLEAN_RETAIN_COMMITS, 3);
+ conf.setString("hoodie.commits.archival.batch", "1");
+
+ // Step 1: write 10 batches of 2 new records each -> 10 delta_commit
instants, 20 distinct keys.
+ for (int i = 0; i < 20; i += 2) {
+ List<RowData> dataset = TestData.dataSetInsert(i + 1, i + 2);
+ TestData.writeData(dataset, conf);
+ }
+
+ // Step 2: trigger at least one completed compaction commit by issuing one
more delta_commit
+ // that UPDATES the very first record keys (id1..id4) and enabling
COMPACTION_DELTA_COMMITS=1.
+ // The update writes new log files for the file group that contains
id1..id4, and the inline
+ // compaction merges them into a new base file -> a real compaction
file-slice boundary.
+ // NOTE: use writeDataAsBatch (which explicitly calls inlineCompaction()),
since the plain
+ // writeData helper does not run the compaction even with
COMPACTION_DELTA_COMMITS=1.
+ conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+ conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+ TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2, 3, 4), conf);
+
+ // Step 3: list the full timeline in one shot to map start_commit ->
expected id set.
+ // Delta_commit instants are strictly monotonically increasing, so the
sorted list of all
+ // delta_commits across active + archived timelines gives a 1:1 mapping to
the 10 batches
+ // written in Step 1: the k-th delta_commit wrote id_{2k+1} and id_{2k+2}.
+ HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(
+ new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(new
Configuration())),
+ tempFile.getAbsolutePath());
+ // Use the merged (archived + active) timeline to capture all
delta_commits,
+ // even those that may have been archived by the aggressive archival
settings.
+ List<String> batchInstantTimes = TimelineUtils.getTimeline(metaClient,
true)
+ .getCommitsTimeline().filterCompletedInstants()
+ .filter(instant ->
HoodieTimeline.DELTA_COMMIT_ACTION.equals(instant.getAction()))
+
.getInstantsAsStream().map(HoodieInstant::requestedTime).collect(Collectors.toList());
+ // Step 1 produced exactly 10 delta_commits; the 11th (if present) is from
Step 2 UPDATE.
+ // Keep only the first 10 to build the batch-index -> id mapping.
+ assertTrue(batchInstantTimes.size() >= 10,
+ "Expected at least 10 delta_commits from Step 1, got " +
batchInstantTimes.size());
+ batchInstantTimes = batchInstantTimes.subList(0, 10);
+
+ // Step 4: pick the LAST archived delta_commit that belongs to Step 1's
batches as
+ // start commit. This avoids any drift caused by archival ordering or by
compaction
+ // `commit` instants being interleaved with delta_commits in the archived
timeline,
+ // and also ignores the Step 2 UPDATE batch in case it also got archived.
+ Set<String> step1InstantTimeSet = new TreeSet<>(batchInstantTimes);
+ List<HoodieInstant> archivedDeltaCommits =
metaClient.getArchivedTimeline().getCommitsTimeline()
+ .filterCompletedInstants()
+ .filter(instant ->
HoodieTimeline.DELTA_COMMIT_ACTION.equals(instant.getAction()))
+ .filter(instant ->
step1InstantTimeSet.contains(instant.requestedTime()))
+ .getInstants();
+ // make sure archival actually happened on Step 1's batches, otherwise the
test premise
+ // (the reader hits the archived start commit + fullTableScan branch) does
not hold.
+ assertTrue(!archivedDeltaCommits.isEmpty(),
+ "archival did not happen as expected on Step 1's batches, archived
delta commits = "
+ + archivedDeltaCommits + ", Step 1 batch instant times = " +
batchInstantTimes);
+ HoodieInstant startInstant =
archivedDeltaCommits.get(archivedDeltaCommits.size() - 1);
+ String archivedStartInstant = startInstant.requestedTime();
+
+ // The expected key set: every Step 1 batch whose instant time is >=
start_commit contributes
+ // its 2 ids; plus id1..id4 from the Step 2 UPDATE batch (always the
latest write, never
+ // excluded since its completion time is the largest).
+ int firstIncludedBatchIdx =
batchInstantTimes.indexOf(archivedStartInstant);
+ assertTrue(firstIncludedBatchIdx >= 0,
+ "chosen start_commit " + archivedStartInstant + " is not one of the
Step 1 batch instant times " + batchInstantTimes);
+ Set<String> expectedIds = new TreeSet<>();
+ for (int i = firstIncludedBatchIdx; i < batchInstantTimes.size(); i++) {
+ expectedIds.add("id" + (2 * i + 1));
+ expectedIds.add("id" + (2 * i + 2));
+ }
+ // UPDATE batch ids — always present in the merged view because the UPDATE
is the latest write.
+ expectedIds.add("id1");
+ expectedIds.add("id2");
+ expectedIds.add("id3");
+ expectedIds.add("id4");
+
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
+ .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
+ .option(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name())
+ .option(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 2)
+ .option(FlinkOptions.READ_START_COMMIT, archivedStartInstant)
+ .option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, true)
+ .end();
+ batchTableEnv.executeSql(hoodieTableDDL);
+
+ List<Row> result = CollectionUtil.iteratorToList(
+ batchTableEnv.executeSql("select uuid from t1").collect());
+ Set<String> actualIds = new TreeSet<>();
+ for (Row r : result) {
+ actualIds.add(r.getField(0).toString());
+ }
+ // Without the fix, the FS view used to construct file slices for the
fallback full-table-scan
+ // branch is built from a compaction-filtered timeline, so log files of
the file slice that
+ // straddles the compaction commit are silently dropped and some ids would
be missing from
+ // {@code actualIds}. With the fix, every expected id must be present.
+ assertEquals(expectedIds, actualIds,
+ "Expected id set " + expectedIds + " but got " + actualIds
+ + " when reading from archived start commit " +
archivedStartInstant
+ + " with skip_compaction = true on a MOR table that has a
completed compaction commit");
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testStreamReadMorTableWithBucketIndex(boolean partitioned) throws
Exception {