sauliusvl opened a new pull request, #16580:
URL: https://github.com/apache/iceberg/pull/16580

   # Flink: Lazy initial bulk scan for `TABLE_SCAN_THEN_INCREMENTAL`
   
   Closes (or references) #14463.
   
   ## Problem & proposal
   
   `ContinuousSplitPlannerImpl` materialises the entire initial scan into a 
single
   `List<IcebergSourceSplit>` and hands it to the enumerator, which then 
serialises everything into
   one `byte[]` on its first checkpoint. The combined-task JSON is dominated by 
the duplicated schema
   (~74 % of the bytes per split on a 19-field schema; deduplicating that in
   `IcebergSourceSplitSerializer` would be a worthwhile follow-up PR 
independent of this one), and
   the resulting checkpoint blob hits Java's 2 GB array cap somewhere around 
800k splits at typical
   Iceberg schema widths. The threshold scales inversely with per-split JSON 
size, so narrower or
   wider schemas shift it, but the failure mode is the same: first-checkpoint 
crash on tables above
   the threshold. Reproduced against a ~1.87M-file production table.
   
   This PR adds an opt-in `LazyContinuousSplitPlanner` that pages through the 
initial scan one
   bounded batch at a time. The enumerator checkpoint stays bounded regardless 
of table size
   (measured ~108 MB on the 1.87M-file table, with `pageSize=10000`).
   
   This PR only touches `flink/v2.1/`. I can either add `flink/v2.0/` and 
`flink/v1.20/` backports
   to the same PR or follow up with separate ones — whichever reviewers prefer.
   
   ## How it works
   
   Three phases of enumerator state:
   
   ```
   Phase 1 (bulk in progress)   position = empty() sentinel    cursor = 
(snapshot, count, hash)
   Phase 2 (bulk just finished) position = real snapshot id    cursor = null 
(cleared atomically)
   Phase 3 (steady state)       position advances on commits   cursor = null 
forever
   ```
   
   The cursor (`LazyBulkScanCursor`: `bulkSnapshotId`, 
`combinedTasksEnumerated`, `rollingHash`)
   travels in `ContinuousEnumerationResult` and is committed by the enumerator 
atomically with the
   assigner update — so a checkpoint racing with `planSplits` cannot capture a 
cursor ahead of the
   assigner state.
   
   On recovery, the planner reopens 
`table.useSnapshot(cursor.bulkSnapshotId()).planTasks()`, skips
   `combinedTasksEnumerated` entries while recomputing the rolling hash, and 
asserts it matches the
   committed value. Mismatch → loud abort, not silent re-scan.
   
   ## Determinism story (please read this part carefully)
   
   Count-based cursor recovery only works when `planTasks()` produces the same 
`CombinedScanTask`
   sequence — same combined tasks in the same order, each combined task 
containing the same files
   with the same `start`/`length` and the same attached delete files — across 
job incarnations.
   **This is not a contract of the Iceberg API.** It is an implementation-level 
property of the
   current Iceberg core (manifest-list order, deterministic bin-packing) that 
this PR relies on.
   
   Two layers defend it:
   
   1. The bulk-scan worker pool is hard-pinned to a single thread regardless of
      `ScanContext#planParallelism()`, so parallel-manifest-read 
non-determinism cannot leak in.
   2. The cursor carries a 64-bit rolling FNV-1a hash that folds in, for each 
emitted file in
      iteration order: the file's location (UTF-8 bytes), its `start` offset, 
its `length`, and the
      location of every attached delete file (also in iteration order). On 
recovery the planner
      re-iterates the first N `CombinedScanTask`s and recomputes the hash; any 
drift in
      combined-task order, file-within-task order, split offsets, delete-file 
attachment, or
      scan-context options like `splitSize` between incarnations changes the 
hash and triggers a
      loud abort. The hash is non-cryptographic and is purely a drift detector 
— **not** a
      tampering defence; a forged cursor with a matching synthetic hash is not 
defended against
      (and isn't a realistic threat in any checkpoint-restoration model I care 
about here).
   
   The cost of a hash mismatch is a full re-scan of the bulk snapshot (~30 min 
for a ~1.87M-file
   table in the empirical run). A manifest-position cursor walking the manifest 
list directly would
   turn this into a contract-level guarantee instead of an implementation-level 
one — happy to take
   that on as a follow-up; it conflicts non-trivially with split combining and 
would expand this
   diff substantially.
   
   ## Reactive paging + single-flight latch
   
   Without reactive triggering, lazy mode paces at `monitorInterval` — a 
1.87M-file table with
   `pageSize=10000` and 10s interval would take 30 min just to drain bulk while 
