Copilot commented on code in PR #156:
URL: https://github.com/apache/fluss-rust/pull/156#discussion_r2684502192


##########
crates/fluss/src/record/kv/kv_record_batch_builder.rs:
##########
@@ -0,0 +1,659 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! KV record batch builder implementation.
+//!
+//! This module provides the KvRecordBatchBuilder for building batches of KV 
records.
+
+use bytes::{Bytes, BytesMut};
+use std::io;
+
+use crate::metadata::KvFormat;
+use crate::record::kv::kv_record::KvRecord;
+use crate::record::kv::kv_record_batch::{
+    ATTRIBUTES_OFFSET, BATCH_SEQUENCE_OFFSET, CRC_OFFSET, LENGTH_LENGTH, 
LENGTH_OFFSET,
+    MAGIC_OFFSET, RECORD_BATCH_HEADER_SIZE, RECORDS_COUNT_OFFSET, 
SCHEMA_ID_OFFSET,
+    WRITER_ID_OFFSET,
+};
+use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, NO_BATCH_SEQUENCE, 
NO_WRITER_ID};
+use crate::row::compacted::CompactedRowWriter;
+
+/// Builder for KvRecordBatch.
+///
+/// This builder accumulates KV records and produces a serialized batch with 
proper
+/// header information and checksums.
+pub struct KvRecordBatchBuilder {
+    schema_id: i32,
+    magic: u8,
+    write_limit: usize,
+    buffer: BytesMut,
+    writer_id: i64,
+    batch_sequence: i32,
+    current_record_number: i32,
+    size_in_bytes: usize,
+    is_closed: bool,
+    kv_format: KvFormat,
+    aborted: bool,
+    built_buffer: Option<Bytes>,
+}

Review Comment:
   The `KvRecordBatchBuilder` does not implement `Drop` to properly handle 
resources or warn about unfinished batches. If a builder is created, records 
are added, but `build()` is never called before the builder is dropped, those 
records are silently lost. Consider adding a `Drop` implementation that either 
logs a warning (if not closed/aborted) or automatically builds the batch to 
prevent accidental data loss.



