This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b53d37  chore: support estimated_size_in_bytes for write batch (#219)
0b53d37 is described below

commit 0b53d371f761d64e8d8be393bdecb5eceef45e5d
Author: yuxia Luo <[email protected]>
AuthorDate: Thu Jan 29 09:35:04 2026 +0800

    chore: support estimated_size_in_bytes for write batch (#219)
---
 crates/fluss/src/client/write/accumulator.rs |   4 +-
 crates/fluss/src/client/write/batch.rs       | 172 ++++++++++++++++++++++++++-
 crates/fluss/src/cluster/cluster.rs          |   3 +-
 crates/fluss/src/record/arrow.rs             |  26 ++++
 4 files changed, 198 insertions(+), 7 deletions(-)

diff --git a/crates/fluss/src/client/write/accumulator.rs 
b/crates/fluss/src/client/write/accumulator.rs
index 624e7c4..a5b9832 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -311,7 +311,7 @@ impl RecordAccumulator {
         node: &ServerNode,
         max_size: i32,
     ) -> Result<Vec<ReadyWriteBatch>> {
-        let mut size = 0;
+        let mut size: usize = 0;
         let buckets = self.get_all_buckets_in_current_node(node, cluster);
         let mut ready = Vec::new();
 
@@ -354,7 +354,7 @@ impl RecordAccumulator {
                     if !batch_lock.is_empty() {
                         let first_batch = batch_lock.front().unwrap();
 
-                        if size + first_batch.estimated_size_in_bytes() > 
max_size as i64
+                        if size + first_batch.estimated_size_in_bytes() > 
max_size as usize
                             && !ready.is_empty()
                         {
                             // there is a rare case that a single batch size 
is larger than the request size
diff --git a/crates/fluss/src/client/write/batch.rs 
b/crates/fluss/src/client/write/batch.rs
index 78381c6..41561d4 100644
--- a/crates/fluss/src/client/write/batch.rs
+++ b/crates/fluss/src/client/write/batch.rs
@@ -132,9 +132,11 @@ impl WriteBatch {
         }
     }
 
-    pub fn estimated_size_in_bytes(&self) -> i64 {
-        0
-        // todo: calculate estimated_size_in_bytes
+    pub fn estimated_size_in_bytes(&self) -> usize {
+        match self {
+            WriteBatch::ArrowLog(batch) => batch.estimated_size_in_bytes(),
+            WriteBatch::Kv(batch) => batch.estimated_size_in_bytes(),
+        }
     }
 
     pub fn is_closed(&self) -> bool {
@@ -245,6 +247,18 @@ impl ArrowLogWriteBatch {
     pub fn close(&mut self) {
         self.arrow_builder.close()
     }
+
+    /// Get an estimate of the number of bytes written to the underlying 
buffer.
+    /// The returned value is exactly correct if the batch has been built.
+    pub fn estimated_size_in_bytes(&self) -> usize {
+        if let Some(ref bytes) = self.built_records {
+            // Return actual size if already built
+            bytes.len()
+        } else {
+            // Delegate to arrow builder for estimated size
+            self.arrow_builder.estimated_size_in_bytes()
+        }
+    }
 }
 
 pub struct KvWriteBatch {
@@ -340,11 +354,18 @@ impl KvWriteBatch {
     pub fn target_columns(&self) -> Option<&Arc<Vec<usize>>> {
         self.target_columns.as_ref()
     }
+
+    /// Get an estimate of the number of bytes written to the underlying 
buffer.
+    /// This returns the current size including header and all appended 
records.
+    pub fn estimated_size_in_bytes(&self) -> usize {
+        self.kv_batch_builder.get_size_in_bytes()
+    }
 }
 
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::client::{RowBytes, WriteFormat};
     use crate::metadata::TablePath;
 
     #[test]
@@ -363,4 +384,149 @@ mod tests {
         batch.re_enqueued();
         assert_eq!(batch.attempts(), 1);
     }
+
+    #[test]
+    fn test_arrow_log_write_batch_estimated_size() {
+        use crate::client::WriteRecord;
+        use crate::compression::{
+            ArrowCompressionInfo, ArrowCompressionType, 
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
+        };
+        use crate::metadata::{DataField, DataTypes, RowType};
+        use crate::row::GenericRow;
+        use arrow::array::{Int32Array, RecordBatch, StringArray};
+        use std::sync::Arc;
+
+        let row_type = RowType::new(vec![
+            DataField::new("id".to_string(), DataTypes::int(), None),
+            DataField::new("name".to_string(), DataTypes::string(), None),
+        ]);
+        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+
+        // Test 1: RowAppendRecordBatchBuilder (to_append_record_batch=false)
+        {
+            let mut batch = ArrowLogWriteBatch::new(
+                1,
+                table_path.clone(),
+                1,
+                ArrowCompressionInfo {
+                    compression_type: ArrowCompressionType::None,
+                    compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
+                },
+                &row_type,
+                0,
+                0,
+                false,
+            )
+            .unwrap();
+
+            // Append rows
+            for _ in 0..200 {
+                let mut row = GenericRow::new(2);
+                row.set_field(0, 1_i32);
+                row.set_field(1, "hello");
+                let record = 
WriteRecord::for_append(Arc::new(table_path.clone()), 1, row);
+                batch.try_append(&record).unwrap();
+            }
+
+            let estimated_size = batch.estimated_size_in_bytes();
+            assert!(estimated_size > 0);
+
+            let built_data = batch.build().unwrap();
+            let actual_size = built_data.len();
+
+            let diff = actual_size - estimated_size;
+            let threshold = actual_size / 10; // 10% tolerance
+            assert!(
+                diff <= threshold,
+                "RowAppend: estimated_size {estimated_size} and actual_size 
{actual_size} differ by more than 10%"
+            );
+        }
+
+        // Test 2: PrebuiltRecordBatchBuilder (to_append_record_batch=true)
+        {
+            let mut batch = ArrowLogWriteBatch::new(
+                1,
+                table_path.clone(),
+                1,
+                ArrowCompressionInfo {
+                    compression_type: ArrowCompressionType::None,
+                    compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
+                },
+                &row_type,
+                0,
+                0,
+                true,
+            )
+            .unwrap();
+
+            // Create a pre-built RecordBatch
+            let schema = crate::record::to_arrow_schema(&row_type).unwrap();
+            let ids: Vec<i32> = (0..200).collect();
+            let names: Vec<&str> = (0..200).map(|_| "hello").collect();
+            let record_batch = RecordBatch::try_new(
+                schema,
+                vec![
+                    Arc::new(Int32Array::from(ids)),
+                    Arc::new(StringArray::from(names)),
+                ],
+            )
+            .unwrap();
+
+            let record =
+                
WriteRecord::for_append_record_batch(Arc::new(table_path.clone()), 1, 
record_batch);
+            batch.try_append(&record).unwrap();
+
+            let estimated_size = batch.estimated_size_in_bytes();
+            assert!(estimated_size > 0);
+
+            let built_data = batch.build().unwrap();
+            let actual_size = built_data.len();
+
+            let diff = actual_size - estimated_size;
+            let threshold = actual_size / 10; // 10% tolerance
+            assert!(
+                diff <= threshold,
+                "Prebuilt: estimated_size {estimated_size} and actual_size 
{actual_size} differ by more than 10%"
+            );
+        }
+    }
+
+    #[test]
+    fn test_kv_write_batch_estimated_size() {
+        use crate::metadata::KvFormat;
+
+        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+
+        let mut batch = KvWriteBatch::new(
+            1,
+            table_path.clone(),
+            1,
+            KvWriteBatch::DEFAULT_WRITE_LIMIT,
+            KvFormat::COMPACTED,
+            0,
+            None,
+            0,
+        );
+
+        for _ in 0..200 {
+            let record = WriteRecord::for_upsert(
+                Arc::new(table_path.clone()),
+                1,
+                Bytes::from(vec![1_u8, 2_u8, 3_u8]),
+                None,
+                WriteFormat::CompactedKv,
+                None,
+                Some(RowBytes::Owned(Bytes::from(vec![1_u8, 2_u8, 3_u8]))),
+            );
+            batch.try_append(&record).unwrap();
+        }
+
+        let estimated_size = batch.estimated_size_in_bytes();
+        let actual_size = batch.build().unwrap().len();
+
+        assert_eq!(
+            actual_size, estimated_size,
+            "estimated size {estimated_size} is not equal to actual size"
+        );
+    }
 }
diff --git a/crates/fluss/src/cluster/cluster.rs 
b/crates/fluss/src/cluster/cluster.rs
index 0b14fe6..d6fe0ae 100644
--- a/crates/fluss/src/cluster/cluster.rs
+++ b/crates/fluss/src/cluster/cluster.rs
@@ -191,8 +191,7 @@ impl Cluster {
                 
&serde_json::from_slice(table_metadata.table_json.as_slice()).map_err(|e| {
                     Error::JsonSerdeError {
                         message: format!(
-                            "Error deserializing table_json into 
TableDescriptor for table_id {} and table_path {}: {}",
-                            table_id, table_path, e
+                            "Error deserializing table_json into 
TableDescriptor for table_id {table_id} and table_path {table_path}: {e}"
                         )
                     }
                 })?,
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 63df6de..6340dc8 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -175,6 +175,9 @@ pub trait ArrowRecordBatchInnerBuilder: Send + Sync {
     fn records_count(&self) -> i32;
 
     fn is_full(&self) -> bool;
+
+    /// Get an estimate of the size in bytes of the arrow data.
+    fn estimated_size_in_bytes(&self) -> usize;
 }
 
 #[derive(Default)]
@@ -214,6 +217,13 @@ impl ArrowRecordBatchInnerBuilder for 
PrebuiltRecordBatchBuilder {
         // full if has one record batch
         self.arrow_record_batch.is_some()
     }
+
+    fn estimated_size_in_bytes(&self) -> usize {
+        self.arrow_record_batch
+            .as_ref()
+            .map(|batch| batch.get_array_memory_size())
+            .unwrap_or(0)
+    }
 }
 
 pub struct RowAppendRecordBatchBuilder {
@@ -361,6 +371,16 @@ impl ArrowRecordBatchInnerBuilder for 
RowAppendRecordBatchBuilder {
     fn is_full(&self) -> bool {
         self.records_count() >= DEFAULT_MAX_RECORD
     }
+
+    fn estimated_size_in_bytes(&self) -> usize {
+        // Returns the uncompressed Arrow array memory size (same as Java's 
arrowWriter.estimatedSizeInBytes()).
+        // Note: This is the size before compression. After build(), the 
actual size may be smaller
+        // if compression is enabled.
+        self.arrow_column_builders
+            .iter()
+            .map(|builder| builder.finish_cloned().get_array_memory_size())
+            .sum()
+    }
 }
 
 impl MemoryLogRecordsArrowBuilder {
@@ -481,6 +501,12 @@ impl MemoryLogRecordsArrowBuilder {
         cursor.write_i32::<LittleEndian>(record_count)?;
         Ok(())
     }
+
+    /// Get an estimate of the number of bytes written to the underlying 
buffer.
+    /// This includes the batch header size plus the estimated arrow data size.
+    pub fn estimated_size_in_bytes(&self) -> usize {
+        RECORD_BATCH_HEADER_SIZE + 
self.arrow_record_batch_builder.estimated_size_in_bytes()
+    }
 }
 
 pub trait ToArrow {

Reply via email to