This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 49d0c93 feat: introduce KvRecordBatchBuilder (#156)
49d0c93 is described below
commit 49d0c9390f8c8247e9f29087a43e861ef4dad2b9
Author: Anton Borisov <[email protected]>
AuthorDate: Thu Jan 15 02:51:57 2026 +0000
feat: introduce KvRecordBatchBuilder (#156)
---
crates/fluss/src/record/kv/kv_record.rs | 343 ++++++++++++
crates/fluss/src/record/kv/kv_record_batch.rs | 394 ++++++++++++++
.../fluss/src/record/kv/kv_record_batch_builder.rs | 581 +++++++++++++++++++++
crates/fluss/src/record/kv/mod.rs | 35 ++
crates/fluss/src/record/mod.rs | 1 +
.../src/row/compacted/compacted_key_writer.rs | 6 +
.../src/row/compacted/compacted_row_reader.rs | 35 +-
.../src/row/compacted/compacted_row_writer.rs | 31 +-
crates/fluss/src/row/mod.rs | 7 +-
crates/fluss/src/util/mod.rs | 1 +
crates/fluss/src/util/varint.rs | 502 ++++++++++++++++++
11 files changed, 1893 insertions(+), 43 deletions(-)
diff --git a/crates/fluss/src/record/kv/kv_record.rs
b/crates/fluss/src/record/kv/kv_record.rs
new file mode 100644
index 0000000..8c30713
--- /dev/null
+++ b/crates/fluss/src/record/kv/kv_record.rs
@@ -0,0 +1,343 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Key-Value record implementation.
+//!
+//! This module provides the KvRecord struct which represents an immutable
key-value record.
+//! The record format is:
+//! - Length => Int32
+//! - KeyLength => Unsigned VarInt
+//! - Key => bytes
+//! - Row => BinaryRow (optional, if null then this is a deletion record)
+
+use bytes::{BufMut, Bytes, BytesMut};
+use std::io;
+
+use crate::util::varint::{
+ read_unsigned_varint_bytes, size_of_unsigned_varint,
write_unsigned_varint_buf,
+};
+
+/// Length field size in bytes
+pub const LENGTH_LENGTH: usize = 4;
+
+/// A key-value record.
+///
+/// The schema is:
+/// - Length => Int32
+/// - KeyLength => Unsigned VarInt
+/// - Key => bytes
+/// - Value => bytes (BinaryRow, written directly without length prefix)
+///
+/// When the value is None (deletion), no Value bytes are present.
+// Reference implementation:
+//
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/record/KvRecord.java
+#[derive(Debug, Clone)]
+pub struct KvRecord {
+ key: Bytes,
+ value: Option<Bytes>,
+ size_in_bytes: usize,
+}
+
+impl KvRecord {
+ /// Create a new KvRecord with the given key and optional value.
+ pub fn new(key: Bytes, value: Option<Bytes>) -> Self {
+ let size_in_bytes = Self::size_of(&key, value.as_deref());
+ Self {
+ key,
+ value,
+ size_in_bytes,
+ }
+ }
+
+ /// Get the key bytes.
+ pub fn key(&self) -> &Bytes {
+ &self.key
+ }
+
+ /// Get the value bytes (None indicates a deletion).
+ pub fn value(&self) -> Option<&Bytes> {
+ self.value.as_ref()
+ }
+
+ /// Calculate the total size of the record when serialized (including
length prefix).
+ pub fn size_of(key: &[u8], value: Option<&[u8]>) -> usize {
+ Self::size_without_length(key, value) + LENGTH_LENGTH
+ }
+
+ /// Calculate the size without the length prefix.
+ fn size_without_length(key: &[u8], value: Option<&[u8]>) -> usize {
+ let key_len = key.len();
+ let key_len_size = size_of_unsigned_varint(key_len as u32);
+
+ match value {
+ Some(v) =>
key_len_size.saturating_add(key_len).saturating_add(v.len()),
+ None => {
+ // Deletion: no value bytes
+ key_len_size.saturating_add(key_len)
+ }
+ }
+ }
+
+ /// Write a KV record to a buffer.
+ ///
+ /// Returns the number of bytes written.
+ pub fn write_to_buf(buf: &mut BytesMut, key: &[u8], value: Option<&[u8]>)
-> io::Result<usize> {
+ let size_in_bytes = Self::size_without_length(key, value);
+
+ let size_i32 = i32::try_from(size_in_bytes).map_err(|_| {
+ io::Error::new(
+ io::ErrorKind::InvalidInput,
+ format!("Record size {} exceeds i32::MAX", size_in_bytes),
+ )
+ })?;
+ buf.put_i32_le(size_i32);
+ let key_len = key.len() as u32;
+ write_unsigned_varint_buf(key_len, buf);
+
+ buf.put_slice(key);
+
+ if let Some(v) = value {
+ buf.put_slice(v);
+ }
+ // For None (deletion), don't write any value bytes
+
+ Ok(size_in_bytes + LENGTH_LENGTH)
+ }
+
+ /// Read a KV record from bytes at the given position.
+ ///
+ /// Returns the KvRecord and the number of bytes consumed.
+ ///
+ /// TODO: Connect KvReadContext and return CompactedRow records.
+ pub fn read_from(bytes: &Bytes, position: usize) -> io::Result<(Self,
usize)> {
+ if bytes.len() < position.saturating_add(LENGTH_LENGTH) {
+ return Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "Not enough bytes to read record length",
+ ));
+ }
+
+ let size_in_bytes_i32 = i32::from_le_bytes([
+ bytes[position],
+ bytes[position + 1],
+ bytes[position + 2],
+ bytes[position + 3],
+ ]);
+
+ if size_in_bytes_i32 < 0 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!("Invalid record length: {}", size_in_bytes_i32),
+ ));
+ }
+
+ let size_in_bytes = size_in_bytes_i32 as usize;
+
+ let total_size =
size_in_bytes.checked_add(LENGTH_LENGTH).ok_or_else(|| {
+ io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!(
+ "Record size overflow: {} + {}",
+ size_in_bytes, LENGTH_LENGTH
+ ),
+ )
+ })?;
+
+ let available = bytes.len().saturating_sub(position);
+ if available < total_size {
+ return Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ format!(
+ "Not enough bytes to read record: expected {}, available
{}",
+ total_size, available
+ ),
+ ));
+ }
+
+ let mut current_offset = position + LENGTH_LENGTH;
+ let record_end = position + total_size;
+
+ // Read key length as unsigned varint (bounded by record end)
+ let (key_len, varint_size) =
+ read_unsigned_varint_bytes(&bytes[current_offset..record_end])?;
+ current_offset += varint_size;
+
+ // Read key bytes
+ let key_end = current_offset + key_len as usize;
+ if key_end > position + total_size {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "Key length exceeds record size",
+ ));
+ }
+ let key = bytes.slice(current_offset..key_end);
+ current_offset = key_end;
+
+ // Read value bytes directly
+ let value = if current_offset < record_end {
+ // Value is present: all remaining bytes are the value
+ let value_bytes = bytes.slice(current_offset..record_end);
+ Some(value_bytes)
+ } else {
+ // No remaining bytes: this is a deletion record
+ None
+ };
+
+ Ok((
+ Self {
+ key,
+ value,
+ size_in_bytes: total_size,
+ },
+ total_size,
+ ))
+ }
+
+ /// Get the total size in bytes of this record.
+ pub fn get_size_in_bytes(&self) -> usize {
+ self.size_in_bytes
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_kv_record_size_calculation() {
+ let key = b"test_key";
+ let value = b"test_value";
+
+ // With value (no value length varint)
+ let size_with_value = KvRecord::size_of(key, Some(value));
+ assert_eq!(
+ size_with_value,
+ LENGTH_LENGTH + size_of_unsigned_varint(key.len() as u32) +
key.len() + value.len()
+ );
+
+ // Without value
+ let size_without_value = KvRecord::size_of(key, None);
+ assert_eq!(
+ size_without_value,
+ LENGTH_LENGTH + size_of_unsigned_varint(key.len() as u32) +
key.len()
+ );
+ }
+
+ #[test]
+ fn test_kv_record_write_read_round_trip() {
+ let key = b"my_key";
+ let value = b"my_value_data";
+
+ let mut buf = BytesMut::new();
+ let written = KvRecord::write_to_buf(&mut buf, key,
Some(value)).unwrap();
+
+ let bytes = buf.freeze();
+ let (record, read_size) = KvRecord::read_from(&bytes, 0).unwrap();
+
+ assert_eq!(written, read_size);
+ assert_eq!(record.key().as_ref(), key);
+ assert_eq!(record.value().unwrap().as_ref(), value);
+ assert_eq!(record.get_size_in_bytes(), written);
+ }
+
+ #[test]
+ fn test_kv_record_deletion() {
+ let key = b"delete_me";
+
+ // Write deletion record (no value)
+ let mut buf = BytesMut::new();
+ let written = KvRecord::write_to_buf(&mut buf, key, None).unwrap();
+
+ let bytes = buf.freeze();
+ let (record, read_size) = KvRecord::read_from(&bytes, 0).unwrap();
+
+ assert_eq!(written, read_size);
+ assert_eq!(record.key().as_ref(), key);
+ assert!(record.value().is_none());
+ }
+
+ #[test]
+ fn test_kv_record_with_large_key() {
+ let key = vec![0u8; 1024];
+ let value = vec![1u8; 4096];
+
+ let mut buf = BytesMut::new();
+ let written = KvRecord::write_to_buf(&mut buf, &key,
Some(&value)).unwrap();
+
+ let bytes = buf.freeze();
+ let (record, read_size) = KvRecord::read_from(&bytes, 0).unwrap();
+
+ assert_eq!(written, read_size);
+ assert_eq!(record.key().len(), key.len());
+ assert_eq!(record.value().unwrap().len(), value.len());
+ }
+
+ #[test]
+ fn test_invalid_record_lengths() {
+ let mut buf = BytesMut::new();
+ buf.put_i32_le(-1); // Negative length
+ buf.put_u8(1); // Some dummy data
+ buf.put_slice(b"key");
+ let bytes = buf.freeze();
+ let result = KvRecord::read_from(&bytes, 0);
+ assert!(result.is_err());
+ assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidData);
+
+ // Test overflow length
+ let mut buf = BytesMut::new();
+ buf.put_i32_le(i32::MAX); // Very large length
+ buf.put_u8(1); // Some dummy data
+ let bytes = buf.freeze();
+ let result = KvRecord::read_from(&bytes, 0);
+ assert!(result.is_err());
+
+ // Test impossibly large but non-negative length
+ let mut buf = BytesMut::new();
+ buf.put_i32_le(1_000_000);
+ let bytes = buf.freeze();
+ let result = KvRecord::read_from(&bytes, 0);
+ assert!(result.is_err());
+ assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof);
+ }
+
+ #[test]
+ fn test_multiple_records_in_buffer() {
+ let records = vec![
+ (b"key1".as_slice(), Some(b"value1".as_slice())),
+ (b"key2".as_slice(), None),
+ (b"key3".as_slice(), Some(b"value3".as_slice())),
+ ];
+
+ let mut buf = BytesMut::new();
+ for (key, value) in &records {
+ KvRecord::write_to_buf(&mut buf, key, *value).unwrap();
+ }
+
+ let bytes = buf.freeze();
+ let mut offset = 0;
+ for (expected_key, expected_value) in &records {
+ let (record, size) = KvRecord::read_from(&bytes, offset).unwrap();
+ assert_eq!(record.key().as_ref(), *expected_key);
+ match expected_value {
+ Some(v) => assert_eq!(record.value().unwrap().as_ref(), *v),
+ None => assert!(record.value().is_none()),
+ }
+ offset += size;
+ }
+ assert_eq!(offset, bytes.len());
+ }
+}
diff --git a/crates/fluss/src/record/kv/kv_record_batch.rs
b/crates/fluss/src/record/kv/kv_record_batch.rs
new file mode 100644
index 0000000..fdd4ad7
--- /dev/null
+++ b/crates/fluss/src/record/kv/kv_record_batch.rs
@@ -0,0 +1,394 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! KV record batch implementation.
+//!
+//! The schema of a KvRecordBatch is:
+//! - Length => Int32
+//! - Magic => Int8
+//! - CRC => Uint32
+//! - SchemaId => Int16
+//! - Attributes => Int8
+//! - WriterId => Int64
+//! - BatchSequence => Int32
+//! - RecordCount => Int32
+//! - Records => [Record]
+//!
+//! The CRC covers data from the SchemaId to the end of the batch.
+
+use bytes::Bytes;
+use std::io;
+
+use crate::record::kv::KvRecord;
+
+// Field lengths in bytes
+pub const LENGTH_LENGTH: usize = 4;
+pub const MAGIC_LENGTH: usize = 1;
+pub const CRC_LENGTH: usize = 4;
+pub const SCHEMA_ID_LENGTH: usize = 2;
+pub const ATTRIBUTE_LENGTH: usize = 1;
+pub const WRITE_CLIENT_ID_LENGTH: usize = 8;
+pub const BATCH_SEQUENCE_LENGTH: usize = 4;
+pub const RECORDS_COUNT_LENGTH: usize = 4;
+
+// Field offsets
+pub const LENGTH_OFFSET: usize = 0;
+pub const MAGIC_OFFSET: usize = LENGTH_OFFSET + LENGTH_LENGTH;
+pub const CRC_OFFSET: usize = MAGIC_OFFSET + MAGIC_LENGTH;
+pub const SCHEMA_ID_OFFSET: usize = CRC_OFFSET + CRC_LENGTH;
+pub const ATTRIBUTES_OFFSET: usize = SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH;
+pub const WRITE_CLIENT_ID_OFFSET: usize = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
+pub const BATCH_SEQUENCE_OFFSET: usize = WRITE_CLIENT_ID_OFFSET +
WRITE_CLIENT_ID_LENGTH;
+pub const RECORDS_COUNT_OFFSET: usize = BATCH_SEQUENCE_OFFSET +
BATCH_SEQUENCE_LENGTH;
+pub const RECORDS_OFFSET: usize = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH;
+
+/// Total header size
+pub const RECORD_BATCH_HEADER_SIZE: usize = RECORDS_OFFSET;
+
+/// Overhead of the batch (length field)
+pub const KV_OVERHEAD: usize = LENGTH_OFFSET + LENGTH_LENGTH;
+
+/// A KV record batch.
+///
+/// This struct provides read access to a serialized KV record batch.
+// Reference implementation:
+//
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatch.java
+pub struct KvRecordBatch {
+ data: Bytes,
+ position: usize,
+}
+
+impl KvRecordBatch {
+ /// Create a new KvRecordBatch pointing to the given data at the specified
position.
+ pub fn new(data: Bytes, position: usize) -> Self {
+ Self { data, position }
+ }
+
+ /// Get the size in bytes of this batch.
+ pub fn size_in_bytes(&self) -> io::Result<usize> {
+ if self.data.len() < self.position.saturating_add(LENGTH_LENGTH) {
+ return Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "Not enough bytes to read batch length",
+ ));
+ }
+ let length_i32 = i32::from_le_bytes([
+ self.data[self.position],
+ self.data[self.position + 1],
+ self.data[self.position + 2],
+ self.data[self.position + 3],
+ ]);
+
+ if length_i32 < 0 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!("Invalid batch length: {}", length_i32),
+ ));
+ }
+
+ let length = length_i32 as usize;
+
+ Ok(length.saturating_add(KV_OVERHEAD))
+ }
+
+ /// Check if this batch is valid by verifying the checksum.
+ pub fn is_valid(&self) -> bool {
+ if !matches!(self.size_in_bytes(), Ok(s) if s >=
RECORD_BATCH_HEADER_SIZE) {
+ return false;
+ }
+
+ match (self.checksum(), self.compute_checksum()) {
+ (Ok(stored), Ok(computed)) => stored == computed,
+ _ => false,
+ }
+ }
+
+ /// Get the magic byte.
+ pub fn magic(&self) -> io::Result<u8> {
+ if self.data.len() <
self.position.saturating_add(MAGIC_OFFSET).saturating_add(1) {
+ return Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "Not enough bytes to read magic byte",
+ ));
+ }
+ Ok(self.data[self.position + MAGIC_OFFSET])
+ }
+
+ /// Get the checksum.
+ pub fn checksum(&self) -> io::Result<u32> {
+ if self.data.len() <
self.position.saturating_add(CRC_OFFSET).saturating_add(4) {
+ return Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "Not enough bytes to read checksum",
+ ));
+ }
+ Ok(u32::from_le_bytes([
+ self.data[self.position + CRC_OFFSET],
+ self.data[self.position + CRC_OFFSET + 1],
+ self.data[self.position + CRC_OFFSET + 2],
+ self.data[self.position + CRC_OFFSET + 3],
+ ]))
+ }
+
+ /// Compute the checksum of this batch.
+ pub fn compute_checksum(&self) -> io::Result<u32> {
+ let size = self.size_in_bytes()?;
+ if size < RECORD_BATCH_HEADER_SIZE {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!(
+ "Batch size {} is less than header size {}",
+ size, RECORD_BATCH_HEADER_SIZE
+ ),
+ ));
+ }
+
+ let start = self.position.saturating_add(SCHEMA_ID_OFFSET);
+ let end = self.position.saturating_add(size);
+
+ if end > self.data.len() || start >= end {
+ return Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "Not enough bytes to compute checksum",
+ ));
+ }
+
+ Ok(crc32c::crc32c(&self.data[start..end]))
+ }
+
+ /// Get the schema ID.
+ pub fn schema_id(&self) -> io::Result<i16> {
+ if self.data.len()
+ < self
+ .position
+ .saturating_add(SCHEMA_ID_OFFSET)
+ .saturating_add(2)
+ {
+ return Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "Not enough bytes to read schema ID",
+ ));
+ }
+ Ok(i16::from_le_bytes([
+ self.data[self.position + SCHEMA_ID_OFFSET],
+ self.data[self.position + SCHEMA_ID_OFFSET + 1],
+ ]))
+ }
+
+ /// Get the writer ID.
+ pub fn writer_id(&self) -> io::Result<i64> {
+ if self.data.len()
+ < self
+ .position
+ .saturating_add(WRITE_CLIENT_ID_OFFSET)
+ .saturating_add(8)
+ {
+ return Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "Not enough bytes to read writer ID",
+ ));
+ }
+ Ok(i64::from_le_bytes([
+ self.data[self.position + WRITE_CLIENT_ID_OFFSET],
+ self.data[self.position + WRITE_CLIENT_ID_OFFSET + 1],
+ self.data[self.position + WRITE_CLIENT_ID_OFFSET + 2],
+ self.data[self.position + WRITE_CLIENT_ID_OFFSET + 3],
+ self.data[self.position + WRITE_CLIENT_ID_OFFSET + 4],
+ self.data[self.position + WRITE_CLIENT_ID_OFFSET + 5],
+ self.data[self.position + WRITE_CLIENT_ID_OFFSET + 6],
+ self.data[self.position + WRITE_CLIENT_ID_OFFSET + 7],
+ ]))
+ }
+
+ /// Get the batch sequence.
+ pub fn batch_sequence(&self) -> io::Result<i32> {
+ if self.data.len()
+ < self
+ .position
+ .saturating_add(BATCH_SEQUENCE_OFFSET)
+ .saturating_add(4)
+ {
+ return Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "Not enough bytes to read batch sequence",
+ ));
+ }
+ Ok(i32::from_le_bytes([
+ self.data[self.position + BATCH_SEQUENCE_OFFSET],
+ self.data[self.position + BATCH_SEQUENCE_OFFSET + 1],
+ self.data[self.position + BATCH_SEQUENCE_OFFSET + 2],
+ self.data[self.position + BATCH_SEQUENCE_OFFSET + 3],
+ ]))
+ }
+
+ /// Get the number of records in this batch.
+ pub fn record_count(&self) -> io::Result<i32> {
+ if self.data.len()
+ < self
+ .position
+ .saturating_add(RECORDS_COUNT_OFFSET)
+ .saturating_add(4)
+ {
+ return Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "Not enough bytes to read record count",
+ ));
+ }
+ Ok(i32::from_le_bytes([
+ self.data[self.position + RECORDS_COUNT_OFFSET],
+ self.data[self.position + RECORDS_COUNT_OFFSET + 1],
+ self.data[self.position + RECORDS_COUNT_OFFSET + 2],
+ self.data[self.position + RECORDS_COUNT_OFFSET + 3],
+ ]))
+ }
+
+ /// Create an iterator over the records in this batch.
+ /// This validates the batch checksum before returning the iterator.
+ /// For trusted data paths, use `records_unchecked()` to skip validation.
+ pub fn records(&self) -> io::Result<KvRecordIterator> {
+ if !self.is_valid() {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "Invalid batch checksum",
+ ));
+ }
+ self.records_unchecked()
+ }
+
+ /// Create an iterator over the records in this batch without validating
the checksum
+ pub fn records_unchecked(&self) -> io::Result<KvRecordIterator> {
+ let size = self.size_in_bytes()?;
+ let count = self.record_count()?;
+ if count < 0 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!("Invalid record count: {}", count),
+ ));
+ }
+ Ok(KvRecordIterator {
+ data: self.data.clone(),
+ position: self.position + RECORDS_OFFSET,
+ end: self.position + size,
+ remaining_count: count,
+ })
+ }
+}
+
+/// Iterator over records in a KV record batch.
+pub struct KvRecordIterator {
+ data: Bytes,
+ position: usize,
+ end: usize,
+ remaining_count: i32,
+}
+
+impl Iterator for KvRecordIterator {
+ type Item = io::Result<KvRecord>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ if self.remaining_count <= 0 || self.position >= self.end {
+ return None;
+ }
+
+ match KvRecord::read_from(&self.data, self.position) {
+ Ok((record, size)) => {
+ self.position += size;
+ self.remaining_count -= 1;
+ Some(Ok(record))
+ }
+ Err(e) => {
+ self.remaining_count = 0; // Stop iteration on error
+ Some(Err(e))
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::metadata::KvFormat;
+ use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, KvRecordBatchBuilder};
+ use bytes::{BufMut, BytesMut};
+
+ #[test]
+ fn test_invalid_batch_lengths() {
+ // Test negative length
+ let mut buf = BytesMut::new();
+ buf.put_i32_le(-1);
+ let bytes = buf.freeze();
+ let batch = KvRecordBatch::new(bytes, 0);
+ assert!(batch.size_in_bytes().is_err()); // Should error for invalid
+ assert!(!batch.is_valid());
+
+ // Test overflow length
+ let mut buf = BytesMut::new();
+ buf.put_i32_le(i32::MAX);
+ let bytes = buf.freeze();
+ let batch = KvRecordBatch::new(bytes, 0);
+ assert!(!batch.is_valid());
+
+ // Test too-short buffer
+ let mut buf = BytesMut::new();
+ buf.put_i32_le(100); // Claims 100 bytes but buffer is tiny
+ let bytes = buf.freeze();
+ let batch = KvRecordBatch::new(bytes, 0);
+ assert!(!batch.is_valid());
+ }
+
+ #[test]
+ fn test_kv_record_batch_build_and_read() {
+ use crate::row::compacted::CompactedRowWriter;
+
+ let schema_id = 42;
+ let write_limit = 4096;
+
+ let mut builder = KvRecordBatchBuilder::new(schema_id, write_limit,
KvFormat::COMPACTED);
+ builder.set_writer_state(100, 5);
+
+ let key1 = b"key1";
+ let mut value1_writer = CompactedRowWriter::new(1);
+ value1_writer.write_bytes(&[1, 2, 3, 4, 5]);
+ builder.append_row(key1, Some(&value1_writer)).unwrap();
+
+ let key2 = b"key2";
+ builder
+ .append_row::<CompactedRowWriter>(key2, None)
+ .unwrap();
+
+ let bytes = builder.build().unwrap();
+
+ let batch = KvRecordBatch::new(bytes.clone(), 0);
+ assert!(batch.is_valid());
+ assert_eq!(batch.magic().unwrap(), CURRENT_KV_MAGIC_VALUE);
+ assert_eq!(batch.schema_id().unwrap(), schema_id as i16);
+ assert_eq!(batch.writer_id().unwrap(), 100);
+ assert_eq!(batch.batch_sequence().unwrap(), 5);
+ assert_eq!(batch.record_count().unwrap(), 2);
+
+ let records: Vec<_> = batch.records().unwrap().collect();
+ assert_eq!(records.len(), 2);
+
+ let record1 = records[0].as_ref().unwrap();
+ assert_eq!(record1.key().as_ref(), key1);
+ assert_eq!(record1.value().unwrap().as_ref(), value1_writer.buffer());
+
+ let record2 = records[1].as_ref().unwrap();
+ assert_eq!(record2.key().as_ref(), key2);
+ assert!(record2.value().is_none());
+ }
+}
diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs
b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
new file mode 100644
index 0000000..773c778
--- /dev/null
+++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
@@ -0,0 +1,581 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! KV record batch builder implementation.
+//!
+//! This module provides the KvRecordBatchBuilder for building batches of KV
records.
+
+use bytes::{Bytes, BytesMut};
+use std::io;
+
+use crate::metadata::KvFormat;
+use crate::record::kv::kv_record::KvRecord;
+use crate::record::kv::kv_record_batch::{
+ ATTRIBUTES_OFFSET, BATCH_SEQUENCE_OFFSET, CRC_OFFSET, LENGTH_LENGTH,
LENGTH_OFFSET,
+ MAGIC_OFFSET, RECORD_BATCH_HEADER_SIZE, RECORDS_COUNT_OFFSET,
SCHEMA_ID_OFFSET,
+ WRITE_CLIENT_ID_OFFSET,
+};
+use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, NO_BATCH_SEQUENCE,
NO_WRITER_ID};
+use crate::row::BinaryRow;
+
+/// Builder for KvRecordBatch.
+///
+/// This builder accumulates KV records and produces a serialized batch with
proper
+/// header information and checksums.
+// Reference implementation:
+//
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatchBuilder.java
+pub struct KvRecordBatchBuilder {
+ schema_id: i32,
+ magic: u8,
+ write_limit: usize,
+ buffer: BytesMut,
+ writer_id: i64,
+ batch_sequence: i32,
+ current_record_number: i32,
+ size_in_bytes: usize,
+ is_closed: bool,
+ kv_format: KvFormat,
+ aborted: bool,
+ built_buffer: Option<Bytes>,
+}
+
+impl KvRecordBatchBuilder {
+ /// Create a new KvRecordBatchBuilder.
+ ///
+ /// # Arguments
+ /// * `schema_id` - The schema ID for records in this batch (must fit in
i16)
+ /// * `write_limit` - Maximum bytes that can be appended
+ /// * `kv_format` - The KV format (Compacted, Indexed, or Aligned)
+ pub fn new(schema_id: i32, write_limit: usize, kv_format: KvFormat) ->
Self {
+ assert!(
+ schema_id <= i16::MAX as i32,
+ "schema_id shouldn't be greater than the max value of i16: {}",
+ i16::MAX
+ );
+
+ let mut buffer =
BytesMut::with_capacity(write_limit.max(RECORD_BATCH_HEADER_SIZE));
+
+ // Reserve space for header (we'll write it at the end)
+ buffer.resize(RECORD_BATCH_HEADER_SIZE, 0);
+
+ Self {
+ schema_id,
+ magic: CURRENT_KV_MAGIC_VALUE,
+ write_limit,
+ buffer,
+ writer_id: NO_WRITER_ID,
+ batch_sequence: NO_BATCH_SEQUENCE,
+ current_record_number: 0,
+ size_in_bytes: RECORD_BATCH_HEADER_SIZE,
+ is_closed: false,
+ kv_format,
+ aborted: false,
+ built_buffer: None,
+ }
+ }
+
+ /// Check if there is room for a new record containing the given key and
row.
+ /// If no records have been appended, this always returns true.
+ pub fn has_room_for_row<R: BinaryRow>(&self, key: &[u8], row: Option<&R>)
-> bool {
+ let value = row.map(|r| r.as_bytes());
+ self.size_in_bytes + KvRecord::size_of(key, value) <= self.write_limit
+ }
+
+ /// Append a KV record with a row value to the batch.
+ ///
+ /// Returns an error if:
+ /// - The builder has been aborted
+ /// - The builder is closed
+ /// - Adding this record would exceed the write limit
+ /// - The maximum number of records is exceeded
+ /// - The KV format is not COMPACTED
+ pub fn append_row<R: BinaryRow>(&mut self, key: &[u8], row: Option<&R>) ->
io::Result<()> {
+ if self.kv_format != KvFormat::COMPACTED {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "append_row can only be used with KvFormat::COMPACTED",
+ ));
+ }
+
+ if self.aborted {
+ return Err(io::Error::other(
+ "Tried to append a record, but KvRecordBatchBuilder has
already been aborted",
+ ));
+ }
+
+ if self.is_closed {
+ return Err(io::Error::other(
+ "Tried to append a record, but KvRecordBatchBuilder is closed
for record appends",
+ ));
+ }
+
+ // Check record count limit before mutation
+ if self.current_record_number == i32::MAX {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ format!(
+ "Maximum number of records per batch exceeded, max
records: {}",
+ i32::MAX
+ ),
+ ));
+ }
+
+ let value = row.map(|r| r.as_bytes());
+ let record_size = KvRecord::size_of(key, value);
+ if self.size_in_bytes + record_size > self.write_limit {
+ return Err(io::Error::new(
+ io::ErrorKind::WriteZero,
+ format!(
+ "Adding record would exceed write limit: {} + {} > {}",
+ self.size_in_bytes, record_size, self.write_limit
+ ),
+ ));
+ }
+
+ let record_byte_size = KvRecord::write_to_buf(&mut self.buffer, key,
value)?;
+ debug_assert_eq!(record_byte_size, record_size, "Record size
mismatch");
+
+ self.current_record_number += 1;
+ self.size_in_bytes += record_byte_size;
+
+ // Invalidate cached buffer since we modified the batch
+ self.built_buffer = None;
+
+ Ok(())
+ }
+
+ /// Set the writer state (writer ID and batch base sequence).
+ ///
+ /// This invalidates any cached buffer, ensuring the batch header will be
rebuilt
+ /// on the next call to [`build`](Self::build).
+ pub fn set_writer_state(&mut self, writer_id: i64, batch_base_sequence:
i32) {
+ self.writer_id = writer_id;
+ self.batch_sequence = batch_base_sequence;
+ // Invalidate cached buffer since header fields changed
+ self.built_buffer = None;
+ }
+
+ /// Build the batch and return the serialized bytes.
+ ///
+ /// This can be called multiple times as the batch is cached after the
first build.
+ ///
+ /// # Caching and Mutations
+ ///
+ /// The builder caches the result after the first successful build.
However, the cache
+ /// is invalidated (and the batch rebuilt) if any of the following occur
after building:
+ /// - Calling [`append_row`](Self::append_row) to add records
+ /// - Calling [`set_writer_state`](Self::set_writer_state) to modify
writer metadata
+ ///
+ /// This allows the builder to be reused with different writer states or
to continue
+ /// appending records after an initial build, but callers should be aware
that the
+ /// built bytes may change if mutations occur between builds.
+ ///
+ /// Note: [`close`](Self::close) prevents further appends but does not
prevent writer state modifications.
+ pub fn build(&mut self) -> io::Result<Bytes> {
+ if self.aborted {
+ return Err(io::Error::other(
+ "Attempting to build an aborted record batch",
+ ));
+ }
+
+ if let Some(ref cached) = self.built_buffer {
+ return Ok(cached.clone());
+ }
+
+ self.write_batch_header()?;
+ let bytes = self.buffer.clone().freeze();
+ self.built_buffer = Some(bytes);
+ Ok(self.built_buffer.as_ref().unwrap().clone())
+ }
+
+ /// Get the writer ID.
+ pub fn writer_id(&self) -> i64 {
+ self.writer_id
+ }
+
+ /// Get the batch sequence.
+ pub fn batch_sequence(&self) -> i32 {
+ self.batch_sequence
+ }
+
+ /// Check if the builder is closed.
+ pub fn is_closed(&self) -> bool {
+ self.is_closed
+ }
+
+ /// Abort the builder.
+ /// After aborting, no more records can be appended and the batch cannot
be built.
+ pub fn abort(&mut self) {
+ self.aborted = true;
+ }
+
+ /// Close the builder.
+ /// After closing, no more records can be appended, but the batch can
still be built.
+ pub fn close(&mut self) -> io::Result<()> {
+ if self.aborted {
+ return Err(io::Error::other(
+ "Cannot close KvRecordBatchBuilder as it has already been
aborted",
+ ));
+ }
+ self.is_closed = true;
+ Ok(())
+ }
+
+ /// Get the current size in bytes of the batch.
+ pub fn get_size_in_bytes(&self) -> usize {
+ self.size_in_bytes
+ }
+
+ // ----------------------- Internal methods -------------------------------
+
+ /// Write the batch header.
+ fn write_batch_header(&mut self) -> io::Result<()> {
+ let size_without_length = self.size_in_bytes - LENGTH_LENGTH;
+ let total_size = i32::try_from(size_without_length).map_err(|_| {
+ io::Error::new(
+ io::ErrorKind::InvalidInput,
+ format!("Batch size {} exceeds i32::MAX", size_without_length),
+ )
+ })?;
+
+ // Compute attributes before borrowing buffer mutably
+ let attributes = self.compute_attributes();
+
+ // Write to the beginning of the buffer
+ let header = &mut self.buffer[0..RECORD_BATCH_HEADER_SIZE];
+
+ // Write length
+ header[LENGTH_OFFSET..LENGTH_OFFSET + LENGTH_LENGTH]
+ .copy_from_slice(&total_size.to_le_bytes());
+
+ // Write magic
+ header[MAGIC_OFFSET] = self.magic;
+
+ // Write empty CRC first (will update later)
+ header[CRC_OFFSET..CRC_OFFSET +
4].copy_from_slice(&0u32.to_le_bytes());
+
+ // Write schema ID
+ header[SCHEMA_ID_OFFSET..SCHEMA_ID_OFFSET + 2]
+ .copy_from_slice(&(self.schema_id as i16).to_le_bytes());
+
+ // Write attributes
+ header[ATTRIBUTES_OFFSET] = attributes;
+
+ // Write writer ID
+ header[WRITE_CLIENT_ID_OFFSET..WRITE_CLIENT_ID_OFFSET + 8]
+ .copy_from_slice(&self.writer_id.to_le_bytes());
+
+ // Write batch sequence
+ header[BATCH_SEQUENCE_OFFSET..BATCH_SEQUENCE_OFFSET + 4]
+ .copy_from_slice(&self.batch_sequence.to_le_bytes());
+
+ // Write record count
+ header[RECORDS_COUNT_OFFSET..RECORDS_COUNT_OFFSET + 4]
+ .copy_from_slice(&self.current_record_number.to_le_bytes());
+
+ // Compute and update CRC
+ let crc =
crc32c::crc32c(&self.buffer[SCHEMA_ID_OFFSET..self.size_in_bytes]);
+ self.buffer[CRC_OFFSET..CRC_OFFSET +
4].copy_from_slice(&crc.to_le_bytes());
+
+ Ok(())
+ }
+
+ /// Compute the attributes byte.
+ fn compute_attributes(&self) -> u8 {
+ // Currently no attributes are used
+ 0
+ }
+}
+
+impl Drop for KvRecordBatchBuilder {
+ fn drop(&mut self) {
+ // Warn if the builder has records but was never built or was aborted
+ if self.current_record_number > 0 && !self.aborted &&
self.built_buffer.is_none() {
+ eprintln!(
+ "Warning: KvRecordBatchBuilder dropped with {} record(s) that
were never built. \
+ Call build() to serialize the batch before dropping.",
+ self.current_record_number
+ );
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::row::compacted::CompactedRowWriter;
+
+ // Helper function to create a CompactedRowWriter with a single bytes
field for testing
+ fn create_test_row(data: &[u8]) -> CompactedRowWriter {
+ let mut writer = CompactedRowWriter::new(1);
+ writer.write_bytes(data);
+ writer
+ }
+
+ #[test]
+ fn test_builder_basic_workflow() {
+ let schema_id = 42;
+ let write_limit = 4096;
+ let mut builder = KvRecordBatchBuilder::new(schema_id, write_limit,
KvFormat::COMPACTED);
+
+ // Test initial state
+ assert!(!builder.is_closed());
+ assert_eq!(builder.writer_id(), NO_WRITER_ID);
+ assert_eq!(builder.batch_sequence(), NO_BATCH_SEQUENCE);
+
+ // Test writer state
+ builder.set_writer_state(100, 5);
+ assert_eq!(builder.writer_id(), 100);
+ assert_eq!(builder.batch_sequence(), 5);
+
+ // Test appending records
+ let key1 = b"key1";
+ let value1 = create_test_row(b"value1");
+ assert!(builder.has_room_for_row(key1, Some(&value1)));
+ builder.append_row(key1, Some(&value1)).unwrap();
+
+ let key2 = b"key2";
+ assert!(builder.has_room_for_row::<CompactedRowWriter>(key2, None));
+ builder
+ .append_row::<CompactedRowWriter>(key2, None)
+ .unwrap();
+
+ // Test close and build
+ builder.close().unwrap();
+ assert!(builder.is_closed());
+
+ let bytes = builder.build().unwrap();
+ assert!(bytes.len() > RECORD_BATCH_HEADER_SIZE);
+
+ // Building again should return cached result
+ let bytes2 = builder.build().unwrap();
+ assert_eq!(bytes.len(), bytes2.len());
+ }
+
+ #[test]
+ fn test_builder_lifecycle() {
+ // Test abort behavior
+ let mut builder = KvRecordBatchBuilder::new(1, 4096,
KvFormat::COMPACTED);
+ let value = create_test_row(b"value");
+ builder.append_row(b"key", Some(&value)).unwrap();
+ builder.abort();
+ assert!(
+ builder
+ .append_row::<CompactedRowWriter>(b"key2", None)
+ .is_err()
+ );
+ assert!(builder.build().is_err());
+ assert!(builder.close().is_err());
+
+ // Test close behavior
+ let mut builder = KvRecordBatchBuilder::new(1, 4096,
KvFormat::COMPACTED);
+ let value = create_test_row(b"value");
+ builder.append_row(b"key", Some(&value)).unwrap();
+ builder.close().unwrap();
+ assert!(
+ builder
+ .append_row::<CompactedRowWriter>(b"key2", None)
+ .is_err()
+ ); // Can't append after close
+ assert!(builder.build().is_ok()); // But can still build
+ }
+
+ #[test]
+ fn test_write_limit_enforcement() {
+ let write_limit = 100; // Very small limit
+ let mut builder = KvRecordBatchBuilder::new(1, write_limit,
KvFormat::COMPACTED);
+
+ // Test has_room_for_row helper
+ let large_key = vec![0u8; 1000];
+ let large_value = vec![1u8; 1000];
+ let large_row = create_test_row(&large_value);
+ assert!(!builder.has_room_for_row(&large_key, Some(&large_row)));
+ let small_value = create_test_row(b"value");
+ assert!(builder.has_room_for_row(b"key", Some(&small_value)));
+
+ // Test append enforcement - add small record first
+ builder.append_row(b"key", Some(&small_value)).unwrap();
+
+ // Try to add large record that exceeds limit (reuse large_row from
above)
+ let result = builder.append_row(b"key2", Some(&large_row));
+ assert!(result.is_err());
+ assert_eq!(result.unwrap_err().kind(), io::ErrorKind::WriteZero);
+ }
+
+ #[test]
+ fn test_append_checks_record_count_limit() {
+ let mut builder = KvRecordBatchBuilder::new(1, 100000,
KvFormat::COMPACTED);
+ builder.current_record_number = i32::MAX - 1;
+
+ let value1 = create_test_row(b"value1");
+ builder.append_row(b"key1", Some(&value1)).unwrap();
+
+ let value2 = create_test_row(b"value2");
+ let result = builder.append_row(b"key2", Some(&value2));
+ assert!(result.is_err());
+ assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput);
+ }
+
+ #[test]
+ #[should_panic(expected = "schema_id shouldn't be greater than")]
+ fn test_builder_invalid_schema_id() {
+ KvRecordBatchBuilder::new(i16::MAX as i32 + 1, 4096,
KvFormat::COMPACTED);
+ }
+
+ #[test]
+ fn test_cache_invalidation_on_append() {
+ let mut builder = KvRecordBatchBuilder::new(1, 4096,
KvFormat::COMPACTED);
+ builder.set_writer_state(100, 5);
+
+ let value1 = create_test_row(b"value1");
+ builder.append_row(b"key1", Some(&value1)).unwrap();
+ let bytes1 = builder.build().unwrap();
+ let len1 = bytes1.len();
+
+ // Append another record - this should invalidate the cache
+ let value2 = create_test_row(b"value2");
+ builder.append_row(b"key2", Some(&value2)).unwrap();
+ let bytes2 = builder.build().unwrap();
+ let len2 = bytes2.len();
+
+ // Verify the second build includes both records
+ assert!(len2 > len1, "Second build should be larger");
+
+ use crate::record::kv::KvRecordBatch;
+ let batch = KvRecordBatch::new(bytes2, 0);
+ assert!(batch.is_valid());
+ assert_eq!(batch.record_count().unwrap(), 2, "Should have 2 records");
+ }
+
+ #[test]
+ fn test_cache_invalidation_on_set_writer_state() {
+ let mut builder = KvRecordBatchBuilder::new(1, 4096,
KvFormat::COMPACTED);
+
+ builder.set_writer_state(100, 5);
+ let value = create_test_row(b"value");
+ builder.append_row(b"key", Some(&value)).unwrap();
+ let bytes1 = builder.build().unwrap();
+
+ // Change writer state - this should invalidate the cache
+ builder.set_writer_state(200, 10);
+ let bytes2 = builder.build().unwrap();
+
+ assert_ne!(
+ bytes1, bytes2,
+ "Bytes should differ after writer state change"
+ );
+
+ use crate::record::kv::KvRecordBatch;
+ let batch1 = KvRecordBatch::new(bytes1, 0);
+ let batch2 = KvRecordBatch::new(bytes2, 0);
+
+ assert_eq!(batch1.writer_id().unwrap(), 100);
+ assert_eq!(batch1.batch_sequence().unwrap(), 5);
+
+ assert_eq!(batch2.writer_id().unwrap(), 200);
+ assert_eq!(batch2.batch_sequence().unwrap(), 10);
+ }
+
+ #[test]
+ fn test_builder_with_compacted_row_writer() {
+ use crate::metadata::{DataType, IntType, StringType};
+ use crate::record::kv::KvRecordBatch;
+ use crate::row::InternalRow;
+ use crate::row::compacted::CompactedRow;
+
+ let mut builder = KvRecordBatchBuilder::new(1, 100000,
KvFormat::COMPACTED);
+ builder.set_writer_state(100, 5);
+
+ let types = vec![
+ DataType::Int(IntType::new()),
+ DataType::String(StringType::new()),
+ ];
+
+ // Create and append first record with CompactedRowWriter
+ let mut row_writer1 = CompactedRowWriter::new(2);
+ row_writer1.write_int(42);
+ row_writer1.write_string("hello");
+
+ let key1 = b"key1";
+ assert!(builder.has_room_for_row(key1, Some(&row_writer1)));
+ builder.append_row(key1, Some(&row_writer1)).unwrap();
+
+ // Create and append second record
+ let mut row_writer2 = CompactedRowWriter::new(2);
+ row_writer2.write_int(100);
+ row_writer2.write_string("world");
+
+ let key2 = b"key2";
+ builder.append_row(key2, Some(&row_writer2)).unwrap();
+
+ // Append a deletion record
+ let key3 = b"key3";
+ builder
+ .append_row::<CompactedRowWriter>(key3, None)
+ .unwrap();
+
+ // Build and verify
+ builder.close().unwrap();
+ let bytes = builder.build().unwrap();
+
+ let batch = KvRecordBatch::new(bytes, 0);
+ assert!(batch.is_valid());
+ assert_eq!(batch.record_count().unwrap(), 3);
+ assert_eq!(batch.writer_id().unwrap(), 100);
+ assert_eq!(batch.batch_sequence().unwrap(), 5);
+
+ // Read back and verify records
+ let records: Vec<_> = batch.records().unwrap().collect();
+ assert_eq!(records.len(), 3);
+
+ // Verify first record
+ let record1 = records[0].as_ref().unwrap();
+ assert_eq!(record1.key().as_ref(), key1);
+ let row1 = CompactedRow::from_bytes(&types, record1.value().unwrap());
+ assert_eq!(row1.get_int(0), 42);
+ assert_eq!(row1.get_string(1), "hello");
+
+ // Verify second record
+ let record2 = records[1].as_ref().unwrap();
+ assert_eq!(record2.key().as_ref(), key2);
+ let row2 = CompactedRow::from_bytes(&types, record2.value().unwrap());
+ assert_eq!(row2.get_int(0), 100);
+ assert_eq!(row2.get_string(1), "world");
+
+ // Verify deletion record
+ let record3 = records[2].as_ref().unwrap();
+ assert_eq!(record3.key().as_ref(), key3);
+ assert!(record3.value().is_none());
+ }
+
+ #[test]
+ fn test_kv_format_validation() {
+ let mut row_writer = CompactedRowWriter::new(1);
+ row_writer.write_int(42);
+
+ // INDEXED format should reject append_row
+ let mut indexed_builder = KvRecordBatchBuilder::new(1, 4096,
KvFormat::INDEXED);
+ let result = indexed_builder.append_row(b"key", Some(&row_writer));
+ assert!(result.is_err());
+ assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput);
+
+ // COMPACTED format should accept append_row
+ let mut compacted_builder = KvRecordBatchBuilder::new(1, 4096,
KvFormat::COMPACTED);
+ let result = compacted_builder.append_row(b"key", Some(&row_writer));
+ assert!(result.is_ok());
+ }
+}
diff --git a/crates/fluss/src/record/kv/mod.rs
b/crates/fluss/src/record/kv/mod.rs
new file mode 100644
index 0000000..ecb762d
--- /dev/null
+++ b/crates/fluss/src/record/kv/mod.rs
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Key-Value record and batch implementations.
+
+mod kv_record;
+mod kv_record_batch;
+mod kv_record_batch_builder;
+
+pub use kv_record::{KvRecord, LENGTH_LENGTH as KV_RECORD_LENGTH_LENGTH};
+pub use kv_record_batch::*;
+pub use kv_record_batch_builder::*;
+
+/// Current KV magic value
+pub const CURRENT_KV_MAGIC_VALUE: u8 = 0;
+
+/// No writer ID constant
+pub const NO_WRITER_ID: i64 = -1;
+
+/// No batch sequence constant
+pub const NO_BATCH_SEQUENCE: i32 = -1;
diff --git a/crates/fluss/src/record/mod.rs b/crates/fluss/src/record/mod.rs
index 35928ea..c5a3f8e 100644
--- a/crates/fluss/src/record/mod.rs
+++ b/crates/fluss/src/record/mod.rs
@@ -22,6 +22,7 @@ use std::collections::HashMap;
mod arrow;
mod error;
+pub mod kv;
pub use arrow::*;
diff --git a/crates/fluss/src/row/compacted/compacted_key_writer.rs
b/crates/fluss/src/row/compacted/compacted_key_writer.rs
index 84a6b22..1152b0c 100644
--- a/crates/fluss/src/row/compacted/compacted_key_writer.rs
+++ b/crates/fluss/src/row/compacted/compacted_key_writer.rs
@@ -30,6 +30,12 @@ pub struct CompactedKeyWriter {
delegate: CompactedRowWriter,
}
+impl Default for CompactedKeyWriter {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
impl CompactedKeyWriter {
pub fn new() -> CompactedKeyWriter {
CompactedKeyWriter {
diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs
b/crates/fluss/src/row/compacted/compacted_row_reader.rs
index c053d4e..5ec2608 100644
--- a/crates/fluss/src/row/compacted/compacted_row_reader.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs
@@ -19,6 +19,7 @@ use
crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes;
use crate::{
metadata::DataType,
row::{Datum, GenericRow,
compacted::compacted_row_writer::CompactedRowWriter},
+ util::varint::{read_unsigned_varint_at, read_unsigned_varint_u64_at},
};
use std::str::from_utf8;
@@ -150,36 +151,18 @@ impl<'a> CompactedRowReader<'a> {
(val, next_pos)
}
- pub fn read_int(&self, mut pos: usize) -> (i32, usize) {
- let mut result: u32 = 0;
- let mut shift = 0;
-
- for _ in 0..CompactedRowWriter::MAX_INT_SIZE {
- let (b, next_pos) = self.read_byte(pos);
- pos = next_pos;
- result |= ((b & 0x7F) as u32) << shift;
- if (b & 0x80) == 0 {
- return (result as i32, pos);
- }
- shift += 7;
+ pub fn read_int(&self, pos: usize) -> (i32, usize) {
+ match read_unsigned_varint_at(self.segment, pos,
CompactedRowWriter::MAX_INT_SIZE) {
+ Ok((value, next_pos)) => (value as i32, next_pos),
+ Err(_) => panic!("Invalid VarInt32 input stream."),
}
- panic!("Invalid VarInt32 input stream.");
}
- pub fn read_long(&self, mut pos: usize) -> (i64, usize) {
- let mut result: u64 = 0;
- let mut shift = 0;
-
- for _ in 0..CompactedRowWriter::MAX_LONG_SIZE {
- let (b, next_pos) = self.read_byte(pos);
- pos = next_pos;
- result |= ((b & 0x7F) as u64) << shift;
- if (b & 0x80) == 0 {
- return (result as i64, pos);
- }
- shift += 7;
+ pub fn read_long(&self, pos: usize) -> (i64, usize) {
+ match read_unsigned_varint_u64_at(self.segment, pos,
CompactedRowWriter::MAX_LONG_SIZE) {
+ Ok((value, next_pos)) => (value as i64, next_pos),
+ Err(_) => panic!("Invalid VarInt64 input stream."),
}
- panic!("Invalid VarInt64 input stream.");
}
pub fn read_float(&self, pos: usize) -> (f32, usize) {
diff --git a/crates/fluss/src/row/compacted/compacted_row_writer.rs
b/crates/fluss/src/row/compacted/compacted_row_writer.rs
index 4f535c6..63b32a3 100644
--- a/crates/fluss/src/row/compacted/compacted_row_writer.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_writer.rs
@@ -18,7 +18,9 @@
use bytes::{Bytes, BytesMut};
use std::cmp;
+use crate::row::BinaryRow;
use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes;
+use crate::util::varint::{write_unsigned_varint_to_slice,
write_unsigned_varint_u64_to_slice};
// Writer for CompactedRow
// Reference implementation:
@@ -125,25 +127,16 @@ impl CompactedRowWriter {
pub fn write_int(&mut self, value: i32) {
self.ensure_capacity(Self::MAX_INT_SIZE);
- let mut v = value as u32;
- while (v & !0x7F) != 0 {
- self.buffer[self.position] = ((v as u8) & 0x7F) | 0x80;
- self.position += 1;
- v >>= 7;
- }
- self.buffer[self.position] = v as u8;
- self.position += 1;
+ let bytes_written =
+ write_unsigned_varint_to_slice(value as u32, &mut
self.buffer[self.position..]);
+ self.position += bytes_written;
}
+
pub fn write_long(&mut self, value: i64) {
self.ensure_capacity(Self::MAX_LONG_SIZE);
- let mut v = value as u64;
- while (v & !0x7F) != 0 {
- self.buffer[self.position] = ((v as u8) & 0x7F) | 0x80;
- self.position += 1;
- v >>= 7;
- }
- self.buffer[self.position] = v as u8;
- self.position += 1;
+ let bytes_written =
+ write_unsigned_varint_u64_to_slice(value as u64, &mut
self.buffer[self.position..]);
+ self.position += bytes_written;
}
pub fn write_float(&mut self, value: f32) {
@@ -154,3 +147,9 @@ impl CompactedRowWriter {
self.write_raw(&value.to_ne_bytes());
}
}
+
+impl BinaryRow for CompactedRowWriter {
+ fn as_bytes(&self) -> &[u8] {
+ self.buffer()
+ }
+}
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index c321ab9..144d64f 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -20,13 +20,18 @@ mod column;
mod datum;
mod binary;
-mod compacted;
+pub mod compacted;
mod encode;
mod field_getter;
pub use column::*;
pub use datum::*;
+pub trait BinaryRow {
+ /// Returns the binary representation of this row as a byte slice.
+ fn as_bytes(&self) -> &[u8];
+}
+
// TODO make functions return Result<?> for better error handling
pub trait InternalRow {
/// Returns the number of fields in this row
diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs
index 5f67290..d191615 100644
--- a/crates/fluss/src/util/mod.rs
+++ b/crates/fluss/src/util/mod.rs
@@ -16,6 +16,7 @@
// under the License.
pub mod murmur_hash;
+pub mod varint;
use crate::metadata::TableBucket;
use linked_hash_map::LinkedHashMap;
diff --git a/crates/fluss/src/util/varint.rs b/crates/fluss/src/util/varint.rs
new file mode 100644
index 0000000..96fd1f5
--- /dev/null
+++ b/crates/fluss/src/util/varint.rs
@@ -0,0 +1,502 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Variable-length integer encoding utilities.
+//!
+//! This module provides utilities for encoding integers in variable-length
format,
+//! which can save space when encoding small integers. The encoding uses 7
bits per byte
+//! with the most significant bit as a continuation flag.
+
+use bytes::BufMut;
+use std::io::{self, Read, Write};
+
+/// Write an unsigned integer in variable-length format.
+///
+/// The encoding uses 7 bits per byte with the MSB set to 1 if more bytes
follow.
+/// This matches the encoding used in Google Protocol Buffers.
+#[allow(dead_code)]
+pub fn write_unsigned_varint<W: Write>(value: u32, writer: &mut W) ->
io::Result<usize> {
+ let mut v = value;
+ let mut bytes_written = 0;
+
+ while (v & !0x7F) != 0 {
+ writer.write_all(&[((v as u8) & 0x7F) | 0x80])?;
+ bytes_written += 1;
+ v >>= 7;
+ }
+ writer.write_all(&[v as u8])?;
+ bytes_written += 1;
+
+ Ok(bytes_written)
+}
+
+/// Write an unsigned integer in variable-length format to a buffer.
+pub fn write_unsigned_varint_buf(value: u32, buf: &mut impl BufMut) {
+ let mut v = value;
+
+ while (v & !0x7F) != 0 {
+ buf.put_u8(((v as u8) & 0x7F) | 0x80);
+ v >>= 7;
+ }
+ buf.put_u8(v as u8);
+}
+
+/// Read an unsigned integer stored in variable-length format.
+#[allow(dead_code)]
+pub fn read_unsigned_varint<R: Read>(reader: &mut R) -> io::Result<u32> {
+ let mut tmp = [0u8; 1];
+ reader.read_exact(&mut tmp)?;
+ let mut byte = tmp[0] as i8;
+
+ if byte >= 0 {
+ return Ok(byte as u32);
+ }
+
+ let mut result = (byte & 127) as u32;
+
+ reader.read_exact(&mut tmp)?;
+ byte = tmp[0] as i8;
+ if byte >= 0 {
+ result |= (byte as u32) << 7;
+ } else {
+ result |= ((byte & 127) as u32) << 7;
+
+ reader.read_exact(&mut tmp)?;
+ byte = tmp[0] as i8;
+ if byte >= 0 {
+ result |= (byte as u32) << 14;
+ } else {
+ result |= ((byte & 127) as u32) << 14;
+
+ reader.read_exact(&mut tmp)?;
+ byte = tmp[0] as i8;
+ if byte >= 0 {
+ result |= (byte as u32) << 21;
+ } else {
+ result |= ((byte & 127) as u32) << 21;
+
+ reader.read_exact(&mut tmp)?;
+ byte = tmp[0] as i8;
+ result |= (byte as u32) << 28;
+
+ if byte < 0 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "Invalid u32 varint encoding: too many bytes (most
significant bit in the 5th byte is set)",
+ ));
+ }
+ }
+ }
+ }
+
+ Ok(result)
+}
+
+/// Read an unsigned integer from a byte slice in variable-length format.
+pub fn read_unsigned_varint_bytes(bytes: &[u8]) -> io::Result<(u32, usize)> {
+ if bytes.is_empty() {
+ return Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "Cannot read varint from empty buffer",
+ ));
+ }
+
+ let mut byte = bytes[0] as i8;
+ let mut index = 1;
+
+ if byte >= 0 {
+ return Ok((byte as u32, index));
+ }
+
+ let mut result = (byte & 127) as u32;
+
+ if index >= bytes.len() {
+ return Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "Incomplete varint",
+ ));
+ }
+ byte = bytes[index] as i8;
+ index += 1;
+ if byte >= 0 {
+ result |= (byte as u32) << 7;
+ } else {
+ result |= ((byte & 127) as u32) << 7;
+
+ if index >= bytes.len() {
+ return Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "Incomplete varint",
+ ));
+ }
+ byte = bytes[index] as i8;
+ index += 1;
+ if byte >= 0 {
+ result |= (byte as u32) << 14;
+ } else {
+ result |= ((byte & 127) as u32) << 14;
+
+ if index >= bytes.len() {
+ return Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "Incomplete varint",
+ ));
+ }
+ byte = bytes[index] as i8;
+ index += 1;
+ if byte >= 0 {
+ result |= (byte as u32) << 21;
+ } else {
+ result |= ((byte & 127) as u32) << 21;
+
+ if index >= bytes.len() {
+ return Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "Incomplete varint",
+ ));
+ }
+ byte = bytes[index] as i8;
+ index += 1;
+ result |= (byte as u32) << 28;
+
+ if byte < 0 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "Invalid u32 varint encoding: too many bytes (most
significant bit in the 5th byte is set)",
+ ));
+ }
+ }
+ }
+ }
+
+ Ok((result, index))
+}
+
+/// Calculate the number of bytes needed to encode a u32 in variable-length
format.
+///
+/// Varint encoding uses 7 bits per byte, so we need `ceil(bits_used / 7)`
bytes.
+/// This function computes that efficiently using the formula:
+///
+/// size = ((38 - leading_zeros) * 74899) >> 19 + (leading_zeros >> 5)
+///
+/// Where:
+/// - `38 = 32 + 6` (6 accounts for ceiling in division)
+/// - `74899 = 2^19 / 7` (enables division by 7 via multiply + shift)
+/// - `leading_zeros >> 5` adds 1 when value is 0 (minimum 1 byte)
+pub fn size_of_unsigned_varint(value: u32) -> usize {
+ let leading_zeros = value.leading_zeros();
+ let leading_zeros_below_38_divided_by_7 = ((38 - leading_zeros) *
0b10010010010010011) >> 19;
+ (leading_zeros_below_38_divided_by_7 + (leading_zeros >> 5)) as usize
+}
+
+/// Calculate the number of bytes needed to encode a u64 in variable-length
format.
+///
+/// Varint encoding uses 7 bits per byte, so we need `ceil(bits_used / 7)`
bytes.
+/// This function computes that efficiently using the formula:
+///
+/// size = ((70 - leading_zeros) * 74899) >> 19 + (leading_zeros >> 6)
+///
+/// - `70 = 64 + 6` (6 accounts for ceiling in division)
+/// - `74899 = 2^19 / 7` (enables division by 7 via multiply + shift)
+/// - `leading_zeros >> 6` adds 1 when value is 0 (minimum 1 byte)
+#[allow(dead_code)]
+pub fn size_of_unsigned_varint_u64(value: u64) -> usize {
+ let leading_zeros = value.leading_zeros();
+ let leading_zeros_below_70_divided_by_7 = ((70 - leading_zeros) *
0b10010010010010011) >> 19;
+ (leading_zeros_below_70_divided_by_7 + (leading_zeros >> 6)) as usize
+}
+
+/// Write an unsigned 64-bit integer in variable-length format to a buffer.
+#[allow(dead_code)]
+pub fn write_unsigned_varint_u64_buf(value: u64, buf: &mut impl BufMut) {
+ let mut v = value;
+ while (v & !0x7F) != 0 {
+ buf.put_u8(((v as u8) & 0x7F) | 0x80);
+ v >>= 7;
+ }
+ buf.put_u8(v as u8);
+}
+
+/// Write directly to a mutable byte slice, returning the number of bytes
written.
+/// Used by CompactedRowWriter which manages its own position.
+///
+/// # Panics
+/// Panics if the slice is too small to hold the encoded varint.
+/// The slice must have at least 5 bytes available (the maximum size for a u32
varint).
+/// Use [`size_of_unsigned_varint`] to calculate the required size beforehand.
+pub fn write_unsigned_varint_to_slice(value: u32, slice: &mut [u8]) -> usize {
+ let mut v = value;
+ let mut written = 0;
+
+ while (v & !0x7F) != 0 {
+ slice[written] = ((v as u8) & 0x7F) | 0x80;
+ written += 1;
+ v >>= 7;
+ }
+ slice[written] = v as u8;
+ written + 1
+}
+
+/// Write unsigned 64-bit varint directly to a mutable byte slice.
+///
+/// # Panics
+/// Panics if the slice is too small to hold the encoded varint.
+/// The slice must have at least 10 bytes available (the maximum size for a
u64 varint).
+pub fn write_unsigned_varint_u64_to_slice(value: u64, slice: &mut [u8]) ->
usize {
+ let mut v = value;
+ let mut written = 0;
+
+ while (v & !0x7F) != 0 {
+ slice[written] = ((v as u8) & 0x7F) | 0x80;
+ written += 1;
+ v >>= 7;
+ }
+ slice[written] = v as u8;
+ written + 1
+}
+
+/// Read unsigned varint from a slice starting at given position.
+/// Returns (value, next_position).
+/// Used by CompactedRowReader which manages positions.
+pub fn read_unsigned_varint_at(
+ slice: &[u8],
+ mut pos: usize,
+ max_bytes: usize,
+) -> io::Result<(u32, usize)> {
+ let mut result: u32 = 0;
+ let mut shift = 0;
+
+ for _ in 0..max_bytes {
+ if pos >= slice.len() {
+ return Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "Unexpected end of varint",
+ ));
+ }
+ let b = slice[pos];
+ pos += 1;
+ result |= ((b & 0x7F) as u32) << shift;
+ if (b & 0x80) == 0 {
+ return Ok((result, pos));
+ }
+ shift += 7;
+ }
+
+ Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "Invalid VarInt32 input stream",
+ ))
+}
+
+/// Read unsigned 64-bit varint from a slice starting at given position.
+pub fn read_unsigned_varint_u64_at(
+ slice: &[u8],
+ mut pos: usize,
+ max_bytes: usize,
+) -> io::Result<(u64, usize)> {
+ let mut result: u64 = 0;
+ let mut shift = 0;
+
+ for _ in 0..max_bytes {
+ if pos >= slice.len() {
+ return Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "Unexpected end of varint",
+ ));
+ }
+ let b = slice[pos];
+ pos += 1;
+ result |= ((b & 0x7F) as u64) << shift;
+ if (b & 0x80) == 0 {
+ return Ok((result, pos));
+ }
+ shift += 7;
+ }
+
+ Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "Invalid VarInt64 input stream",
+ ))
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::io::Cursor;
+
+ #[test]
+ fn test_unsigned_varint_round_trip() {
+ let test_values = vec![
+ 0u32,
+ 1,
+ 127,
+ 128,
+ 255,
+ 256,
+ 16383,
+ 16384,
+ 2097151,
+ 2097152,
+ 268435455,
+ 268435456,
+ u32::MAX,
+ ];
+
+ for value in test_values {
+ // Test with Write trait
+ let mut buffer = Vec::new();
+ let written = write_unsigned_varint(value, &mut buffer).unwrap();
+
+ let mut reader = Cursor::new(&buffer);
+ let read_value = read_unsigned_varint(&mut reader).unwrap();
+
+ assert_eq!(value, read_value, "Round trip failed for value {}",
value);
+ assert_eq!(
+ written,
+ buffer.len(),
+ "Bytes written mismatch for value {}",
+ value
+ );
+
+ // Test with BufMut
+ let mut buf = bytes::BytesMut::new();
+ write_unsigned_varint_buf(value, &mut buf);
+ assert_eq!(buf.len(), written, "BufMut write length mismatch");
+
+ // Test size calculation
+ let calculated_size = size_of_unsigned_varint(value);
+ assert_eq!(
+ calculated_size,
+ buffer.len(),
+ "Size calculation failed for value {}",
+ value
+ );
+
+ // Test reading from bytes
+ let (read_value_bytes, bytes_read) =
read_unsigned_varint_bytes(&buffer).unwrap();
+ assert_eq!(
+ value, read_value_bytes,
+ "Bytes read failed for value {}",
+ value
+ );
+ assert_eq!(
+ bytes_read,
+ buffer.len(),
+ "Bytes read count mismatch for value {}",
+ value
+ );
+ }
+ }
+
+ #[test]
+ fn test_size_of_unsigned_varint() {
+ assert_eq!(size_of_unsigned_varint(0), 1);
+ assert_eq!(size_of_unsigned_varint(127), 1);
+ assert_eq!(size_of_unsigned_varint(128), 2);
+ assert_eq!(size_of_unsigned_varint(16383), 2);
+ assert_eq!(size_of_unsigned_varint(16384), 3);
+ assert_eq!(size_of_unsigned_varint(2097151), 3);
+ assert_eq!(size_of_unsigned_varint(2097152), 4);
+ assert_eq!(size_of_unsigned_varint(268435455), 4);
+ assert_eq!(size_of_unsigned_varint(268435456), 5);
+ assert_eq!(size_of_unsigned_varint(u32::MAX), 5);
+ }
+
+ #[test]
+ fn test_size_of_unsigned_varint_u64() {
+ assert_eq!(size_of_unsigned_varint_u64(0), 1);
+ assert_eq!(size_of_unsigned_varint_u64(127), 1);
+ assert_eq!(size_of_unsigned_varint_u64(128), 2);
+ assert_eq!(size_of_unsigned_varint_u64(16383), 2);
+ assert_eq!(size_of_unsigned_varint_u64(16384), 3);
+ assert_eq!(size_of_unsigned_varint_u64(2097151), 3);
+ assert_eq!(size_of_unsigned_varint_u64(2097152), 4);
+ assert_eq!(size_of_unsigned_varint_u64(268435455), 4);
+ assert_eq!(size_of_unsigned_varint_u64(268435456), 5);
+ assert_eq!(size_of_unsigned_varint_u64(u32::MAX as u64), 5);
+ assert_eq!(size_of_unsigned_varint_u64(34359738367), 5);
+ assert_eq!(size_of_unsigned_varint_u64(34359738368), 6);
+ assert_eq!(size_of_unsigned_varint_u64(4398046511103), 6);
+ assert_eq!(size_of_unsigned_varint_u64(4398046511104), 7);
+ assert_eq!(size_of_unsigned_varint_u64(562949953421311), 7);
+ assert_eq!(size_of_unsigned_varint_u64(562949953421312), 8);
+ assert_eq!(size_of_unsigned_varint_u64(72057594037927935), 8);
+ assert_eq!(size_of_unsigned_varint_u64(72057594037927936), 9);
+ assert_eq!(size_of_unsigned_varint_u64(9223372036854775807), 9);
+ assert_eq!(size_of_unsigned_varint_u64(9223372036854775808), 10);
+ assert_eq!(size_of_unsigned_varint_u64(u64::MAX), 10);
+ }
+
+ #[test]
+ fn test_read_unsigned_varint_bytes_error_handling() {
+ // Empty buffer
+ assert!(read_unsigned_varint_bytes(&[]).is_err());
+
+ // Incomplete varint (continuation bit set but no next byte)
+ assert!(read_unsigned_varint_bytes(&[0x80]).is_err());
+ assert!(read_unsigned_varint_bytes(&[0xFF, 0x80]).is_err());
+ }
+
+ #[test]
+ fn test_write_read_to_slice() {
+ // Test u32 varint to slice
+ let test_values_u32 = vec![0u32, 127, 128, 16384, u32::MAX];
+
+ for value in test_values_u32 {
+ let mut buffer = vec![0u8; 10];
+ let written = write_unsigned_varint_to_slice(value, &mut buffer);
+
+ let (read_value, next_pos) = read_unsigned_varint_at(&buffer, 0,
5).unwrap();
+ assert_eq!(value, read_value);
+ assert_eq!(written, next_pos);
+ }
+
+ // Test u64 varint to slice
+ let test_values_u64 = vec![0u64, 127, 128, 16384, u32::MAX as u64,
u64::MAX];
+
+ for value in test_values_u64 {
+ let mut buffer = vec![0u8; 10];
+ let written = write_unsigned_varint_u64_to_slice(value, &mut
buffer);
+
+ let (read_value, next_pos) = read_unsigned_varint_u64_at(&buffer,
0, 10).unwrap();
+ assert_eq!(value, read_value);
+ assert_eq!(written, next_pos);
+ }
+ }
+
+ #[test]
+ fn test_read_at_with_offset() {
+ // Write multiple varints and read at different positions
+ let mut buffer = vec![0u8; 20];
+ let mut pos = 0;
+
+ pos += write_unsigned_varint_to_slice(127, &mut buffer[pos..]);
+ pos += write_unsigned_varint_to_slice(16384, &mut buffer[pos..]);
+ let end_pos = pos + write_unsigned_varint_to_slice(u32::MAX, &mut
buffer[pos..]);
+
+ // Read back
+ let (val1, pos1) = read_unsigned_varint_at(&buffer, 0, 5).unwrap();
+ assert_eq!(val1, 127);
+
+ let (val2, pos2) = read_unsigned_varint_at(&buffer, pos1, 5).unwrap();
+ assert_eq!(val2, 16384);
+
+ let (val3, pos3) = read_unsigned_varint_at(&buffer, pos2, 5).unwrap();
+ assert_eq!(val3, u32::MAX);
+ assert_eq!(pos3, end_pos);
+ }
+}