This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch allow-batch-poll in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
commit 5b63be186fa7256e84c456420c44240e316a0d9c Author: luoyuxia <[email protected]> AuthorDate: Sat Jan 17 13:55:06 2026 +0800 allow scan batch --- crates/fluss/src/client/table/scanner.rs | 29 ++++++++++++++++++++--------- crates/fluss/src/record/mod.rs | 26 ++++++++++++++++++++++++++ crates/fluss/tests/integration/table.rs | 12 ++++++------ 3 files changed, 52 insertions(+), 15 deletions(-) diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 7d22324..6989719 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -37,7 +37,7 @@ use crate::client::table::remote_log::{ use crate::error::{Error, Result, RpcError}; use crate::metadata::{TableBucket, TableInfo, TablePath}; use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTable}; -use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords, to_arrow_schema}; +use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords, to_arrow_schema, ScanRecordBatches}; use crate::rpc::{RpcClient, message}; use crate::util::FairBucketStatusMap; @@ -339,6 +339,13 @@ impl LogScannerInner { Ok(()) } + async fn unsubscribe(&self, bucket: i32) -> Result<()> { + let table_bucket = TableBucket::new(self.table_id, bucket); + self.log_scanner_status + .unassign_scan_buckets( &[table_bucket]); + Ok(()) + } + async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> { self.metadata .check_and_update_table_metadata(from_ref(&self.table_path)) @@ -374,7 +381,7 @@ impl LogScannerInner { self.log_fetcher.collect_fetches() } - async fn poll_batches(&self, timeout: Duration) -> Result<Vec<RecordBatch>> { + async fn poll_batches(&self, timeout: Duration) -> Result<ScanRecordBatches> { let start = std::time::Instant::now(); let deadline = start + timeout; @@ -388,7 +395,7 @@ impl LogScannerInner { let now = std::time::Instant::now(); if now >= deadline { - return Ok(Vec::new()); + return Ok(ScanRecordBatches::new()); } let remaining = deadline - now; @@ -399,12 +406,12 @@ impl LogScannerInner { .await; if !has_data { - return Ok(Vec::new()); + return Ok(ScanRecordBatches::new()); } } } - async fn poll_for_batches(&self) -> Result<Vec<RecordBatch>> { + async fn poll_for_batches(&self) -> Result<ScanRecordBatches> { let result = self.log_fetcher.collect_batches()?; if !result.is_empty() { return Ok(result); @@ -432,7 +439,7 @@ impl LogScanner { // Implementation for RecordBatchLogScanner (batches mode) impl RecordBatchLogScanner { - pub async fn poll(&self, timeout: Duration) -> Result<Vec<RecordBatch>> { + pub async fn poll(&self, timeout: Duration) -> Result<ScanRecordBatches> { self.inner.poll_batches(timeout).await } @@ -443,6 +450,10 @@ impl RecordBatchLogScanner { pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> { self.inner.subscribe_batch(bucket_offsets).await } + + pub async fn unsubscribe(&self, bucket: i32) -> Result<()> { + self.inner.unsubscribe(bucket).await + } } struct LogFetcher { @@ -906,12 +917,12 @@ impl LogFetcher { } /// Collect completed fetches as RecordBatches - fn collect_batches(&self) -> Result<Vec<RecordBatch>> { + fn collect_batches(&self) -> Result<ScanRecordBatches> { // Limit memory usage with both batch count and byte size constraints. // Max 100 batches per poll, but also check total bytes (soft cap ~64MB). const MAX_BATCHES: usize = 100; const MAX_BYTES: usize = 64 * 1024 * 1024; // 64MB soft cap - let mut result: Vec<RecordBatch> = Vec::new(); + let mut result: ScanRecordBatches = ScanRecordBatches::new(); let mut batches_remaining = MAX_BATCHES; let mut bytes_consumed: usize = 0; @@ -930,7 +941,7 @@ impl LogFetcher { batches.iter().map(|b| b.get_array_memory_size()).sum(); bytes_consumed += batch_bytes; - result.extend(batches); + result.insert(next_fetch.table_bucket().clone(), batches); batches_remaining = batches_remaining.saturating_sub(batch_count); } diff --git a/crates/fluss/src/record/mod.rs b/crates/fluss/src/record/mod.rs index 35928ea..d770bb4 100644 --- a/crates/fluss/src/record/mod.rs +++ b/crates/fluss/src/record/mod.rs @@ -19,6 +19,7 @@ use crate::metadata::TableBucket; use crate::row::ColumnarRow; use core::fmt; use std::collections::HashMap; +use ::arrow::array::RecordBatch; mod arrow; mod error; @@ -137,6 +138,31 @@ pub struct ScanRecords { records: HashMap<TableBucket, Vec<ScanRecord>>, } +pub struct ScanRecordBatches { + pub record_batches: HashMap<TableBucket, Vec<RecordBatch>>, +} + + +impl ScanRecordBatches { + pub fn new() -> Self { + Self { + record_batches: HashMap::new(), + } + } + + pub fn insert(&mut self, table_bucket: TableBucket, records: Vec<RecordBatch>) { + self.record_batches.entry(table_bucket).or_default().extend(records); + } + + pub fn is_empty(&self) -> bool { + self.record_batches.is_empty() + } + + pub fn values(&self) -> Vec<&RecordBatch> { + self.record_batches.values().flatten().collect() + } +} + impl ScanRecords { pub fn empty() -> Self { Self { diff --git a/crates/fluss/tests/integration/table.rs b/crates/fluss/tests/integration/table.rs index 4cba469..c6701b5 100644 --- a/crates/fluss/tests/integration/table.rs +++ b/crates/fluss/tests/integration/table.rs @@ -528,7 +528,7 @@ mod table_test { use arrow::array::Int32Array; let batches = scanner.poll(Duration::from_secs(10)).await.unwrap(); let mut all_ids: Vec<i32> = batches - .iter() + .values().iter() .flat_map(|b| { (0..b.num_rows()).map(|i| { b.column(0) @@ -553,8 +553,8 @@ mod table_test { let more = scanner.poll(Duration::from_secs(10)).await.unwrap(); let new_ids: Vec<i32> = more - .iter() - .flat_map(|b| { + .values() + .iter().flat_map(|b| { (0..b.num_rows()).map(|i| { b.column(0) .as_any() @@ -574,8 +574,8 @@ mod table_test { trunc_scanner.subscribe(0, 3).await.unwrap(); let trunc_batches = trunc_scanner.poll(Duration::from_secs(10)).await.unwrap(); let trunc_ids: Vec<i32> = trunc_batches - .iter() - .flat_map(|b| { + .values() + .iter().flat_map(|b| { (0..b.num_rows()).map(|i| { b.column(0) .as_any() @@ -600,6 +600,6 @@ mod table_test { let proj_batches = proj.poll(Duration::from_secs(10)).await.unwrap(); // Projected batch should have 1 column (id), not 2 (id, name) - assert_eq!(proj_batches[0].num_columns(), 1); + assert_eq!(proj_batches.values()[0].num_columns(), 1); } }
