This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 6962c73 feat: log scanner support poll record batch directly
6962c73 is described below
commit 6962c73337abce351c62f9f1d7f8d23f50fe4286
Author: Anton Borisov <[email protected]>
AuthorDate: Sat Jan 10 02:00:12 2026 +0000
feat: log scanner support poll record batch directly
---
crates/fluss/src/client/table/log_fetch_buffer.rs | 54 ++++-
crates/fluss/src/client/table/mod.rs | 2 +-
crates/fluss/src/client/table/scanner.rs | 236 ++++++++++++++++++++--
crates/fluss/src/record/arrow.rs | 22 ++
crates/fluss/tests/integration/table.rs | 133 ++++++++++++
5 files changed, 430 insertions(+), 17 deletions(-)
diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index cee104e..e9bac53 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -15,12 +15,14 @@
// specific language governing permissions and limitations
// under the License.
+use arrow::array::RecordBatch;
+use parking_lot::Mutex;
+
use crate::error::Result;
use crate::metadata::TableBucket;
use crate::record::{
LogRecordBatch, LogRecordIterator, LogRecordsBatches, ReadContext,
ScanRecord,
};
-use parking_lot::Mutex;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
@@ -31,6 +33,7 @@ use tokio::sync::Notify;
pub trait CompletedFetch: Send + Sync {
fn table_bucket(&self) -> &TableBucket;
fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>>;
+ fn fetch_batches(&mut self, max_batches: usize) ->
Result<Vec<RecordBatch>>;
fn is_consumed(&self) -> bool;
fn drain(&mut self);
fn size_in_bytes(&self) -> usize;
@@ -318,6 +321,38 @@ impl DefaultCompletedFetch {
}
}
}
+
+ /// Get the next batch directly without row iteration
+ fn next_fetched_batch(&mut self) -> Result<Option<RecordBatch>> {
+ loop {
+ let Some(log_batch) = self.log_record_batch.next() else {
+ self.drain();
+ return Ok(None);
+ };
+
+ let mut record_batch = log_batch.record_batch(&self.read_context)?;
+
+ // Skip empty batches
+ if record_batch.num_rows() == 0 {
+ continue;
+ }
+
+ // Truncate batch
+ let base_offset = log_batch.base_log_offset();
+ if self.next_fetch_offset > base_offset {
+ let skip_count = (self.next_fetch_offset - base_offset) as
usize;
+ if skip_count >= record_batch.num_rows() {
+ continue;
+ }
+ // Slice the batch to skip the first skip_count rows
+ record_batch = record_batch.slice(skip_count,
record_batch.num_rows() - skip_count);
+ }
+
+ self.next_fetch_offset = log_batch.next_log_offset();
+ self.records_read += record_batch.num_rows();
+ return Ok(Some(record_batch));
+ }
+ }
}
impl CompletedFetch for DefaultCompletedFetch {
@@ -346,6 +381,23 @@ impl CompletedFetch for DefaultCompletedFetch {
Ok(scan_records)
}
+ fn fetch_batches(&mut self, max_batches: usize) ->
Result<Vec<RecordBatch>> {
+ if self.consumed {
+ return Ok(Vec::new());
+ }
+
+ let mut batches = Vec::with_capacity(max_batches.min(16));
+
+ for _ in 0..max_batches {
+ match self.next_fetched_batch()? {
+ Some(batch) => batches.push(batch),
+ None => break,
+ }
+ }
+
+ Ok(batches)
+ }
+
fn is_consumed(&self) -> bool {
self.consumed
}
diff --git a/crates/fluss/src/client/table/mod.rs
b/crates/fluss/src/client/table/mod.rs
index e2cf9e6..26341d7 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -32,7 +32,7 @@ mod scanner;
mod writer;
pub use append::{AppendWriter, TableAppend};
-pub use scanner::{LogScanner, TableScan};
+pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan};
#[allow(dead_code)]
pub struct FlussTable<'a> {
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index 4255bb6..7d22324 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -15,6 +15,16 @@
// specific language governing permissions and limitations
// under the License.
+use arrow::array::RecordBatch;
+use arrow_schema::SchemaRef;
+use log::{debug, error, warn};
+use parking_lot::{Mutex, RwLock};
+use std::collections::{HashMap, HashSet};
+use std::slice::from_ref;
+use std::sync::Arc;
+use std::time::Duration;
+use tempfile::TempDir;
+
use crate::client::connection::FlussConnection;
use crate::client::credentials::CredentialsCache;
use crate::client::metadata::Metadata;
@@ -30,14 +40,6 @@ use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket,
PbFetchLogReqForTabl
use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords,
to_arrow_schema};
use crate::rpc::{RpcClient, message};
use crate::util::FairBucketStatusMap;
-use arrow_schema::SchemaRef;
-use log::{debug, error, warn};
-use parking_lot::{Mutex, RwLock};
-use std::collections::{HashMap, HashSet};
-use std::slice::from_ref;
-use std::sync::Arc;
-use std::time::Duration;
-use tempfile::TempDir;
const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024;
#[allow(dead_code)]
@@ -216,16 +218,48 @@ impl<'a> TableScan<'a> {
}
pub fn create_log_scanner(self) -> Result<LogScanner> {
- LogScanner::new(
+ let inner = LogScannerInner::new(
&self.table_info,
self.metadata.clone(),
self.conn.get_connections(),
self.projected_fields,
- )
+ )?;
+ Ok(LogScanner {
+ inner: Arc::new(inner),
+ })
+ }
+
+ pub fn create_record_batch_log_scanner(self) ->
Result<RecordBatchLogScanner> {
+ let inner = LogScannerInner::new(
+ &self.table_info,
+ self.metadata.clone(),
+ self.conn.get_connections(),
+ self.projected_fields,
+ )?;
+ Ok(RecordBatchLogScanner {
+ inner: Arc::new(inner),
+ })
}
}
+/// Scanner for reading log records one at a time with per-record metadata.
+///
+/// Use this scanner when you need access to individual record offsets and
timestamps.
+/// For batch-level access, use [`RecordBatchLogScanner`] instead.
pub struct LogScanner {
+ inner: Arc<LogScannerInner>,
+}
+
+/// Scanner for reading log data as Arrow RecordBatches.
+///
+/// More efficient than [`LogScanner`] for batch-level analytics where
per-record
+/// metadata (offsets, timestamps) is not needed.
+pub struct RecordBatchLogScanner {
+ inner: Arc<LogScannerInner>,
+}
+
+/// Private shared implementation for both scanner types
+struct LogScannerInner {
table_path: TablePath,
table_id: i64,
metadata: Arc<Metadata>,
@@ -233,8 +267,8 @@ pub struct LogScanner {
log_fetcher: LogFetcher,
}
-impl LogScanner {
- pub fn new(
+impl LogScannerInner {
+ fn new(
table_info: &TableInfo,
metadata: Arc<Metadata>,
connections: Arc<RpcClient>,
@@ -256,7 +290,7 @@ impl LogScanner {
})
}
- pub async fn poll(&self, timeout: Duration) -> Result<ScanRecords> {
+ async fn poll_records(&self, timeout: Duration) -> Result<ScanRecords> {
let start = std::time::Instant::now();
let deadline = start + timeout;
@@ -295,7 +329,7 @@ impl LogScanner {
}
}
- pub async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
+ async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
let table_bucket = TableBucket::new(self.table_id, bucket);
self.metadata
.check_and_update_table_metadata(from_ref(&self.table_path))
@@ -305,7 +339,7 @@ impl LogScanner {
Ok(())
}
- pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) ->
Result<()> {
+ async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) ->
Result<()> {
self.metadata
.check_and_update_table_metadata(from_ref(&self.table_path))
.await?;
@@ -339,6 +373,76 @@ impl LogScanner {
// Collect completed fetches from buffer
self.log_fetcher.collect_fetches()
}
+
+ async fn poll_batches(&self, timeout: Duration) ->
Result<Vec<RecordBatch>> {
+ let start = std::time::Instant::now();
+ let deadline = start + timeout;
+
+ loop {
+ let batches = self.poll_for_batches().await?;
+
+ if !batches.is_empty() {
+ self.log_fetcher.send_fetches().await?;
+ return Ok(batches);
+ }
+
+ let now = std::time::Instant::now();
+ if now >= deadline {
+ return Ok(Vec::new());
+ }
+
+ let remaining = deadline - now;
+ let has_data = self
+ .log_fetcher
+ .log_fetch_buffer
+ .await_not_empty(remaining)
+ .await;
+
+ if !has_data {
+ return Ok(Vec::new());
+ }
+ }
+ }
+
+ async fn poll_for_batches(&self) -> Result<Vec<RecordBatch>> {
+ let result = self.log_fetcher.collect_batches()?;
+ if !result.is_empty() {
+ return Ok(result);
+ }
+
+ self.log_fetcher.send_fetches().await?;
+ self.log_fetcher.collect_batches()
+ }
+}
+
+// Implementation for LogScanner (records mode)
+impl LogScanner {
+ pub async fn poll(&self, timeout: Duration) -> Result<ScanRecords> {
+ self.inner.poll_records(timeout).await
+ }
+
+ pub async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
+ self.inner.subscribe(bucket, offset).await
+ }
+
+ pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) ->
Result<()> {
+ self.inner.subscribe_batch(bucket_offsets).await
+ }
+}
+
+// Implementation for RecordBatchLogScanner (batches mode)
+impl RecordBatchLogScanner {
+ pub async fn poll(&self, timeout: Duration) -> Result<Vec<RecordBatch>> {
+ self.inner.poll_batches(timeout).await
+ }
+
+ pub async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
+ self.inner.subscribe(bucket, offset).await
+ }
+
+ pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) ->
Result<()> {
+ self.inner.subscribe_batch(bucket_offsets).await
+ }
}
struct LogFetcher {
@@ -801,6 +905,108 @@ impl LogFetcher {
}
}
+ /// Collect completed fetches as RecordBatches
+ fn collect_batches(&self) -> Result<Vec<RecordBatch>> {
+ // Limit memory usage with both batch count and byte size constraints.
+ // Max 100 batches per poll, but also check total bytes (soft cap
~64MB).
+ const MAX_BATCHES: usize = 100;
+ const MAX_BYTES: usize = 64 * 1024 * 1024; // 64MB soft cap
+ let mut result: Vec<RecordBatch> = Vec::new();
+ let mut batches_remaining = MAX_BATCHES;
+ let mut bytes_consumed: usize = 0;
+
+ while batches_remaining > 0 && bytes_consumed < MAX_BYTES {
+ let next_in_line = self.log_fetch_buffer.next_in_line_fetch();
+
+ match next_in_line {
+ Some(mut next_fetch) if !next_fetch.is_consumed() => {
+ let batches =
+ self.fetch_batches_from_fetch(&mut next_fetch,
batches_remaining)?;
+ let batch_count = batches.len();
+
+ if !batches.is_empty() {
+ // Track bytes consumed (soft cap - may exceed by one
fetch)
+ let batch_bytes: usize =
+ batches.iter().map(|b|
b.get_array_memory_size()).sum();
+ bytes_consumed += batch_bytes;
+
+ result.extend(batches);
+ batches_remaining =
batches_remaining.saturating_sub(batch_count);
+ }
+
+ if !next_fetch.is_consumed() {
+ self.log_fetch_buffer
+ .set_next_in_line_fetch(Some(next_fetch));
+ }
+ }
+ _ => {
+ if let Some(completed_fetch) =
self.log_fetch_buffer.poll() {
+ if !completed_fetch.is_initialized() {
+ let size_in_bytes =
completed_fetch.size_in_bytes();
+ match self.initialize_fetch(completed_fetch) {
+ Ok(initialized) => {
+
self.log_fetch_buffer.set_next_in_line_fetch(initialized);
+ continue;
+ }
+ Err(e) => {
+ if result.is_empty() && size_in_bytes == 0
{
+ continue;
+ }
+ return Err(e);
+ }
+ }
+ } else {
+ self.log_fetch_buffer
+ .set_next_in_line_fetch(Some(completed_fetch));
+ }
+ } else {
+ break;
+ }
+ }
+ }
+ }
+
+ Ok(result)
+ }
+
+ fn fetch_batches_from_fetch(
+ &self,
+ next_in_line_fetch: &mut Box<dyn CompletedFetch>,
+ max_batches: usize,
+ ) -> Result<Vec<RecordBatch>> {
+ let table_bucket = next_in_line_fetch.table_bucket().clone();
+ let current_offset =
self.log_scanner_status.get_bucket_offset(&table_bucket);
+
+ if current_offset.is_none() {
+ warn!(
+ "Ignoring fetched batches for {table_bucket:?} since the
bucket has been unsubscribed"
+ );
+ next_in_line_fetch.drain();
+ return Ok(Vec::new());
+ }
+
+ let current_offset = current_offset.unwrap();
+ let fetch_offset = next_in_line_fetch.next_fetch_offset();
+
+ if fetch_offset == current_offset {
+ let batches = next_in_line_fetch.fetch_batches(max_batches)?;
+ let next_fetch_offset = next_in_line_fetch.next_fetch_offset();
+
+ if next_fetch_offset > current_offset {
+ self.log_scanner_status
+ .update_offset(&table_bucket, next_fetch_offset);
+ }
+
+ Ok(batches)
+ } else {
+ warn!(
+ "Ignoring fetched batches for {table_bucket:?} at offset
{fetch_offset} since the current offset is {current_offset}"
+ );
+ next_in_line_fetch.drain();
+ Ok(Vec::new())
+ }
+ }
+
async fn prepare_fetch_log_requests(&self) -> HashMap<i32,
FetchLogRequest> {
let mut fetch_log_req_for_buckets = HashMap::new();
let mut table_id = None;
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 5a5115e..89fb7b9 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -546,6 +546,28 @@ impl LogRecordBatch {
};
Ok(log_record_iterator)
}
+
+ /// Returns the record batch directly without creating an iterator.
+ /// This is more efficient when you need the entire batch rather than
+ /// iterating row-by-row.
+ pub fn record_batch(&self, read_context: &ReadContext) ->
Result<RecordBatch> {
+ if self.record_count() == 0 {
+ // Return empty batch with correct schema
+ return
Ok(RecordBatch::new_empty(read_context.target_schema.clone()));
+ }
+
+ let data = self.data.get(RECORDS_OFFSET..).ok_or_else(|| {
+ crate::error::Error::UnexpectedError {
+ message: format!(
+ "Corrupt log record batch: data length {} is less than
RECORDS_OFFSET {}",
+ self.data.len(),
+ RECORDS_OFFSET
+ ),
+ source: None,
+ }
+ })?;
+ read_context.record_batch(data)
+ }
}
/// Parse an Arrow IPC message from a byte slice.
diff --git a/crates/fluss/tests/integration/table.rs
b/crates/fluss/tests/integration/table.rs
index 0ac34c7..4cba469 100644
--- a/crates/fluss/tests/integration/table.rs
+++ b/crates/fluss/tests/integration/table.rs
@@ -469,4 +469,137 @@ mod table_test {
records.sort_by_key(|r| r.offset());
records
}
+
+ #[tokio::test]
+ async fn test_poll_batches() {
+ let cluster = get_fluss_cluster();
+ let connection = cluster.get_fluss_connection().await;
+ let admin = connection.get_admin().await.expect("Failed to get admin");
+
+ let table_path = TablePath::new("fluss".to_string(),
"test_poll_batches".to_string());
+ let schema = Schema::builder()
+ .column("id", DataTypes::int())
+ .column("name", DataTypes::string())
+ .build()
+ .unwrap();
+
+ create_table(
+ &admin,
+ &table_path,
+ &TableDescriptor::builder().schema(schema).build().unwrap(),
+ )
+ .await;
+ tokio::time::sleep(Duration::from_secs(1)).await;
+
+ let table = connection.get_table(&table_path).await.unwrap();
+ let scanner =
table.new_scan().create_record_batch_log_scanner().unwrap();
+ scanner.subscribe(0, 0).await.unwrap();
+
+ // Test 1: Empty table should return empty result
+ assert!(
+ scanner
+ .poll(Duration::from_millis(500))
+ .await
+ .unwrap()
+ .is_empty()
+ );
+
+ let writer = table.new_append().unwrap().create_writer();
+ writer
+ .append_arrow_batch(
+ record_batch!(("id", Int32, [1, 2]), ("name", Utf8, ["a",
"b"])).unwrap(),
+ )
+ .await
+ .unwrap();
+ writer
+ .append_arrow_batch(
+ record_batch!(("id", Int32, [3, 4]), ("name", Utf8, ["c",
"d"])).unwrap(),
+ )
+ .await
+ .unwrap();
+ writer
+ .append_arrow_batch(
+ record_batch!(("id", Int32, [5, 6]), ("name", Utf8, ["e",
"f"])).unwrap(),
+ )
+ .await
+ .unwrap();
+ writer.flush().await.unwrap();
+
+ use arrow::array::Int32Array;
+ let batches = scanner.poll(Duration::from_secs(10)).await.unwrap();
+ let mut all_ids: Vec<i32> = batches
+ .iter()
+ .flat_map(|b| {
+ (0..b.num_rows()).map(|i| {
+ b.column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap()
+ .value(i)
+ })
+ })
+ .collect();
+
+ // Test 2: Order should be preserved across multiple batches
+ assert_eq!(all_ids, vec![1, 2, 3, 4, 5, 6]);
+
+ writer
+ .append_arrow_batch(
+ record_batch!(("id", Int32, [7, 8]), ("name", Utf8, ["g",
"h"])).unwrap(),
+ )
+ .await
+ .unwrap();
+ writer.flush().await.unwrap();
+
+ let more = scanner.poll(Duration::from_secs(10)).await.unwrap();
+ let new_ids: Vec<i32> = more
+ .iter()
+ .flat_map(|b| {
+ (0..b.num_rows()).map(|i| {
+ b.column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap()
+ .value(i)
+ })
+ })
+ .collect();
+
+ // Test 3: Subsequent polls should not return duplicate data (offset
continuation)
+ assert_eq!(new_ids, vec![7, 8]);
+
+ // Test 4: Subscribing from mid-offset should truncate batch (Arrow
batch slicing)
+ // Server returns all records from start of batch, but client
truncates to subscription offset
+ let trunc_scanner =
table.new_scan().create_record_batch_log_scanner().unwrap();
+ trunc_scanner.subscribe(0, 3).await.unwrap();
+ let trunc_batches =
trunc_scanner.poll(Duration::from_secs(10)).await.unwrap();
+ let trunc_ids: Vec<i32> = trunc_batches
+ .iter()
+ .flat_map(|b| {
+ (0..b.num_rows()).map(|i| {
+ b.column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap()
+ .value(i)
+ })
+ })
+ .collect();
+
+ // Subscribing from offset 3 should return [4,5,6,7,8], not
[1,2,3,4,5,6,7,8]
+ assert_eq!(trunc_ids, vec![4, 5, 6, 7, 8]);
+
+ // Test 5: Projection should only return requested columns
+ let proj = table
+ .new_scan()
+ .project_by_name(&["id"])
+ .unwrap()
+ .create_record_batch_log_scanner()
+ .unwrap();
+ proj.subscribe(0, 0).await.unwrap();
+ let proj_batches = proj.poll(Duration::from_secs(10)).await.unwrap();
+
+ // Projected batch should have 1 column (id), not 2 (id, name)
+ assert_eq!(proj_batches[0].num_columns(), 1);
+ }
}