##########
crates/fluss/src/util/varint.rs:
##########
@@ -0,0 +1,446 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Variable-length integer encoding utilities.
+//!
+//! This module provides utilities for encoding integers in variable-length 
format,
+//! which can save space when encoding small integers. The encoding uses 7 
bits per byte
+//! with the most significant bit as a continuation flag.
+
+use bytes::BufMut;
+use std::io::{self, Read, Write};
+
+/// Write an unsigned integer in variable-length format.
+///
+/// The encoding uses 7 bits per byte with the MSB set to 1 if more bytes 
follow.
+/// This matches the encoding used in Google Protocol Buffers.
+pub fn write_unsigned_varint<W: Write>(value: u32, writer: &mut W) -> 
io::Result<usize> {
+    let mut v = value;
+    let mut bytes_written = 0;
+
+    while (v & !0x7F) != 0 {
+        writer.write_all(&[((v as u8) & 0x7F) | 0x80])?;
+        bytes_written += 1;
+        v >>= 7;
+    }
+    writer.write_all(&[v as u8])?;
+    bytes_written += 1;
+
+    Ok(bytes_written)
+}
+
+/// Write an unsigned integer in variable-length format to a buffer.
+pub fn write_unsigned_varint_buf(value: u32, buf: &mut impl BufMut) {
+    let mut v = value;
+
+    while (v & !0x7F) != 0 {
+        buf.put_u8(((v as u8) & 0x7F) | 0x80);
+        v >>= 7;
+    }
+    buf.put_u8(v as u8);
+}
+
+/// Read an unsigned integer stored in variable-length format.
+#[allow(dead_code)]
+pub fn read_unsigned_varint<R: Read>(reader: &mut R) -> io::Result<u32> {
+    let mut tmp = [0u8; 1];
+    reader.read_exact(&mut tmp)?;
+    let mut byte = tmp[0] as i8;
+
+    if byte >= 0 {
+        return Ok(byte as u32);
+    }
+
+    let mut result = (byte & 127) as u32;
+
+    reader.read_exact(&mut tmp)?;
+    byte = tmp[0] as i8;
+    if byte >= 0 {
+        result |= (byte as u32) << 7;
+    } else {
+        result |= ((byte & 127) as u32) << 7;
+
+        reader.read_exact(&mut tmp)?;
+        byte = tmp[0] as i8;
+        if byte >= 0 {
+            result |= (byte as u32) << 14;
+        } else {
+            result |= ((byte & 127) as u32) << 14;
+
+            reader.read_exact(&mut tmp)?;
+            byte = tmp[0] as i8;
+            if byte >= 0 {
+                result |= (byte as u32) << 21;
+            } else {
+                result |= ((byte & 127) as u32) << 21;
+
+                reader.read_exact(&mut tmp)?;
+                byte = tmp[0] as i8;
+                result |= (byte as u32) << 28;
+
+                if byte < 0 {
+                    return Err(io::Error::new(
+                        io::ErrorKind::InvalidData,
+                        format!(
+                            "VarInt is too long, the most significant bit in 
the 5th byte is set, converted value: {:#x}",
+                            result
+                        ),
+                    ));
+                }
+            }
+        }
+    }
+
+    Ok(result)
+}
+
+/// Read an unsigned integer from a byte slice in variable-length format.
+pub fn read_unsigned_varint_bytes(bytes: &[u8]) -> io::Result<(u32, usize)> {
+    if bytes.is_empty() {
+        return Err(io::Error::new(
+            io::ErrorKind::UnexpectedEof,
+            "Cannot read varint from empty buffer",
+        ));
+    }
+
+    let mut byte = bytes[0] as i8;
+    let mut index = 1;
+
+    if byte >= 0 {
+        return Ok((byte as u32, index));
+    }
+
+    let mut result = (byte & 127) as u32;
+
+    if index >= bytes.len() {
+        return Err(io::Error::new(
+            io::ErrorKind::UnexpectedEof,
+            "Incomplete varint",
+        ));
+    }
+    byte = bytes[index] as i8;
+    index += 1;
+    if byte >= 0 {
+        result |= (byte as u32) << 7;
+    } else {
+        result |= ((byte & 127) as u32) << 7;
+
+        if index >= bytes.len() {
+            return Err(io::Error::new(
+                io::ErrorKind::UnexpectedEof,
+                "Incomplete varint",
+            ));
+        }
+        byte = bytes[index] as i8;
+        index += 1;
+        if byte >= 0 {
+            result |= (byte as u32) << 14;
+        } else {
+            result |= ((byte & 127) as u32) << 14;
+
+            if index >= bytes.len() {
+                return Err(io::Error::new(
+                    io::ErrorKind::UnexpectedEof,
+                    "Incomplete varint",
+                ));
+            }
+            byte = bytes[index] as i8;
+            index += 1;
+            if byte >= 0 {
+                result |= (byte as u32) << 21;
+            } else {
+                result |= ((byte & 127) as u32) << 21;
+
+                if index >= bytes.len() {
+                    return Err(io::Error::new(
+                        io::ErrorKind::UnexpectedEof,
+                        "Incomplete varint",
+                    ));
+                }
+                byte = bytes[index] as i8;
+                index += 1;
+                result |= (byte as u32) << 28;
+
+                if byte < 0 {
+                    return Err(io::Error::new(
+                        io::ErrorKind::InvalidData,
+                        format!(
+                            "VarInt is too long, the most significant bit in 
the 5th byte is set, converted value: {:#x}",
+                            result
+                        ),
+                    ));
+                }
+            }
+        }
+    }
+
+    Ok((result, index))
+}
+
+/// Calculate the number of bytes needed to encode an unsigned integer in 
variable-length format.
+pub fn size_of_unsigned_varint(value: u32) -> usize {
+    let leading_zeros = value.leading_zeros();
+    let leading_zeros_below_38_divided_by_7 = ((38 - leading_zeros) * 
0b10010010010010011) >> 19;
+    (leading_zeros_below_38_divided_by_7 + (leading_zeros >> 5)) as usize
+}

Review Comment:
   The `size_of_unsigned_varint` function uses a complex bit manipulation 
formula for calculating the varint size. While this is likely an optimization, 
the magic number `0b10010010010010011` (147,363 in decimal) and the calculation 
`((38 - leading_zeros) * 0b10010010010010011) >> 19` are not documented or 
explained. This makes the code difficult to understand and maintain. Consider 
adding a comment explaining the mathematical basis for this formula or 
providing a reference to where it comes from.



##########
crates/fluss/src/record/kv/kv_record.rs:
##########
@@ -0,0 +1,467 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Key-Value record implementation.
+//!
+//! This module provides the KvRecord struct which represents an immutable 
key-value record.
+//! The record format is:
+//! - Length => Int32
+//! - KeyLength => Unsigned VarInt
+//! - Key => bytes
+//! - Row => BinaryRow (optional, if null then this is a deletion record)
+
+use bytes::{BufMut, Bytes, BytesMut};
+use std::io::{self, Write};
+
+use crate::util::varint::{
+    read_unsigned_varint_bytes, size_of_unsigned_varint, write_unsigned_varint,
+    write_unsigned_varint_buf,
+};
+
+/// Length field size in bytes
+pub const LENGTH_LENGTH: usize = 4;
+
+/// A key-value record.
+///
+/// The schema is:
+/// - Length => Int32
+/// - KeyLength => Unsigned VarInt
+/// - Key => bytes
+/// - ValueLength => Unsigned VarInt (only present if value exists, even for 
empty values)
+/// - Value => bytes (if ValueLength > 0)
+///
+/// When the value is None (deletion), no ValueLength or Value bytes are 
present.
+/// When the value is Some(&[]) (empty value), ValueLength is present with 
value 0.
+#[derive(Debug, Clone)]
+pub struct KvRecord {
+    key: Bytes,
+    value: Option<Bytes>,
+}
+
+impl KvRecord {
+    /// Create a new KvRecord with the given key and optional value.
+    pub fn new(key: Bytes, value: Option<Bytes>) -> Self {
+        Self { key, value }
+    }
+
+    /// Get the key bytes.
+    pub fn key(&self) -> &Bytes {
+        &self.key
+    }
+
+    /// Get the value bytes (None indicates a deletion).
+    pub fn value(&self) -> Option<&Bytes> {
+        self.value.as_ref()
+    }
+
+    /// Calculate the total size of the record when serialized (including 
length prefix).
+    pub fn size_of(key: &[u8], value: Option<&[u8]>) -> usize {
+        Self::size_without_length(key, value) + LENGTH_LENGTH
+    }
+
+    /// Calculate the size without the length prefix.
+    fn size_without_length(key: &[u8], value: Option<&[u8]>) -> usize {
+        let key_len = key.len();
+        let key_len_size = size_of_unsigned_varint(key_len as u32);
+
+        match value {
+            Some(v) => {
+                let value_len_size = size_of_unsigned_varint(v.len() as u32);
+                key_len_size + key_len + value_len_size + v.len()

Review Comment:
   The `size_without_length` method performs unchecked addition on line 84 
(`key_len_size + key_len + value_len_size + v.len()`). If the key or value are 
extremely large (close to `usize::MAX`), this could silently overflow on 32-bit 
platforms or with pathological inputs. While unlikely in practice, consider 
using checked arithmetic or documenting the assumption that keys and values are 
reasonably sized to prevent potential issues.



##########
crates/fluss/src/record/kv/kv_record_batch_builder.rs:
##########
@@ -0,0 +1,659 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! KV record batch builder implementation.
+//!
+//! This module provides the KvRecordBatchBuilder for building batches of KV 
records.
+
+use bytes::{Bytes, BytesMut};
+use std::io;
+
+use crate::metadata::KvFormat;
+use crate::record::kv::kv_record::KvRecord;
+use crate::record::kv::kv_record_batch::{
+    ATTRIBUTES_OFFSET, BATCH_SEQUENCE_OFFSET, CRC_OFFSET, LENGTH_LENGTH, 
LENGTH_OFFSET,
+    MAGIC_OFFSET, RECORD_BATCH_HEADER_SIZE, RECORDS_COUNT_OFFSET, 
SCHEMA_ID_OFFSET,
+    WRITER_ID_OFFSET,
+};
+use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, NO_BATCH_SEQUENCE, 
NO_WRITER_ID};
+use crate::row::compacted::CompactedRowWriter;
+
+/// Builder for KvRecordBatch.
+///
+/// This builder accumulates KV records and produces a serialized batch with 
proper
+/// header information and checksums.
+pub struct KvRecordBatchBuilder {
+    schema_id: i32,
+    magic: u8,
+    write_limit: usize,
+    buffer: BytesMut,
+    writer_id: i64,
+    batch_sequence: i32,
+    current_record_number: i32,
+    size_in_bytes: usize,
+    is_closed: bool,
+    kv_format: KvFormat,
+    aborted: bool,
+    built_buffer: Option<Bytes>,
+}
+
+impl KvRecordBatchBuilder {
+    /// Create a new KvRecordBatchBuilder.
+    ///
+    /// # Arguments
+    /// * `schema_id` - The schema ID for records in this batch (must fit in 
i16)
+    /// * `write_limit` - Maximum bytes that can be appended
+    /// * `kv_format` - The KV format (Compacted, Indexed, or Aligned)
+    pub fn new(schema_id: i32, write_limit: usize, kv_format: KvFormat) -> 
Self {
+        assert!(
+            schema_id <= i16::MAX as i32,
+            "schema_id shouldn't be greater than the max value of i16: {}",
+            i16::MAX
+        );
+
+        let mut buffer = 
BytesMut::with_capacity(write_limit.max(RECORD_BATCH_HEADER_SIZE));
+
+        // Reserve space for header (we'll write it at the end)
+        buffer.resize(RECORD_BATCH_HEADER_SIZE, 0);
+
+        Self {
+            schema_id,
+            magic: CURRENT_KV_MAGIC_VALUE,
+            write_limit,
+            buffer,
+            writer_id: NO_WRITER_ID,
+            batch_sequence: NO_BATCH_SEQUENCE,
+            current_record_number: 0,
+            size_in_bytes: RECORD_BATCH_HEADER_SIZE,
+            is_closed: false,
+            kv_format,
+            aborted: false,
+            built_buffer: None,
+        }
+    }
+
+    /// Check if there is room for a new record containing the given key and 
value.
+    /// If no records have been appended, this always returns true.
+    pub fn has_room_for(&self, key: &[u8], value: Option<&[u8]>) -> bool {
+        self.size_in_bytes + KvRecord::size_of(key, value) <= self.write_limit
+    }
+
+    /// Check if there is room for a new record containing the given key and 
CompactedRow.
+    /// If no records have been appended, this always returns true.
+    pub fn has_room_for_row(&self, key: &[u8], row_writer: 
Option<&CompactedRowWriter>) -> bool {
+        let value = row_writer.map(|w: &CompactedRowWriter| w.buffer());
+        self.has_room_for(key, value)
+    }
+
+    /// Append a KV record with a CompactedRow value to the batch.
+    ///
+    /// This is the recommended API for KvFormat::COMPACTED batches.
+    ///
+    /// # Arguments
+    /// * `key` - The key bytes
+    /// * `row_writer` - The CompactedRowWriter containing the serialized row 
data.
+    ///   Pass None for deletion records.
+    ///
+    /// # Errors
+    /// Returns an error if:
+    /// - The builder has been aborted
+    /// - The builder is closed
+    /// - Adding this record would exceed the write limit
+    /// - The maximum number of records is exceeded
+    /// - The KV format is not COMPACTED
+    pub fn append_row(
+        &mut self,
+        key: &[u8],
+        row_writer: Option<&CompactedRowWriter>,
+    ) -> io::Result<()> {
+        if self.kv_format != KvFormat::COMPACTED {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidInput,
+                "append_row can only be used with KvFormat::COMPACTED",
+            ));
+        }
+
+        let value = row_writer.map(|w: &CompactedRowWriter| w.buffer());
+        self.append(key, value)
+    }
+
+    /// Append a KV record to the batch with raw bytes.
+    ///
+    /// # Arguments
+    /// * `key` - The key bytes
+    /// * `value` - Optional value bytes. If None, this represents a deletion.
+    ///
+    /// # Errors
+    /// Returns an error if:
+    /// - The builder has been aborted
+    /// - The builder is closed
+    /// - Adding this record would exceed the write limit
+    /// - The maximum number of records is exceeded
+    /// - The value format doesn't match the configured KV format
+    pub fn append(&mut self, key: &[u8], value: Option<&[u8]>) -> 
io::Result<()> {
+        if self.aborted {
+            return Err(io::Error::other(
+                "Tried to append a record, but KvRecordBatchBuilder has 
already been aborted",
+            ));
+        }
+
+        if self.is_closed {
+            return Err(io::Error::other(
+                "Tried to append a record, but KvRecordBatchBuilder is closed 
for record appends",
+            ));
+        }
+
+        // Check record count limit before mutation
+        if self.current_record_number == i32::MAX {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidInput,
+                format!(
+                    "Maximum number of records per batch exceeded, max 
records: {}",
+                    i32::MAX
+                ),
+            ));
+        }
+
+        let record_size = KvRecord::size_of(key, value);
+        if self.size_in_bytes + record_size > self.write_limit {
+            return Err(io::Error::new(
+                io::ErrorKind::WriteZero,
+                format!(
+                    "Adding record would exceed write limit: {} + {} > {}",
+                    self.size_in_bytes, record_size, self.write_limit
+                ),
+            ));
+        }
+
+        self.validate_row_format(value)?;
+
+        let record_byte_size = KvRecord::write_to_buf(&mut self.buffer, key, 
value)?;
+        debug_assert_eq!(record_byte_size, record_size, "Record size 
mismatch");
+
+        self.current_record_number += 1;
+        self.size_in_bytes += record_byte_size;
+
+        // Invalidate cached buffer since we modified the batch
+        self.built_buffer = None;
+
+        Ok(())
+    }
+
+    /// Set the writer state (writer ID and batch base sequence).
+    pub fn set_writer_state(&mut self, writer_id: i64, batch_base_sequence: 
i32) {
+        self.writer_id = writer_id;
+        self.batch_sequence = batch_base_sequence;
+        // Invalidate cached buffer since header fields changed
+        self.built_buffer = None;
+    }
+
+    /// Reset the writer state.
+    /// This triggers a rebuild of the batch header on next build.
+    pub fn reset_writer_state(&mut self, writer_id: i64, batch_sequence: i32) {
+        self.built_buffer = None;
+        self.writer_id = writer_id;
+        self.batch_sequence = batch_sequence;
+    }
+
+    /// Build the batch and return the serialized bytes.
+    /// This can be called multiple times as the batch is cached after the 
first build.
+    pub fn build(&mut self) -> io::Result<Bytes> {
+        if self.aborted {
+            return Err(io::Error::other(
+                "Attempting to build an aborted record batch",
+            ));
+        }
+
+        if let Some(ref cached) = self.built_buffer {
+            return Ok(cached.clone());
+        }
+
+        self.write_batch_header()?;
+        let bytes = self.buffer.clone().freeze();
+        self.built_buffer = Some(bytes.clone());
+        Ok(bytes)

Review Comment:
   The `build()` method performs `self.buffer.clone().freeze()` on line 226, 
which clones the entire buffer, and then immediately clones the resulting 
`Bytes` again on line 227 to store in the cache. This is inefficient - you 
could avoid the second clone by doing: `let bytes = 
self.buffer.clone().freeze(); self.built_buffer = Some(bytes.clone()); 
Ok(bytes)` or even better, avoid cloning on the return path when caching for 
the first time: `let bytes = self.buffer.clone().freeze(); self.built_buffer = 
Some(bytes); Ok(self.built_buffer.as_ref().unwrap().clone())`.
   ```suggestion
           self.built_buffer = Some(bytes);
           Ok(self.built_buffer.as_ref().unwrap().clone())
   ```



##########
crates/fluss/src/record/kv/kv_record.rs:
##########
@@ -0,0 +1,467 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Key-Value record implementation.
+//!
+//! This module provides the KvRecord struct which represents an immutable 
key-value record.
+//! The record format is:
+//! - Length => Int32
+//! - KeyLength => Unsigned VarInt
+//! - Key => bytes
+//! - Row => BinaryRow (optional, if null then this is a deletion record)
+
+use bytes::{BufMut, Bytes, BytesMut};
+use std::io::{self, Write};
+
+use crate::util::varint::{
+    read_unsigned_varint_bytes, size_of_unsigned_varint, write_unsigned_varint,
+    write_unsigned_varint_buf,
+};
+
+/// Length field size in bytes
+pub const LENGTH_LENGTH: usize = 4;
+
+/// A key-value record.
+///
+/// The schema is:
+/// - Length => Int32
+/// - KeyLength => Unsigned VarInt
+/// - Key => bytes
+/// - ValueLength => Unsigned VarInt (only present if value exists, even for 
empty values)
+/// - Value => bytes (if ValueLength > 0)
+///
+/// When the value is None (deletion), no ValueLength or Value bytes are 
present.
+/// When the value is Some(&[]) (empty value), ValueLength is present with 
value 0.
+#[derive(Debug, Clone)]
+pub struct KvRecord {
+    key: Bytes,
+    value: Option<Bytes>,
+}
+
+impl KvRecord {
+    /// Create a new KvRecord with the given key and optional value.
+    pub fn new(key: Bytes, value: Option<Bytes>) -> Self {
+        Self { key, value }
+    }
+
+    /// Get the key bytes.
+    pub fn key(&self) -> &Bytes {
+        &self.key
+    }
+
+    /// Get the value bytes (None indicates a deletion).
+    pub fn value(&self) -> Option<&Bytes> {
+        self.value.as_ref()
+    }
+
+    /// Calculate the total size of the record when serialized (including 
length prefix).
+    pub fn size_of(key: &[u8], value: Option<&[u8]>) -> usize {
+        Self::size_without_length(key, value) + LENGTH_LENGTH
+    }
+
+    /// Calculate the size without the length prefix.
+    fn size_without_length(key: &[u8], value: Option<&[u8]>) -> usize {
+        let key_len = key.len();
+        let key_len_size = size_of_unsigned_varint(key_len as u32);
+
+        match value {
+            Some(v) => {
+                let value_len_size = size_of_unsigned_varint(v.len() as u32);
+                key_len_size + key_len + value_len_size + v.len()
+            }
+            None => {
+                // Deletion: no value length varint or value bytes
+                key_len_size + key_len
+            }
+        }
+    }
+
+    /// Write a KV record to a writer.
+    ///
+    /// Returns the number of bytes written.
+    pub fn write_to<W: Write>(
+        writer: &mut W,
+        key: &[u8],
+        value: Option<&[u8]>,
+    ) -> io::Result<usize> {
+        let size_in_bytes = Self::size_without_length(key, value);
+
+        let size_i32 = i32::try_from(size_in_bytes).map_err(|_| {
+            io::Error::new(
+                io::ErrorKind::InvalidInput,
+                format!("Record size {} exceeds i32::MAX", size_in_bytes),
+            )
+        })?;
+        writer.write_all(&size_i32.to_be_bytes())?;
+
+        let key_len = key.len() as u32;
+        write_unsigned_varint(key_len, writer)?;
+
+        writer.write_all(key)?;
+
+        if let Some(v) = value {
+            let value_len = v.len() as u32;
+            write_unsigned_varint(value_len, writer)?;
+            writer.write_all(v)?;
+        }
+        // For None (deletion), don't write value length or value bytes
+
+        Ok(size_in_bytes + LENGTH_LENGTH)
+    }
+
+    /// Write a KV record to a buffer.
+    ///
+    /// Returns the number of bytes written.
+    pub fn write_to_buf(buf: &mut BytesMut, key: &[u8], value: Option<&[u8]>) 
-> io::Result<usize> {
+        let size_in_bytes = Self::size_without_length(key, value);
+
+        let size_i32 = i32::try_from(size_in_bytes).map_err(|_| {
+            io::Error::new(
+                io::ErrorKind::InvalidInput,
+                format!("Record size {} exceeds i32::MAX", size_in_bytes),
+            )
+        })?;
+        buf.put_i32(size_i32);
+
+        // Write key length as unsigned varint
+        let key_len = key.len() as u32;
+        write_unsigned_varint_buf(key_len, buf);
+
+        buf.put_slice(key);
+
+        // Write the value length and bytes if present (even for empty values)
+        if let Some(v) = value {
+            let value_len = v.len() as u32;
+            write_unsigned_varint_buf(value_len, buf);
+            buf.put_slice(v);
+        }
+        // For None (deletion), don't write value length or value bytes
+
+        Ok(size_in_bytes + LENGTH_LENGTH)
+    }
+
+    /// Read a KV record from bytes at the given position.
+    ///
+    /// Returns the KvRecord and the number of bytes consumed.
+    pub fn read_from(bytes: &Bytes, position: usize) -> io::Result<(Self, 
usize)> {
+        if bytes.len() < position.saturating_add(LENGTH_LENGTH) {
+            return Err(io::Error::new(
+                io::ErrorKind::UnexpectedEof,
+                "Not enough bytes to read record length",
+            ));
+        }
+
+        let size_in_bytes_i32 = i32::from_be_bytes([
+            bytes[position],
+            bytes[position + 1],
+            bytes[position + 2],
+            bytes[position + 3],
+        ]);
+
+        if size_in_bytes_i32 < 0 {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                format!("Invalid record length: {}", size_in_bytes_i32),
+            ));
+        }
+
+        let size_in_bytes = size_in_bytes_i32 as usize;
+
+        let total_size = 
size_in_bytes.checked_add(LENGTH_LENGTH).ok_or_else(|| {
+            io::Error::new(
+                io::ErrorKind::InvalidData,
+                format!(
+                    "Record size overflow: {} + {}",
+                    size_in_bytes, LENGTH_LENGTH
+                ),
+            )
+        })?;
+
+        // Check against available bytes with checked arithmetic
+        let available = bytes.len().saturating_sub(position);
+        if available < total_size {
+            return Err(io::Error::new(
+                io::ErrorKind::UnexpectedEof,
+                format!(
+                    "Not enough bytes to read record: expected {}, available 
{}",
+                    total_size,
+                    bytes.len() - position
+                ),
+            ));
+        }
+
+        // Start reading from after the length field
+        let mut current_offset = position + LENGTH_LENGTH;
+        let record_end = position + total_size;
+
+        // Read key length as unsigned varint (bounded by record end)
+        let (key_len, varint_size) =
+            read_unsigned_varint_bytes(&bytes[current_offset..record_end])?;
+        current_offset += varint_size;
+
+        // Read key bytes
+        let key_end = current_offset + key_len as usize;
+        if key_end > position + total_size {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                "Key length exceeds record size",
+            ));
+        }
+        let key = bytes.slice(current_offset..key_end);
+        current_offset = key_end;
+
+        let remaining_bytes = (position + total_size) - current_offset;
+
+        let value = if remaining_bytes > 0 {
+            // Value is present: read value length varint
+            let (value_len, varint_size) =
+                
read_unsigned_varint_bytes(&bytes[current_offset..record_end])?;
+            current_offset += varint_size;
+
+            // Read value bytes
+            let value_end = current_offset + value_len as usize;
+            if value_end > record_end {
+                return Err(io::Error::new(
+                    io::ErrorKind::InvalidData,
+                    "Value length exceeds record size",
+                ));
+            }
+
+            // Use slice for both empty and non-empty values (zero-copy)
+            let value_bytes = bytes.slice(current_offset..value_end);
+            current_offset = value_end;
+
+            // Verify no trailing bytes remain
+            if current_offset != record_end {
+                return Err(io::Error::new(
+                    io::ErrorKind::InvalidData,
+                    "Record has trailing bytes after value",
+                ));
+            }
+
+            Some(value_bytes)
+        } else {
+            // No remaining bytes: this is a deletion

Review Comment:
   In the `read_from` method, after reading the value, there's a check for 
trailing bytes (line 249-254). However, for deletion records (when 
remaining_bytes == 0), there's no corresponding validation. If the record 
length is incorrect and there should be value data but `remaining_bytes` 
evaluates to 0 due to arithmetic errors or corruption, the record would be 
silently treated as a deletion. Consider adding validation to ensure that for 
deletion records, `current_offset == record_end` to catch potential data 
corruption issues.
   ```suggestion
               // No remaining bytes: this is a deletion
               // Validate that we've exactly reached the end of the record.
               if current_offset != record_end {
                   return Err(io::Error::new(
                       io::ErrorKind::InvalidData,
                       "Deletion record has inconsistent length",
                   ));
               }
   ```



##########
crates/fluss/src/record/kv/kv_record.rs:
##########
@@ -0,0 +1,467 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Key-Value record implementation.
+//!
+//! This module provides the KvRecord struct which represents an immutable 
key-value record.
+//! The record format is:
+//! - Length => Int32
+//! - KeyLength => Unsigned VarInt
+//! - Key => bytes
+//! - Row => BinaryRow (optional, if null then this is a deletion record)
+
+use bytes::{BufMut, Bytes, BytesMut};
+use std::io::{self, Write};
+
+use crate::util::varint::{
+    read_unsigned_varint_bytes, size_of_unsigned_varint, write_unsigned_varint,
+    write_unsigned_varint_buf,
+};
+
+/// Length field size in bytes
+pub const LENGTH_LENGTH: usize = 4;
+
+/// A key-value record.
+///
+/// The schema is:
+/// - Length => Int32
+/// - KeyLength => Unsigned VarInt
+/// - Key => bytes
+/// - ValueLength => Unsigned VarInt (only present if value exists, even for 
empty values)
+/// - Value => bytes (if ValueLength > 0)
+///
+/// When the value is None (deletion), no ValueLength or Value bytes are 
present.
+/// When the value is Some(&[]) (empty value), ValueLength is present with 
value 0.
+#[derive(Debug, Clone)]
+pub struct KvRecord {
+    key: Bytes,
+    value: Option<Bytes>,
+}
+
+impl KvRecord {
+    /// Create a new KvRecord with the given key and optional value.
+    pub fn new(key: Bytes, value: Option<Bytes>) -> Self {
+        Self { key, value }
+    }
+
+    /// Get the key bytes.
+    pub fn key(&self) -> &Bytes {
+        &self.key
+    }
+
+    /// Get the value bytes (None indicates a deletion).
+    pub fn value(&self) -> Option<&Bytes> {
+        self.value.as_ref()
+    }
+
+    /// Calculate the total size of the record when serialized (including 
length prefix).
+    pub fn size_of(key: &[u8], value: Option<&[u8]>) -> usize {
+        Self::size_without_length(key, value) + LENGTH_LENGTH
+    }
+
+    /// Calculate the size without the length prefix.
+    fn size_without_length(key: &[u8], value: Option<&[u8]>) -> usize {
+        let key_len = key.len();
+        let key_len_size = size_of_unsigned_varint(key_len as u32);
+
+        match value {
+            Some(v) => {
+                let value_len_size = size_of_unsigned_varint(v.len() as u32);
+                key_len_size + key_len + value_len_size + v.len()
+            }
+            None => {
+                // Deletion: no value length varint or value bytes
+                key_len_size + key_len
+            }
+        }
+    }
+
+    /// Write a KV record to a writer.
+    ///
+    /// Returns the number of bytes written.
+    pub fn write_to<W: Write>(
+        writer: &mut W,
+        key: &[u8],
+        value: Option<&[u8]>,
+    ) -> io::Result<usize> {
+        let size_in_bytes = Self::size_without_length(key, value);
+
+        let size_i32 = i32::try_from(size_in_bytes).map_err(|_| {
+            io::Error::new(
+                io::ErrorKind::InvalidInput,
+                format!("Record size {} exceeds i32::MAX", size_in_bytes),
+            )
+        })?;
+        writer.write_all(&size_i32.to_be_bytes())?;
+
+        let key_len = key.len() as u32;
+        write_unsigned_varint(key_len, writer)?;
+
+        writer.write_all(key)?;
+
+        if let Some(v) = value {
+            let value_len = v.len() as u32;
+            write_unsigned_varint(value_len, writer)?;
+            writer.write_all(v)?;
+        }
+        // For None (deletion), don't write value length or value bytes
+
+        Ok(size_in_bytes + LENGTH_LENGTH)
+    }
+
+    /// Write a KV record to a buffer.
+    ///
+    /// Returns the number of bytes written.
+    pub fn write_to_buf(buf: &mut BytesMut, key: &[u8], value: Option<&[u8]>) 
-> io::Result<usize> {
+        let size_in_bytes = Self::size_without_length(key, value);
+
+        let size_i32 = i32::try_from(size_in_bytes).map_err(|_| {
+            io::Error::new(
+                io::ErrorKind::InvalidInput,
+                format!("Record size {} exceeds i32::MAX", size_in_bytes),
+            )
+        })?;
+        buf.put_i32(size_i32);
+
+        // Write key length as unsigned varint
+        let key_len = key.len() as u32;
+        write_unsigned_varint_buf(key_len, buf);
+
+        buf.put_slice(key);
+
+        // Write the value length and bytes if present (even for empty values)
+        if let Some(v) = value {
+            let value_len = v.len() as u32;
+            write_unsigned_varint_buf(value_len, buf);
+            buf.put_slice(v);
+        }
+        // For None (deletion), don't write value length or value bytes
+
+        Ok(size_in_bytes + LENGTH_LENGTH)
+    }
+
+    /// Read a KV record from bytes at the given position.
+    ///
+    /// Returns the KvRecord and the number of bytes consumed.
+    pub fn read_from(bytes: &Bytes, position: usize) -> io::Result<(Self, 
usize)> {
+        if bytes.len() < position.saturating_add(LENGTH_LENGTH) {
+            return Err(io::Error::new(
+                io::ErrorKind::UnexpectedEof,
+                "Not enough bytes to read record length",
+            ));
+        }
+
+        let size_in_bytes_i32 = i32::from_be_bytes([
+            bytes[position],
+            bytes[position + 1],
+            bytes[position + 2],
+            bytes[position + 3],
+        ]);
+
+        if size_in_bytes_i32 < 0 {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                format!("Invalid record length: {}", size_in_bytes_i32),
+            ));
+        }
+
+        let size_in_bytes = size_in_bytes_i32 as usize;
+
+        let total_size = 
size_in_bytes.checked_add(LENGTH_LENGTH).ok_or_else(|| {
+            io::Error::new(
+                io::ErrorKind::InvalidData,
+                format!(
+                    "Record size overflow: {} + {}",
+                    size_in_bytes, LENGTH_LENGTH
+                ),
+            )
+        })?;
+
+        // Check against available bytes with checked arithmetic
+        let available = bytes.len().saturating_sub(position);
+        if available < total_size {
+            return Err(io::Error::new(
+                io::ErrorKind::UnexpectedEof,
+                format!(
+                    "Not enough bytes to read record: expected {}, available 
{}",
+                    total_size,
+                    bytes.len() - position

Review Comment:
   On line 202, the error message uses `bytes.len() - position` which could 
underflow if `position > bytes.len()`. Although this is protected by the check 
on line 196 using `saturating_sub`, the error message calculation is not 
protected and could panic in debug mode or wrap in release mode. Use 
`available` instead: `format!("Not enough bytes to read record: expected {}, 
available {}", total_size, available)`.
   ```suggestion
                       available
   ```



##########
crates/fluss/src/util/varint.rs:
##########
@@ -0,0 +1,446 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Variable-length integer encoding utilities.
+//!
+//! This module provides utilities for encoding integers in variable-length 
format,
+//! which can save space when encoding small integers. The encoding uses 7 
bits per byte
+//! with the most significant bit as a continuation flag.
+
+use bytes::BufMut;
+use std::io::{self, Read, Write};
+
+/// Write an unsigned integer in variable-length format.
+///
+/// The encoding uses 7 bits per byte with the MSB set to 1 if more bytes 
follow.
+/// This matches the encoding used in Google Protocol Buffers.
+pub fn write_unsigned_varint<W: Write>(value: u32, writer: &mut W) -> 
io::Result<usize> {
+    let mut v = value;
+    let mut bytes_written = 0;
+
+    while (v & !0x7F) != 0 {
+        writer.write_all(&[((v as u8) & 0x7F) | 0x80])?;
+        bytes_written += 1;
+        v >>= 7;
+    }
+    writer.write_all(&[v as u8])?;
+    bytes_written += 1;
+
+    Ok(bytes_written)
+}
+
+/// Write an unsigned integer in variable-length format to a buffer.
+pub fn write_unsigned_varint_buf(value: u32, buf: &mut impl BufMut) {
+    let mut v = value;
+
+    while (v & !0x7F) != 0 {
+        buf.put_u8(((v as u8) & 0x7F) | 0x80);
+        v >>= 7;
+    }
+    buf.put_u8(v as u8);
+}
+
+/// Read an unsigned integer stored in variable-length format.
+#[allow(dead_code)]
+pub fn read_unsigned_varint<R: Read>(reader: &mut R) -> io::Result<u32> {
+    let mut tmp = [0u8; 1];
+    reader.read_exact(&mut tmp)?;
+    let mut byte = tmp[0] as i8;
+
+    if byte >= 0 {
+        return Ok(byte as u32);
+    }
+
+    let mut result = (byte & 127) as u32;
+
+    reader.read_exact(&mut tmp)?;
+    byte = tmp[0] as i8;
+    if byte >= 0 {
+        result |= (byte as u32) << 7;
+    } else {
+        result |= ((byte & 127) as u32) << 7;
+
+        reader.read_exact(&mut tmp)?;
+        byte = tmp[0] as i8;
+        if byte >= 0 {
+            result |= (byte as u32) << 14;
+        } else {
+            result |= ((byte & 127) as u32) << 14;
+
+            reader.read_exact(&mut tmp)?;
+            byte = tmp[0] as i8;
+            if byte >= 0 {
+                result |= (byte as u32) << 21;
+            } else {
+                result |= ((byte & 127) as u32) << 21;
+
+                reader.read_exact(&mut tmp)?;
+                byte = tmp[0] as i8;
+                result |= (byte as u32) << 28;
+
+                if byte < 0 {
+                    return Err(io::Error::new(
+                        io::ErrorKind::InvalidData,
+                        format!(
+                            "VarInt is too long, the most significant bit in 
the 5th byte is set, converted value: {:#x}",
+                            result
+                        ),

Review Comment:
   The error message "VarInt is too long, the most significant bit in the 5th 
byte is set" includes the partially decoded value in the message. This could be 
misleading because when the MSB of the 5th byte is set, it indicates that the 
varint encoding is invalid for a u32 (which can only use up to 5 bytes with the 
5th byte having MSB clear). The "converted value" shown is 
incomplete/corrupted. Consider clarifying the error message to indicate this is 
an encoding error rather than implying the value shown is meaningful.
   ```suggestion
                           "Invalid u32 varint encoding: too many bytes (most 
significant bit in the 5th byte is set)",
   ```



##########
crates/fluss/src/record/kv/kv_record_batch_builder.rs:
##########
@@ -0,0 +1,659 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! KV record batch builder implementation.
+//!
+//! This module provides the KvRecordBatchBuilder for building batches of KV 
records.
+
+use bytes::{Bytes, BytesMut};
+use std::io;
+
+use crate::metadata::KvFormat;
+use crate::record::kv::kv_record::KvRecord;
+use crate::record::kv::kv_record_batch::{
+    ATTRIBUTES_OFFSET, BATCH_SEQUENCE_OFFSET, CRC_OFFSET, LENGTH_LENGTH, 
LENGTH_OFFSET,
+    MAGIC_OFFSET, RECORD_BATCH_HEADER_SIZE, RECORDS_COUNT_OFFSET, 
SCHEMA_ID_OFFSET,
+    WRITER_ID_OFFSET,
+};
+use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, NO_BATCH_SEQUENCE, 
NO_WRITER_ID};
+use crate::row::compacted::CompactedRowWriter;
+
+/// Builder for KvRecordBatch.
+///
+/// This builder accumulates KV records and produces a serialized batch with 
proper
+/// header information and checksums.
+pub struct KvRecordBatchBuilder {
+    schema_id: i32,
+    magic: u8,
+    write_limit: usize,
+    buffer: BytesMut,
+    writer_id: i64,
+    batch_sequence: i32,
+    current_record_number: i32,
+    size_in_bytes: usize,
+    is_closed: bool,
+    kv_format: KvFormat,
+    aborted: bool,
+    built_buffer: Option<Bytes>,
+}
+
+impl KvRecordBatchBuilder {
+    /// Create a new KvRecordBatchBuilder.
+    ///
+    /// # Arguments
+    /// * `schema_id` - The schema ID for records in this batch (must fit in 
i16)
+    /// * `write_limit` - Maximum bytes that can be appended
+    /// * `kv_format` - The KV format (Compacted, Indexed, or Aligned)
+    pub fn new(schema_id: i32, write_limit: usize, kv_format: KvFormat) -> 
Self {
+        assert!(
+            schema_id <= i16::MAX as i32,
+            "schema_id shouldn't be greater than the max value of i16: {}",
+            i16::MAX
+        );
+
+        let mut buffer = 
BytesMut::with_capacity(write_limit.max(RECORD_BATCH_HEADER_SIZE));
+
+        // Reserve space for header (we'll write it at the end)
+        buffer.resize(RECORD_BATCH_HEADER_SIZE, 0);
+
+        Self {
+            schema_id,
+            magic: CURRENT_KV_MAGIC_VALUE,
+            write_limit,
+            buffer,
+            writer_id: NO_WRITER_ID,
+            batch_sequence: NO_BATCH_SEQUENCE,
+            current_record_number: 0,
+            size_in_bytes: RECORD_BATCH_HEADER_SIZE,
+            is_closed: false,
+            kv_format,
+            aborted: false,
+            built_buffer: None,
+        }
+    }
+
+    /// Check if there is room for a new record containing the given key and 
value.
+    /// If no records have been appended, this always returns true.
+    pub fn has_room_for(&self, key: &[u8], value: Option<&[u8]>) -> bool {
+        self.size_in_bytes + KvRecord::size_of(key, value) <= self.write_limit
+    }
+
+    /// Check if there is room for a new record containing the given key and 
CompactedRow.
+    /// If no records have been appended, this always returns true.
+    pub fn has_room_for_row(&self, key: &[u8], row_writer: 
Option<&CompactedRowWriter>) -> bool {
+        let value = row_writer.map(|w: &CompactedRowWriter| w.buffer());
+        self.has_room_for(key, value)
+    }
+
+    /// Append a KV record with a CompactedRow value to the batch.
+    ///
+    /// This is the recommended API for KvFormat::COMPACTED batches.
+    ///
+    /// # Arguments
+    /// * `key` - The key bytes
+    /// * `row_writer` - The CompactedRowWriter containing the serialized row 
data.
+    ///   Pass None for deletion records.
+    ///
+    /// # Errors
+    /// Returns an error if:
+    /// - The builder has been aborted
+    /// - The builder is closed
+    /// - Adding this record would exceed the write limit
+    /// - The maximum number of records is exceeded
+    /// - The KV format is not COMPACTED
+    pub fn append_row(
+        &mut self,
+        key: &[u8],
+        row_writer: Option<&CompactedRowWriter>,
+    ) -> io::Result<()> {
+        if self.kv_format != KvFormat::COMPACTED {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidInput,
+                "append_row can only be used with KvFormat::COMPACTED",
+            ));
+        }
+
+        let value = row_writer.map(|w: &CompactedRowWriter| w.buffer());
+        self.append(key, value)
+    }
+
+    /// Append a KV record to the batch with raw bytes.
+    ///
+    /// # Arguments
+    /// * `key` - The key bytes
+    /// * `value` - Optional value bytes. If None, this represents a deletion.
+    ///
+    /// # Errors
+    /// Returns an error if:
+    /// - The builder has been aborted
+    /// - The builder is closed
+    /// - Adding this record would exceed the write limit
+    /// - The maximum number of records is exceeded
+    /// - The value format doesn't match the configured KV format
+    pub fn append(&mut self, key: &[u8], value: Option<&[u8]>) -> 
io::Result<()> {
+        if self.aborted {
+            return Err(io::Error::other(
+                "Tried to append a record, but KvRecordBatchBuilder has 
already been aborted",
+            ));
+        }
+
+        if self.is_closed {
+            return Err(io::Error::other(
+                "Tried to append a record, but KvRecordBatchBuilder is closed 
for record appends",
+            ));
+        }
+
+        // Check record count limit before mutation
+        if self.current_record_number == i32::MAX {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidInput,
+                format!(
+                    "Maximum number of records per batch exceeded, max 
records: {}",
+                    i32::MAX
+                ),
+            ));
+        }
+
+        let record_size = KvRecord::size_of(key, value);
+        if self.size_in_bytes + record_size > self.write_limit {
+            return Err(io::Error::new(
+                io::ErrorKind::WriteZero,
+                format!(
+                    "Adding record would exceed write limit: {} + {} > {}",
+                    self.size_in_bytes, record_size, self.write_limit
+                ),
+            ));
+        }
+
+        self.validate_row_format(value)?;
+
+        let record_byte_size = KvRecord::write_to_buf(&mut self.buffer, key, 
value)?;
+        debug_assert_eq!(record_byte_size, record_size, "Record size 
mismatch");
+
+        self.current_record_number += 1;
+        self.size_in_bytes += record_byte_size;
+
+        // Invalidate cached buffer since we modified the batch
+        self.built_buffer = None;
+
+        Ok(())
+    }
+
+    /// Set the writer state (writer ID and batch base sequence).
+    pub fn set_writer_state(&mut self, writer_id: i64, batch_base_sequence: 
i32) {
+        self.writer_id = writer_id;
+        self.batch_sequence = batch_base_sequence;
+        // Invalidate cached buffer since header fields changed
+        self.built_buffer = None;
+    }
+
+    /// Reset the writer state.
+    /// This triggers a rebuild of the batch header on next build.
+    pub fn reset_writer_state(&mut self, writer_id: i64, batch_sequence: i32) {
+        self.built_buffer = None;
+        self.writer_id = writer_id;
+        self.batch_sequence = batch_sequence;

Review Comment:
   The `reset_writer_state` method has the same implementation as 
`set_writer_state`, making it redundant. Both methods invalidate the cache and 
update writer_id and batch_sequence in the same order. Consider removing 
`reset_writer_state` or clearly documenting the semantic difference between 
"set" and "reset" operations. If they are truly equivalent, having both methods 
can confuse API consumers.
   ```suggestion
       /// This is a convenience alias for [`set_writer_state`] that ensures
       /// the writer state is updated and the batch header will be rebuilt
       /// on the next call to [`build`].
       pub fn reset_writer_state(&mut self, writer_id: i64, batch_sequence: 
i32) {
           self.set_writer_state(writer_id, batch_sequence);
   ```



##########
crates/fluss/src/record/kv/kv_record_batch_builder.rs:
##########
@@ -0,0 +1,659 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! KV record batch builder implementation.
+//!
+//! This module provides the KvRecordBatchBuilder for building batches of KV 
records.
+
+use bytes::{Bytes, BytesMut};
+use std::io;
+
+use crate::metadata::KvFormat;
+use crate::record::kv::kv_record::KvRecord;
+use crate::record::kv::kv_record_batch::{
+    ATTRIBUTES_OFFSET, BATCH_SEQUENCE_OFFSET, CRC_OFFSET, LENGTH_LENGTH, 
LENGTH_OFFSET,
+    MAGIC_OFFSET, RECORD_BATCH_HEADER_SIZE, RECORDS_COUNT_OFFSET, 
SCHEMA_ID_OFFSET,
+    WRITER_ID_OFFSET,
+};
+use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, NO_BATCH_SEQUENCE, 
NO_WRITER_ID};
+use crate::row::compacted::CompactedRowWriter;
+
+/// Builder for KvRecordBatch.
+///
+/// This builder accumulates KV records and produces a serialized batch with 
proper
+/// header information and checksums.
+pub struct KvRecordBatchBuilder {
+    schema_id: i32,
+    magic: u8,
+    write_limit: usize,
+    buffer: BytesMut,
+    writer_id: i64,
+    batch_sequence: i32,
+    current_record_number: i32,
+    size_in_bytes: usize,
+    is_closed: bool,
+    kv_format: KvFormat,
+    aborted: bool,
+    built_buffer: Option<Bytes>,
+}
+
+impl KvRecordBatchBuilder {
+    /// Create a new KvRecordBatchBuilder.
+    ///
+    /// # Arguments
+    /// * `schema_id` - The schema ID for records in this batch (must fit in 
i16)
+    /// * `write_limit` - Maximum bytes that can be appended
+    /// * `kv_format` - The KV format (Compacted, Indexed, or Aligned)
+    pub fn new(schema_id: i32, write_limit: usize, kv_format: KvFormat) -> 
Self {
+        assert!(
+            schema_id <= i16::MAX as i32,
+            "schema_id shouldn't be greater than the max value of i16: {}",
+            i16::MAX
+        );
+
+        let mut buffer = 
BytesMut::with_capacity(write_limit.max(RECORD_BATCH_HEADER_SIZE));
+
+        // Reserve space for header (we'll write it at the end)
+        buffer.resize(RECORD_BATCH_HEADER_SIZE, 0);
+
+        Self {
+            schema_id,
+            magic: CURRENT_KV_MAGIC_VALUE,
+            write_limit,
+            buffer,
+            writer_id: NO_WRITER_ID,
+            batch_sequence: NO_BATCH_SEQUENCE,
+            current_record_number: 0,
+            size_in_bytes: RECORD_BATCH_HEADER_SIZE,
+            is_closed: false,
+            kv_format,
+            aborted: false,
+            built_buffer: None,
+        }
+    }
+
+    /// Check if there is room for a new record containing the given key and 
value.
+    /// If no records have been appended, this always returns true.
+    pub fn has_room_for(&self, key: &[u8], value: Option<&[u8]>) -> bool {
+        self.size_in_bytes + KvRecord::size_of(key, value) <= self.write_limit
+    }
+
+    /// Check if there is room for a new record containing the given key and 
CompactedRow.
+    /// If no records have been appended, this always returns true.
+    pub fn has_room_for_row(&self, key: &[u8], row_writer: 
Option<&CompactedRowWriter>) -> bool {
+        let value = row_writer.map(|w: &CompactedRowWriter| w.buffer());
+        self.has_room_for(key, value)
+    }
+
+    /// Append a KV record with a CompactedRow value to the batch.
+    ///
+    /// This is the recommended API for KvFormat::COMPACTED batches.
+    ///
+    /// # Arguments
+    /// * `key` - The key bytes
+    /// * `row_writer` - The CompactedRowWriter containing the serialized row 
data.
+    ///   Pass None for deletion records.
+    ///
+    /// # Errors
+    /// Returns an error if:
+    /// - The builder has been aborted
+    /// - The builder is closed
+    /// - Adding this record would exceed the write limit
+    /// - The maximum number of records is exceeded
+    /// - The KV format is not COMPACTED
+    pub fn append_row(
+        &mut self,
+        key: &[u8],
+        row_writer: Option<&CompactedRowWriter>,
+    ) -> io::Result<()> {
+        if self.kv_format != KvFormat::COMPACTED {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidInput,
+                "append_row can only be used with KvFormat::COMPACTED",
+            ));
+        }
+
+        let value = row_writer.map(|w: &CompactedRowWriter| w.buffer());
+        self.append(key, value)
+    }
+
+    /// Append a KV record to the batch with raw bytes.
+    ///
+    /// # Arguments
+    /// * `key` - The key bytes
+    /// * `value` - Optional value bytes. If None, this represents a deletion.
+    ///
+    /// # Errors
+    /// Returns an error if:
+    /// - The builder has been aborted
+    /// - The builder is closed
+    /// - Adding this record would exceed the write limit
+    /// - The maximum number of records is exceeded
+    /// - The value format doesn't match the configured KV format
+    pub fn append(&mut self, key: &[u8], value: Option<&[u8]>) -> 
io::Result<()> {
+        if self.aborted {
+            return Err(io::Error::other(
+                "Tried to append a record, but KvRecordBatchBuilder has 
already been aborted",
+            ));
+        }
+
+        if self.is_closed {
+            return Err(io::Error::other(
+                "Tried to append a record, but KvRecordBatchBuilder is closed 
for record appends",
+            ));
+        }
+
+        // Check record count limit before mutation
+        if self.current_record_number == i32::MAX {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidInput,
+                format!(
+                    "Maximum number of records per batch exceeded, max 
records: {}",
+                    i32::MAX
+                ),
+            ));
+        }
+
+        let record_size = KvRecord::size_of(key, value);
+        if self.size_in_bytes + record_size > self.write_limit {
+            return Err(io::Error::new(
+                io::ErrorKind::WriteZero,
+                format!(
+                    "Adding record would exceed write limit: {} + {} > {}",
+                    self.size_in_bytes, record_size, self.write_limit
+                ),
+            ));
+        }
+
+        self.validate_row_format(value)?;
+
+        let record_byte_size = KvRecord::write_to_buf(&mut self.buffer, key, 
value)?;
+        debug_assert_eq!(record_byte_size, record_size, "Record size 
mismatch");
+
+        self.current_record_number += 1;
+        self.size_in_bytes += record_byte_size;
+
+        // Invalidate cached buffer since we modified the batch
+        self.built_buffer = None;
+
+        Ok(())
+    }
+
+    /// Set the writer state (writer ID and batch base sequence).
+    pub fn set_writer_state(&mut self, writer_id: i64, batch_base_sequence: 
i32) {
+        self.writer_id = writer_id;
+        self.batch_sequence = batch_base_sequence;
+        // Invalidate cached buffer since header fields changed
+        self.built_buffer = None;
+    }
+
+    /// Reset the writer state.
+    /// This triggers a rebuild of the batch header on next build.
+    pub fn reset_writer_state(&mut self, writer_id: i64, batch_sequence: i32) {
+        self.built_buffer = None;
+        self.writer_id = writer_id;
+        self.batch_sequence = batch_sequence;
+    }
+
+    /// Build the batch and return the serialized bytes.
+    /// This can be called multiple times as the batch is cached after the 
first build.
+    pub fn build(&mut self) -> io::Result<Bytes> {
+        if self.aborted {
+            return Err(io::Error::other(
+                "Attempting to build an aborted record batch",
+            ));
+        }
+
+        if let Some(ref cached) = self.built_buffer {
+            return Ok(cached.clone());
+        }
+
+        self.write_batch_header()?;
+        let bytes = self.buffer.clone().freeze();
+        self.built_buffer = Some(bytes.clone());
+        Ok(bytes)
+    }

Review Comment:
   The builder allows building the batch multiple times via caching, but 
there's no mechanism to prevent mutation after the first build. While `close()` 
prevents appending new records, it doesn't prevent calling `set_writer_state()` 
or `reset_writer_state()` after building, which invalidates the cache and 
allows rebuilding with different metadata. This could lead to inconsistencies 
if callers expect the built bytes to remain stable. Consider either: (1) 
preventing all mutations after the first successful build, or (2) documenting 
this behavior clearly.



##########
crates/fluss/src/util/varint.rs:
##########
@@ -0,0 +1,446 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Variable-length integer encoding utilities.
+//!
+//! This module provides utilities for encoding integers in variable-length 
format,
+//! which can save space when encoding small integers. The encoding uses 7 
bits per byte
+//! with the most significant bit as a continuation flag.
+
+use bytes::BufMut;
+use std::io::{self, Read, Write};
+
+/// Write an unsigned integer in variable-length format.
+///
+/// The encoding uses 7 bits per byte with the MSB set to 1 if more bytes 
follow.
+/// This matches the encoding used in Google Protocol Buffers.
+pub fn write_unsigned_varint<W: Write>(value: u32, writer: &mut W) -> 
io::Result<usize> {
+    let mut v = value;
+    let mut bytes_written = 0;
+
+    while (v & !0x7F) != 0 {
+        writer.write_all(&[((v as u8) & 0x7F) | 0x80])?;
+        bytes_written += 1;
+        v >>= 7;
+    }
+    writer.write_all(&[v as u8])?;
+    bytes_written += 1;
+
+    Ok(bytes_written)
+}
+
+/// Write an unsigned integer in variable-length format to a buffer.
+pub fn write_unsigned_varint_buf(value: u32, buf: &mut impl BufMut) {
+    let mut v = value;
+
+    while (v & !0x7F) != 0 {
+        buf.put_u8(((v as u8) & 0x7F) | 0x80);
+        v >>= 7;
+    }
+    buf.put_u8(v as u8);
+}
+
+/// Read an unsigned integer stored in variable-length format.
+#[allow(dead_code)]
+pub fn read_unsigned_varint<R: Read>(reader: &mut R) -> io::Result<u32> {
+    let mut tmp = [0u8; 1];
+    reader.read_exact(&mut tmp)?;
+    let mut byte = tmp[0] as i8;
+
+    if byte >= 0 {
+        return Ok(byte as u32);
+    }
+
+    let mut result = (byte & 127) as u32;
+
+    reader.read_exact(&mut tmp)?;
+    byte = tmp[0] as i8;
+    if byte >= 0 {
+        result |= (byte as u32) << 7;
+    } else {
+        result |= ((byte & 127) as u32) << 7;
+
+        reader.read_exact(&mut tmp)?;
+        byte = tmp[0] as i8;
+        if byte >= 0 {
+            result |= (byte as u32) << 14;
+        } else {
+            result |= ((byte & 127) as u32) << 14;
+
+            reader.read_exact(&mut tmp)?;
+            byte = tmp[0] as i8;
+            if byte >= 0 {
+                result |= (byte as u32) << 21;
+            } else {
+                result |= ((byte & 127) as u32) << 21;
+
+                reader.read_exact(&mut tmp)?;
+                byte = tmp[0] as i8;
+                result |= (byte as u32) << 28;
+
+                if byte < 0 {
+                    return Err(io::Error::new(
+                        io::ErrorKind::InvalidData,
+                        format!(
+                            "VarInt is too long, the most significant bit in 
the 5th byte is set, converted value: {:#x}",
+                            result
+                        ),
+                    ));
+                }
+            }
+        }
+    }
+
+    Ok(result)
+}
+
+/// Read an unsigned integer from a byte slice in variable-length format.
+pub fn read_unsigned_varint_bytes(bytes: &[u8]) -> io::Result<(u32, usize)> {
+    if bytes.is_empty() {
+        return Err(io::Error::new(
+            io::ErrorKind::UnexpectedEof,
+            "Cannot read varint from empty buffer",
+        ));
+    }
+
+    let mut byte = bytes[0] as i8;
+    let mut index = 1;
+
+    if byte >= 0 {
+        return Ok((byte as u32, index));
+    }
+
+    let mut result = (byte & 127) as u32;
+
+    if index >= bytes.len() {
+        return Err(io::Error::new(
+            io::ErrorKind::UnexpectedEof,
+            "Incomplete varint",
+        ));
+    }
+    byte = bytes[index] as i8;
+    index += 1;
+    if byte >= 0 {
+        result |= (byte as u32) << 7;
+    } else {
+        result |= ((byte & 127) as u32) << 7;
+
+        if index >= bytes.len() {
+            return Err(io::Error::new(
+                io::ErrorKind::UnexpectedEof,
+                "Incomplete varint",
+            ));
+        }
+        byte = bytes[index] as i8;
+        index += 1;
+        if byte >= 0 {
+            result |= (byte as u32) << 14;
+        } else {
+            result |= ((byte & 127) as u32) << 14;
+
+            if index >= bytes.len() {
+                return Err(io::Error::new(
+                    io::ErrorKind::UnexpectedEof,
+                    "Incomplete varint",
+                ));
+            }
+            byte = bytes[index] as i8;
+            index += 1;
+            if byte >= 0 {
+                result |= (byte as u32) << 21;
+            } else {
+                result |= ((byte & 127) as u32) << 21;
+
+                if index >= bytes.len() {
+                    return Err(io::Error::new(
+                        io::ErrorKind::UnexpectedEof,
+                        "Incomplete varint",
+                    ));
+                }
+                byte = bytes[index] as i8;
+                index += 1;
+                result |= (byte as u32) << 28;
+
+                if byte < 0 {
+                    return Err(io::Error::new(
+                        io::ErrorKind::InvalidData,
+                        format!(
+                            "VarInt is too long, the most significant bit in 
the 5th byte is set, converted value: {:#x}",
+                            result
+                        ),
+                    ));
+                }
+            }
+        }
+    }
+
+    Ok((result, index))
+}
+
+/// Calculate the number of bytes needed to encode an unsigned integer in 
variable-length format.
+pub fn size_of_unsigned_varint(value: u32) -> usize {
+    let leading_zeros = value.leading_zeros();
+    let leading_zeros_below_38_divided_by_7 = ((38 - leading_zeros) * 
0b10010010010010011) >> 19;
+    (leading_zeros_below_38_divided_by_7 + (leading_zeros >> 5)) as usize
+}
+
+/// Write an unsigned 64-bit integer in variable-length format to a buffer.
+#[allow(dead_code)]
+pub fn write_unsigned_varint_u64_buf(value: u64, buf: &mut impl BufMut) {
+    let mut v = value;
+    while (v & !0x7F) != 0 {
+        buf.put_u8(((v as u8) & 0x7F) | 0x80);
+        v >>= 7;
+    }
+    buf.put_u8(v as u8);
+}
+
+/// Write directly to a mutable byte slice, returning the number of bytes 
written.
+/// Used by CompactedRowWriter which manages its own position.
+pub fn write_unsigned_varint_to_slice(value: u32, slice: &mut [u8]) -> usize {
+    let mut v = value;
+    let mut written = 0;
+
+    while (v & !0x7F) != 0 {
+        slice[written] = ((v as u8) & 0x7F) | 0x80;
+        written += 1;
+        v >>= 7;
+    }
+    slice[written] = v as u8;
+    written + 1
+}
+
+/// Write unsigned 64-bit varint directly to a mutable byte slice.
+pub fn write_unsigned_varint_u64_to_slice(value: u64, slice: &mut [u8]) -> 
usize {
+    let mut v = value;
+    let mut written = 0;
+
+    while (v & !0x7F) != 0 {
+        slice[written] = ((v as u8) & 0x7F) | 0x80;
+        written += 1;
+        v >>= 7;
+    }
+    slice[written] = v as u8;
+    written + 1
+}

Review Comment:
   The `write_unsigned_varint_to_slice` and 
`write_unsigned_varint_u64_to_slice` functions write directly to a slice 
without checking bounds. If the slice is too small to hold the encoded varint, 
this will panic with an index out of bounds error. The caller must ensure the 
slice is large enough (up to 5 bytes for u32, 10 bytes for u64). Consider 
either: (1) adding bounds checking and returning a Result, (2) documenting the 
precondition clearly in the function documentation, or (3) accepting a slice of 
the exact required size as a fixed-size array.



##########
crates/fluss/src/record/kv/kv_record_batch_builder.rs:
##########
@@ -0,0 +1,659 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! KV record batch builder implementation.
+//!
+//! This module provides the KvRecordBatchBuilder for building batches of KV 
records.
+
+use bytes::{Bytes, BytesMut};
+use std::io;
+
+use crate::metadata::KvFormat;
+use crate::record::kv::kv_record::KvRecord;
+use crate::record::kv::kv_record_batch::{
+    ATTRIBUTES_OFFSET, BATCH_SEQUENCE_OFFSET, CRC_OFFSET, LENGTH_LENGTH, 
LENGTH_OFFSET,
+    MAGIC_OFFSET, RECORD_BATCH_HEADER_SIZE, RECORDS_COUNT_OFFSET, 
SCHEMA_ID_OFFSET,
+    WRITER_ID_OFFSET,
+};
+use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, NO_BATCH_SEQUENCE, 
NO_WRITER_ID};
+use crate::row::compacted::CompactedRowWriter;
+
+/// Builder for KvRecordBatch.
+///
+/// This builder accumulates KV records and produces a serialized batch with 
proper
+/// header information and checksums.
+pub struct KvRecordBatchBuilder {
+    schema_id: i32,
+    magic: u8,
+    write_limit: usize,
+    buffer: BytesMut,
+    writer_id: i64,
+    batch_sequence: i32,
+    current_record_number: i32,
+    size_in_bytes: usize,
+    is_closed: bool,
+    kv_format: KvFormat,
+    aborted: bool,
+    built_buffer: Option<Bytes>,
+}
+
+impl KvRecordBatchBuilder {
+    /// Create a new KvRecordBatchBuilder.
+    ///
+    /// # Arguments
+    /// * `schema_id` - The schema ID for records in this batch (must fit in 
i16)
+    /// * `write_limit` - Maximum bytes that can be appended
+    /// * `kv_format` - The KV format (Compacted, Indexed, or Aligned)
+    pub fn new(schema_id: i32, write_limit: usize, kv_format: KvFormat) -> 
Self {
+        assert!(
+            schema_id <= i16::MAX as i32,
+            "schema_id shouldn't be greater than the max value of i16: {}",
+            i16::MAX
+        );
+
+        let mut buffer = 
BytesMut::with_capacity(write_limit.max(RECORD_BATCH_HEADER_SIZE));
+
+        // Reserve space for header (we'll write it at the end)
+        buffer.resize(RECORD_BATCH_HEADER_SIZE, 0);
+
+        Self {
+            schema_id,
+            magic: CURRENT_KV_MAGIC_VALUE,
+            write_limit,
+            buffer,
+            writer_id: NO_WRITER_ID,
+            batch_sequence: NO_BATCH_SEQUENCE,
+            current_record_number: 0,
+            size_in_bytes: RECORD_BATCH_HEADER_SIZE,
+            is_closed: false,
+            kv_format,
+            aborted: false,
+            built_buffer: None,
+        }
+    }
+
+    /// Check if there is room for a new record containing the given key and 
value.
+    /// If no records have been appended, this always returns true.
+    pub fn has_room_for(&self, key: &[u8], value: Option<&[u8]>) -> bool {
+        self.size_in_bytes + KvRecord::size_of(key, value) <= self.write_limit
+    }
+
+    /// Check if there is room for a new record containing the given key and 
CompactedRow.
+    /// If no records have been appended, this always returns true.
+    pub fn has_room_for_row(&self, key: &[u8], row_writer: 
Option<&CompactedRowWriter>) -> bool {
+        let value = row_writer.map(|w: &CompactedRowWriter| w.buffer());
+        self.has_room_for(key, value)
+    }
+
+    /// Append a KV record with a CompactedRow value to the batch.
+    ///
+    /// This is the recommended API for KvFormat::COMPACTED batches.
+    ///
+    /// # Arguments
+    /// * `key` - The key bytes
+    /// * `row_writer` - The CompactedRowWriter containing the serialized row 
data.
+    ///   Pass None for deletion records.
+    ///
+    /// # Errors
+    /// Returns an error if:
+    /// - The builder has been aborted
+    /// - The builder is closed
+    /// - Adding this record would exceed the write limit
+    /// - The maximum number of records is exceeded
+    /// - The KV format is not COMPACTED
+    pub fn append_row(
+        &mut self,
+        key: &[u8],
+        row_writer: Option<&CompactedRowWriter>,
+    ) -> io::Result<()> {
+        if self.kv_format != KvFormat::COMPACTED {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidInput,
+                "append_row can only be used with KvFormat::COMPACTED",
+            ));
+        }
+
+        let value = row_writer.map(|w: &CompactedRowWriter| w.buffer());
+        self.append(key, value)
+    }
+
+    /// Append a KV record to the batch with raw bytes.
+    ///
+    /// # Arguments
+    /// * `key` - The key bytes
+    /// * `value` - Optional value bytes. If None, this represents a deletion.
+    ///
+    /// # Errors
+    /// Returns an error if:
+    /// - The builder has been aborted
+    /// - The builder is closed
+    /// - Adding this record would exceed the write limit
+    /// - The maximum number of records is exceeded
+    /// - The value format doesn't match the configured KV format
+    pub fn append(&mut self, key: &[u8], value: Option<&[u8]>) -> 
io::Result<()> {
+        if self.aborted {
+            return Err(io::Error::other(
+                "Tried to append a record, but KvRecordBatchBuilder has 
already been aborted",
+            ));
+        }
+
+        if self.is_closed {
+            return Err(io::Error::other(
+                "Tried to append a record, but KvRecordBatchBuilder is closed 
for record appends",
+            ));
+        }
+
+        // Check record count limit before mutation
+        if self.current_record_number == i32::MAX {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidInput,
+                format!(
+                    "Maximum number of records per batch exceeded, max 
records: {}",
+                    i32::MAX
+                ),
+            ));
+        }
+
+        let record_size = KvRecord::size_of(key, value);
+        if self.size_in_bytes + record_size > self.write_limit {
+            return Err(io::Error::new(
+                io::ErrorKind::WriteZero,
+                format!(
+                    "Adding record would exceed write limit: {} + {} > {}",
+                    self.size_in_bytes, record_size, self.write_limit
+                ),
+            ));
+        }
+
+        self.validate_row_format(value)?;
+
+        let record_byte_size = KvRecord::write_to_buf(&mut self.buffer, key, 
value)?;
+        debug_assert_eq!(record_byte_size, record_size, "Record size 
mismatch");
+
+        self.current_record_number += 1;
+        self.size_in_bytes += record_byte_size;
+
+        // Invalidate cached buffer since we modified the batch
+        self.built_buffer = None;
+
+        Ok(())
+    }
+
+    /// Set the writer state (writer ID and batch base sequence).
+    pub fn set_writer_state(&mut self, writer_id: i64, batch_base_sequence: 
i32) {
+        self.writer_id = writer_id;
+        self.batch_sequence = batch_base_sequence;
+        // Invalidate cached buffer since header fields changed
+        self.built_buffer = None;
+    }
+
+    /// Reset the writer state.
+    /// This triggers a rebuild of the batch header on next build.
+    pub fn reset_writer_state(&mut self, writer_id: i64, batch_sequence: i32) {
+        self.built_buffer = None;
+        self.writer_id = writer_id;
+        self.batch_sequence = batch_sequence;
+    }
+
+    /// Build the batch and return the serialized bytes.
+    /// This can be called multiple times as the batch is cached after the 
first build.
+    pub fn build(&mut self) -> io::Result<Bytes> {
+        if self.aborted {
+            return Err(io::Error::other(
+                "Attempting to build an aborted record batch",
+            ));
+        }
+
+        if let Some(ref cached) = self.built_buffer {
+            return Ok(cached.clone());
+        }
+
+        self.write_batch_header()?;
+        let bytes = self.buffer.clone().freeze();
+        self.built_buffer = Some(bytes.clone());
+        Ok(bytes)
+    }
+
+    /// Get the writer ID.
+    pub fn writer_id(&self) -> i64 {
+        self.writer_id
+    }
+
+    /// Get the batch sequence.
+    pub fn batch_sequence(&self) -> i32 {
+        self.batch_sequence
+    }
+
+    /// Check if the builder is closed.
+    pub fn is_closed(&self) -> bool {
+        self.is_closed
+    }
+
+    /// Abort the builder.
+    /// After aborting, no more records can be appended and the batch cannot 
be built.
+    pub fn abort(&mut self) {
+        self.aborted = true;
+    }
+
+    /// Close the builder.
+    /// After closing, no more records can be appended, but the batch can 
still be built.
+    pub fn close(&mut self) -> io::Result<()> {
+        if self.aborted {
+            return Err(io::Error::other(
+                "Cannot close KvRecordBatchBuilder as it has already been 
aborted",
+            ));
+        }
+        self.is_closed = true;
+        Ok(())
+    }
+
+    /// Get the current size in bytes of the batch.
+    pub fn get_size_in_bytes(&self) -> usize {
+        self.size_in_bytes
+    }
+
+    // ----------------------- Internal methods -------------------------------
+
+    /// Write the batch header.
+    fn write_batch_header(&mut self) -> io::Result<()> {
+        let size_without_length = self.size_in_bytes - LENGTH_LENGTH;
+        let total_size = i32::try_from(size_without_length).map_err(|_| {
+            io::Error::new(
+                io::ErrorKind::InvalidInput,
+                format!("Batch size {} exceeds i32::MAX", size_without_length),
+            )
+        })?;
+
+        // Compute attributes before borrowing buffer mutably
+        let attributes = self.compute_attributes();
+
+        // Write to the beginning of the buffer
+        let header = &mut self.buffer[0..RECORD_BATCH_HEADER_SIZE];
+
+        // Write length
+        header[LENGTH_OFFSET..LENGTH_OFFSET + LENGTH_LENGTH]
+            .copy_from_slice(&total_size.to_be_bytes());
+
+        // Write magic
+        header[MAGIC_OFFSET] = self.magic;
+
+        // Write empty CRC first (will update later)
+        header[CRC_OFFSET..CRC_OFFSET + 
4].copy_from_slice(&0u32.to_be_bytes());
+
+        // Write schema ID
+        header[SCHEMA_ID_OFFSET..SCHEMA_ID_OFFSET + 2]
+            .copy_from_slice(&(self.schema_id as i16).to_be_bytes());
+
+        // Write attributes
+        header[ATTRIBUTES_OFFSET] = attributes;
+
+        // Write writer ID
+        header[WRITER_ID_OFFSET..WRITER_ID_OFFSET + 8]
+            .copy_from_slice(&self.writer_id.to_be_bytes());
+
+        // Write batch sequence
+        header[BATCH_SEQUENCE_OFFSET..BATCH_SEQUENCE_OFFSET + 4]
+            .copy_from_slice(&self.batch_sequence.to_be_bytes());
+
+        // Write record count
+        header[RECORDS_COUNT_OFFSET..RECORDS_COUNT_OFFSET + 4]
+            .copy_from_slice(&self.current_record_number.to_be_bytes());
+
+        // Compute and update CRC
+        let crc = 
crc32c::crc32c(&self.buffer[SCHEMA_ID_OFFSET..self.size_in_bytes]);
+        self.buffer[CRC_OFFSET..CRC_OFFSET + 
4].copy_from_slice(&crc.to_be_bytes());
+
+        Ok(())
+    }
+
+    /// Compute the attributes byte.
+    fn compute_attributes(&self) -> u8 {
+        // Currently no attributes are used
+        0
+    }
+
+    /// Validate the row format according to the KV format.
+    fn validate_row_format(&self, value: Option<&[u8]>) -> io::Result<()> {
+        match self.kv_format {
+            KvFormat::COMPACTED => {
+                if let Some(bytes) = value {
+                    // CompactedRow must have at least a header (null bitmap).
+                    // The minimum size is at least 1 byte for any row (even 1 
field has 1 byte header).
+                    // We can't validate the exact header size without knowing 
field count (arity),
+                    // but we can at least check it's not empty.
+                    if bytes.is_empty() {
+                        return Err(io::Error::new(
+                            io::ErrorKind::InvalidData,
+                            "CompactedRow value cannot be empty (must have at 
least a header)",
+                        ));
+                    }
+                    // Further validation could be done if we stored 
schema/field count,
+                    // but for now this basic check prevents obvious errors.

Review Comment:
   The validation logic checks if `bytes.is_empty()` for CompactedRow values, 
but this may be too permissive. A valid CompactedRow must have at least a 
header (null bitmap) which depends on the field count (arity). For example, 
with 1 field, the header is 1 byte; with 8 fields, it's 1 byte; with 9 fields, 
it's 2 bytes. Without knowing the field count, you cannot properly validate 
whether the value is a structurally valid CompactedRow. Consider documenting 
this limitation or storing the schema/field count to enable proper validation.
   ```suggestion
       ///
       /// Note:
       /// - For [`KvFormat::COMPACTED`], this method performs **only a shallow 
check** that the
       ///   value is non-empty if present. A structurally valid `CompactedRow` 
must include at
       ///   least a null-bitmap header whose size depends on the field count 
(arity), but the
       ///   builder does not have access to the schema/field count and 
therefore cannot fully
       ///   validate the layout here.
       /// - For [`KvFormat::INDEXED`], values are currently rejected because 
the format is not
       ///   yet implemented.
       ///
       /// Callers must not rely on this method to guarantee that `COMPACTED` 
values are
       /// structurally valid; it only detects obviously invalid empty values.
       fn validate_row_format(&self, value: Option<&[u8]>) -> io::Result<()> {
           match self.kv_format {
               KvFormat::COMPACTED => {
                   if let Some(bytes) = value {
                       // A CompactedRow must have at least a header (null 
bitmap).
                       // The minimum size is at least 1 byte for any row (even 
1 field has a 1-byte header).
                       //
                       // IMPORTANT LIMITATION:
                       // We do not know the field count (arity) here, so we 
cannot compute or verify
                       // the exact expected header size or the full row 
layout. As a result, this
                       // check only ensures that the value is not empty; it 
does NOT guarantee that
                       // the bytes represent a structurally valid CompactedRow.
                       if bytes.is_empty() {
                           return Err(io::Error::new(
                               io::ErrorKind::InvalidData,
                               "CompactedRow value cannot be empty (must 
include at least a header)",
                           ));
                       }
                       // TODO: When schema/field count is available to this 
builder, extend this
                       // validation to check the expected header size and row 
layout.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to