This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 49d0c93  feat: introduce KvRecordBatchBuilder (#156)
49d0c93 is described below

commit 49d0c9390f8c8247e9f29087a43e861ef4dad2b9
Author: Anton Borisov <[email protected]>
AuthorDate: Thu Jan 15 02:51:57 2026 +0000

    feat: introduce KvRecordBatchBuilder (#156)
---
 crates/fluss/src/record/kv/kv_record.rs            | 343 ++++++++++++
 crates/fluss/src/record/kv/kv_record_batch.rs      | 394 ++++++++++++++
 .../fluss/src/record/kv/kv_record_batch_builder.rs | 581 +++++++++++++++++++++
 crates/fluss/src/record/kv/mod.rs                  |  35 ++
 crates/fluss/src/record/mod.rs                     |   1 +
 .../src/row/compacted/compacted_key_writer.rs      |   6 +
 .../src/row/compacted/compacted_row_reader.rs      |  35 +-
 .../src/row/compacted/compacted_row_writer.rs      |  31 +-
 crates/fluss/src/row/mod.rs                        |   7 +-
 crates/fluss/src/util/mod.rs                       |   1 +
 crates/fluss/src/util/varint.rs                    | 502 ++++++++++++++++++
 11 files changed, 1893 insertions(+), 43 deletions(-)

diff --git a/crates/fluss/src/record/kv/kv_record.rs 
b/crates/fluss/src/record/kv/kv_record.rs
new file mode 100644
index 0000000..8c30713
--- /dev/null
+++ b/crates/fluss/src/record/kv/kv_record.rs
@@ -0,0 +1,343 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Key-Value record implementation.
+//!
+//! This module provides the KvRecord struct which represents an immutable 
key-value record.
+//! The record format is:
+//! - Length => Int32
+//! - KeyLength => Unsigned VarInt
+//! - Key => bytes
+//! - Row => BinaryRow (optional, if null then this is a deletion record)
+
+use bytes::{BufMut, Bytes, BytesMut};
+use std::io;
+
+use crate::util::varint::{
+    read_unsigned_varint_bytes, size_of_unsigned_varint, 
write_unsigned_varint_buf,
+};
+
+/// Length field size in bytes
+pub const LENGTH_LENGTH: usize = 4;
+
+/// A key-value record.
+///
+/// The schema is:
+/// - Length => Int32
+/// - KeyLength => Unsigned VarInt
+/// - Key => bytes
+/// - Value => bytes (BinaryRow, written directly without length prefix)
+///
+/// When the value is None (deletion), no Value bytes are present.
+// Reference implementation:
+// 
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/record/KvRecord.java
+#[derive(Debug, Clone)]
+pub struct KvRecord {
+    key: Bytes,
+    value: Option<Bytes>,
+    size_in_bytes: usize,
+}
+
+impl KvRecord {
+    /// Create a new KvRecord with the given key and optional value.
+    pub fn new(key: Bytes, value: Option<Bytes>) -> Self {
+        let size_in_bytes = Self::size_of(&key, value.as_deref());
+        Self {
+            key,
+            value,
+            size_in_bytes,
+        }
+    }
+
+    /// Get the key bytes.
+    pub fn key(&self) -> &Bytes {
+        &self.key
+    }
+
+    /// Get the value bytes (None indicates a deletion).
+    pub fn value(&self) -> Option<&Bytes> {
+        self.value.as_ref()
+    }
+
+    /// Calculate the total size of the record when serialized (including 
length prefix).
+    pub fn size_of(key: &[u8], value: Option<&[u8]>) -> usize {
+        Self::size_without_length(key, value) + LENGTH_LENGTH
+    }
+
+    /// Calculate the size without the length prefix.
+    fn size_without_length(key: &[u8], value: Option<&[u8]>) -> usize {
+        let key_len = key.len();
+        let key_len_size = size_of_unsigned_varint(key_len as u32);
+
+        match value {
+            Some(v) => 
key_len_size.saturating_add(key_len).saturating_add(v.len()),
+            None => {
+                // Deletion: no value bytes
+                key_len_size.saturating_add(key_len)
+            }
+        }
+    }
+
+    /// Write a KV record to a buffer.
+    ///
+    /// Returns the number of bytes written.
+    pub fn write_to_buf(buf: &mut BytesMut, key: &[u8], value: Option<&[u8]>) 
-> io::Result<usize> {
+        let size_in_bytes = Self::size_without_length(key, value);
+
+        let size_i32 = i32::try_from(size_in_bytes).map_err(|_| {
+            io::Error::new(
+                io::ErrorKind::InvalidInput,
+                format!("Record size {} exceeds i32::MAX", size_in_bytes),
+            )
+        })?;
+        buf.put_i32_le(size_i32);
+        let key_len = key.len() as u32;
+        write_unsigned_varint_buf(key_len, buf);
+
+        buf.put_slice(key);
+
+        if let Some(v) = value {
+            buf.put_slice(v);
+        }
+        // For None (deletion), don't write any value bytes
+
+        Ok(size_in_bytes + LENGTH_LENGTH)
+    }
+
+    /// Read a KV record from bytes at the given position.
+    ///
+    /// Returns the KvRecord and the number of bytes consumed.
+    ///
+    /// TODO: Connect KvReadContext and return CompactedRow records.
+    pub fn read_from(bytes: &Bytes, position: usize) -> io::Result<(Self, 
usize)> {
+        if bytes.len() < position.saturating_add(LENGTH_LENGTH) {
+            return Err(io::Error::new(
+                io::ErrorKind::UnexpectedEof,
+                "Not enough bytes to read record length",
+            ));
+        }
+
+        let size_in_bytes_i32 = i32::from_le_bytes([
+            bytes[position],
+            bytes[position + 1],
+            bytes[position + 2],
+            bytes[position + 3],
+        ]);
+
+        if size_in_bytes_i32 < 0 {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                format!("Invalid record length: {}", size_in_bytes_i32),
+            ));
+        }
+
+        let size_in_bytes = size_in_bytes_i32 as usize;
+
+        let total_size = 
size_in_bytes.checked_add(LENGTH_LENGTH).ok_or_else(|| {
+            io::Error::new(
+                io::ErrorKind::InvalidData,
+                format!(
+                    "Record size overflow: {} + {}",
+                    size_in_bytes, LENGTH_LENGTH
+                ),
+            )
+        })?;
+
+        let available = bytes.len().saturating_sub(position);
+        if available < total_size {
+            return Err(io::Error::new(
+                io::ErrorKind::UnexpectedEof,
+                format!(
+                    "Not enough bytes to read record: expected {}, available 
{}",
+                    total_size, available
+                ),
+            ));
+        }
+
+        let mut current_offset = position + LENGTH_LENGTH;
+        let record_end = position + total_size;
+
+        // Read key length as unsigned varint (bounded by record end)
+        let (key_len, varint_size) =
+            read_unsigned_varint_bytes(&bytes[current_offset..record_end])?;
+        current_offset += varint_size;
+
+        // Read key bytes
+        let key_end = current_offset + key_len as usize;
+        if key_end > position + total_size {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                "Key length exceeds record size",
+            ));
+        }
+        let key = bytes.slice(current_offset..key_end);
+        current_offset = key_end;
+
+        // Read value bytes directly
+        let value = if current_offset < record_end {
+            // Value is present: all remaining bytes are the value
+            let value_bytes = bytes.slice(current_offset..record_end);
+            Some(value_bytes)
+        } else {
+            // No remaining bytes: this is a deletion record
+            None
+        };
+
+        Ok((
+            Self {
+                key,
+                value,
+                size_in_bytes: total_size,
+            },
+            total_size,
+        ))
+    }
+
+    /// Get the total size in bytes of this record.
+    pub fn get_size_in_bytes(&self) -> usize {
+        self.size_in_bytes
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_kv_record_size_calculation() {
+        let key = b"test_key";
+        let value = b"test_value";
+
+        // With value (no value length varint)
+        let size_with_value = KvRecord::size_of(key, Some(value));
+        assert_eq!(
+            size_with_value,
+            LENGTH_LENGTH + size_of_unsigned_varint(key.len() as u32) + 
key.len() + value.len()
+        );
+
+        // Without value
+        let size_without_value = KvRecord::size_of(key, None);
+        assert_eq!(
+            size_without_value,
+            LENGTH_LENGTH + size_of_unsigned_varint(key.len() as u32) + 
key.len()
+        );
+    }
+
+    #[test]
+    fn test_kv_record_write_read_round_trip() {
+        let key = b"my_key";
+        let value = b"my_value_data";
+
+        let mut buf = BytesMut::new();
+        let written = KvRecord::write_to_buf(&mut buf, key, 
Some(value)).unwrap();
+
+        let bytes = buf.freeze();
+        let (record, read_size) = KvRecord::read_from(&bytes, 0).unwrap();
+
+        assert_eq!(written, read_size);
+        assert_eq!(record.key().as_ref(), key);
+        assert_eq!(record.value().unwrap().as_ref(), value);
+        assert_eq!(record.get_size_in_bytes(), written);
+    }
+
+    #[test]
+    fn test_kv_record_deletion() {
+        let key = b"delete_me";
+
+        // Write deletion record (no value)
+        let mut buf = BytesMut::new();
+        let written = KvRecord::write_to_buf(&mut buf, key, None).unwrap();
+
+        let bytes = buf.freeze();
+        let (record, read_size) = KvRecord::read_from(&bytes, 0).unwrap();
+
+        assert_eq!(written, read_size);
+        assert_eq!(record.key().as_ref(), key);
+        assert!(record.value().is_none());
+    }
+
+    #[test]
+    fn test_kv_record_with_large_key() {
+        let key = vec![0u8; 1024];
+        let value = vec![1u8; 4096];
+
+        let mut buf = BytesMut::new();
+        let written = KvRecord::write_to_buf(&mut buf, &key, 
Some(&value)).unwrap();
+
+        let bytes = buf.freeze();
+        let (record, read_size) = KvRecord::read_from(&bytes, 0).unwrap();
+
+        assert_eq!(written, read_size);
+        assert_eq!(record.key().len(), key.len());
+        assert_eq!(record.value().unwrap().len(), value.len());
+    }
+
+    #[test]
+    fn test_invalid_record_lengths() {
+        let mut buf = BytesMut::new();
+        buf.put_i32_le(-1); // Negative length
+        buf.put_u8(1); // Some dummy data
+        buf.put_slice(b"key");
+        let bytes = buf.freeze();
+        let result = KvRecord::read_from(&bytes, 0);
+        assert!(result.is_err());
+        assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidData);
+
+        // Test overflow length
+        let mut buf = BytesMut::new();
+        buf.put_i32_le(i32::MAX); // Very large length
+        buf.put_u8(1); // Some dummy data
+        let bytes = buf.freeze();
+        let result = KvRecord::read_from(&bytes, 0);
+        assert!(result.is_err());
+
+        // Test impossibly large but non-negative length
+        let mut buf = BytesMut::new();
+        buf.put_i32_le(1_000_000);
+        let bytes = buf.freeze();
+        let result = KvRecord::read_from(&bytes, 0);
+        assert!(result.is_err());
+        assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof);
+    }
+
+    #[test]
+    fn test_multiple_records_in_buffer() {
+        let records = vec![
+            (b"key1".as_slice(), Some(b"value1".as_slice())),
+            (b"key2".as_slice(), None),
+            (b"key3".as_slice(), Some(b"value3".as_slice())),
+        ];
+
+        let mut buf = BytesMut::new();
+        for (key, value) in &records {
+            KvRecord::write_to_buf(&mut buf, key, *value).unwrap();
+        }
+
+        let bytes = buf.freeze();
+        let mut offset = 0;
+        for (expected_key, expected_value) in &records {
+            let (record, size) = KvRecord::read_from(&bytes, offset).unwrap();
+            assert_eq!(record.key().as_ref(), *expected_key);
+            match expected_value {
+                Some(v) => assert_eq!(record.value().unwrap().as_ref(), *v),
+                None => assert!(record.value().is_none()),
+            }
+            offset += size;
+        }
+        assert_eq!(offset, bytes.len());
+    }
+}
diff --git a/crates/fluss/src/record/kv/kv_record_batch.rs 
b/crates/fluss/src/record/kv/kv_record_batch.rs
new file mode 100644
index 0000000..fdd4ad7
--- /dev/null
+++ b/crates/fluss/src/record/kv/kv_record_batch.rs
@@ -0,0 +1,394 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! KV record batch implementation.
+//!
+//! The schema of a KvRecordBatch is:
+//! - Length => Int32
+//! - Magic => Int8
+//! - CRC => Uint32
+//! - SchemaId => Int16
+//! - Attributes => Int8
+//! - WriterId => Int64
+//! - BatchSequence => Int32
+//! - RecordCount => Int32
+//! - Records => [Record]
+//!
+//! The CRC covers data from the SchemaId to the end of the batch.
+
+use bytes::Bytes;
+use std::io;
+
+use crate::record::kv::KvRecord;
+
+// Field lengths in bytes
+pub const LENGTH_LENGTH: usize = 4;
+pub const MAGIC_LENGTH: usize = 1;
+pub const CRC_LENGTH: usize = 4;
+pub const SCHEMA_ID_LENGTH: usize = 2;
+pub const ATTRIBUTE_LENGTH: usize = 1;
+pub const WRITE_CLIENT_ID_LENGTH: usize = 8;
+pub const BATCH_SEQUENCE_LENGTH: usize = 4;
+pub const RECORDS_COUNT_LENGTH: usize = 4;
+
+// Field offsets
+pub const LENGTH_OFFSET: usize = 0;
+pub const MAGIC_OFFSET: usize = LENGTH_OFFSET + LENGTH_LENGTH;
+pub const CRC_OFFSET: usize = MAGIC_OFFSET + MAGIC_LENGTH;
+pub const SCHEMA_ID_OFFSET: usize = CRC_OFFSET + CRC_LENGTH;
+pub const ATTRIBUTES_OFFSET: usize = SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH;
+pub const WRITE_CLIENT_ID_OFFSET: usize = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
+pub const BATCH_SEQUENCE_OFFSET: usize = WRITE_CLIENT_ID_OFFSET + 
WRITE_CLIENT_ID_LENGTH;
+pub const RECORDS_COUNT_OFFSET: usize = BATCH_SEQUENCE_OFFSET + 
BATCH_SEQUENCE_LENGTH;
+pub const RECORDS_OFFSET: usize = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH;
+
+/// Total header size
+pub const RECORD_BATCH_HEADER_SIZE: usize = RECORDS_OFFSET;
+
+/// Overhead of the batch (length field)
+pub const KV_OVERHEAD: usize = LENGTH_OFFSET + LENGTH_LENGTH;
+
+/// A KV record batch.
+///
+/// This struct provides read access to a serialized KV record batch.
+// Reference implementation:
+// 
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatch.java
+pub struct KvRecordBatch {
+    data: Bytes,
+    position: usize,
+}
+
+impl KvRecordBatch {
+    /// Create a new KvRecordBatch pointing to the given data at the specified 
position.
+    pub fn new(data: Bytes, position: usize) -> Self {
+        Self { data, position }
+    }
+
+    /// Get the size in bytes of this batch.
+    pub fn size_in_bytes(&self) -> io::Result<usize> {
+        if self.data.len() < self.position.saturating_add(LENGTH_LENGTH) {
+            return Err(io::Error::new(
+                io::ErrorKind::UnexpectedEof,
+                "Not enough bytes to read batch length",
+            ));
+        }
+        let length_i32 = i32::from_le_bytes([
+            self.data[self.position],
+            self.data[self.position + 1],
+            self.data[self.position + 2],
+            self.data[self.position + 3],
+        ]);
+
+        if length_i32 < 0 {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                format!("Invalid batch length: {}", length_i32),
+            ));
+        }
+
+        let length = length_i32 as usize;
+
+        Ok(length.saturating_add(KV_OVERHEAD))
+    }
+
+    /// Check if this batch is valid by verifying the checksum.
+    pub fn is_valid(&self) -> bool {
+        if !matches!(self.size_in_bytes(), Ok(s) if s >= 
RECORD_BATCH_HEADER_SIZE) {
+            return false;
+        }
+
+        match (self.checksum(), self.compute_checksum()) {
+            (Ok(stored), Ok(computed)) => stored == computed,
+            _ => false,
+        }
+    }
+
+    /// Get the magic byte.
+    pub fn magic(&self) -> io::Result<u8> {
+        if self.data.len() < 
self.position.saturating_add(MAGIC_OFFSET).saturating_add(1) {
+            return Err(io::Error::new(
+                io::ErrorKind::UnexpectedEof,
+                "Not enough bytes to read magic byte",
+            ));
+        }
+        Ok(self.data[self.position + MAGIC_OFFSET])
+    }
+
+    /// Get the checksum.
+    pub fn checksum(&self) -> io::Result<u32> {
+        if self.data.len() < 
self.position.saturating_add(CRC_OFFSET).saturating_add(4) {
+            return Err(io::Error::new(
+                io::ErrorKind::UnexpectedEof,
+                "Not enough bytes to read checksum",
+            ));
+        }
+        Ok(u32::from_le_bytes([
+            self.data[self.position + CRC_OFFSET],
+            self.data[self.position + CRC_OFFSET + 1],
+            self.data[self.position + CRC_OFFSET + 2],
+            self.data[self.position + CRC_OFFSET + 3],
+        ]))
+    }
+
+    /// Compute the checksum of this batch.
+    pub fn compute_checksum(&self) -> io::Result<u32> {
+        let size = self.size_in_bytes()?;
+        if size < RECORD_BATCH_HEADER_SIZE {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                format!(
+                    "Batch size {} is less than header size {}",
+                    size, RECORD_BATCH_HEADER_SIZE
+                ),
+            ));
+        }
+
+        let start = self.position.saturating_add(SCHEMA_ID_OFFSET);
+        let end = self.position.saturating_add(size);
+
+        if end > self.data.len() || start >= end {
+            return Err(io::Error::new(
+                io::ErrorKind::UnexpectedEof,
+                "Not enough bytes to compute checksum",
+            ));
+        }
+
+        Ok(crc32c::crc32c(&self.data[start..end]))
+    }
+
+    /// Get the schema ID.
+    pub fn schema_id(&self) -> io::Result<i16> {
+        if self.data.len()
+            < self
+                .position
+                .saturating_add(SCHEMA_ID_OFFSET)
+                .saturating_add(2)
+        {
+            return Err(io::Error::new(
+                io::ErrorKind::UnexpectedEof,
+                "Not enough bytes to read schema ID",
+            ));
+        }
+        Ok(i16::from_le_bytes([
+            self.data[self.position + SCHEMA_ID_OFFSET],
+            self.data[self.position + SCHEMA_ID_OFFSET + 1],
+        ]))
+    }
+
+    /// Get the writer ID.
+    pub fn writer_id(&self) -> io::Result<i64> {
+        if self.data.len()
+            < self
+                .position
+                .saturating_add(WRITE_CLIENT_ID_OFFSET)
+                .saturating_add(8)
+        {
+            return Err(io::Error::new(
+                io::ErrorKind::UnexpectedEof,
+                "Not enough bytes to read writer ID",
+            ));
+        }
+        Ok(i64::from_le_bytes([
+            self.data[self.position + WRITE_CLIENT_ID_OFFSET],
+            self.data[self.position + WRITE_CLIENT_ID_OFFSET + 1],
+            self.data[self.position + WRITE_CLIENT_ID_OFFSET + 2],
+            self.data[self.position + WRITE_CLIENT_ID_OFFSET + 3],
+            self.data[self.position + WRITE_CLIENT_ID_OFFSET + 4],
+            self.data[self.position + WRITE_CLIENT_ID_OFFSET + 5],
+            self.data[self.position + WRITE_CLIENT_ID_OFFSET + 6],
+            self.data[self.position + WRITE_CLIENT_ID_OFFSET + 7],
+        ]))
+    }
+
+    /// Get the batch sequence.
+    pub fn batch_sequence(&self) -> io::Result<i32> {
+        if self.data.len()
+            < self
+                .position
+                .saturating_add(BATCH_SEQUENCE_OFFSET)
+                .saturating_add(4)
+        {
+            return Err(io::Error::new(
+                io::ErrorKind::UnexpectedEof,
+                "Not enough bytes to read batch sequence",
+            ));
+        }
+        Ok(i32::from_le_bytes([
+            self.data[self.position + BATCH_SEQUENCE_OFFSET],
+            self.data[self.position + BATCH_SEQUENCE_OFFSET + 1],
+            self.data[self.position + BATCH_SEQUENCE_OFFSET + 2],
+            self.data[self.position + BATCH_SEQUENCE_OFFSET + 3],
+        ]))
+    }
+
+    /// Get the number of records in this batch.
+    pub fn record_count(&self) -> io::Result<i32> {
+        if self.data.len()
+            < self
+                .position
+                .saturating_add(RECORDS_COUNT_OFFSET)
+                .saturating_add(4)
+        {
+            return Err(io::Error::new(
+                io::ErrorKind::UnexpectedEof,
+                "Not enough bytes to read record count",
+            ));
+        }
+        Ok(i32::from_le_bytes([
+            self.data[self.position + RECORDS_COUNT_OFFSET],
+            self.data[self.position + RECORDS_COUNT_OFFSET + 1],
+            self.data[self.position + RECORDS_COUNT_OFFSET + 2],
+            self.data[self.position + RECORDS_COUNT_OFFSET + 3],
+        ]))
+    }
+
+    /// Create an iterator over the records in this batch.
+    /// This validates the batch checksum before returning the iterator.
+    /// For trusted data paths, use `records_unchecked()` to skip validation.
+    pub fn records(&self) -> io::Result<KvRecordIterator> {
+        if !self.is_valid() {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                "Invalid batch checksum",
+            ));
+        }
+        self.records_unchecked()
+    }
+
+    /// Create an iterator over the records in this batch without validating 
the checksum
+    pub fn records_unchecked(&self) -> io::Result<KvRecordIterator> {
+        let size = self.size_in_bytes()?;
+        let count = self.record_count()?;
+        if count < 0 {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                format!("Invalid record count: {}", count),
+            ));
+        }
+        Ok(KvRecordIterator {
+            data: self.data.clone(),
+            position: self.position + RECORDS_OFFSET,
+            end: self.position + size,
+            remaining_count: count,
+        })
+    }
+}
+
+/// Iterator over records in a KV record batch.
+pub struct KvRecordIterator {
+    data: Bytes,
+    position: usize,
+    end: usize,
+    remaining_count: i32,
+}
+
+impl Iterator for KvRecordIterator {
+    type Item = io::Result<KvRecord>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        if self.remaining_count <= 0 || self.position >= self.end {
+            return None;
+        }
+
+        match KvRecord::read_from(&self.data, self.position) {
+            Ok((record, size)) => {
+                self.position += size;
+                self.remaining_count -= 1;
+                Some(Ok(record))
+            }
+            Err(e) => {
+                self.remaining_count = 0; // Stop iteration on error
+                Some(Err(e))
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::metadata::KvFormat;
+    use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, KvRecordBatchBuilder};
+    use bytes::{BufMut, BytesMut};
+
+    #[test]
+    fn test_invalid_batch_lengths() {
+        // Test negative length
+        let mut buf = BytesMut::new();
+        buf.put_i32_le(-1);
+        let bytes = buf.freeze();
+        let batch = KvRecordBatch::new(bytes, 0);
+        assert!(batch.size_in_bytes().is_err()); // Should error for invalid
+        assert!(!batch.is_valid());
+
+        // Test overflow length
+        let mut buf = BytesMut::new();
+        buf.put_i32_le(i32::MAX);
+        let bytes = buf.freeze();
+        let batch = KvRecordBatch::new(bytes, 0);
+        assert!(!batch.is_valid());
+
+        // Test too-short buffer
+        let mut buf = BytesMut::new();
+        buf.put_i32_le(100); // Claims 100 bytes but buffer is tiny
+        let bytes = buf.freeze();
+        let batch = KvRecordBatch::new(bytes, 0);
+        assert!(!batch.is_valid());
+    }
+
+    #[test]
+    fn test_kv_record_batch_build_and_read() {
+        use crate::row::compacted::CompactedRowWriter;
+
+        let schema_id = 42;
+        let write_limit = 4096;
+
+        let mut builder = KvRecordBatchBuilder::new(schema_id, write_limit, 
KvFormat::COMPACTED);
+        builder.set_writer_state(100, 5);
+
+        let key1 = b"key1";
+        let mut value1_writer = CompactedRowWriter::new(1);
+        value1_writer.write_bytes(&[1, 2, 3, 4, 5]);
+        builder.append_row(key1, Some(&value1_writer)).unwrap();
+
+        let key2 = b"key2";
+        builder
+            .append_row::<CompactedRowWriter>(key2, None)
+            .unwrap();
+
+        let bytes = builder.build().unwrap();
+
+        let batch = KvRecordBatch::new(bytes.clone(), 0);
+        assert!(batch.is_valid());
+        assert_eq!(batch.magic().unwrap(), CURRENT_KV_MAGIC_VALUE);
+        assert_eq!(batch.schema_id().unwrap(), schema_id as i16);
+        assert_eq!(batch.writer_id().unwrap(), 100);
+        assert_eq!(batch.batch_sequence().unwrap(), 5);
+        assert_eq!(batch.record_count().unwrap(), 2);
+
+        let records: Vec<_> = batch.records().unwrap().collect();
+        assert_eq!(records.len(), 2);
+
+        let record1 = records[0].as_ref().unwrap();
+        assert_eq!(record1.key().as_ref(), key1);
+        assert_eq!(record1.value().unwrap().as_ref(), value1_writer.buffer());
+
+        let record2 = records[1].as_ref().unwrap();
+        assert_eq!(record2.key().as_ref(), key2);
+        assert!(record2.value().is_none());
+    }
+}
diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs 
b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
new file mode 100644
index 0000000..773c778
--- /dev/null
+++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
@@ -0,0 +1,581 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! KV record batch builder implementation.
+//!
+//! This module provides the KvRecordBatchBuilder for building batches of KV 
records.
+
+use bytes::{Bytes, BytesMut};
+use std::io;
+
+use crate::metadata::KvFormat;
+use crate::record::kv::kv_record::KvRecord;
+use crate::record::kv::kv_record_batch::{
+    ATTRIBUTES_OFFSET, BATCH_SEQUENCE_OFFSET, CRC_OFFSET, LENGTH_LENGTH, 
LENGTH_OFFSET,
+    MAGIC_OFFSET, RECORD_BATCH_HEADER_SIZE, RECORDS_COUNT_OFFSET, 
SCHEMA_ID_OFFSET,
+    WRITE_CLIENT_ID_OFFSET,
+};
+use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, NO_BATCH_SEQUENCE, 
NO_WRITER_ID};
+use crate::row::BinaryRow;
+
+/// Builder for KvRecordBatch.
+///
+/// This builder accumulates KV records and produces a serialized batch with 
proper
+/// header information and checksums.
+// Reference implementation:
+// 
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatchBuilder.java
+pub struct KvRecordBatchBuilder {
+    schema_id: i32,
+    magic: u8,
+    write_limit: usize,
+    buffer: BytesMut,
+    writer_id: i64,
+    batch_sequence: i32,
+    current_record_number: i32,
+    size_in_bytes: usize,
+    is_closed: bool,
+    kv_format: KvFormat,
+    aborted: bool,
+    built_buffer: Option<Bytes>,
+}
+
+impl KvRecordBatchBuilder {
+    /// Create a new KvRecordBatchBuilder.
+    ///
+    /// # Arguments
+    /// * `schema_id` - The schema ID for records in this batch (must fit in 
i16)
+    /// * `write_limit` - Maximum bytes that can be appended
+    /// * `kv_format` - The KV format (Compacted, Indexed, or Aligned)
+    pub fn new(schema_id: i32, write_limit: usize, kv_format: KvFormat) -> 
Self {
+        assert!(
+            schema_id <= i16::MAX as i32,
+            "schema_id shouldn't be greater than the max value of i16: {}",
+            i16::MAX
+        );
+
+        let mut buffer = 
BytesMut::with_capacity(write_limit.max(RECORD_BATCH_HEADER_SIZE));
+
+        // Reserve space for header (we'll write it at the end)
+        buffer.resize(RECORD_BATCH_HEADER_SIZE, 0);
+
+        Self {
+            schema_id,
+            magic: CURRENT_KV_MAGIC_VALUE,
+            write_limit,
+            buffer,
+            writer_id: NO_WRITER_ID,
+            batch_sequence: NO_BATCH_SEQUENCE,
+            current_record_number: 0,
+            size_in_bytes: RECORD_BATCH_HEADER_SIZE,
+            is_closed: false,
+            kv_format,
+            aborted: false,
+            built_buffer: None,
+        }
+    }
+
+    /// Check if there is room for a new record containing the given key and 
row.
+    /// If no records have been appended, this always returns true.
+    pub fn has_room_for_row<R: BinaryRow>(&self, key: &[u8], row: Option<&R>) 
-> bool {
+        let value = row.map(|r| r.as_bytes());
+        self.size_in_bytes + KvRecord::size_of(key, value) <= self.write_limit
+    }
+
+    /// Append a KV record with a row value to the batch.
+    ///
+    /// Returns an error if:
+    /// - The builder has been aborted
+    /// - The builder is closed
+    /// - Adding this record would exceed the write limit
+    /// - The maximum number of records is exceeded
+    /// - The KV format is not COMPACTED
+    pub fn append_row<R: BinaryRow>(&mut self, key: &[u8], row: Option<&R>) -> 
io::Result<()> {
+        if self.kv_format != KvFormat::COMPACTED {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidInput,
+                "append_row can only be used with KvFormat::COMPACTED",
+            ));
+        }
+
+        if self.aborted {
+            return Err(io::Error::other(
+                "Tried to append a record, but KvRecordBatchBuilder has 
already been aborted",
+            ));
+        }
+
+        if self.is_closed {
+            return Err(io::Error::other(
+                "Tried to append a record, but KvRecordBatchBuilder is closed 
for record appends",
+            ));
+        }
+
+        // Check record count limit before mutation
+        if self.current_record_number == i32::MAX {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidInput,
+                format!(
+                    "Maximum number of records per batch exceeded, max 
records: {}",
+                    i32::MAX
+                ),
+            ));
+        }
+
+        let value = row.map(|r| r.as_bytes());
+        let record_size = KvRecord::size_of(key, value);
+        if self.size_in_bytes + record_size > self.write_limit {
+            return Err(io::Error::new(
+                io::ErrorKind::WriteZero,
+                format!(
+                    "Adding record would exceed write limit: {} + {} > {}",
+                    self.size_in_bytes, record_size, self.write_limit
+                ),
+            ));
+        }
+
+        let record_byte_size = KvRecord::write_to_buf(&mut self.buffer, key, 
value)?;
+        debug_assert_eq!(record_byte_size, record_size, "Record size 
mismatch");
+
+        self.current_record_number += 1;
+        self.size_in_bytes += record_byte_size;
+
+        // Invalidate cached buffer since we modified the batch
+        self.built_buffer = None;
+
+        Ok(())
+    }
+
+    /// Set the writer state (writer ID and batch base sequence).
+    ///
+    /// This invalidates any cached buffer, ensuring the batch header will be 
rebuilt
+    /// on the next call to [`build`](Self::build).
+    pub fn set_writer_state(&mut self, writer_id: i64, batch_base_sequence: 
i32) {
+        self.writer_id = writer_id;
+        self.batch_sequence = batch_base_sequence;
+        // Invalidate cached buffer since header fields changed
+        self.built_buffer = None;
+    }
+
+    /// Build the batch and return the serialized bytes.
+    ///
+    /// This can be called multiple times as the batch is cached after the 
first build.
+    ///
+    /// # Caching and Mutations
+    ///
+    /// The builder caches the result after the first successful build. 
However, the cache
+    /// is invalidated (and the batch rebuilt) if any of the following occur 
after building:
+    /// - Calling [`append_row`](Self::append_row) to add records
+    /// - Calling [`set_writer_state`](Self::set_writer_state) to modify 
writer metadata
+    ///
+    /// This allows the builder to be reused with different writer states or 
to continue
+    /// appending records after an initial build, but callers should be aware 
that the
+    /// built bytes may change if mutations occur between builds.
+    ///
+    /// Note: [`close`](Self::close) prevents further appends but does not 
prevent writer state modifications.
+    pub fn build(&mut self) -> io::Result<Bytes> {
+        if self.aborted {
+            return Err(io::Error::other(
+                "Attempting to build an aborted record batch",
+            ));
+        }
+
+        if let Some(ref cached) = self.built_buffer {
+            return Ok(cached.clone());
+        }
+
+        self.write_batch_header()?;
+        let bytes = self.buffer.clone().freeze();
+        self.built_buffer = Some(bytes);
+        Ok(self.built_buffer.as_ref().unwrap().clone())
+    }
+
+    /// Get the writer ID.
+    pub fn writer_id(&self) -> i64 {
+        self.writer_id
+    }
+
+    /// Get the batch sequence.
+    pub fn batch_sequence(&self) -> i32 {
+        self.batch_sequence
+    }
+
+    /// Check if the builder is closed.
+    pub fn is_closed(&self) -> bool {
+        self.is_closed
+    }
+
+    /// Abort the builder.
+    /// After aborting, no more records can be appended and the batch cannot 
be built.
+    pub fn abort(&mut self) {
+        self.aborted = true;
+    }
+
+    /// Close the builder.
+    /// After closing, no more records can be appended, but the batch can 
still be built.
+    pub fn close(&mut self) -> io::Result<()> {
+        if self.aborted {
+            return Err(io::Error::other(
+                "Cannot close KvRecordBatchBuilder as it has already been 
aborted",
+            ));
+        }
+        self.is_closed = true;
+        Ok(())
+    }
+
+    /// Get the current size in bytes of the batch.
+    pub fn get_size_in_bytes(&self) -> usize {
+        self.size_in_bytes
+    }
+
+    // ----------------------- Internal methods -------------------------------
+
+    /// Write the batch header.
+    fn write_batch_header(&mut self) -> io::Result<()> {
+        let size_without_length = self.size_in_bytes - LENGTH_LENGTH;
+        let total_size = i32::try_from(size_without_length).map_err(|_| {
+            io::Error::new(
+                io::ErrorKind::InvalidInput,
+                format!("Batch size {} exceeds i32::MAX", size_without_length),
+            )
+        })?;
+
+        // Compute attributes before borrowing buffer mutably
+        let attributes = self.compute_attributes();
+
+        // Write to the beginning of the buffer
+        let header = &mut self.buffer[0..RECORD_BATCH_HEADER_SIZE];
+
+        // Write length
+        header[LENGTH_OFFSET..LENGTH_OFFSET + LENGTH_LENGTH]
+            .copy_from_slice(&total_size.to_le_bytes());
+
+        // Write magic
+        header[MAGIC_OFFSET] = self.magic;
+
+        // Write empty CRC first (will update later)
+        header[CRC_OFFSET..CRC_OFFSET + 
4].copy_from_slice(&0u32.to_le_bytes());
+
+        // Write schema ID
+        header[SCHEMA_ID_OFFSET..SCHEMA_ID_OFFSET + 2]
+            .copy_from_slice(&(self.schema_id as i16).to_le_bytes());
+
+        // Write attributes
+        header[ATTRIBUTES_OFFSET] = attributes;
+
+        // Write writer ID
+        header[WRITE_CLIENT_ID_OFFSET..WRITE_CLIENT_ID_OFFSET + 8]
+            .copy_from_slice(&self.writer_id.to_le_bytes());
+
+        // Write batch sequence
+        header[BATCH_SEQUENCE_OFFSET..BATCH_SEQUENCE_OFFSET + 4]
+            .copy_from_slice(&self.batch_sequence.to_le_bytes());
+
+        // Write record count
+        header[RECORDS_COUNT_OFFSET..RECORDS_COUNT_OFFSET + 4]
+            .copy_from_slice(&self.current_record_number.to_le_bytes());
+
+        // Compute and update CRC
+        let crc = 
crc32c::crc32c(&self.buffer[SCHEMA_ID_OFFSET..self.size_in_bytes]);
+        self.buffer[CRC_OFFSET..CRC_OFFSET + 
4].copy_from_slice(&crc.to_le_bytes());
+
+        Ok(())
+    }
+
+    /// Compute the attributes byte.
+    fn compute_attributes(&self) -> u8 {
+        // Currently no attributes are used
+        0
+    }
+}
+
+impl Drop for KvRecordBatchBuilder {
+    fn drop(&mut self) {
+        // Warn if the builder has records but was never built or was aborted
+        if self.current_record_number > 0 && !self.aborted && 
self.built_buffer.is_none() {
+            eprintln!(
+                "Warning: KvRecordBatchBuilder dropped with {} record(s) that 
were never built. \
+                 Call build() to serialize the batch before dropping.",
+                self.current_record_number
+            );
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::row::compacted::CompactedRowWriter;
+
+    // Helper function to create a CompactedRowWriter with a single bytes 
field for testing
+    fn create_test_row(data: &[u8]) -> CompactedRowWriter {
+        let mut writer = CompactedRowWriter::new(1);
+        writer.write_bytes(data);
+        writer
+    }
+
+    #[test]
+    fn test_builder_basic_workflow() {
+        let schema_id = 42;
+        let write_limit = 4096;
+        let mut builder = KvRecordBatchBuilder::new(schema_id, write_limit, 
KvFormat::COMPACTED);
+
+        // Test initial state
+        assert!(!builder.is_closed());
+        assert_eq!(builder.writer_id(), NO_WRITER_ID);
+        assert_eq!(builder.batch_sequence(), NO_BATCH_SEQUENCE);
+
+        // Test writer state
+        builder.set_writer_state(100, 5);
+        assert_eq!(builder.writer_id(), 100);
+        assert_eq!(builder.batch_sequence(), 5);
+
+        // Test appending records
+        let key1 = b"key1";
+        let value1 = create_test_row(b"value1");
+        assert!(builder.has_room_for_row(key1, Some(&value1)));
+        builder.append_row(key1, Some(&value1)).unwrap();
+
+        let key2 = b"key2";
+        assert!(builder.has_room_for_row::<CompactedRowWriter>(key2, None));
+        builder
+            .append_row::<CompactedRowWriter>(key2, None)
+            .unwrap();
+
+        // Test close and build
+        builder.close().unwrap();
+        assert!(builder.is_closed());
+
+        let bytes = builder.build().unwrap();
+        assert!(bytes.len() > RECORD_BATCH_HEADER_SIZE);
+
+        // Building again should return cached result
+        let bytes2 = builder.build().unwrap();
+        assert_eq!(bytes.len(), bytes2.len());
+    }
+
+    #[test]
+    fn test_builder_lifecycle() {
+        // Test abort behavior
+        let mut builder = KvRecordBatchBuilder::new(1, 4096, 
KvFormat::COMPACTED);
+        let value = create_test_row(b"value");
+        builder.append_row(b"key", Some(&value)).unwrap();
+        builder.abort();
+        assert!(
+            builder
+                .append_row::<CompactedRowWriter>(b"key2", None)
+                .is_err()
+        );
+        assert!(builder.build().is_err());
+        assert!(builder.close().is_err());
+
+        // Test close behavior
+        let mut builder = KvRecordBatchBuilder::new(1, 4096, 
KvFormat::COMPACTED);
+        let value = create_test_row(b"value");
+        builder.append_row(b"key", Some(&value)).unwrap();
+        builder.close().unwrap();
+        assert!(
+            builder
+                .append_row::<CompactedRowWriter>(b"key2", None)
+                .is_err()
+        ); // Can't append after close
+        assert!(builder.build().is_ok()); // But can still build
+    }
+
+    #[test]
+    fn test_write_limit_enforcement() {
+        let write_limit = 100; // Very small limit
+        let mut builder = KvRecordBatchBuilder::new(1, write_limit, 
KvFormat::COMPACTED);
+
+        // Test has_room_for_row helper
+        let large_key = vec![0u8; 1000];
+        let large_value = vec![1u8; 1000];
+        let large_row = create_test_row(&large_value);
+        assert!(!builder.has_room_for_row(&large_key, Some(&large_row)));
+        let small_value = create_test_row(b"value");
+        assert!(builder.has_room_for_row(b"key", Some(&small_value)));
+
+        // Test append enforcement - add small record first
+        builder.append_row(b"key", Some(&small_value)).unwrap();
+
+        // Try to add large record that exceeds limit (reuse large_row from 
above)
+        let result = builder.append_row(b"key2", Some(&large_row));
+        assert!(result.is_err());
+        assert_eq!(result.unwrap_err().kind(), io::ErrorKind::WriteZero);
+    }
+
+    #[test]
+    fn test_append_checks_record_count_limit() {
+        let mut builder = KvRecordBatchBuilder::new(1, 100000, 
KvFormat::COMPACTED);
+        builder.current_record_number = i32::MAX - 1;
+
+        let value1 = create_test_row(b"value1");
+        builder.append_row(b"key1", Some(&value1)).unwrap();
+
+        let value2 = create_test_row(b"value2");
+        let result = builder.append_row(b"key2", Some(&value2));
+        assert!(result.is_err());
+        assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput);
+    }
+
+    #[test]
+    #[should_panic(expected = "schema_id shouldn't be greater than")]
+    fn test_builder_invalid_schema_id() {
+        KvRecordBatchBuilder::new(i16::MAX as i32 + 1, 4096, 
KvFormat::COMPACTED);
+    }
+
+    #[test]
+    fn test_cache_invalidation_on_append() {
+        let mut builder = KvRecordBatchBuilder::new(1, 4096, 
KvFormat::COMPACTED);
+        builder.set_writer_state(100, 5);
+
+        let value1 = create_test_row(b"value1");
+        builder.append_row(b"key1", Some(&value1)).unwrap();
+        let bytes1 = builder.build().unwrap();
+        let len1 = bytes1.len();
+
+        // Append another record - this should invalidate the cache
+        let value2 = create_test_row(b"value2");
+        builder.append_row(b"key2", Some(&value2)).unwrap();
+        let bytes2 = builder.build().unwrap();
+        let len2 = bytes2.len();
+
+        // Verify the second build includes both records
+        assert!(len2 > len1, "Second build should be larger");
+
+        use crate::record::kv::KvRecordBatch;
+        let batch = KvRecordBatch::new(bytes2, 0);
+        assert!(batch.is_valid());
+        assert_eq!(batch.record_count().unwrap(), 2, "Should have 2 records");
+    }
+
+    #[test]
+    fn test_cache_invalidation_on_set_writer_state() {
+        let mut builder = KvRecordBatchBuilder::new(1, 4096, 
KvFormat::COMPACTED);
+
+        builder.set_writer_state(100, 5);
+        let value = create_test_row(b"value");
+        builder.append_row(b"key", Some(&value)).unwrap();
+        let bytes1 = builder.build().unwrap();
+
+        // Change writer state - this should invalidate the cache
+        builder.set_writer_state(200, 10);
+        let bytes2 = builder.build().unwrap();
+
+        assert_ne!(
+            bytes1, bytes2,
+            "Bytes should differ after writer state change"
+        );
+
+        use crate::record::kv::KvRecordBatch;
+        let batch1 = KvRecordBatch::new(bytes1, 0);
+        let batch2 = KvRecordBatch::new(bytes2, 0);
+
+        assert_eq!(batch1.writer_id().unwrap(), 100);
+        assert_eq!(batch1.batch_sequence().unwrap(), 5);
+
+        assert_eq!(batch2.writer_id().unwrap(), 200);
+        assert_eq!(batch2.batch_sequence().unwrap(), 10);
+    }
+
+    #[test]
+    fn test_builder_with_compacted_row_writer() {
+        use crate::metadata::{DataType, IntType, StringType};
+        use crate::record::kv::KvRecordBatch;
+        use crate::row::InternalRow;
+        use crate::row::compacted::CompactedRow;
+
+        let mut builder = KvRecordBatchBuilder::new(1, 100000, 
KvFormat::COMPACTED);
+        builder.set_writer_state(100, 5);
+
+        let types = vec![
+            DataType::Int(IntType::new()),
+            DataType::String(StringType::new()),
+        ];
+
+        // Create and append first record with CompactedRowWriter
+        let mut row_writer1 = CompactedRowWriter::new(2);
+        row_writer1.write_int(42);
+        row_writer1.write_string("hello");
+
+        let key1 = b"key1";
+        assert!(builder.has_room_for_row(key1, Some(&row_writer1)));
+        builder.append_row(key1, Some(&row_writer1)).unwrap();
+
+        // Create and append second record
+        let mut row_writer2 = CompactedRowWriter::new(2);
+        row_writer2.write_int(100);
+        row_writer2.write_string("world");
+
+        let key2 = b"key2";
+        builder.append_row(key2, Some(&row_writer2)).unwrap();
+
+        // Append a deletion record
+        let key3 = b"key3";
+        builder
+            .append_row::<CompactedRowWriter>(key3, None)
+            .unwrap();
+
+        // Build and verify
+        builder.close().unwrap();
+        let bytes = builder.build().unwrap();
+
+        let batch = KvRecordBatch::new(bytes, 0);
+        assert!(batch.is_valid());
+        assert_eq!(batch.record_count().unwrap(), 3);
+        assert_eq!(batch.writer_id().unwrap(), 100);
+        assert_eq!(batch.batch_sequence().unwrap(), 5);
+
+        // Read back and verify records
+        let records: Vec<_> = batch.records().unwrap().collect();
+        assert_eq!(records.len(), 3);
+
+        // Verify first record
+        let record1 = records[0].as_ref().unwrap();
+        assert_eq!(record1.key().as_ref(), key1);
+        let row1 = CompactedRow::from_bytes(&types, record1.value().unwrap());
+        assert_eq!(row1.get_int(0), 42);
+        assert_eq!(row1.get_string(1), "hello");
+
+        // Verify second record
+        let record2 = records[1].as_ref().unwrap();
+        assert_eq!(record2.key().as_ref(), key2);
+        let row2 = CompactedRow::from_bytes(&types, record2.value().unwrap());
+        assert_eq!(row2.get_int(0), 100);
+        assert_eq!(row2.get_string(1), "world");
+
+        // Verify deletion record
+        let record3 = records[2].as_ref().unwrap();
+        assert_eq!(record3.key().as_ref(), key3);
+        assert!(record3.value().is_none());
+    }
+
+    #[test]
+    fn test_kv_format_validation() {
+        let mut row_writer = CompactedRowWriter::new(1);
+        row_writer.write_int(42);
+
+        // INDEXED format should reject append_row
+        let mut indexed_builder = KvRecordBatchBuilder::new(1, 4096, 
KvFormat::INDEXED);
+        let result = indexed_builder.append_row(b"key", Some(&row_writer));
+        assert!(result.is_err());
+        assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput);
+
+        // COMPACTED format should accept append_row
+        let mut compacted_builder = KvRecordBatchBuilder::new(1, 4096, 
KvFormat::COMPACTED);
+        let result = compacted_builder.append_row(b"key", Some(&row_writer));
+        assert!(result.is_ok());
+    }
+}
diff --git a/crates/fluss/src/record/kv/mod.rs 
b/crates/fluss/src/record/kv/mod.rs
new file mode 100644
index 0000000..ecb762d
--- /dev/null
+++ b/crates/fluss/src/record/kv/mod.rs
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Key-Value record and batch implementations.
+
+mod kv_record;
+mod kv_record_batch;
+mod kv_record_batch_builder;
+
+pub use kv_record::{KvRecord, LENGTH_LENGTH as KV_RECORD_LENGTH_LENGTH};
+pub use kv_record_batch::*;
+pub use kv_record_batch_builder::*;
+
+/// Current KV magic value
+pub const CURRENT_KV_MAGIC_VALUE: u8 = 0;
+
+/// No writer ID constant
+pub const NO_WRITER_ID: i64 = -1;
+
+/// No batch sequence constant
+pub const NO_BATCH_SEQUENCE: i32 = -1;
diff --git a/crates/fluss/src/record/mod.rs b/crates/fluss/src/record/mod.rs
index 35928ea..c5a3f8e 100644
--- a/crates/fluss/src/record/mod.rs
+++ b/crates/fluss/src/record/mod.rs
@@ -22,6 +22,7 @@ use std::collections::HashMap;
 
 mod arrow;
 mod error;
