This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 2b5fc64 chore: add more info for RecordBatches (#211)
2b5fc64 is described below
commit 2b5fc64d95967aae374e6de17c4c1c9dd61a1cb9
Author: Anton Borisov <[email protected]>
AuthorDate: Sun Jan 25 08:31:08 2026 +0000
chore: add more info for RecordBatches (#211)
---
crates/fluss/src/client/table/log_fetch_buffer.rs | 28 +++++----
crates/fluss/src/client/table/scanner.rs | 44 ++++++++-----
crates/fluss/src/record/mod.rs | 77 +++++++++++++++++++++++
crates/fluss/tests/integration/table.rs | 20 +++---
4 files changed, 133 insertions(+), 36 deletions(-)
diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index 4a64eda..b529806 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -65,7 +65,7 @@ pub trait CompletedFetch: Send + Sync {
fn fetch_error_context(&self) -> Option<&FetchErrorContext>;
fn take_error(&mut self) -> Option<Error>;
fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>>;
- fn fetch_batches(&mut self, max_batches: usize) ->
Result<Vec<RecordBatch>>;
+ fn fetch_batches(&mut self, max_batches: usize) ->
Result<Vec<(RecordBatch, i64)>>;
fn is_consumed(&self) -> bool;
fn records_read(&self) -> usize;
fn drain(&mut self);
@@ -476,8 +476,9 @@ impl DefaultCompletedFetch {
source: None,
}
}
- /// Get the next batch directly without row iteration
- fn next_fetched_batch(&mut self) -> Result<Option<RecordBatch>> {
+ /// Get the next batch with its base offset.
+ /// Returns (RecordBatch, base_offset) where base_offset is the offset of
the first record.
+ fn next_fetched_batch(&mut self) -> Result<Option<(RecordBatch, i64)>> {
loop {
let Some(log_batch_result) = self.log_record_batch.next() else {
self.drain();
@@ -492,20 +493,23 @@ impl DefaultCompletedFetch {
continue;
}
- // Truncate batch
- let base_offset = log_batch.base_log_offset();
- if self.next_fetch_offset > base_offset {
- let skip_count = (self.next_fetch_offset - base_offset) as
usize;
+ // Calculate the effective base offset for this batch
+ let log_base_offset = log_batch.base_log_offset();
+ let effective_base_offset = if self.next_fetch_offset >
log_base_offset {
+ let skip_count = (self.next_fetch_offset - log_base_offset) as
usize;
if skip_count >= record_batch.num_rows() {
continue;
}
// Slice the batch to skip the first skip_count rows
record_batch = record_batch.slice(skip_count,
record_batch.num_rows() - skip_count);
- }
+ self.next_fetch_offset
+ } else {
+ log_base_offset
+ };
self.next_fetch_offset = log_batch.next_log_offset();
self.records_read += record_batch.num_rows();
- return Ok(Some(record_batch));
+ return Ok(Some((record_batch, effective_base_offset)));
}
}
}
@@ -585,7 +589,7 @@ impl CompletedFetch for DefaultCompletedFetch {
Ok(scan_records)
}
- fn fetch_batches(&mut self, max_batches: usize) ->
Result<Vec<RecordBatch>> {
+ fn fetch_batches(&mut self, max_batches: usize) ->
Result<Vec<(RecordBatch, i64)>> {
if let Some(error) = self.error.take() {
return Err(error);
}
@@ -607,7 +611,7 @@ impl CompletedFetch for DefaultCompletedFetch {
for _ in 0..max_batches {
match self.next_fetched_batch()? {
- Some(batch) => batches.push(batch),
+ Some(batch_with_offset) => batches.push(batch_with_offset),
None => break,
}
}
@@ -692,7 +696,7 @@ impl CompletedFetch for RemoteCompletedFetch {
self.inner.fetch_records(max_records)
}
- fn fetch_batches(&mut self, max_batches: usize) ->
Result<Vec<RecordBatch>> {
+ fn fetch_batches(&mut self, max_batches: usize) ->
Result<Vec<(RecordBatch, i64)>> {
self.inner.fetch_batches(max_batches)
}
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index 61ed56e..dbebe1a 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use arrow::array::RecordBatch;
use arrow_schema::SchemaRef;
use log::{debug, warn};
use parking_lot::{Mutex, RwLock};
@@ -39,7 +38,9 @@ use crate::client::table::remote_log::{RemoteLogDownloader,
RemoteLogFetchInfo};
use crate::error::{ApiError, Error, FlussError, Result};
use crate::metadata::{PhysicalTablePath, TableBucket, TableInfo, TablePath};
use crate::proto::{ErrorResponse, FetchLogRequest, PbFetchLogReqForBucket,
PbFetchLogReqForTable};
-use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords,
to_arrow_schema};
+use crate::record::{
+ LogRecordsBatches, ReadContext, ScanBatch, ScanRecord, ScanRecords,
to_arrow_schema,
+};
use crate::rpc::{RpcClient, RpcError, message};
use crate::util::FairBucketStatusMap;
@@ -380,7 +381,7 @@ impl LogScannerInner {
self.log_fetcher.collect_fetches()
}
- async fn poll_batches(&self, timeout: Duration) ->
Result<Vec<RecordBatch>> {
+ async fn poll_batches(&self, timeout: Duration) -> Result<Vec<ScanBatch>> {
let start = Instant::now();
let deadline = start + timeout;
@@ -410,7 +411,7 @@ impl LogScannerInner {
}
}
- async fn poll_for_batches(&self) -> Result<Vec<RecordBatch>> {
+ async fn poll_for_batches(&self) -> Result<Vec<ScanBatch>> {
let result = self.log_fetcher.collect_batches()?;
if !result.is_empty() {
return Ok(result);
@@ -438,7 +439,8 @@ impl LogScanner {
// Implementation for RecordBatchLogScanner (batches mode)
impl RecordBatchLogScanner {
- pub async fn poll(&self, timeout: Duration) -> Result<Vec<RecordBatch>> {
+ /// Poll for batches with metadata (bucket and offset information).
+ pub async fn poll(&self, timeout: Duration) -> Result<Vec<ScanBatch>> {
self.inner.poll_batches(timeout).await
}
@@ -1134,13 +1136,13 @@ impl LogFetcher {
}
}
- /// Collect completed fetches as RecordBatches
- fn collect_batches(&self) -> Result<Vec<RecordBatch>> {
+ /// Collect completed fetches as ScanBatches (with bucket and offset
metadata)
+ fn collect_batches(&self) -> Result<Vec<ScanBatch>> {
// Limit memory usage with both batch count and byte size constraints.
// Max 100 batches per poll, but also check total bytes (soft cap
~64MB).
const MAX_BATCHES: usize = 100;
const MAX_BYTES: usize = 64 * 1024 * 1024; // 64MB soft cap
- let mut result: Vec<RecordBatch> = Vec::new();
+ let mut result: Vec<ScanBatch> = Vec::new();
let mut batches_remaining = MAX_BATCHES;
let mut bytes_consumed: usize = 0;
@@ -1150,17 +1152,19 @@ impl LogFetcher {
match next_in_line {
Some(mut next_fetch) if !next_fetch.is_consumed() => {
- let batches =
+ let scan_batches =
self.fetch_batches_from_fetch(&mut next_fetch,
batches_remaining)?;
- let batch_count = batches.len();
+ let batch_count = scan_batches.len();
- if !batches.is_empty() {
+ if !scan_batches.is_empty() {
// Track bytes consumed (soft cap - may exceed by
one fetch)
- let batch_bytes: usize =
- batches.iter().map(|b|
b.get_array_memory_size()).sum();
+ let batch_bytes: usize = scan_batches
+ .iter()
+ .map(|sb| sb.batch().get_array_memory_size())
+ .sum();
bytes_consumed += batch_bytes;
- result.extend(batches);
+ result.extend(scan_batches);
batches_remaining =
batches_remaining.saturating_sub(batch_count);
}
@@ -1214,7 +1218,7 @@ impl LogFetcher {
&self,
next_in_line_fetch: &mut Box<dyn CompletedFetch>,
max_batches: usize,
- ) -> Result<Vec<RecordBatch>> {
+ ) -> Result<Vec<ScanBatch>> {
let table_bucket = next_in_line_fetch.table_bucket().clone();
let current_offset =
self.log_scanner_status.get_bucket_offset(&table_bucket);
@@ -1230,7 +1234,7 @@ impl LogFetcher {
let fetch_offset = next_in_line_fetch.next_fetch_offset();
if fetch_offset == current_offset {
- let batches = next_in_line_fetch.fetch_batches(max_batches)?;
+ let batches_with_offsets =
next_in_line_fetch.fetch_batches(max_batches)?;
let next_fetch_offset = next_in_line_fetch.next_fetch_offset();
if next_fetch_offset > current_offset {
@@ -1238,7 +1242,13 @@ impl LogFetcher {
.update_offset(&table_bucket, next_fetch_offset);
}
- Ok(batches)
+ // Convert to ScanBatch with bucket info
+ Ok(batches_with_offsets
+ .into_iter()
+ .map(|(batch, base_offset)| {
+ ScanBatch::new(table_bucket.clone(), batch, base_offset)
+ })
+ .collect())
} else {
warn!(
"Ignoring fetched batches for {table_bucket:?} at offset
{fetch_offset} since the current offset is {current_offset}"
diff --git a/crates/fluss/src/record/mod.rs b/crates/fluss/src/record/mod.rs
index 94997e8..8438b16 100644
--- a/crates/fluss/src/record/mod.rs
+++ b/crates/fluss/src/record/mod.rs
@@ -17,6 +17,7 @@
use crate::metadata::TableBucket;
use crate::row::ColumnarRow;
+use ::arrow::array::RecordBatch;
use core::fmt;
use std::collections::HashMap;
@@ -170,6 +171,60 @@ impl ScanRecords {
}
}
+/// A batch of records with metadata about bucket and offsets.
+///
+/// This is the batch-level equivalent of [`ScanRecord`], providing efficient
+/// access to Arrow RecordBatches while preserving the bucket and offset
information
+/// needed for tracking consumption progress.
+#[derive(Debug, Clone)]
+pub struct ScanBatch {
+ /// The bucket this batch belongs to
+ bucket: TableBucket,
+ /// The Arrow RecordBatch containing the data
+ batch: RecordBatch,
+ /// Offset of the first record in this batch
+ base_offset: i64,
+}
+
+impl ScanBatch {
+ pub fn new(bucket: TableBucket, batch: RecordBatch, base_offset: i64) ->
Self {
+ Self {
+ bucket,
+ batch,
+ base_offset,
+ }
+ }
+
+ pub fn bucket(&self) -> &TableBucket {
+ &self.bucket
+ }
+
+ pub fn batch(&self) -> &RecordBatch {
+ &self.batch
+ }
+
+ pub fn into_batch(self) -> RecordBatch {
+ self.batch
+ }
+
+ pub fn base_offset(&self) -> i64 {
+ self.base_offset
+ }
+
+ pub fn num_records(&self) -> usize {
+ self.batch.num_rows()
+ }
+
+ /// Returns the offset of the last record in this batch.
+ pub fn last_offset(&self) -> i64 {
+ if self.batch.num_rows() == 0 {
+ self.base_offset - 1
+ } else {
+ self.base_offset + self.batch.num_rows() as i64 - 1
+ }
+ }
+}
+
impl IntoIterator for ScanRecords {
type Item = ScanRecord;
type IntoIter = std::vec::IntoIter<ScanRecord>;
@@ -243,4 +298,26 @@ mod tests {
assert_eq!(record.timestamp(), -1);
assert_eq!(record.change_type(), &ChangeType::Insert);
}
+
+ #[test]
+ fn scan_batch_last_offset() {
+ let schema = Arc::new(Schema::new(vec![Field::new("v",
DataType::Int32, false)]));
+ let bucket = TableBucket::new(1, 0);
+
+ // Batch with 3 records starting at offset 100 -> last_offset = 102
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
+ )
+ .unwrap();
+ let scan_batch = ScanBatch::new(bucket.clone(), batch, 100);
+ assert_eq!(scan_batch.num_records(), 3);
+ assert_eq!(scan_batch.last_offset(), 102);
+
+ // Empty batch -> last_offset = base_offset - 1
+ let empty_batch = RecordBatch::new_empty(schema);
+ let empty_scan_batch = ScanBatch::new(bucket, empty_batch, 100);
+ assert_eq!(empty_scan_batch.num_records(), 0);
+ assert_eq!(empty_scan_batch.last_offset(), 99);
+ }
}
diff --git a/crates/fluss/tests/integration/table.rs
b/crates/fluss/tests/integration/table.rs
index ef73b56..046ec02 100644
--- a/crates/fluss/tests/integration/table.rs
+++ b/crates/fluss/tests/integration/table.rs
@@ -498,8 +498,10 @@ mod table_test {
let all_ids: Vec<i32> = batches
.iter()
.flat_map(|b| {
- (0..b.num_rows()).map(|i| {
- b.column(0)
+ let batch = b.batch();
+ (0..batch.num_rows()).map(move |i| {
+ batch
+ .column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
@@ -523,8 +525,10 @@ mod table_test {
let new_ids: Vec<i32> = more
.iter()
.flat_map(|b| {
- (0..b.num_rows()).map(|i| {
- b.column(0)
+ let batch = b.batch();
+ (0..batch.num_rows()).map(move |i| {
+ batch
+ .column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
@@ -544,8 +548,10 @@ mod table_test {
let trunc_ids: Vec<i32> = trunc_batches
.iter()
.flat_map(|b| {
- (0..b.num_rows()).map(|i| {
- b.column(0)
+ let batch = b.batch();
+ (0..batch.num_rows()).map(move |i| {
+ batch
+ .column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
@@ -568,6 +574,6 @@ mod table_test {
let proj_batches = proj.poll(Duration::from_secs(10)).await.unwrap();
// Projected batch should have 1 column (id), not 2 (id, name)
- assert_eq!(proj_batches[0].num_columns(), 1);
+ assert_eq!(proj_batches[0].batch().num_columns(), 1);
}
}