sauliusvl opened a new pull request, #16580:
URL: https://github.com/apache/iceberg/pull/16580
# Flink: Lazy initial bulk scan for `TABLE_SCAN_THEN_INCREMENTAL`
Closes (or references) #14463.
## Problem & proposal
`ContinuousSplitPlannerImpl` materialises the entire initial scan into a
single
`List<IcebergSourceSplit>` and hands it to the enumerator, which then
serialises everything into
one `byte[]` on its first checkpoint. The combined-task JSON is dominated by
the duplicated schema
(~74 % of the bytes per split on a 19-field schema; deduplicating that in
`IcebergSourceSplitSerializer` would be a worthwhile follow-up PR
independent of this one), and
the resulting checkpoint blob hits Java's 2 GB array cap somewhere around
800k splits at typical
Iceberg schema widths. The threshold scales inversely with per-split JSON
size, so narrower or
wider schemas shift it, but the failure mode is the same: first-checkpoint
crash on tables above
the threshold. Reproduced against a ~1.87M-file production table.
This PR adds an opt-in `LazyContinuousSplitPlanner` that pages through the
initial scan one
bounded batch at a time. The enumerator checkpoint stays bounded regardless
of table size
(measured ~108 MB on the 1.87M-file table, with `pageSize=10000`).
This PR only touches `flink/v2.1/`. I can either add `flink/v2.0/` and
`flink/v1.20/` backports
to the same PR or follow up with separate ones — whichever reviewers prefer.
## How it works
Three phases of enumerator state:
```
Phase 1 (bulk in progress) position = empty() sentinel cursor =
(snapshot, count, hash)
Phase 2 (bulk just finished) position = real snapshot id cursor = null
(cleared atomically)
Phase 3 (steady state) position advances on commits cursor = null
forever
```
The cursor (`LazyBulkScanCursor`: `bulkSnapshotId`,
`combinedTasksEnumerated`, `rollingHash`)
travels in `ContinuousEnumerationResult` and is committed by the enumerator
atomically with the
assigner update — so a checkpoint racing with `planSplits` cannot capture a
cursor ahead of the
assigner state.
On recovery, the planner reopens
`table.useSnapshot(cursor.bulkSnapshotId()).planTasks()`, skips
`combinedTasksEnumerated` entries while recomputing the rolling hash, and
asserts it matches the
committed value. Mismatch → loud abort, not silent re-scan.
## Determinism story (please read this part carefully)
Count-based cursor recovery only works when `planTasks()` produces the same
`CombinedScanTask`
sequence — same combined tasks in the same order, each combined task
containing the same files
with the same `start`/`length` and the same attached delete files — across
job incarnations.
**This is not a contract of the Iceberg API.** It is an implementation-level
property of the
current Iceberg core (manifest-list order, deterministic bin-packing) that
this PR relies on.
Two layers defend it:
1. The bulk-scan worker pool is hard-pinned to a single thread regardless of
`ScanContext#planParallelism()`, so parallel-manifest-read
non-determinism cannot leak in.
2. The cursor carries a 64-bit rolling FNV-1a hash that folds in, for each
emitted file in
iteration order: the file's location (UTF-8 bytes), its `start` offset,
its `length`, and the
location of every attached delete file (also in iteration order). On
recovery the planner
re-iterates the first N `CombinedScanTask`s and recomputes the hash; any
drift in
combined-task order, file-within-task order, split offsets, delete-file
attachment, or
scan-context options like `splitSize` between incarnations changes the
hash and triggers a
loud abort. The hash is non-cryptographic and is purely a drift detector
— **not** a
tampering defence; a forged cursor with a matching synthetic hash is not
defended against
(and isn't a realistic threat in any checkpoint-restoration model I care
about here).
The cost of a hash mismatch is a full re-scan of the bulk snapshot (~30 min
for a ~1.87M-file
table in the empirical run). A manifest-position cursor walking the manifest
list directly would
turn this into a contract-level guarantee instead of an implementation-level
one — happy to take
that on as a follow-up; it conflicts non-trivially with split combining and
would expand this
diff substantially.
## Reactive paging + single-flight latch
Without reactive triggering, lazy mode paces at `monitorInterval` — a
1.87M-file table with
`pageSize=10000` and 10s interval would take 30 min just to drain bulk while
readers idle. The
enumerator listens for `SplitRequestEvent`, and when
`assigner.pendingSplitCount()` drops below
`pageSize/2` it schedules an extra `planSplits` call without waiting for the
next tick. After
bulk completes the watermark returns to `0` (purely periodic) so the
incremental delegate isn't
spammed.
To prevent N parallel readers all dispatching N callAsync tasks on the same
burst of
`SplitRequestEvent`s, `handleSourceEvent` pre-checks the single-flight latch
and skips the
dispatch when one is already in flight. The CAS inside `discoverSplits` is
still the actual
gatekeeper — the pre-check just suppresses the common case.
An `AtomicBoolean` single-flight latch + an identity sentinel
`LATCH_HELD_NOOP` ensures only one
`planSplits` callable + handler pair is ever in flight, even when the
periodic timer and reactive
trigger fire close together.
## Compatibility
- Opt-in via builder: `.lazyInitialBulkScanPageSize(N)`. Default `0` keeps
existing eager behaviour.
- Connector config: `connector.iceberg.lazy-initial-bulk-scan-page-size=N`.
- Only affects streaming + `TABLE_SCAN_THEN_INCREMENTAL`. Batch and other
starting strategies pass
through unchanged.
- Enumerator state serializer bumped from v2 → v3 with the cursor appended.
v1/v2 checkpoints
deserialize cleanly with `lazyBulkScanCursor=null`.
- `IcebergSource.createEnumerator` validates the (page-size config, restored
state) pair up front
with two recovery-footgun checks and one general config check:
- **Mid-bulk state + lazy disabled** (cursor present,
`lazyInitialBulkScanPageSize == 0`) —
would drop the cursor and overflow the 2 GB checkpoint on the next eager
planning call.
- **Mid-bulk state + different starting strategy** — would silently drop
the cursor when the
lazy planner short-circuits to the incremental delegate for
non-`TABLE_SCAN_THEN_INCREMENTAL`
strategies.
- **Invalid page size** (negative) — general config check that runs
whether or not state is
being restored; zero is valid (means "eager").
Note: `isInLazyBulkPhase()` is deliberately narrow — it returns true only
when a cursor is
present, not when `lastEnumeratedPosition` is the empty sentinel. The
latter is also produced
by the eager planner on empty-initial tables, and rejecting it would
regress pre-existing
eager → eager restart on empty-initial checkpoints. The 2 GB-overflow risk
for the
empty-table-then-grew case is pre-existing eager behaviour that this PR
doesn't make worse.
## Trade-offs / known limitations
- **Determinism is implementation-level, not contract-level.** See the
"Determinism story"
section above. The rolling hash catches drift; the cost is a full re-scan.
- **Two table-loader clones per planner.** The lazy planner constructs an
internal
`ContinuousSplitPlannerImpl` for the incremental tail, which clones its
own table loader. Two
clones per planner; refactor would expand the diff substantially.
- **Single-thread bulk planning.** Slower than parallel (60s vs 10s on 1.87M
files in my testing),
but a tractable cost for cursor correctness.
## Tests
All passing. Highlights:
- `TestLazyContinuousSplitPlanner` covers the planner in isolation, cursor
recovery, hash
mismatch / matching, snapshot expiration, reopen-after-transient-failure,
iteration-order
determinism lock-in, the `IcebergSource.restoreEnumerator` rejection
paths, and an end-to-end
real-planner + real-enumerator + `HadoopTableExtension` integration test
that verifies
`assignedSplits + pendingSplits == fileCount` across the bulk →
incremental transition.
- `TestLazyEnumerator` covers the enumerator + cursor commit semantics with
a fake planner,
including the CAS-relaxation regression test and the
eager-result-doesn't-touch-cursor
invariant.
- `TestContinuousIcebergEnumeratorLatch` uses real `ExecutorService` threads
to verify the
single-flight latch under contention, including the exception path that
protects against the
one race I came across mid-implementation.
- `TestIcebergEnumeratorStateSerializer` covers v1/v2/v3 round-trips, the
cursor round-trip,
and v2-as-v3 forward compat.
---
*AI tooling disclosure: parts of this PR (some implementation, tests, and
this PR body) were
drafted with AI assistance. All code has been reviewed end-to-end and
validated with the test
suite above plus live empirical runs against the ~1.87M-file production
table.*
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]