+pub mod kv;
 
 pub use arrow::*;
 
diff --git a/crates/fluss/src/row/compacted/compacted_key_writer.rs 
b/crates/fluss/src/row/compacted/compacted_key_writer.rs
index 84a6b22..1152b0c 100644
--- a/crates/fluss/src/row/compacted/compacted_key_writer.rs
+++ b/crates/fluss/src/row/compacted/compacted_key_writer.rs
@@ -30,6 +30,12 @@ pub struct CompactedKeyWriter {
     delegate: CompactedRowWriter,
 }
 
+impl Default for CompactedKeyWriter {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
 impl CompactedKeyWriter {
     pub fn new() -> CompactedKeyWriter {
         CompactedKeyWriter {
diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs 
b/crates/fluss/src/row/compacted/compacted_row_reader.rs
index c053d4e..5ec2608 100644
--- a/crates/fluss/src/row/compacted/compacted_row_reader.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs
@@ -19,6 +19,7 @@ use 
crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes;
 use crate::{
     metadata::DataType,
     row::{Datum, GenericRow, 
compacted::compacted_row_writer::CompactedRowWriter},
+    util::varint::{read_unsigned_varint_at, read_unsigned_varint_u64_at},
 };
 use std::str::from_utf8;
 
@@ -150,36 +151,18 @@ impl<'a> CompactedRowReader<'a> {
         (val, next_pos)
     }
 
-    pub fn read_int(&self, mut pos: usize) -> (i32, usize) {
-        let mut result: u32 = 0;
-        let mut shift = 0;
-
-        for _ in 0..CompactedRowWriter::MAX_INT_SIZE {
-            let (b, next_pos) = self.read_byte(pos);
-            pos = next_pos;
-            result |= ((b & 0x7F) as u32) << shift;
-            if (b & 0x80) == 0 {
-                return (result as i32, pos);
-            }
-            shift += 7;
+    pub fn read_int(&self, pos: usize) -> (i32, usize) {
+        match read_unsigned_varint_at(self.segment, pos, 
CompactedRowWriter::MAX_INT_SIZE) {
+            Ok((value, next_pos)) => (value as i32, next_pos),
+            Err(_) => panic!("Invalid VarInt32 input stream."),
         }
-        panic!("Invalid VarInt32 input stream.");
     }
 
-    pub fn read_long(&self, mut pos: usize) -> (i64, usize) {
-        let mut result: u64 = 0;
-        let mut shift = 0;
-
-        for _ in 0..CompactedRowWriter::MAX_LONG_SIZE {
-            let (b, next_pos) = self.read_byte(pos);
-            pos = next_pos;
-            result |= ((b & 0x7F) as u64) << shift;
-            if (b & 0x80) == 0 {
-                return (result as i64, pos);
-            }
-            shift += 7;
+    pub fn read_long(&self, pos: usize) -> (i64, usize) {
+        match read_unsigned_varint_u64_at(self.segment, pos, 
CompactedRowWriter::MAX_LONG_SIZE) {
+            Ok((value, next_pos)) => (value as i64, next_pos),
+            Err(_) => panic!("Invalid VarInt64 input stream."),
         }
-        panic!("Invalid VarInt64 input stream.");
     }
 
     pub fn read_float(&self, pos: usize) -> (f32, usize) {
diff --git a/crates/fluss/src/row/compacted/compacted_row_writer.rs 
b/crates/fluss/src/row/compacted/compacted_row_writer.rs
index 4f535c6..63b32a3 100644
--- a/crates/fluss/src/row/compacted/compacted_row_writer.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_writer.rs
@@ -18,7 +18,9 @@
 use bytes::{Bytes, BytesMut};
 use std::cmp;
 
+use crate::row::BinaryRow;
 use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes;
+use crate::util::varint::{write_unsigned_varint_to_slice, 
write_unsigned_varint_u64_to_slice};
 
 // Writer for CompactedRow
 // Reference implementation:
@@ -125,25 +127,16 @@ impl CompactedRowWriter {
 
     pub fn write_int(&mut self, value: i32) {
         self.ensure_capacity(Self::MAX_INT_SIZE);
-        let mut v = value as u32;
-        while (v & !0x7F) != 0 {
-            self.buffer[self.position] = ((v as u8) & 0x7F) | 0x80;
-            self.position += 1;
-            v >>= 7;
-        }
-        self.buffer[self.position] = v as u8;
-        self.position += 1;
+        let bytes_written =
+            write_unsigned_varint_to_slice(value as u32, &mut 
self.buffer[self.position..]);
+        self.position += bytes_written;
     }
+
     pub fn write_long(&mut self, value: i64) {
         self.ensure_capacity(Self::MAX_LONG_SIZE);
-        let mut v = value as u64;
-        while (v & !0x7F) != 0 {
-            self.buffer[self.position] = ((v as u8) & 0x7F) | 0x80;
-            self.position += 1;
-            v >>= 7;
-        }
-        self.buffer[self.position] = v as u8;
-        self.position += 1;
+        let bytes_written =
+            write_unsigned_varint_u64_to_slice(value as u64, &mut 
self.buffer[self.position..]);
+        self.position += bytes_written;
     }
 
     pub fn write_float(&mut self, value: f32) {
@@ -154,3 +147,9 @@ impl CompactedRowWriter {
         self.write_raw(&value.to_ne_bytes());
     }
 }
+
+impl BinaryRow for CompactedRowWriter {
+    fn as_bytes(&self) -> &[u8] {
+        self.buffer()
+    }
+}
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index c321ab9..144d64f 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -20,13 +20,18 @@ mod column;
 mod datum;
 
 mod binary;
-mod compacted;
+pub mod compacted;
 mod encode;
 mod field_getter;
 
 pub use column::*;
 pub use datum::*;
 
+pub trait BinaryRow {
+    /// Returns the binary representation of this row as a byte slice.
+    fn as_bytes(&self) -> &[u8];
+}
+
 // TODO make functions return Result<?> for better error handling
 pub trait InternalRow {
     /// Returns the number of fields in this row
diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs
index 5f67290..d191615 100644
--- a/crates/fluss/src/util/mod.rs
+++ b/crates/fluss/src/util/mod.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 pub mod murmur_hash;
+pub mod varint;
 
 use crate::metadata::TableBucket;
 use linked_hash_map::LinkedHashMap;
diff --git a/crates/fluss/src/util/varint.rs b/crates/fluss/src/util/varint.rs
new file mode 100644
index 0000000..96fd1f5
--- /dev/null
+++ b/crates/fluss/src/util/varint.rs
@@ -0,0 +1,502 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Variable-length integer encoding utilities.
+//!
+//! This module provides utilities for encoding integers in variable-length 
format,
+//! which can save space when encoding small integers. The encoding uses 7 
bits per byte
+//! with the most significant bit as a continuation flag.
+
+use bytes::BufMut;
+use std::io::{self, Read, Write};
+
+/// Write an unsigned integer in variable-length format.
+///
+/// The encoding uses 7 bits per byte with the MSB set to 1 if more bytes 
follow.
+/// This matches the encoding used in Google Protocol Buffers.
+#[allow(dead_code)]
+pub fn write_unsigned_varint<W: Write>(value: u32, writer: &mut W) -> 
io::Result<usize> {
+    let mut v = value;
+    let mut bytes_written = 0;
+
+    while (v & !0x7F) != 0 {
+        writer.write_all(&[((v as u8) & 0x7F) | 0x80])?;
+        bytes_written += 1;
+        v >>= 7;
+    }
+    writer.write_all(&[v as u8])?;
+    bytes_written += 1;
+
+    Ok(bytes_written)
+}
+
+/// Write an unsigned integer in variable-length format to a buffer.
+pub fn write_unsigned_varint_buf(value: u32, buf: &mut impl BufMut) {
+    let mut v = value;
+
+    while (v & !0x7F) != 0 {
+        buf.put_u8(((v as u8) & 0x7F) | 0x80);
+        v >>= 7;
+    }
+    buf.put_u8(v as u8);
+}
+
+/// Read an unsigned integer stored in variable-length format.
+#[allow(dead_code)]
+pub fn read_unsigned_varint<R: Read>(reader: &mut R) -> io::Result<u32> {
+    let mut tmp = [0u8; 1];
+    reader.read_exact(&mut tmp)?;
+    let mut byte = tmp[0] as i8;
+
+    if byte >= 0 {
+        return Ok(byte as u32);
+    }
+
+    let mut result = (byte & 127) as u32;
+
+    reader.read_exact(&mut tmp)?;
+    byte = tmp[0] as i8;
+    if byte >= 0 {
+        result |= (byte as u32) << 7;
+    } else {
+        result |= ((byte & 127) as u32) << 7;
+
+        reader.read_exact(&mut tmp)?;
+        byte = tmp[0] as i8;
+        if byte >= 0 {
+            result |= (byte as u32) << 14;
+        } else {
+            result |= ((byte & 127) as u32) << 14;
+
+            reader.read_exact(&mut tmp)?;
+            byte = tmp[0] as i8;
+            if byte >= 0 {
+                result |= (byte as u32) << 21;
+            } else {
+                result |= ((byte & 127) as u32) << 21;
+
+                reader.read_exact(&mut tmp)?;
+                byte = tmp[0] as i8;
+                result |= (byte as u32) << 28;
+
+                if byte < 0 {
+                    return Err(io::Error::new(
+                        io::ErrorKind::InvalidData,
+                        "Invalid u32 varint encoding: too many bytes (most 
significant bit in the 5th byte is set)",
+                    ));
+                }
+            }
+        }
+    }
+
+    Ok(result)
+}
+
+/// Read an unsigned integer from a byte slice in variable-length format.
+pub fn read_unsigned_varint_bytes(bytes: &[u8]) -> io::Result<(u32, usize)> {
+    if bytes.is_empty() {
+        return Err(io::Error::new(
+            io::ErrorKind::UnexpectedEof,
+            "Cannot read varint from empty buffer",
+        ));
+    }
+
+    let mut byte = bytes[0] as i8;
+    let mut index = 1;
+
+    if byte >= 0 {
+        return Ok((byte as u32, index));
+    }
+
+    let mut result = (byte & 127) as u32;
+
+    if index >= bytes.len() {
+        return Err(io::Error::new(
+            io::ErrorKind::UnexpectedEof,
+            "Incomplete varint",
+        ));
+    }
+    byte = bytes[index] as i8;
+    index += 1;
+    if byte >= 0 {
+        result |= (byte as u32) << 7;
+    } else {
+        result |= ((byte & 127) as u32) << 7;
+
+        if index >= bytes.len() {
+            return Err(io::Error::new(
+                io::ErrorKind::UnexpectedEof,
+                "Incomplete varint",
+            ));
+        }
+        byte = bytes[index] as i8;
+        index += 1;
+        if byte >= 0 {
+            result |= (byte as u32) << 14;
+        } else {
+            result |= ((byte & 127) as u32) << 14;
+
+            if index >= bytes.len() {
+                return Err(io::Error::new(
+                    io::ErrorKind::UnexpectedEof,
+                    "Incomplete varint",
+                ));
+            }
+            byte = bytes[index] as i8;
+            index += 1;
+            if byte >= 0 {
+                result |= (byte as u32) << 21;
+            } else {
+                result |= ((byte & 127) as u32) << 21;
+
+                if index >= bytes.len() {
+                    return Err(io::Error::new(
+                        io::ErrorKind::UnexpectedEof,
+                        "Incomplete varint",
+                    ));
+                }
+                byte = bytes[index] as i8;
+                index += 1;
+                result |= (byte as u32) << 28;
+
+                if byte < 0 {
+                    return Err(io::Error::new(
+                        io::ErrorKind::InvalidData,
+                        "Invalid u32 varint encoding: too many bytes (most 
significant bit in the 5th byte is set)",
+                    ));
+                }
+            }
+        }
+    }
+
+    Ok((result, index))
+}
+
+/// Calculate the number of bytes needed to encode a u32 in variable-length 
format.
+///
+/// Varint encoding uses 7 bits per byte, so we need `ceil(bits_used / 7)` 
bytes.
+/// This function computes that efficiently using the formula:
+///
+/// size = ((38 - leading_zeros) * 74899) >> 19  +  (leading_zeros >> 5)
+///
+/// Where:
+/// - `38 = 32 + 6` (6 accounts for ceiling in division)
+/// - `74899 = 2^19 / 7` (enables division by 7 via multiply + shift)
+/// - `leading_zeros >> 5` adds 1 when value is 0 (minimum 1 byte)
+pub fn size_of_unsigned_varint(value: u32) -> usize {
+    let leading_zeros = value.leading_zeros();
+    let leading_zeros_below_38_divided_by_7 = ((38 - leading_zeros) * 
0b10010010010010011) >> 19;
+    (leading_zeros_below_38_divided_by_7 + (leading_zeros >> 5)) as usize
+}
+
+/// Calculate the number of bytes needed to encode a u64 in variable-length 
format.
+///
+/// Varint encoding uses 7 bits per byte, so we need `ceil(bits_used / 7)` 
bytes.
+/// This function computes that efficiently using the formula:
+///
+/// size = ((70 - leading_zeros) * 74899) >> 19  +  (leading_zeros >> 6)
+///
+/// - `70 = 64 + 6` (6 accounts for ceiling in division)
+/// - `74899 = 2^19 / 7` (enables division by 7 via multiply + shift)
+/// - `leading_zeros >> 6` adds 1 when value is 0 (minimum 1 byte)
+#[allow(dead_code)]
+pub fn size_of_unsigned_varint_u64(value: u64) -> usize {
+    let leading_zeros = value.leading_zeros();
+    let leading_zeros_below_70_divided_by_7 = ((70 - leading_zeros) * 
0b10010010010010011) >> 19;
+    (leading_zeros_below_70_divided_by_7 + (leading_zeros >> 6)) as usize
+}
+
+/// Write an unsigned 64-bit integer in variable-length format to a buffer.
+#[allow(dead_code)]
+pub fn write_unsigned_varint_u64_buf(value: u64, buf: &mut impl BufMut) {
+    let mut v = value;
+    while (v & !0x7F) != 0 {
+        buf.put_u8(((v as u8) & 0x7F) | 0x80);
+        v >>= 7;
+    }
+    buf.put_u8(v as u8);
+}
+
+/// Write directly to a mutable byte slice, returning the number of bytes 
written.
+/// Used by CompactedRowWriter which manages its own position.
+///
+/// # Panics
+/// Panics if the slice is too small to hold the encoded varint.
+/// The slice must have at least 5 bytes available (the maximum size for a u32 
varint).
+/// Use [`size_of_unsigned_varint`] to calculate the required size beforehand.
+pub fn write_unsigned_varint_to_slice(value: u32, slice: &mut [u8]) -> usize {
+    let mut v = value;
+    let mut written = 0;
+
+    while (v & !0x7F) != 0 {
+        slice[written] = ((v as u8) & 0x7F) | 0x80;
+        written += 1;
+        v >>= 7;
+    }
+    slice[written] = v as u8;
+    written + 1
+}
+
+/// Write unsigned 64-bit varint directly to a mutable byte slice.
+///
+/// # Panics
+/// Panics if the slice is too small to hold the encoded varint.
+/// The slice must have at least 10 bytes available (the maximum size for a 
u64 varint).
+pub fn write_unsigned_varint_u64_to_slice(value: u64, slice: &mut [u8]) -> 
usize {
+    let mut v = value;
+    let mut written = 0;
+
+    while (v & !0x7F) != 0 {
+        slice[written] = ((v as u8) & 0x7F) | 0x80;
+        written += 1;
+        v >>= 7;
+    }
+    slice[written] = v as u8;
+    written + 1
+}
+
+/// Read unsigned varint from a slice starting at given position.
+/// Returns (value, next_position).
+/// Used by CompactedRowReader which manages positions.
+pub fn read_unsigned_varint_at(
+    slice: &[u8],
+    mut pos: usize,
+    max_bytes: usize,
+) -> io::Result<(u32, usize)> {
+    let mut result: u32 = 0;
+    let mut shift = 0;
+
+    for _ in 0..max_bytes {
+        if pos >= slice.len() {
+            return Err(io::Error::new(
+                io::ErrorKind::UnexpectedEof,
+                "Unexpected end of varint",
+            ));
+        }
+        let b = slice[pos];
+        pos += 1;
+        result |= ((b & 0x7F) as u32) << shift;
+        if (b & 0x80) == 0 {
+            return Ok((result, pos));
+        }
+        shift += 7;
+    }
+
+    Err(io::Error::new(
+        io::ErrorKind::InvalidData,
+        "Invalid VarInt32 input stream",
+    ))
+}
+
+/// Read unsigned 64-bit varint from a slice starting at given position.
+pub fn read_unsigned_varint_u64_at(
+    slice: &[u8],
+    mut pos: usize,
+    max_bytes: usize,
+) -> io::Result<(u64, usize)> {
+    let mut result: u64 = 0;
+    let mut shift = 0;
+
+    for _ in 0..max_bytes {
+        if pos >= slice.len() {
+            return Err(io::Error::new(
+                io::ErrorKind::UnexpectedEof,
+                "Unexpected end of varint",
+            ));
+        }
+        let b = slice[pos];
+        pos += 1;
+        result |= ((b & 0x7F) as u64) << shift;
+        if (b & 0x80) == 0 {
+            return Ok((result, pos));
+        }
+        shift += 7;
+    }
+
+    Err(io::Error::new(
+        io::ErrorKind::InvalidData,
+        "Invalid VarInt64 input stream",
+    ))
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::io::Cursor;
+
+    #[test]
+    fn test_unsigned_varint_round_trip() {
+        let test_values = vec![
+            0u32,
+            1,
+            127,
+            128,
+            255,
+            256,
+            16383,
+            16384,
+            2097151,
+            2097152,
+            268435455,
+            268435456,
+            u32::MAX,
+        ];
+
+        for value in test_values {
+            // Test with Write trait
+            let mut buffer = Vec::new();
+            let written = write_unsigned_varint(value, &mut buffer).unwrap();
+
+            let mut reader = Cursor::new(&buffer);
+            let read_value = read_unsigned_varint(&mut reader).unwrap();
+
+            assert_eq!(value, read_value, "Round trip failed for value {}", 
value);
+            assert_eq!(
+                written,
+                buffer.len(),
+                "Bytes written mismatch for value {}",
+                value
+            );
+
+            // Test with BufMut
+            let mut buf = bytes::BytesMut::new();
+            write_unsigned_varint_buf(value, &mut buf);
+            assert_eq!(buf.len(), written, "BufMut write length mismatch");
+
+            // Test size calculation
+            let calculated_size = size_of_unsigned_varint(value);
+            assert_eq!(
+                calculated_size,
+                buffer.len(),
+                "Size calculation failed for value {}",
+                value
+            );
+
+            // Test reading from bytes
+            let (read_value_bytes, bytes_read) = 
read_unsigned_varint_bytes(&buffer).unwrap();
+            assert_eq!(
+                value, read_value_bytes,
+                "Bytes read failed for value {}",
+                value
+            );
+            assert_eq!(
+                bytes_read,
+                buffer.len(),
+                "Bytes read count mismatch for value {}",
+                value
+            );
+        }
+    }
+
+    #[test]
+    fn test_size_of_unsigned_varint() {
+        assert_eq!(size_of_unsigned_varint(0), 1);
+        assert_eq!(size_of_unsigned_varint(127), 1);
+        assert_eq!(size_of_unsigned_varint(128), 2);
+        assert_eq!(size_of_unsigned_varint(16383), 2);
+        assert_eq!(size_of_unsigned_varint(16384), 3);
+        assert_eq!(size_of_unsigned_varint(2097151), 3);
+        assert_eq!(size_of_unsigned_varint(2097152), 4);
+        assert_eq!(size_of_unsigned_varint(268435455), 4);
+        assert_eq!(size_of_unsigned_varint(268435456), 5);
+        assert_eq!(size_of_unsigned_varint(u32::MAX), 5);
+    }
+
+    #[test]
+    fn test_size_of_unsigned_varint_u64() {
+        assert_eq!(size_of_unsigned_varint_u64(0), 1);
+        assert_eq!(size_of_unsigned_varint_u64(127), 1);
+        assert_eq!(size_of_unsigned_varint_u64(128), 2);
+        assert_eq!(size_of_unsigned_varint_u64(16383), 2);
+        assert_eq!(size_of_unsigned_varint_u64(16384), 3);
+        assert_eq!(size_of_unsigned_varint_u64(2097151), 3);
+        assert_eq!(size_of_unsigned_varint_u64(2097152), 4);
+        assert_eq!(size_of_unsigned_varint_u64(268435455), 4);
+        assert_eq!(size_of_unsigned_varint_u64(268435456), 5);
+        assert_eq!(size_of_unsigned_varint_u64(u32::MAX as u64), 5);
+        assert_eq!(size_of_unsigned_varint_u64(34359738367), 5);
+        assert_eq!(size_of_unsigned_varint_u64(34359738368), 6);
+        assert_eq!(size_of_unsigned_varint_u64(4398046511103), 6);
+        assert_eq!(size_of_unsigned_varint_u64(4398046511104), 7);
+        assert_eq!(size_of_unsigned_varint_u64(562949953421311), 7);
+        assert_eq!(size_of_unsigned_varint_u64(562949953421312), 8);
+        assert_eq!(size_of_unsigned_varint_u64(72057594037927935), 8);
+        assert_eq!(size_of_unsigned_varint_u64(72057594037927936), 9);
+        assert_eq!(size_of_unsigned_varint_u64(9223372036854775807), 9);
+        assert_eq!(size_of_unsigned_varint_u64(9223372036854775808), 10);
+        assert_eq!(size_of_unsigned_varint_u64(u64::MAX), 10);
+    }
+
+    #[test]
+    fn test_read_unsigned_varint_bytes_error_handling() {
+        // Empty buffer
+        assert!(read_unsigned_varint_bytes(&[]).is_err());
+
+        // Incomplete varint (continuation bit set but no next byte)
+        assert!(read_unsigned_varint_bytes(&[0x80]).is_err());
+        assert!(read_unsigned_varint_bytes(&[0xFF, 0x80]).is_err());
+    }
+
+    #[test]
+    fn test_write_read_to_slice() {
+        // Test u32 varint to slice
+        let test_values_u32 = vec![0u32, 127, 128, 16384, u32::MAX];
+
+        for value in test_values_u32 {
+            let mut buffer = vec![0u8; 10];
+            let written = write_unsigned_varint_to_slice(value, &mut buffer);
+
+            let (read_value, next_pos) = read_unsigned_varint_at(&buffer, 0, 
5).unwrap();
+            assert_eq!(value, read_value);
+            assert_eq!(written, next_pos);
+        }
+
+        // Test u64 varint to slice
+        let test_values_u64 = vec![0u64, 127, 128, 16384, u32::MAX as u64, 
u64::MAX];
+
+        for value in test_values_u64 {
+            let mut buffer = vec![0u8; 10];
+            let written = write_unsigned_varint_u64_to_slice(value, &mut 
buffer);
+
+            let (read_value, next_pos) = read_unsigned_varint_u64_at(&buffer, 
0, 10).unwrap();
+            assert_eq!(value, read_value);
+            assert_eq!(written, next_pos);
+        }
+    }
+
+    #[test]
+    fn test_read_at_with_offset() {
+        // Write multiple varints and read at different positions
+        let mut buffer = vec![0u8; 20];
+        let mut pos = 0;
+
+        pos += write_unsigned_varint_to_slice(127, &mut buffer[pos..]);
+        pos += write_unsigned_varint_to_slice(16384, &mut buffer[pos..]);
+        let end_pos = pos + write_unsigned_varint_to_slice(u32::MAX, &mut 
buffer[pos..]);
+
+        // Read back
+        let (val1, pos1) = read_unsigned_varint_at(&buffer, 0, 5).unwrap();
+        assert_eq!(val1, 127);
+
+        let (val2, pos2) = read_unsigned_varint_at(&buffer, pos1, 5).unwrap();
+        assert_eq!(val2, 16384);
+
+        let (val3, pos3) = read_unsigned_varint_at(&buffer, pos2, 5).unwrap();
+        assert_eq!(val3, u32::MAX);
+        assert_eq!(pos3, end_pos);
+    }
+}

Reply via email to