readers idle. The
   enumerator listens for `SplitRequestEvent`, and when 
`assigner.pendingSplitCount()` drops below
   `pageSize/2` it schedules an extra `planSplits` call without waiting for the 
next tick. After
   bulk completes the watermark returns to `0` (purely periodic) so the 
incremental delegate isn't
   spammed.
   
   To prevent N parallel readers all dispatching N callAsync tasks on the same 
burst of
   `SplitRequestEvent`s, `handleSourceEvent` pre-checks the single-flight latch 
and skips the
   dispatch when one is already in flight. The CAS inside `discoverSplits` is 
still the actual
   gatekeeper — the pre-check just suppresses the common case.
   
   An `AtomicBoolean` single-flight latch + an identity sentinel 
`LATCH_HELD_NOOP` ensures only one
   `planSplits` callable + handler pair is ever in flight, even when the 
periodic timer and reactive
   trigger fire close together.
   
   ## Compatibility
   
   - Opt-in via builder: `.lazyInitialBulkScanPageSize(N)`. Default `0` keeps 
existing eager behaviour.
   - Connector config: `connector.iceberg.lazy-initial-bulk-scan-page-size=N`.
   - Only affects streaming + `TABLE_SCAN_THEN_INCREMENTAL`. Batch and other 
starting strategies pass
     through unchanged.
   - Enumerator state serializer bumped from v2 → v3 with the cursor appended. 
v1/v2 checkpoints
     deserialize cleanly with `lazyBulkScanCursor=null`.
   - `IcebergSource.createEnumerator` validates the (page-size config, restored 
state) pair up front
     with two recovery-footgun checks and one general config check:
     - **Mid-bulk state + lazy disabled** (cursor present, 
`lazyInitialBulkScanPageSize == 0`) —
       would drop the cursor and overflow the 2 GB checkpoint on the next eager 
planning call.
     - **Mid-bulk state + different starting strategy** — would silently drop 
the cursor when the
       lazy planner short-circuits to the incremental delegate for 
non-`TABLE_SCAN_THEN_INCREMENTAL`
       strategies.
     - **Invalid page size** (negative) — general config check that runs 
whether or not state is
       being restored; zero is valid (means "eager").
   
     Note: `isInLazyBulkPhase()` is deliberately narrow — it returns true only 
when a cursor is
     present, not when `lastEnumeratedPosition` is the empty sentinel. The 
latter is also produced
     by the eager planner on empty-initial tables, and rejecting it would 
regress pre-existing
     eager → eager restart on empty-initial checkpoints. The 2 GB-overflow risk 
for the
     empty-table-then-grew case is pre-existing eager behaviour that this PR 
doesn't make worse.
   
   ## Trade-offs / known limitations
   
   - **Determinism is implementation-level, not contract-level.** See the 
"Determinism story"
     section above. The rolling hash catches drift; the cost is a full re-scan.
   - **Two table-loader clones per planner.** The lazy planner constructs an 
internal
     `ContinuousSplitPlannerImpl` for the incremental tail, which clones its 
own table loader. Two
     clones per planner; refactor would expand the diff substantially.
   - **Single-thread bulk planning.** Slower than parallel (60s vs 10s on 1.87M 
files in my testing),
     but a tractable cost for cursor correctness.
   
   ## Tests
   
   All passing. Highlights:
   
   - `TestLazyContinuousSplitPlanner` covers the planner in isolation, cursor 
recovery, hash
     mismatch / matching, snapshot expiration, reopen-after-transient-failure, 
iteration-order
     determinism lock-in, the `IcebergSource.restoreEnumerator` rejection 
paths, and an end-to-end
     real-planner + real-enumerator + `HadoopTableExtension` integration test 
that verifies
     `assignedSplits + pendingSplits == fileCount` across the bulk → 
incremental transition.
   - `TestLazyEnumerator` covers the enumerator + cursor commit semantics with 
a fake planner,
     including the CAS-relaxation regression test and the 
eager-result-doesn't-touch-cursor
     invariant.
   - `TestContinuousIcebergEnumeratorLatch` uses real `ExecutorService` threads 
to verify the
     single-flight latch under contention, including the exception path that 
protects against the
     one race I came across mid-implementation.
   - `TestIcebergEnumeratorStateSerializer` covers v1/v2/v3 round-trips, the 
cursor round-trip,
     and v2-as-v3 forward compat.
   
   ---
   
   *AI tooling disclosure: parts of this PR (some implementation, tests, and 
this PR body) were
   drafted with AI assistance. All code has been reviewed end-to-end and 
validated with the test
   suite above plus live empirical runs against the ~1.87M-file production 
table.*
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to