This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 6962c73  feat: log scanner support poll record batch directly
6962c73 is described below

commit 6962c73337abce351c62f9f1d7f8d23f50fe4286
Author: Anton Borisov <[email protected]>
AuthorDate: Sat Jan 10 02:00:12 2026 +0000

    feat: log scanner support poll record batch directly
---
 crates/fluss/src/client/table/log_fetch_buffer.rs |  54 ++++-
 crates/fluss/src/client/table/mod.rs              |   2 +-
 crates/fluss/src/client/table/scanner.rs          | 236 ++++++++++++++++++++--
 crates/fluss/src/record/arrow.rs                  |  22 ++
 crates/fluss/tests/integration/table.rs           | 133 ++++++++++++
 5 files changed, 430 insertions(+), 17 deletions(-)

diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs 
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index cee104e..e9bac53 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -15,12 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use arrow::array::RecordBatch;
+use parking_lot::Mutex;
+
 use crate::error::Result;
 use crate::metadata::TableBucket;
 use crate::record::{
     LogRecordBatch, LogRecordIterator, LogRecordsBatches, ReadContext, 
ScanRecord,
 };
-use parking_lot::Mutex;
 use std::collections::{HashMap, VecDeque};
 use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, Ordering};
@@ -31,6 +33,7 @@ use tokio::sync::Notify;
 pub trait CompletedFetch: Send + Sync {
     fn table_bucket(&self) -> &TableBucket;
     fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>>;
+    fn fetch_batches(&mut self, max_batches: usize) -> 
Result<Vec<RecordBatch>>;
     fn is_consumed(&self) -> bool;
     fn drain(&mut self);
     fn size_in_bytes(&self) -> usize;
@@ -318,6 +321,38 @@ impl DefaultCompletedFetch {
             }
         }
     }
+
+    /// Get the next batch directly without row iteration
+    fn next_fetched_batch(&mut self) -> Result<Option<RecordBatch>> {
+        loop {
+            let Some(log_batch) = self.log_record_batch.next() else {
+                self.drain();
+                return Ok(None);
+            };
+
+            let mut record_batch = log_batch.record_batch(&self.read_context)?;
+
+            // Skip empty batches
+            if record_batch.num_rows() == 0 {
+                continue;
+            }
+
+            // Truncate batch
+            let base_offset = log_batch.base_log_offset();
+            if self.next_fetch_offset > base_offset {
+                let skip_count = (self.next_fetch_offset - base_offset) as 
usize;
+                if skip_count >= record_batch.num_rows() {
+                    continue;
+                }
+                // Slice the batch to skip the first skip_count rows
+                record_batch = record_batch.slice(skip_count, 
record_batch.num_rows() - skip_count);
+            }
+
+            self.next_fetch_offset = log_batch.next_log_offset();
+            self.records_read += record_batch.num_rows();
+            return Ok(Some(record_batch));
+        }
+    }
 }
 
 impl CompletedFetch for DefaultCompletedFetch {
@@ -346,6 +381,23 @@ impl CompletedFetch for DefaultCompletedFetch {
         Ok(scan_records)
     }
 
+    fn fetch_batches(&mut self, max_batches: usize) -> 
Result<Vec<RecordBatch>> {
+        if self.consumed {
+            return Ok(Vec::new());
+        }
+
+        let mut batches = Vec::with_capacity(max_batches.min(16));
+
+        for _ in 0..max_batches {
+            match self.next_fetched_batch()? {
+                Some(batch) => batches.push(batch),
+                None => break,
+            }
+        }
+
+        Ok(batches)
+    }
+
     fn is_consumed(&self) -> bool {
         self.consumed
     }
diff --git a/crates/fluss/src/client/table/mod.rs 
b/crates/fluss/src/client/table/mod.rs
index e2cf9e6..26341d7 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -32,7 +32,7 @@ mod scanner;
 mod writer;
 
 pub use append::{AppendWriter, TableAppend};
-pub use scanner::{LogScanner, TableScan};
+pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan};
 
 #[allow(dead_code)]
 pub struct FlussTable<'a> {
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index 4255bb6..7d22324 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -15,6 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use arrow::array::RecordBatch;
+use arrow_schema::SchemaRef;
+use log::{debug, error, warn};
+use parking_lot::{Mutex, RwLock};
+use std::collections::{HashMap, HashSet};
+use std::slice::from_ref;
+use std::sync::Arc;
+use std::time::Duration;
+use tempfile::TempDir;
+
 use crate::client::connection::FlussConnection;
 use crate::client::credentials::CredentialsCache;
 use crate::client::metadata::Metadata;
