This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 0b53d37 chore: support estimated_size_in_bytes for write batch (#219)
0b53d37 is described below
commit 0b53d371f761d64e8d8be393bdecb5eceef45e5d
Author: yuxia Luo <[email protected]>
AuthorDate: Thu Jan 29 09:35:04 2026 +0800
chore: support estimated_size_in_bytes for write batch (#219)
---
crates/fluss/src/client/write/accumulator.rs | 4 +-
crates/fluss/src/client/write/batch.rs | 172 ++++++++++++++++++++++++++-
crates/fluss/src/cluster/cluster.rs | 3 +-
crates/fluss/src/record/arrow.rs | 26 ++++
4 files changed, 198 insertions(+), 7 deletions(-)
diff --git a/crates/fluss/src/client/write/accumulator.rs
b/crates/fluss/src/client/write/accumulator.rs
index 624e7c4..a5b9832 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -311,7 +311,7 @@ impl RecordAccumulator {
node: &ServerNode,
max_size: i32,
) -> Result<Vec<ReadyWriteBatch>> {
- let mut size = 0;
+ let mut size: usize = 0;
let buckets = self.get_all_buckets_in_current_node(node, cluster);
let mut ready = Vec::new();
@@ -354,7 +354,7 @@ impl RecordAccumulator {
if !batch_lock.is_empty() {
let first_batch = batch_lock.front().unwrap();
- if size + first_batch.estimated_size_in_bytes() >
max_size as i64
+ if size + first_batch.estimated_size_in_bytes() >
max_size as usize
&& !ready.is_empty()
{
// there is a rare case that a single batch size
is larger than the request size
diff --git a/crates/fluss/src/client/write/batch.rs
b/crates/fluss/src/client/write/batch.rs
index 78381c6..41561d4 100644
--- a/crates/fluss/src/client/write/batch.rs
+++ b/crates/fluss/src/client/write/batch.rs
@@ -132,9 +132,11 @@ impl WriteBatch {
}
}
- pub fn estimated_size_in_bytes(&self) -> i64 {
- 0
- // todo: calculate estimated_size_in_bytes
+ pub fn estimated_size_in_bytes(&self) -> usize {
+ match self {
+ WriteBatch::ArrowLog(batch) => batch.estimated_size_in_bytes(),
+ WriteBatch::Kv(batch) => batch.estimated_size_in_bytes(),
+ }
}
pub fn is_closed(&self) -> bool {
@@ -245,6 +247,18 @@ impl ArrowLogWriteBatch {
pub fn close(&mut self) {
self.arrow_builder.close()
}
+
+ /// Get an estimate of the number of bytes written to the underlying
buffer.
+ /// The returned value is exactly correct if the batch has been built.
+ pub fn estimated_size_in_bytes(&self) -> usize {
+ if let Some(ref bytes) = self.built_records {
+ // Return actual size if already built
+ bytes.len()
+ } else {
+ // Delegate to arrow builder for estimated size
+ self.arrow_builder.estimated_size_in_bytes()
+ }
+ }
}
pub struct KvWriteBatch {
@@ -340,11 +354,18 @@ impl KvWriteBatch {
pub fn target_columns(&self) -> Option<&Arc<Vec<usize>>> {
self.target_columns.as_ref()
}
+
+ /// Get an estimate of the number of bytes written to the underlying
buffer.
+ /// This returns the current size including header and all appended
records.
+ pub fn estimated_size_in_bytes(&self) -> usize {
+ self.kv_batch_builder.get_size_in_bytes()
+ }
}
#[cfg(test)]
mod tests {
use super::*;
+ use crate::client::{RowBytes, WriteFormat};
use crate::metadata::TablePath;
#[test]
@@ -363,4 +384,149 @@ mod tests {
batch.re_enqueued();
assert_eq!(batch.attempts(), 1);
}
+
+ #[test]
+ fn test_arrow_log_write_batch_estimated_size() {
+ use crate::client::WriteRecord;
+ use crate::compression::{
+ ArrowCompressionInfo, ArrowCompressionType,
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
+ };
+ use crate::metadata::{DataField, DataTypes, RowType};
+ use crate::row::GenericRow;
+ use arrow::array::{Int32Array, RecordBatch, StringArray};
+ use std::sync::Arc;
+
+ let row_type = RowType::new(vec![
+ DataField::new("id".to_string(), DataTypes::int(), None),
+ DataField::new("name".to_string(), DataTypes::string(), None),
+ ]);
+ let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+
+ // Test 1: RowAppendRecordBatchBuilder (to_append_record_batch=false)
+ {
+ let mut batch = ArrowLogWriteBatch::new(
+ 1,
+ table_path.clone(),
+ 1,
+ ArrowCompressionInfo {
+ compression_type: ArrowCompressionType::None,
+ compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
+ },
+ &row_type,
+ 0,
+ 0,
+ false,
+ )
+ .unwrap();
+
+ // Append rows
+ for _ in 0..200 {
+ let mut row = GenericRow::new(2);
+ row.set_field(0, 1_i32);
+ row.set_field(1, "hello");
+ let record =
WriteRecord::for_append(Arc::new(table_path.clone()), 1, row);
+ batch.try_append(&record).unwrap();
+ }
+
+ let estimated_size = batch.estimated_size_in_bytes();
+ assert!(estimated_size > 0);
+
+ let built_data = batch.build().unwrap();
+ let actual_size = built_data.len();
+
+ let diff = actual_size - estimated_size;
+ let threshold = actual_size / 10; // 10% tolerance
+ assert!(
+ diff <= threshold,
+ "RowAppend: estimated_size {estimated_size} and actual_size
{actual_size} differ by more than 10%"
+ );
+ }
+
+ // Test 2: PrebuiltRecordBatchBuilder (to_append_record_batch=true)
+ {
+ let mut batch = ArrowLogWriteBatch::new(
+ 1,
+ table_path.clone(),
+ 1,
+ ArrowCompressionInfo {
+ compression_type: ArrowCompressionType::None,
+ compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
+ },
+ &row_type,
+ 0,
+ 0,
+ true,
+ )
+ .unwrap();
+
+ // Create a pre-built RecordBatch
+ let schema = crate::record::to_arrow_schema(&row_type).unwrap();
+ let ids: Vec<i32> = (0..200).collect();
+ let names: Vec<&str> = (0..200).map(|_| "hello").collect();
+ let record_batch = RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(Int32Array::from(ids)),
+ Arc::new(StringArray::from(names)),
+ ],
+ )
+ .unwrap();
+
+ let record =
+
WriteRecord::for_append_record_batch(Arc::new(table_path.clone()), 1,
record_batch);
+ batch.try_append(&record).unwrap();
+
+ let estimated_size = batch.estimated_size_in_bytes();
+ assert!(estimated_size > 0);
+
+ let built_data = batch.build().unwrap();
+ let actual_size = built_data.len();
+
+ let diff = actual_size - estimated_size;
+ let threshold = actual_size / 10; // 10% tolerance
+ assert!(
+ diff <= threshold,
+ "Prebuilt: estimated_size {estimated_size} and actual_size
{actual_size} differ by more than 10%"
+ );
+ }
+ }
+
+ #[test]
+ fn test_kv_write_batch_estimated_size() {
+ use crate::metadata::KvFormat;
+
+ let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+
+ let mut batch = KvWriteBatch::new(
+ 1,
+ table_path.clone(),
+ 1,
+ KvWriteBatch::DEFAULT_WRITE_LIMIT,
+ KvFormat::COMPACTED,
+ 0,
+ None,
+ 0,
+ );
+
+ for _ in 0..200 {
+ let record = WriteRecord::for_upsert(
+ Arc::new(table_path.clone()),
+ 1,
+ Bytes::from(vec![1_u8, 2_u8, 3_u8]),
+ None,
+ WriteFormat::CompactedKv,
+ None,
+ Some(RowBytes::Owned(Bytes::from(vec![1_u8, 2_u8, 3_u8]))),
+ );
+ batch.try_append(&record).unwrap();
+ }
+
+ let estimated_size = batch.estimated_size_in_bytes();
+ let actual_size = batch.build().unwrap().len();
+
+ assert_eq!(
+ actual_size, estimated_size,
+ "estimated size {estimated_size} is not equal to actual size"
+ );
+ }
}
diff --git a/crates/fluss/src/cluster/cluster.rs
b/crates/fluss/src/cluster/cluster.rs
index 0b14fe6..d6fe0ae 100644
--- a/crates/fluss/src/cluster/cluster.rs
+++ b/crates/fluss/src/cluster/cluster.rs
@@ -191,8 +191,7 @@ impl Cluster {
&serde_json::from_slice(table_metadata.table_json.as_slice()).map_err(|e| {
Error::JsonSerdeError {
message: format!(
- "Error deserializing table_json into
TableDescriptor for table_id {} and table_path {}: {}",
- table_id, table_path, e
+ "Error deserializing table_json into
TableDescriptor for table_id {table_id} and table_path {table_path}: {e}"
)
}
})?,
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 63df6de..6340dc8 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -175,6 +175,9 @@ pub trait ArrowRecordBatchInnerBuilder: Send + Sync {
fn records_count(&self) -> i32;
fn is_full(&self) -> bool;
+
+ /// Get an estimate of the size in bytes of the arrow data.
+ fn estimated_size_in_bytes(&self) -> usize;
}
#[derive(Default)]
@@ -214,6 +217,13 @@ impl ArrowRecordBatchInnerBuilder for
PrebuiltRecordBatchBuilder {
// full if has one record batch
self.arrow_record_batch.is_some()
}
+
+ fn estimated_size_in_bytes(&self) -> usize {
+ self.arrow_record_batch
+ .as_ref()
+ .map(|batch| batch.get_array_memory_size())
+ .unwrap_or(0)
+ }
}
pub struct RowAppendRecordBatchBuilder {
@@ -361,6 +371,16 @@ impl ArrowRecordBatchInnerBuilder for
RowAppendRecordBatchBuilder {
fn is_full(&self) -> bool {
self.records_count() >= DEFAULT_MAX_RECORD
}
+
+ fn estimated_size_in_bytes(&self) -> usize {
+ // Returns the uncompressed Arrow array memory size (same as Java's
arrowWriter.estimatedSizeInBytes()).
+ // Note: This is the size before compression. After build(), the
actual size may be smaller
+ // if compression is enabled.
+ self.arrow_column_builders
+ .iter()
+ .map(|builder| builder.finish_cloned().get_array_memory_size())
+ .sum()
+ }
}
impl MemoryLogRecordsArrowBuilder {
@@ -481,6 +501,12 @@ impl MemoryLogRecordsArrowBuilder {
cursor.write_i32::<LittleEndian>(record_count)?;
Ok(())
}
+
+ /// Get an estimate of the number of bytes written to the underlying
buffer.
+ /// This includes the batch header size plus the estimated arrow data size.
+ pub fn estimated_size_in_bytes(&self) -> usize {
+ RECORD_BATCH_HEADER_SIZE +
self.arrow_record_batch_builder.estimated_size_in_bytes()
+ }
}
pub trait ToArrow {