fhan688 opened a new pull request, #18848: URL: https://github.com/apache/hudi/pull/18848
### Describe the issue this Pull Request addresses Closes https://github.com/apache/hudi/issues/18847 Flink streaming/batch read against a MOR table loses data when the reader hits the **full-table-scan** code path while `read.streaming.skip_compaction = true`. Triggering condition (any of the following enters the full-table-scan branch): - `read.start-commit = earliest`, OR - the configured `read.start-commit` has already been archived, OR - the reader falls back to full table scan because `WriteProfiles.getFilesFromMetadata` returned `null` (deleted files in commit metadata). Combined with `read.streaming.skip_compaction = true` on a MOR table that has at least one completed compaction commit, the reader silently drops log files and returns fewer rows than expected. ### Summary and Changelog Use the full commits-and-compaction timeline (`metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants()`) when constructing the `HoodieTableFileSystemView` in every full-table-scan branch of `IncrementalInputSplits`. The `skip_compaction` semantics is preserved by the existing instant-range filter applied to the generated input splits afterwards. Production code changes (1 file, +34 / -3) — `hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java`: - Introduce a small helper `getFullCommitsTimeline(HoodieTableMetaClient)` with Javadoc explaining why a full timeline is required for full-table-scan FS views. - Streaming `inputSplits(metaClient, issuedOffset, cdcEnabled)` — earliest branch (`instantRange.isEmpty()`): switch to the full timeline. - Batch `inputSplits(metaClient, cdcEnabled)` — `fullTableScan` branch (`startFromEarliest || hasArchivedInstants`): switch to the full timeline. - Batch `inputSplits(metaClient, cdcEnabled)` — "fallback to full table scan" branch (deleted files in commit metadata): switch to the full timeline. - Other call sites (`getIncInputSplits` / `WriteProfiles.getCommitMetadata` paths) intentionally keep using the filtered `activeTimeline`, since they rely on `skip_compaction` filtering for correct incremental semantics. ### Impact - **User-facing behavior**: Flink readers on MOR tables with `read.streaming.skip_compaction = true` will no longer drop log files when the reader enters a full-table-scan path (earliest start commit, archived start commit, or fallback after deleted files in commit metadata). No new configs, no schema or storage format changes. - **Public API**: none. - **Performance**: negligible — the helper resolves to the same timeline that other full-table-scan paths already use; no additional file/IO work. ### Risk Level low The fix is local to three call sites in a single Flink reader class and is structurally equivalent: it broadens the timeline used for FS-view construction in branches that do **not** depend on `skip_compaction` for correctness. Untouched call sites (`getIncInputSplits`, `WriteProfiles.getCommitMetadata`) continue to use the filtered timeline, so streaming/incremental `skip_compaction` semantics is unchanged. Three regression tests are added covering each patched branch. ### Documentation Update none No new configs are introduced and no existing config behavior changes from the user's perspective; `read.streaming.skip_compaction` continues to mean "skip compaction commits in the consumed instant stream". ### Contributor's checklist - [ ] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute) - [ ] Enough context is provided in the sections above - [ ] Adequate tests were added if applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