@@ -30,14 +40,6 @@ use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, 
PbFetchLogReqForTabl
 use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords, 
to_arrow_schema};
 use crate::rpc::{RpcClient, message};
 use crate::util::FairBucketStatusMap;
-use arrow_schema::SchemaRef;
-use log::{debug, error, warn};
-use parking_lot::{Mutex, RwLock};
-use std::collections::{HashMap, HashSet};
-use std::slice::from_ref;
-use std::sync::Arc;
-use std::time::Duration;
-use tempfile::TempDir;
 
 const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024;
 #[allow(dead_code)]
@@ -216,16 +218,48 @@ impl<'a> TableScan<'a> {
     }
 
     pub fn create_log_scanner(self) -> Result<LogScanner> {
-        LogScanner::new(
+        let inner = LogScannerInner::new(
             &self.table_info,
             self.metadata.clone(),
             self.conn.get_connections(),
             self.projected_fields,
-        )
+        )?;
+        Ok(LogScanner {
+            inner: Arc::new(inner),
+        })
+    }
+
+    pub fn create_record_batch_log_scanner(self) -> 
Result<RecordBatchLogScanner> {
+        let inner = LogScannerInner::new(
+            &self.table_info,
+            self.metadata.clone(),
+            self.conn.get_connections(),
+            self.projected_fields,
+        )?;
+        Ok(RecordBatchLogScanner {
+            inner: Arc::new(inner),
+        })
     }
 }
 
