tub opened a new pull request, #7424:
URL: https://github.com/apache/paimon/pull/7424
## What does this PR do?
Adds continuous streaming read support to `paimon-python` via two new
classes:
- **`AsyncStreamingTableScan`** — async generator that continuously polls
for new snapshots and yields `Plan` objects as new data arrives. Python
equivalent of Java's `DataTableStreamScan`.
- **`StreamReadBuilder`** — fluent builder (analogous to `ReadBuilder`) for
configuring and constructing a streaming scan.
- **`Table.new_stream_read_builder()`** — entry point on the table interface.
## Key features
- **Initial scan**: on first call, performs a full scan of the latest
snapshot and yields a `Plan`
- **Delta / changelog follow-up**: subsequent iterations yield only new data
per snapshot, using `DeltaFollowUpScanner` (changelog-producer=none) or
`ChangelogFollowUpScanner` (input/lookup/full-compaction)
- **Diff catch-up**: when starting from a historical snapshot with a large
gap to the latest, uses \`IncrementalDiffScanner\` to collapse the catch-up
into a single efficient plan instead of replaying every intermediate snapshot
- **Batch lookahead**: \`SnapshotManager.find_next_scannable()\` fetches a
batch of snapshot IDs in parallel to skip non-scannable snapshots (e.g.
compaction-only) with minimal S3 round-trips
- **Prefetching**: optional background thread pre-fetches the next scannable
snapshot while the caller processes the current plan
- **Bucket/shard filtering**: \`with_buckets([0,1])\` /
\`with_bucket_filter(fn)\` for parallel consumer sharding
- **Synchronous wrapper**: \`stream_sync()\` for non-async usage
## Builder API
\`\`\`python
scan = (
table.new_stream_read_builder()
.with_filter(predicate)
.with_projection(["col1", "col2"])
.with_poll_interval_ms(500)
.new_streaming_scan()
)
reader = table.new_stream_read_builder().new_read()
async for plan in scan.stream():
arrow_table = reader.to_arrow(plan.splits())
process(arrow_table)
\`\`\`
## Part of streaming read PR stack
This is PR 3 of the streaming read stack. Prerequisites already merged:
- #7347 LRU caching, \`ChangelogProducer\` enum
- #7348 \`FollowUpScanner\` hierarchy, \`IncrementalDiffScanner\`
- #7394 \`TableRead\` row kind support
- #7349 Consumer registration
- #7415 \`ConsumerManager\` API
- #7417 \`ChangelogProducer\` config option
- #7418 \`SnapshotManager\` batch lookahead
Tracking issue: #7152
## Test plan
- \`pypaimon/tests/streaming_table_scan_test.py\` — 12 unit tests covering
initial scan, delta/changelog follow-up, diff catch-up, prefetch, bucket
filtering, \`stream_sync()\`
- \`pypaimon/tests/stream_read_builder_test.py\` — 8 unit tests covering
builder configuration and scan/read construction
\`\`\`bash
python -m pytest pypaimon/tests/streaming_table_scan_test.py
pypaimon/tests/stream_read_builder_test.py -v
\`\`\`
🤖 Generated with [Claude Code](https://claude.com/claude-code)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]