This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b5fc64  chore: add more info for RecordBatches (#211)
2b5fc64 is described below

commit 2b5fc64d95967aae374e6de17c4c1c9dd61a1cb9
Author: Anton Borisov <[email protected]>
AuthorDate: Sun Jan 25 08:31:08 2026 +0000

    chore: add more info for RecordBatches (#211)
---
 crates/fluss/src/client/table/log_fetch_buffer.rs | 28 +++++----
 crates/fluss/src/client/table/scanner.rs          | 44 ++++++++-----
 crates/fluss/src/record/mod.rs                    | 77 +++++++++++++++++++++++
 crates/fluss/tests/integration/table.rs           | 20 +++---
 4 files changed, 133 insertions(+), 36 deletions(-)

diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs 
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index 4a64eda..b529806 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -65,7 +65,7 @@ pub trait CompletedFetch: Send + Sync {
     fn fetch_error_context(&self) -> Option<&FetchErrorContext>;
     fn take_error(&mut self) -> Option<Error>;
     fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>>;
-    fn fetch_batches(&mut self, max_batches: usize) -> 
Result<Vec<RecordBatch>>;
+    fn fetch_batches(&mut self, max_batches: usize) -> 
Result<Vec<(RecordBatch, i64)>>;
     fn is_consumed(&self) -> bool;
     fn records_read(&self) -> usize;
     fn drain(&mut self);
@@ -476,8 +476,9 @@ impl DefaultCompletedFetch {
             source: None,
         }
     }
-    /// Get the next batch directly without row iteration
-    fn next_fetched_batch(&mut self) -> Result<Option<RecordBatch>> {
+    /// Get the next batch with its base offset.
+    /// Returns (RecordBatch, base_offset) where base_offset is the offset of 
the first record.
+    fn next_fetched_batch(&mut self) -> Result<Option<(RecordBatch, i64)>> {
         loop {
             let Some(log_batch_result) = self.log_record_batch.next() else {
                 self.drain();
@@ -492,20 +493,23 @@ impl DefaultCompletedFetch {
                 continue;
             }
 
-            // Truncate batch
-            let base_offset = log_batch.base_log_offset();
-            if self.next_fetch_offset > base_offset {
-                let skip_count = (self.next_fetch_offset - base_offset) as 
usize;
+            // Calculate the effective base offset for this batch
+            let log_base_offset = log_batch.base_log_offset();
+            let effective_base_offset = if self.next_fetch_offset > 
log_base_offset {
+                let skip_count = (self.next_fetch_offset - log_base_offset) as 
usize;
                 if skip_count >= record_batch.num_rows() {
                     continue;
                 }
                 // Slice the batch to skip the first skip_count rows
                 record_batch = record_batch.slice(skip_count, 
record_batch.num_rows() - skip_count);
-            }
+                self.next_fetch_offset
+            } else {
+                log_base_offset
+            };
 
             self.next_fetch_offset = log_batch.next_log_offset();
             self.records_read += record_batch.num_rows();
-            return Ok(Some(record_batch));
+            return Ok(Some((record_batch, effective_base_offset)));
         }
     }
 }
@@ -585,7 +589,7 @@ impl CompletedFetch for DefaultCompletedFetch {
         Ok(scan_records)
     }
 
-    fn fetch_batches(&mut self, max_batches: usize) -> 
Result<Vec<RecordBatch>> {
+    fn fetch_batches(&mut self, max_batches: usize) -> 
Result<Vec<(RecordBatch, i64)>> {
         if let Some(error) = self.error.take() {
             return Err(error);
         }
@@ -607,7 +611,7 @@ impl CompletedFetch for DefaultCompletedFetch {
 
         for _ in 0..max_batches {
             match self.next_fetched_batch()? {
-                Some(batch) => batches.push(batch),
+                Some(batch_with_offset) => batches.push(batch_with_offset),
                 None => break,
             }
         }
@@ -692,7 +696,7 @@ impl CompletedFetch for RemoteCompletedFetch {
         self.inner.fetch_records(max_records)
     }
 
-    fn fetch_batches(&mut self, max_batches: usize) -> 
Result<Vec<RecordBatch>> {
+    fn fetch_batches(&mut self, max_batches: usize) -> 
Result<Vec<(RecordBatch, i64)>> {
         self.inner.fetch_batches(max_batches)
     }
 
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index 61ed56e..dbebe1a 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow::array::RecordBatch;
 use arrow_schema::SchemaRef;
 use log::{debug, warn};
 use parking_lot::{Mutex, RwLock};
@@ -39,7 +38,9 @@ use crate::client::table::remote_log::{RemoteLogDownloader, 
RemoteLogFetchInfo};
 use crate::error::{ApiError, Error, FlussError, Result};
 use crate::metadata::{PhysicalTablePath, TableBucket, TableInfo, TablePath};
 use crate::proto::{ErrorResponse, FetchLogRequest, PbFetchLogReqForBucket, 
PbFetchLogReqForTable};
-use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords, 
to_arrow_schema};
+use crate::record::{
+    LogRecordsBatches, ReadContext, ScanBatch, ScanRecord, ScanRecords, 
to_arrow_schema,
+};
 use crate::rpc::{RpcClient, RpcError, message};
 use crate::util::FairBucketStatusMap;
 
@@ -380,7 +381,7 @@ impl LogScannerInner {
         self.log_fetcher.collect_fetches()
     }
 
-    async fn poll_batches(&self, timeout: Duration) -> 
Result<Vec<RecordBatch>> {
+    async fn poll_batches(&self, timeout: Duration) -> Result<Vec<ScanBatch>> {
         let start = Instant::now();
         let deadline = start + timeout;
 
@@ -410,7 +411,7 @@ impl LogScannerInner {
         }
     }
 
-    async fn poll_for_batches(&self) -> Result<Vec<RecordBatch>> {
+    async fn poll_for_batches(&self) -> Result<Vec<ScanBatch>> {
         let result = self.log_fetcher.collect_batches()?;
         if !result.is_empty() {
             return Ok(result);
@@ -438,7 +439,8 @@ impl LogScanner {
 
 // Implementation for RecordBatchLogScanner (batches mode)
 impl RecordBatchLogScanner {
-    pub async fn poll(&self, timeout: Duration) -> Result<Vec<RecordBatch>> {
+    /// Poll for batches with metadata (bucket and offset information).
+    pub async fn poll(&self, timeout: Duration) -> Result<Vec<ScanBatch>> {
         self.inner.poll_batches(timeout).await
     }
 
@@ -1134,13 +1136,13 @@ impl LogFetcher {
         }
     }
 
-    /// Collect completed fetches as RecordBatches
-    fn collect_batches(&self) -> Result<Vec<RecordBatch>> {
+    /// Collect completed fetches as ScanBatches (with bucket and offset 
metadata)
+    fn collect_batches(&self) -> Result<Vec<ScanBatch>> {
         // Limit memory usage with both batch count and byte size constraints.
         // Max 100 batches per poll, but also check total bytes (soft cap 
~64MB).
         const MAX_BATCHES: usize = 100;
         const MAX_BYTES: usize = 64 * 1024 * 1024; // 64MB soft cap
-        let mut result: Vec<RecordBatch> = Vec::new();
+        let mut result: Vec<ScanBatch> = Vec::new();
         let mut batches_remaining = MAX_BATCHES;
         let mut bytes_consumed: usize = 0;
 
@@ -1150,17 +1152,19 @@ impl LogFetcher {
 
                 match next_in_line {
                     Some(mut next_fetch) if !next_fetch.is_consumed() => {
-                        let batches =
+                        let scan_batches =
                             self.fetch_batches_from_fetch(&mut next_fetch, 
batches_remaining)?;
-                        let batch_count = batches.len();
+                        let batch_count = scan_batches.len();
 
-                        if !batches.is_empty() {
+                        if !scan_batches.is_empty() {
                             // Track bytes consumed (soft cap - may exceed by 
one fetch)
-                            let batch_bytes: usize =
-                                batches.iter().map(|b| 
b.get_array_memory_size()).sum();
+                            let batch_bytes: usize = scan_batches
+                                .iter()
+                                .map(|sb| sb.batch().get_array_memory_size())
+                                .sum();
                             bytes_consumed += batch_bytes;
 
-                            result.extend(batches);
+                            result.extend(scan_batches);
                             batches_remaining = 
batches_remaining.saturating_sub(batch_count);
                         }
 
@@ -1214,7 +1218,7 @@ impl LogFetcher {
         &self,
         next_in_line_fetch: &mut Box<dyn CompletedFetch>,
         max_batches: usize,
-    ) -> Result<Vec<RecordBatch>> {
+    ) -> Result<Vec<ScanBatch>> {
         let table_bucket = next_in_line_fetch.table_bucket().clone();
         let current_offset = 
self.log_scanner_status.get_bucket_offset(&table_bucket);
 
@@ -1230,7 +1234,7 @@ impl LogFetcher {
         let fetch_offset = next_in_line_fetch.next_fetch_offset();
 
         if fetch_offset == current_offset {
-            let batches = next_in_line_fetch.fetch_batches(max_batches)?;
+            let batches_with_offsets = 
next_in_line_fetch.fetch_batches(max_batches)?;
             let next_fetch_offset = next_in_line_fetch.next_fetch_offset();
 
             if next_fetch_offset > current_offset {
@@ -1238,7 +1242,13 @@ impl LogFetcher {
                     .update_offset(&table_bucket, next_fetch_offset);
             }
 
-            Ok(batches)
+            // Convert to ScanBatch with bucket info
+            Ok(batches_with_offsets
+                .into_iter()
+                .map(|(batch, base_offset)| {
+                    ScanBatch::new(table_bucket.clone(), batch, base_offset)
+                })
+                .collect())
         } else {
             warn!(
                 "Ignoring fetched batches for {table_bucket:?} at offset 
{fetch_offset} since the current offset is {current_offset}"
diff --git a/crates/fluss/src/record/mod.rs b/crates/fluss/src/record/mod.rs
index 94997e8..8438b16 100644
--- a/crates/fluss/src/record/mod.rs
+++ b/crates/fluss/src/record/mod.rs
@@ -17,6 +17,7 @@
 
 use crate::metadata::TableBucket;
 use crate::row::ColumnarRow;
+use ::arrow::array::RecordBatch;
 use core::fmt;
 use std::collections::HashMap;
 
@@ -170,6 +171,60 @@ impl ScanRecords {
     }
 }
 
+/// A batch of records with metadata about bucket and offsets.
+///
+/// This is the batch-level equivalent of [`ScanRecord`], providing efficient
+/// access to Arrow RecordBatches while preserving the bucket and offset 
information
+/// needed for tracking consumption progress.
+#[derive(Debug, Clone)]
+pub struct ScanBatch {
+    /// The bucket this batch belongs to
+    bucket: TableBucket,
+    /// The Arrow RecordBatch containing the data
+    batch: RecordBatch,
+    /// Offset of the first record in this batch
+    base_offset: i64,
+}
+
+impl ScanBatch {
+    pub fn new(bucket: TableBucket, batch: RecordBatch, base_offset: i64) -> 
Self {
+        Self {
+            bucket,
+            batch,
+            base_offset,
+        }
+    }
+
+    pub fn bucket(&self) -> &TableBucket {
+        &self.bucket
+    }
+
+    pub fn batch(&self) -> &RecordBatch {
+        &self.batch
+    }
+
+    pub fn into_batch(self) -> RecordBatch {
+        self.batch
+    }
+
+    pub fn base_offset(&self) -> i64 {
+        self.base_offset
+    }
+
+    pub fn num_records(&self) -> usize {
+        self.batch.num_rows()
+    }
+
+    /// Returns the offset of the last record in this batch.
+    pub fn last_offset(&self) -> i64 {
+        if self.batch.num_rows() == 0 {
+            self.base_offset - 1
+        } else {
+            self.base_offset + self.batch.num_rows() as i64 - 1
+        }
+    }
+}
+
 impl IntoIterator for ScanRecords {
     type Item = ScanRecord;
     type IntoIter = std::vec::IntoIter<ScanRecord>;
@@ -243,4 +298,26 @@ mod tests {
         assert_eq!(record.timestamp(), -1);
         assert_eq!(record.change_type(), &ChangeType::Insert);
     }
+
+    #[test]
+    fn scan_batch_last_offset() {
+        let schema = Arc::new(Schema::new(vec![Field::new("v", 
DataType::Int32, false)]));
+        let bucket = TableBucket::new(1, 0);
+
+        // Batch with 3 records starting at offset 100 -> last_offset = 102
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
+        )
+        .unwrap();
+        let scan_batch = ScanBatch::new(bucket.clone(), batch, 100);
+        assert_eq!(scan_batch.num_records(), 3);
+        assert_eq!(scan_batch.last_offset(), 102);
+
+        // Empty batch -> last_offset = base_offset - 1
+        let empty_batch = RecordBatch::new_empty(schema);
+        let empty_scan_batch = ScanBatch::new(bucket, empty_batch, 100);
+        assert_eq!(empty_scan_batch.num_records(), 0);
+        assert_eq!(empty_scan_batch.last_offset(), 99);
+    }
 }
diff --git a/crates/fluss/tests/integration/table.rs 
b/crates/fluss/tests/integration/table.rs
index ef73b56..046ec02 100644
--- a/crates/fluss/tests/integration/table.rs
+++ b/crates/fluss/tests/integration/table.rs
@@ -498,8 +498,10 @@ mod table_test {
         let all_ids: Vec<i32> = batches
             .iter()
             .flat_map(|b| {
-                (0..b.num_rows()).map(|i| {
-                    b.column(0)
+                let batch = b.batch();
+                (0..batch.num_rows()).map(move |i| {
+                    batch
+                        .column(0)
                         .as_any()
                         .downcast_ref::<Int32Array>()
                         .unwrap()
@@ -523,8 +525,10 @@ mod table_test {
         let new_ids: Vec<i32> = more
             .iter()
             .flat_map(|b| {
-                (0..b.num_rows()).map(|i| {
-                    b.column(0)
+                let batch = b.batch();
+                (0..batch.num_rows()).map(move |i| {
+                    batch
+                        .column(0)
                         .as_any()
                         .downcast_ref::<Int32Array>()
                         .unwrap()
@@ -544,8 +548,10 @@ mod table_test {
         let trunc_ids: Vec<i32> = trunc_batches
             .iter()
             .flat_map(|b| {
-                (0..b.num_rows()).map(|i| {
-                    b.column(0)
+                let batch = b.batch();
+                (0..batch.num_rows()).map(move |i| {
+                    batch
+                        .column(0)
                         .as_any()
                         .downcast_ref::<Int32Array>()
                         .unwrap()
@@ -568,6 +574,6 @@ mod table_test {
         let proj_batches = proj.poll(Duration::from_secs(10)).await.unwrap();
 
         // Projected batch should have 1 column (id), not 2 (id, name)
-        assert_eq!(proj_batches[0].num_columns(), 1);
+        assert_eq!(proj_batches[0].batch().num_columns(), 1);
     }
 }

Reply via email to