+/// Scanner for reading log records one at a time with per-record metadata.
+///
+/// Use this scanner when you need access to individual record offsets and 
timestamps.
+/// For batch-level access, use [`RecordBatchLogScanner`] instead.
 pub struct LogScanner {
+    inner: Arc<LogScannerInner>,
+}
+
+/// Scanner for reading log data as Arrow RecordBatches.
+///
+/// More efficient than [`LogScanner`] for batch-level analytics where 
per-record
+/// metadata (offsets, timestamps) is not needed.
+pub struct RecordBatchLogScanner {
+    inner: Arc<LogScannerInner>,
+}
+
+/// Private shared implementation for both scanner types
+struct LogScannerInner {
     table_path: TablePath,
     table_id: i64,
     metadata: Arc<Metadata>,
@@ -233,8 +267,8 @@ pub struct LogScanner {
     log_fetcher: LogFetcher,
 }
 
-impl LogScanner {
-    pub fn new(
+impl LogScannerInner {
+    fn new(
         table_info: &TableInfo,
         metadata: Arc<Metadata>,
         connections: Arc<RpcClient>,
@@ -256,7 +290,7 @@ impl LogScanner {
         })
     }
 
-    pub async fn poll(&self, timeout: Duration) -> Result<ScanRecords> {
+    async fn poll_records(&self, timeout: Duration) -> Result<ScanRecords> {
         let start = std::time::Instant::now();
         let deadline = start + timeout;
 
@@ -295,7 +329,7 @@ impl LogScanner {
         }
     }
 
-    pub async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
+    async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
         let table_bucket = TableBucket::new(self.table_id, bucket);
         self.metadata
             .check_and_update_table_metadata(from_ref(&self.table_path))
@@ -305,7 +339,7 @@ impl LogScanner {
         Ok(())
     }
 
-    pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> 
Result<()> {
+    async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> 
Result<()> {
         self.metadata
             .check_and_update_table_metadata(from_ref(&self.table_path))
             .await?;
@@ -339,6 +373,76 @@ impl LogScanner {
         // Collect completed fetches from buffer
         self.log_fetcher.collect_fetches()
     }
+
+    async fn poll_batches(&self, timeout: Duration) -> 
Result<Vec<RecordBatch>> {
+        let start = std::time::Instant::now();
+        let deadline = start + timeout;
+
+        loop {
+            let batches = self.poll_for_batches().await?;
+
+            if !batches.is_empty() {
+                self.log_fetcher.send_fetches().await?;
+                return Ok(batches);
+            }
+
+            let now = std::time::Instant::now();
+            if now >= deadline {
+                return Ok(Vec::new());
+            }
+
+            let remaining = deadline - now;
+            let has_data = self
+                .log_fetcher
+                .log_fetch_buffer
+                .await_not_empty(remaining)
+                .await;
+
+            if !has_data {
+                return Ok(Vec::new());
+            }
+        }
+    }
+
+    async fn poll_for_batches(&self) -> Result<Vec<RecordBatch>> {
+        let result = self.log_fetcher.collect_batches()?;
+        if !result.is_empty() {
+            return Ok(result);
+        }
+
+        self.log_fetcher.send_fetches().await?;
+        self.log_fetcher.collect_batches()
+    }
+}
+
+// Implementation for LogScanner (records mode)
+impl LogScanner {
+    pub async fn poll(&self, timeout: Duration) -> Result<ScanRecords> {
+        self.inner.poll_records(timeout).await
+    }
+
+    pub async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
+        self.inner.subscribe(bucket, offset).await
+    }
+
+    pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> 
Result<()> {
+        self.inner.subscribe_batch(bucket_offsets).await
+    }
+}
+
+// Implementation for RecordBatchLogScanner (batches mode)
+impl RecordBatchLogScanner {
+    pub async fn poll(&self, timeout: Duration) -> Result<Vec<RecordBatch>> {
+        self.inner.poll_batches(timeout).await
+    }
+
+    pub async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
+        self.inner.subscribe(bucket, offset).await
+    }
+
+    pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> 
Result<()> {
+        self.inner.subscribe_batch(bucket_offsets).await
+    }
 }
 
 struct LogFetcher {
@@ -801,6 +905,108 @@ impl LogFetcher {
         }
     }
 
+    /// Collect completed fetches as RecordBatches
+    fn collect_batches(&self) -> Result<Vec<RecordBatch>> {
+        // Limit memory usage with both batch count and byte size constraints.
+        // Max 100 batches per poll, but also check total bytes (soft cap 
~64MB).
+        const MAX_BATCHES: usize = 100;
+        const MAX_BYTES: usize = 64 * 1024 * 1024; // 64MB soft cap
+        let mut result: Vec<RecordBatch> = Vec::new();
+        let mut batches_remaining = MAX_BATCHES;
+        let mut bytes_consumed: usize = 0;
+
+        while batches_remaining > 0 && bytes_consumed < MAX_BYTES {
+            let next_in_line = self.log_fetch_buffer.next_in_line_fetch();
+
+            match next_in_line {
+                Some(mut next_fetch) if !next_fetch.is_consumed() => {
+                    let batches =
+                        self.fetch_batches_from_fetch(&mut next_fetch, 
batches_remaining)?;
+                    let batch_count = batches.len();
+
+                    if !batches.is_empty() {
+                        // Track bytes consumed (soft cap - may exceed by one 
fetch)
+                        let batch_bytes: usize =
+                            batches.iter().map(|b| 
b.get_array_memory_size()).sum();
+                        bytes_consumed += batch_bytes;
+
+                        result.extend(batches);
+                        batches_remaining = 
batches_remaining.saturating_sub(batch_count);
+                    }
+
+                    if !next_fetch.is_consumed() {
+                        self.log_fetch_buffer
+                            .set_next_in_line_fetch(Some(next_fetch));
+                    }
+                }
+                _ => {
+                    if let Some(completed_fetch) = 
self.log_fetch_buffer.poll() {
+                        if !completed_fetch.is_initialized() {
+                            let size_in_bytes = 
completed_fetch.size_in_bytes();
+                            match self.initialize_fetch(completed_fetch) {
+                                Ok(initialized) => {
+                                    
self.log_fetch_buffer.set_next_in_line_fetch(initialized);
+                                    continue;
+                                }
+                                Err(e) => {
+                                    if result.is_empty() && size_in_bytes == 0 
{
+                                        continue;
+                                    }
+                                    return Err(e);
+                                }
+                            }
+                        } else {
+                            self.log_fetch_buffer
+                                .set_next_in_line_fetch(Some(completed_fetch));
+                        }
+                    } else {
+                        break;
+                    }
+                }
+            }
+        }
+
+        Ok(result)
+    }
+
+    fn fetch_batches_from_fetch(
+        &self,
+        next_in_line_fetch: &mut Box<dyn CompletedFetch>,
+        max_batches: usize,
+    ) -> Result<Vec<RecordBatch>> {
+        let table_bucket = next_in_line_fetch.table_bucket().clone();
+        let current_offset = 
self.log_scanner_status.get_bucket_offset(&table_bucket);
+
+        if current_offset.is_none() {
+            warn!(
+                "Ignoring fetched batches for {table_bucket:?} since the 
bucket has been unsubscribed"
+            );
+            next_in_line_fetch.drain();
+            return Ok(Vec::new());
+        }
+
+        let current_offset = current_offset.unwrap();
+        let fetch_offset = next_in_line_fetch.next_fetch_offset();
+
+        if fetch_offset == current_offset {
+            let batches = next_in_line_fetch.fetch_batches(max_batches)?;
+            let next_fetch_offset = next_in_line_fetch.next_fetch_offset();
+
+            if next_fetch_offset > current_offset {
+                self.log_scanner_status
+                    .update_offset(&table_bucket, next_fetch_offset);
+            }
+
+            Ok(batches)
+        } else {
+            warn!(
+                "Ignoring fetched batches for {table_bucket:?} at offset 
{fetch_offset} since the current offset is {current_offset}"
+            );
+            next_in_line_fetch.drain();
+            Ok(Vec::new())
+        }
+    }
+
     async fn prepare_fetch_log_requests(&self) -> HashMap<i32, 
FetchLogRequest> {
         let mut fetch_log_req_for_buckets = HashMap::new();
         let mut table_id = None;
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 5a5115e..89fb7b9 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -546,6 +546,28 @@ impl LogRecordBatch {
         };
         Ok(log_record_iterator)
     }
+
+    /// Returns the record batch directly without creating an iterator.
+    /// This is more efficient when you need the entire batch rather than
+    /// iterating row-by-row.
+    pub fn record_batch(&self, read_context: &ReadContext) -> 
Result<RecordBatch> {
+        if self.record_count() == 0 {
+            // Return empty batch with correct schema
+            return 
Ok(RecordBatch::new_empty(read_context.target_schema.clone()));
+        }
+
+        let data = self.data.get(RECORDS_OFFSET..).ok_or_else(|| {
+            crate::error::Error::UnexpectedError {
+                message: format!(
+                    "Corrupt log record batch: data length {} is less than 
RECORDS_OFFSET {}",
+                    self.data.len(),
+                    RECORDS_OFFSET
+                ),
+                source: None,
+            }
+        })?;
+        read_context.record_batch(data)
+    }
 }
 
 /// Parse an Arrow IPC message from a byte slice.
