This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch allow-batch-poll
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git

commit 5b63be186fa7256e84c456420c44240e316a0d9c
Author: luoyuxia <[email protected]>
AuthorDate: Sat Jan 17 13:55:06 2026 +0800

    allow scan batch
---
 crates/fluss/src/client/table/scanner.rs | 29 ++++++++++++++++++++---------
 crates/fluss/src/record/mod.rs           | 26 ++++++++++++++++++++++++++
 crates/fluss/tests/integration/table.rs  | 12 ++++++------
 3 files changed, 52 insertions(+), 15 deletions(-)

diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index 7d22324..6989719 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -37,7 +37,7 @@ use crate::client::table::remote_log::{
 use crate::error::{Error, Result, RpcError};
 use crate::metadata::{TableBucket, TableInfo, TablePath};
 use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, 
PbFetchLogReqForTable};
-use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords, 
to_arrow_schema};
+use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords, 
to_arrow_schema, ScanRecordBatches};
 use crate::rpc::{RpcClient, message};
 use crate::util::FairBucketStatusMap;
 
@@ -339,6 +339,13 @@ impl LogScannerInner {
         Ok(())
     }
 
+    async fn unsubscribe(&self, bucket: i32) -> Result<()> {
+        let table_bucket = TableBucket::new(self.table_id, bucket);
+        self.log_scanner_status
+            .unassign_scan_buckets( &[table_bucket]);
+        Ok(())
+    }
+
     async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> 
