tub opened a new pull request, #7424:
URL: https://github.com/apache/paimon/pull/7424

   ## What does this PR do?
   
   Adds continuous streaming read support to `paimon-python` via two new 
classes:
   
   - **`AsyncStreamingTableScan`** — async generator that continuously polls 
for new snapshots and yields `Plan` objects as new data arrives. Python 
equivalent of Java's `DataTableStreamScan`.
   - **`StreamReadBuilder`** — fluent builder (analogous to `ReadBuilder`) for 
configuring and constructing a streaming scan.
   - **`Table.new_stream_read_builder()`** — entry point on the table interface.
   
   ## Key features
   
   - **Initial scan**: on first call, performs a full scan of the latest 
snapshot and yields a `Plan`
   - **Delta / changelog follow-up**: subsequent iterations yield only new data 
per snapshot, using `DeltaFollowUpScanner` (changelog-producer=none) or 
`ChangelogFollowUpScanner` (input/lookup/full-compaction)
   - **Diff catch-up**: when starting from a historical snapshot with a large 
gap to the latest, uses \`IncrementalDiffScanner\` to collapse the catch-up 
into a single efficient plan instead of replaying every intermediate snapshot
   - **Batch lookahead**: \`SnapshotManager.find_next_scannable()\` fetches a 
batch of snapshot IDs in parallel to skip non-scannable snapshots (e.g. 
compaction-only) with minimal S3 round-trips
   - **Prefetching**: optional background thread pre-fetches the next scannable 
snapshot while the caller processes the current plan
   - **Bucket/shard filtering**: \`with_buckets([0,1])\` / 
\`with_bucket_filter(fn)\` for parallel consumer sharding
   - **Synchronous wrapper**: \`stream_sync()\` for non-async usage
   
   ## Builder API
   
   \`\`\`python
   scan = (
       table.new_stream_read_builder()
       .with_filter(predicate)
       .with_projection(["col1", "col2"])
       .with_poll_interval_ms(500)
       .new_streaming_scan()
   )
   reader = table.new_stream_read_builder().new_read()
   
   async for plan in scan.stream():
       arrow_table = reader.to_arrow(plan.splits())
       process(arrow_table)
   \`\`\`
   
   ## Part of streaming read PR stack
   
   This is PR 3 of the streaming read stack. Prerequisites already merged:
   - #7347 LRU caching, \`ChangelogProducer\` enum
   - #7348 \`FollowUpScanner\` hierarchy, \`IncrementalDiffScanner\`
   - #7394 \`TableRead\` row kind support
   - #7349 Consumer registration
   - #7415 \`ConsumerManager\` API
   - #7417 \`ChangelogProducer\` config option
   - #7418 \`SnapshotManager\` batch lookahead
   
   Tracking issue: #7152
   
   ## Test plan
   
   - \`pypaimon/tests/streaming_table_scan_test.py\` — 12 unit tests covering 
initial scan, delta/changelog follow-up, diff catch-up, prefetch, bucket 
filtering, \`stream_sync()\`
   - \`pypaimon/tests/stream_read_builder_test.py\` — 8 unit tests covering 
builder configuration and scan/read construction
   
   \`\`\`bash
   python -m pytest pypaimon/tests/streaming_table_scan_test.py 
pypaimon/tests/stream_read_builder_test.py -v
   \`\`\`
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to