diff --git a/crates/fluss/tests/integration/table.rs 
b/crates/fluss/tests/integration/table.rs
index 0ac34c7..4cba469 100644
--- a/crates/fluss/tests/integration/table.rs
+++ b/crates/fluss/tests/integration/table.rs
@@ -469,4 +469,137 @@ mod table_test {
         records.sort_by_key(|r| r.offset());
         records
     }
+
+    #[tokio::test]
+    async fn test_poll_batches() {
+        let cluster = get_fluss_cluster();
+        let connection = cluster.get_fluss_connection().await;
+        let admin = connection.get_admin().await.expect("Failed to get admin");
+
+        let table_path = TablePath::new("fluss".to_string(), 
"test_poll_batches".to_string());
+        let schema = Schema::builder()
+            .column("id", DataTypes::int())
+            .column("name", DataTypes::string())
+            .build()
+            .unwrap();
+
+        create_table(
+            &admin,
+            &table_path,
+            &TableDescriptor::builder().schema(schema).build().unwrap(),
+        )
+        .await;
+        tokio::time::sleep(Duration::from_secs(1)).await;
+
+        let table = connection.get_table(&table_path).await.unwrap();
+        let scanner = 
table.new_scan().create_record_batch_log_scanner().unwrap();
+        scanner.subscribe(0, 0).await.unwrap();
+
+        // Test 1: Empty table should return empty result
+        assert!(
+            scanner
+                .poll(Duration::from_millis(500))
+                .await
+                .unwrap()
+                .is_empty()
+        );
+
+        let writer = table.new_append().unwrap().create_writer();
+        writer
+            .append_arrow_batch(
+                record_batch!(("id", Int32, [1, 2]), ("name", Utf8, ["a", 
"b"])).unwrap(),
+            )
+            .await
+            .unwrap();
+        writer
+            .append_arrow_batch(
+                record_batch!(("id", Int32, [3, 4]), ("name", Utf8, ["c", 
"d"])).unwrap(),
+            )
+            .await
+            .unwrap();
+        writer
+            .append_arrow_batch(
+                record_batch!(("id", Int32, [5, 6]), ("name", Utf8, ["e", 
"f"])).unwrap(),
+            )
+            .await
+            .unwrap();
+        writer.flush().await.unwrap();
+
+        use arrow::array::Int32Array;
+        let batches = scanner.poll(Duration::from_secs(10)).await.unwrap();
+        let mut all_ids: Vec<i32> = batches
+            .iter()
+            .flat_map(|b| {
+                (0..b.num_rows()).map(|i| {
+                    b.column(0)
+                        .as_any()
+                        .downcast_ref::<Int32Array>()
+                        .unwrap()
+                        .value(i)
+                })
+            })
+            .collect();
+
+        // Test 2: Order should be preserved across multiple batches
+        assert_eq!(all_ids, vec![1, 2, 3, 4, 5, 6]);
+
+        writer
+            .append_arrow_batch(
+                record_batch!(("id", Int32, [7, 8]), ("name", Utf8, ["g", 
"h"])).unwrap(),
+            )
+            .await
+            .unwrap();
+        writer.flush().await.unwrap();
+
+        let more = scanner.poll(Duration::from_secs(10)).await.unwrap();
+        let new_ids: Vec<i32> = more
+            .iter()
+            .flat_map(|b| {
+                (0..b.num_rows()).map(|i| {
+                    b.column(0)
+                        .as_any()
+                        .downcast_ref::<Int32Array>()
+                        .unwrap()
+                        .value(i)
+                })
+            })
+            .collect();
+
+        // Test 3: Subsequent polls should not return duplicate data (offset 
continuation)
+        assert_eq!(new_ids, vec![7, 8]);
+
+        // Test 4: Subscribing from mid-offset should truncate batch (Arrow 
batch slicing)
+        // Server returns all records from start of batch, but client 
truncates to subscription offset
+        let trunc_scanner = 
table.new_scan().create_record_batch_log_scanner().unwrap();
+        trunc_scanner.subscribe(0, 3).await.unwrap();
+        let trunc_batches = 
trunc_scanner.poll(Duration::from_secs(10)).await.unwrap();
+        let trunc_ids: Vec<i32> = trunc_batches
+            .iter()
+            .flat_map(|b| {
+                (0..b.num_rows()).map(|i| {
+                    b.column(0)
+                        .as_any()
+                        .downcast_ref::<Int32Array>()
+                        .unwrap()
+                        .value(i)
+                })
+            })
+            .collect();
+
+        // Subscribing from offset 3 should return [4,5,6,7,8], not 
[1,2,3,4,5,6,7,8]
+        assert_eq!(trunc_ids, vec![4, 5, 6, 7, 8]);
+
+        // Test 5: Projection should only return requested columns
+        let proj = table
+            .new_scan()
+            .project_by_name(&["id"])
+            .unwrap()
+            .create_record_batch_log_scanner()
+            .unwrap();
+        proj.subscribe(0, 0).await.unwrap();
+        let proj_batches = proj.poll(Duration::from_secs(10)).await.unwrap();
+
+        // Projected batch should have 1 column (id), not 2 (id, name)
+        assert_eq!(proj_batches[0].num_columns(), 1);
+    }
 }

Reply via email to