Result<()> {
         self.metadata
             .check_and_update_table_metadata(from_ref(&self.table_path))
@@ -374,7 +381,7 @@ impl LogScannerInner {
         self.log_fetcher.collect_fetches()
     }
 
-    async fn poll_batches(&self, timeout: Duration) -> 
Result<Vec<RecordBatch>> {
+    async fn poll_batches(&self, timeout: Duration) -> 
Result<ScanRecordBatches> {
         let start = std::time::Instant::now();
         let deadline = start + timeout;
 
@@ -388,7 +395,7 @@ impl LogScannerInner {
 
             let now = std::time::Instant::now();
             if now >= deadline {
-                return Ok(Vec::new());
+                return Ok(ScanRecordBatches::new());
             }
 
             let remaining = deadline - now;
@@ -399,12 +406,12 @@ impl LogScannerInner {
                 .await;
 
             if !has_data {
-                return Ok(Vec::new());
+                return Ok(ScanRecordBatches::new());
             }
         }
     }
 
-    async fn poll_for_batches(&self) -> Result<Vec<RecordBatch>> {
+    async fn poll_for_batches(&self) -> Result<ScanRecordBatches> {
         let result = self.log_fetcher.collect_batches()?;
         if !result.is_empty() {
             return Ok(result);
@@ -432,7 +439,7 @@ impl LogScanner {
 
 // Implementation for RecordBatchLogScanner (batches mode)
 impl RecordBatchLogScanner {
-    pub async fn poll(&self, timeout: Duration) -> Result<Vec<RecordBatch>> {
+    pub async fn poll(&self, timeout: Duration) -> Result<ScanRecordBatches> {
         self.inner.poll_batches(timeout).await
     }
 
@@ -443,6 +450,10 @@ impl RecordBatchLogScanner {
     pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> 
Result<()> {
         self.inner.subscribe_batch(bucket_offsets).await
     }
+
+    pub async fn unsubscribe(&self, bucket: i32) -> Result<()> {
+       self.inner.unsubscribe(bucket).await
+    }
 }
 
 struct LogFetcher {
@@ -906,12 +917,12 @@ impl LogFetcher {
     }
 
     /// Collect completed fetches as RecordBatches
-    fn collect_batches(&self) -> Result<Vec<RecordBatch>> {
+    fn collect_batches(&self) -> Result<ScanRecordBatches> {
         // Limit memory usage with both batch count and byte size constraints.
         // Max 100 batches per poll, but also check total bytes (soft cap 
~64MB).
         const MAX_BATCHES: usize = 100;
         const MAX_BYTES: usize = 64 * 1024 * 1024; // 64MB soft cap
-        let mut result: Vec<RecordBatch> = Vec::new();
+        let mut result: ScanRecordBatches = ScanRecordBatches::new();
         let mut batches_remaining = MAX_BATCHES;
         let mut bytes_consumed: usize = 0;
 
@@ -930,7 +941,7 @@ impl LogFetcher {
                             batches.iter().map(|b| 
b.get_array_memory_size()).sum();
                         bytes_consumed += batch_bytes;
 
-                        result.extend(batches);
+                        result.insert(next_fetch.table_bucket().clone(), 
batches);
                         batches_remaining = 
batches_remaining.saturating_sub(batch_count);
                     }
 
diff --git a/crates/fluss/src/record/mod.rs b/crates/fluss/src/record/mod.rs
index 35928ea..d770bb4 100644
--- a/crates/fluss/src/record/mod.rs
+++ b/crates/fluss/src/record/mod.rs
@@ -19,6 +19,7 @@ use crate::metadata::TableBucket;
 use crate::row::ColumnarRow;
 use core::fmt;
 use std::collections::HashMap;
+use ::arrow::array::RecordBatch;
 
 mod arrow;
 mod error;
@@ -137,6 +138,31 @@ pub struct ScanRecords {
     records: HashMap<TableBucket, Vec<ScanRecord>>,
 }
 
+pub struct ScanRecordBatches {
+    pub record_batches: HashMap<TableBucket, Vec<RecordBatch>>,
+}
+
+
+impl ScanRecordBatches {
+    pub fn new() -> Self {
+        Self {
+            record_batches: HashMap::new(),
+        }
+    }
+    
+    pub fn insert(&mut self, table_bucket: TableBucket, records: 
Vec<RecordBatch>) {
+        self.record_batches.entry(table_bucket).or_default().extend(records);
+    }
+    
+    pub fn is_empty(&self) -> bool {
+        self.record_batches.is_empty()
+    }
+    
+    pub fn values(&self) -> Vec<&RecordBatch> {
+        self.record_batches.values().flatten().collect()
+    }
+}
+
 impl ScanRecords {
     pub fn empty() -> Self {
         Self {
diff --git a/crates/fluss/tests/integration/table.rs 
b/crates/fluss/tests/integration/table.rs
index 4cba469..c6701b5 100644
--- a/crates/fluss/tests/integration/table.rs
+++ b/crates/fluss/tests/integration/table.rs
@@ -528,7 +528,7 @@ mod table_test {
         use arrow::array::Int32Array;
         let batches = scanner.poll(Duration::from_secs(10)).await.unwrap();
         let mut all_ids: Vec<i32> = batches
-            .iter()
+            .values().iter()
             .flat_map(|b| {
                 (0..b.num_rows()).map(|i| {
                     b.column(0)
@@ -553,8 +553,8 @@ mod table_test {
 
         let more = scanner.poll(Duration::from_secs(10)).await.unwrap();
         let new_ids: Vec<i32> = more
-            .iter()
-            .flat_map(|b| {
+            .values()
+            .iter().flat_map(|b| {
                 (0..b.num_rows()).map(|i| {
                     b.column(0)
                         .as_any()
@@ -574,8 +574,8 @@ mod table_test {
         trunc_scanner.subscribe(0, 3).await.unwrap();
         let trunc_batches = 
trunc_scanner.poll(Duration::from_secs(10)).await.unwrap();
         let trunc_ids: Vec<i32> = trunc_batches
-            .iter()
-            .flat_map(|b| {
+            .values()
+            .iter().flat_map(|b| {
                 (0..b.num_rows()).map(|i| {
                     b.column(0)
                         .as_any()
@@ -600,6 +600,6 @@ mod table_test {
         let proj_batches = proj.poll(Duration::from_secs(10)).await.unwrap();
 
         // Projected batch should have 1 column (id), not 2 (id, name)
-        assert_eq!(proj_batches[0].num_columns(), 1);
+        assert_eq!(proj_batches.values()[0].num_columns(), 1);
     }
 }

Reply via email to