This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new f4808d8 feat: Introduce KVReadContext and read path wiring (#174)
f4808d8 is described below
commit f4808d81cc5e5788815f13cd49a012f9a3d71528
Author: Anton Borisov <[email protected]>
AuthorDate: Sat Jan 17 15:41:12 2026 +0000
feat: Introduce KVReadContext and read path wiring (#174)
---
crates/fluss/src/record/kv/kv_record.rs | 166 ++++++++++---------
crates/fluss/src/record/kv/kv_record_batch.rs | 115 ++++++++++---
.../fluss/src/record/kv/kv_record_batch_builder.rs | 154 +++++++++---------
.../fluss/src/record/kv/kv_record_read_context.rs | 179 +++++++++++++++++++++
crates/fluss/src/record/kv/mod.rs | 7 +
crates/fluss/src/record/kv/read_context.rs | 45 ++++++
crates/fluss/src/record/kv/test_util.rs | 50 ++++++
crates/fluss/src/row/mod.rs | 2 +
crates/fluss/src/row/row_decoder.rs | 137 ++++++++++++++++
9 files changed, 674 insertions(+), 181 deletions(-)
diff --git a/crates/fluss/src/record/kv/kv_record.rs
b/crates/fluss/src/record/kv/kv_record.rs
index ab8c2ac..a9c45d6 100644
--- a/crates/fluss/src/record/kv/kv_record.rs
+++ b/crates/fluss/src/record/kv/kv_record.rs
@@ -27,6 +27,8 @@
use bytes::{BufMut, Bytes, BytesMut};
use std::io;
+use crate::row::RowDecoder;
+use crate::row::compacted::CompactedRow;
use crate::util::varint::{
read_unsigned_varint_bytes, size_of_unsigned_varint,
write_unsigned_varint_buf,
};
@@ -34,7 +36,7 @@ use crate::util::varint::{
/// Length field size in bytes
pub const LENGTH_LENGTH: usize = 4;
-/// A key-value record.
+/// A key-value record containing raw key and value bytes.
///
/// The schema is:
/// - Length => Int32
@@ -43,34 +45,39 @@ pub const LENGTH_LENGTH: usize = 4;
/// - Value => bytes (BinaryRow, written directly without length prefix)
///
/// When the value is None (deletion), no Value bytes are present.
-// Reference implementation:
-//
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/record/KvRecord.java
+///
+/// This struct stores only raw bytes. To decode the value into a typed row,
+/// use the `row()` method with a RowDecoder (typically obtained from the
iterator).
+///
+/// Reference implementation:
+///
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/record/KvRecord.java
#[derive(Debug, Clone)]
pub struct KvRecord {
key: Bytes,
- value: Option<Bytes>,
+ value_bytes: Option<Bytes>,
size_in_bytes: usize,
}
impl KvRecord {
- /// Create a new KvRecord with the given key and optional value.
- pub fn new(key: Bytes, value: Option<Bytes>) -> Self {
- let size_in_bytes = Self::size_of(&key, value.as_deref());
- Self {
- key,
- value,
- size_in_bytes,
- }
- }
-
/// Get the key bytes.
pub fn key(&self) -> &Bytes {
&self.key
}
- /// Get the value bytes (None indicates a deletion).
- pub fn value(&self) -> Option<&Bytes> {
- self.value.as_ref()
+ /// Get the raw value bytes (for testing).
+ #[cfg(test)]
+ pub(crate) fn value_bytes(&self) -> Option<&Bytes> {
+ self.value_bytes.as_ref()
+ }
+
+ /// Decode the value bytes into a typed row using the provided decoder.
+ /// This creates a lightweight CompactedRow view over the raw bytes.
+ /// Actual field parsing is lazy (on first access).
+ pub fn row<'a>(&'a self, decoder: &dyn RowDecoder) ->
Option<CompactedRow<'a>> {
+ self.value_bytes.as_ref().map(|bytes| {
+ // Decode on-demand - CompactedRow<'a> lifetime tied to &'a self
+ decoder.decode(bytes.as_ref())
+ })
}
/// Calculate the total size of the record when serialized (including
length prefix).
@@ -121,8 +128,7 @@ impl KvRecord {
/// Read a KV record from bytes at the given position.
///
/// Returns the KvRecord and the number of bytes consumed.
- ///
- /// TODO: Connect KvReadContext and return CompactedRow records.
+ /// The record contains only raw bytes; use `row()` with a RowDecoder to
decode the value.
pub fn read_from(bytes: &Bytes, position: usize) -> io::Result<(Self,
usize)> {
if bytes.len() < position.saturating_add(LENGTH_LENGTH) {
return Err(io::Error::new(
@@ -183,11 +189,10 @@ impl KvRecord {
let key = bytes.slice(current_offset..key_end);
current_offset = key_end;
- // Read value bytes directly
- let value = if current_offset < record_end {
+ // Read value bytes directly (don't decode yet - will decode on-demand)
+ let value_bytes = if current_offset < record_end {
// Value is present: all remaining bytes are the value
- let value_bytes = bytes.slice(current_offset..record_end);
- Some(value_bytes)
+ Some(bytes.slice(current_offset..record_end))
} else {
// No remaining bytes: this is a deletion record
None
@@ -196,7 +201,7 @@ impl KvRecord {
Ok((
Self {
key,
- value,
+ value_bytes,
size_in_bytes: total_size,
},
total_size,
@@ -207,6 +212,11 @@ impl KvRecord {
pub fn get_size_in_bytes(&self) -> usize {
self.size_in_bytes
}
+
+ /// Check if this is a deletion record (no value).
+ pub fn is_deletion(&self) -> bool {
+ self.value_bytes.is_none()
+ }
}
#[cfg(test)]
@@ -214,30 +224,25 @@ mod tests {
use super::*;
#[test]
- fn test_kv_record_size_calculation() {
+ fn test_kv_record_basic_operations() {
let key = b"test_key";
let value = b"test_value";
- // With value (no value length varint)
+ // Test size calculation with value
let size_with_value = KvRecord::size_of(key, Some(value));
assert_eq!(
size_with_value,
LENGTH_LENGTH + size_of_unsigned_varint(key.len() as u32) +
key.len() + value.len()
);
- // Without value
+ // Test size calculation without value (deletion)
let size_without_value = KvRecord::size_of(key, None);
assert_eq!(
size_without_value,
LENGTH_LENGTH + size_of_unsigned_varint(key.len() as u32) +
key.len()
);
- }
-
- #[test]
- fn test_kv_record_write_read_round_trip() {
- let key = b"my_key";
- let value = b"my_value_data";
+ // Test write/read round trip with value
let mut buf = BytesMut::new();
let written = KvRecord::write_to_buf(&mut buf, key,
Some(value)).unwrap();
@@ -246,40 +251,70 @@ mod tests {
assert_eq!(written, read_size);
assert_eq!(record.key().as_ref(), key);
- assert_eq!(record.value().unwrap().as_ref(), value);
+ assert_eq!(record.value_bytes().unwrap().as_ref(), value);
assert_eq!(record.get_size_in_bytes(), written);
- }
-
- #[test]
- fn test_kv_record_deletion() {
- let key = b"delete_me";
+ assert!(!record.is_deletion());
- // Write deletion record (no value)
+ // Test deletion record (no value)
+ let delete_key = b"delete_me";
let mut buf = BytesMut::new();
- let written = KvRecord::write_to_buf(&mut buf, key, None).unwrap();
+ let written = KvRecord::write_to_buf(&mut buf, delete_key,
None).unwrap();
let bytes = buf.freeze();
let (record, read_size) = KvRecord::read_from(&bytes, 0).unwrap();
assert_eq!(written, read_size);
- assert_eq!(record.key().as_ref(), key);
- assert!(record.value().is_none());
+ assert_eq!(record.key().as_ref(), delete_key);
+ assert!(record.is_deletion());
+ assert!(record.value_bytes().is_none());
}
#[test]
- fn test_kv_record_with_large_key() {
- let key = vec![0u8; 1024];
- let value = vec![1u8; 4096];
+ fn test_kv_record_multiple_records() {
+ // Test multiple regular-sized records in buffer
+ let records = vec![
+ (b"key1".as_slice(), Some(b"value1".as_slice())),
+ (b"key2".as_slice(), None), // Deletion
+ (b"key3".as_slice(), Some(b"value3".as_slice())),
+ ];
let mut buf = BytesMut::new();
- let written = KvRecord::write_to_buf(&mut buf, &key,
Some(&value)).unwrap();
+ for (key, value) in &records {
+ KvRecord::write_to_buf(&mut buf, key, *value).unwrap();
+ }
+
+ let bytes = buf.freeze();
+ let mut offset = 0;
+ for (expected_key, expected_value) in &records {
+ let (record, size) = KvRecord::read_from(&bytes, offset).unwrap();
+ assert_eq!(record.key().as_ref(), *expected_key);
+ match expected_value {
+ Some(v) => {
+ assert_eq!(record.value_bytes().unwrap().as_ref(), *v);
+ assert!(!record.is_deletion());
+ }
+ None => {
+ assert!(record.is_deletion());
+ assert!(record.value_bytes().is_none());
+ }
+ }
+ offset += size;
+ }
+ assert_eq!(offset, bytes.len());
+
+ // Test large keys and values
+ let large_key = vec![0u8; 1024];
+ let large_value = vec![1u8; 4096];
+
+ let mut buf = BytesMut::new();
+ let written = KvRecord::write_to_buf(&mut buf, &large_key,
Some(&large_value)).unwrap();
let bytes = buf.freeze();
let (record, read_size) = KvRecord::read_from(&bytes, 0).unwrap();
assert_eq!(written, read_size);
- assert_eq!(record.key().len(), key.len());
- assert_eq!(record.value().unwrap().len(), value.len());
+ assert_eq!(record.key().len(), large_key.len());
+ assert_eq!(record.value_bytes().unwrap().len(), large_value.len());
}
#[test]
@@ -291,7 +326,9 @@ mod tests {
let bytes = buf.freeze();
let result = KvRecord::read_from(&bytes, 0);
assert!(result.is_err());
- assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidData);
+ if let Err(e) = result {
+ assert_eq!(e.kind(), io::ErrorKind::InvalidData);
+ }
// Test overflow length
let mut buf = BytesMut::new();
@@ -307,33 +344,8 @@ mod tests {
let bytes = buf.freeze();
let result = KvRecord::read_from(&bytes, 0);
assert!(result.is_err());
- assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof);
- }
-
- #[test]
- fn test_multiple_records_in_buffer() {
- let records = vec![
- (b"key1".as_slice(), Some(b"value1".as_slice())),
- (b"key2".as_slice(), None),
- (b"key3".as_slice(), Some(b"value3".as_slice())),
- ];
-
- let mut buf = BytesMut::new();
- for (key, value) in &records {
- KvRecord::write_to_buf(&mut buf, key, *value).unwrap();
+ if let Err(e) = result {
+ assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
}
-
- let bytes = buf.freeze();
- let mut offset = 0;
- for (expected_key, expected_value) in &records {
- let (record, size) = KvRecord::read_from(&bytes, offset).unwrap();
- assert_eq!(record.key().as_ref(), *expected_key);
- match expected_value {
- Some(v) => assert_eq!(record.value().unwrap().as_ref(), *v),
- None => assert!(record.value().is_none()),
- }
- offset += size;
- }
- assert_eq!(offset, bytes.len());
}
}
diff --git a/crates/fluss/src/record/kv/kv_record_batch.rs
b/crates/fluss/src/record/kv/kv_record_batch.rs
index eb3c09a..32f712f 100644
--- a/crates/fluss/src/record/kv/kv_record_batch.rs
+++ b/crates/fluss/src/record/kv/kv_record_batch.rs
@@ -32,8 +32,11 @@
use bytes::Bytes;
use std::io;
+use std::sync::Arc;
-use crate::record::kv::KvRecord;
+use crate::error::Result;
+use crate::record::kv::{KvRecord, ReadContext};
+use crate::row::RowDecoder;
// Field lengths in bytes
pub const LENGTH_LENGTH: usize = 4;
@@ -253,38 +256,87 @@ impl KvRecordBatch {
]))
}
- /// Create an iterator over the records in this batch.
- /// This validates the batch checksum before returning the iterator.
+ /// Create an iterable collection of records in this batch.
+ ///
+ /// This validates the batch checksum before returning the records.
/// For trusted data paths, use `records_unchecked()` to skip validation.
- pub fn records(&self) -> io::Result<KvRecordIterator> {
+ ///
+ /// Mirrors: KvRecordBatch.records(ReadContext)
+ pub fn records(&self, read_context: &dyn ReadContext) -> Result<KvRecords>
{
if !self.is_valid() {
- return Err(io::Error::new(
- io::ErrorKind::InvalidData,
- "Invalid batch checksum",
- ));
+ return Err(crate::error::Error::IoUnexpectedError {
+ message: "Invalid batch checksum".to_string(),
+ source: io::Error::new(io::ErrorKind::InvalidData, "Invalid
batch checksum"),
+ });
}
- self.records_unchecked()
+ self.records_unchecked(read_context)
}
- /// Create an iterator over the records in this batch without validating
the checksum
- pub fn records_unchecked(&self) -> io::Result<KvRecordIterator> {
+ /// Create an iterable collection of records in this batch without
validating the checksum.
+ pub fn records_unchecked(&self, read_context: &dyn ReadContext) ->
Result<KvRecords> {
let size = self.size_in_bytes()?;
let count = self.record_count()?;
+ let schema_id = self.schema_id()?;
+
if count < 0 {
- return Err(io::Error::new(
- io::ErrorKind::InvalidData,
- format!("Invalid record count: {count}"),
- ));
+ return Err(crate::error::Error::IoUnexpectedError {
+ message: format!("Invalid record count: {count}"),
+ source: io::Error::new(io::ErrorKind::InvalidData, "Invalid
record count"),
+ });
}
- Ok(KvRecordIterator {
- data: self.data.clone(),
- position: self.position + RECORDS_OFFSET,
- end: self.position + size,
- remaining_count: count,
+
+ // Get row decoder for this schema from context (cached)
+ let row_decoder = read_context.get_row_decoder(schema_id)?;
+
+ Ok(KvRecords {
+ iter: KvRecordIterator {
+ data: self.data.clone(),
+ position: self.position + RECORDS_OFFSET,
+ end: self.position + size,
+ remaining_count: count,
+ },
+ row_decoder,
})
}
}
+/// Iterable collection of KV records with associated decoder.
+///
+/// This wrapper provides both iteration capability and access to the row
decoder
+/// needed to decode record values into typed rows.
+pub struct KvRecords {
+ iter: KvRecordIterator,
+ row_decoder: Arc<dyn RowDecoder>,
+}
+
+impl KvRecords {
+ /// Get a reference to the row decoder for decoding record values.
+ ///
+ /// Returns a reference tied to the lifetime of `&self`.
+ /// Use this when iterating by reference.
+ pub fn decoder(&self) -> &dyn RowDecoder {
+ &*self.row_decoder
+ }
+
+ /// Get an owned Arc to the row decoder.
+ ///
+ /// Returns a cloned Arc that can outlive the KvRecords,
+ /// allowing you to grab it before consuming the iterator.
+ /// Useful if you must keep the decoder beyond the iterable’s
lifetime(collect then decode style)
+ pub fn decoder_arc(&self) -> Arc<dyn RowDecoder> {
+ Arc::clone(&self.row_decoder)
+ }
+}
+
+impl IntoIterator for KvRecords {
+ type Item = io::Result<KvRecord>;
+ type IntoIter = KvRecordIterator;
+
+ fn into_iter(self) -> Self::IntoIter {
+ self.iter
+ }
+}
+
/// Iterator over records in a KV record batch.
pub struct KvRecordIterator {
data: Bytes,
@@ -319,7 +371,9 @@ impl Iterator for KvRecordIterator {
mod tests {
use super::*;
use crate::metadata::{DataTypes, KvFormat, RowType};
+ use crate::record::kv::test_util::TestReadContext;
use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, KvRecordBatchBuilder};
+ use crate::row::InternalRow;
use crate::row::binary::BinaryWriter;
use crate::row::compacted::CompactedRow;
use bytes::{BufMut, BytesMut};
@@ -380,15 +434,24 @@ mod tests {
assert_eq!(batch.batch_sequence().unwrap(), 5);
assert_eq!(batch.record_count().unwrap(), 2);
- let records: Vec<_> = batch.records().unwrap().collect();
- assert_eq!(records.len(), 2);
+ // Create ReadContext for reading
+ let read_context =
TestReadContext::compacted(vec![DataTypes::bytes()]);
- let record1 = records[0].as_ref().unwrap();
+ // Iterate and verify records using typed API
+ let records = batch.records(&read_context).unwrap();
+ let decoder = records.decoder_arc(); // Get Arc before consuming
+
+ let mut iter = records.into_iter();
+ let record1 = iter.next().unwrap().unwrap();
assert_eq!(record1.key().as_ref(), key1);
- assert_eq!(record1.value().unwrap().as_ref(), value1_writer.buffer());
+ assert!(!record1.is_deletion());
+ let row1 = record1.row(&*decoder).unwrap();
+ assert_eq!(row1.get_bytes(0), &[1, 2, 3, 4, 5]);
- let record2 = records[1].as_ref().unwrap();
+ let record2 = iter.next().unwrap().unwrap();
assert_eq!(record2.key().as_ref(), key2);
- assert!(record2.value().is_none());
+ assert!(record2.is_deletion());
+
+ assert!(iter.next().is_none());
}
}
diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs
b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
index c36a861..636104d 100644
--- a/crates/fluss/src/record/kv/kv_record_batch_builder.rs
+++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
@@ -330,22 +330,20 @@ mod tests {
}
#[test]
- fn test_builder_basic_workflow() {
+ fn test_builder_basic_operations() {
+ // Test basic workflow: initial state, writer state, append, close,
build
let schema_id = 42;
let write_limit = 4096;
let mut builder = KvRecordBatchBuilder::new(schema_id, write_limit,
KvFormat::COMPACTED);
- // Test initial state
assert!(!builder.is_closed());
assert_eq!(builder.writer_id(), NO_WRITER_ID);
assert_eq!(builder.batch_sequence(), NO_BATCH_SEQUENCE);
- // Test writer state
builder.set_writer_state(100, 5);
assert_eq!(builder.writer_id(), 100);
assert_eq!(builder.batch_sequence(), 5);
- // Test appending records
let key1 = b"key1";
let value1 = create_test_row(b"value1");
assert!(builder.has_room_for_row(key1, Some(&value1)));
@@ -355,7 +353,6 @@ mod tests {
assert!(builder.has_room_for_row::<CompactedRow>(key2, None));
builder.append_row::<CompactedRow>(key2, None).unwrap();
- // Test close and build
builder.close().unwrap();
assert!(builder.is_closed());
@@ -365,11 +362,8 @@ mod tests {
// Building again should return cached result
let bytes2 = builder.build().unwrap();
assert_eq!(bytes.len(), bytes2.len());
- }
- #[test]
- fn test_builder_lifecycle() {
- // Test abort behavior
+ // Test lifecycle: abort behavior
let mut builder = KvRecordBatchBuilder::new(1, 4096,
KvFormat::COMPACTED);
let value = create_test_row(b"value");
builder.append_row(b"key", Some(&value)).unwrap();
@@ -378,13 +372,30 @@ mod tests {
assert!(builder.build().is_err());
assert!(builder.close().is_err());
- // Test close behavior
+ // Test lifecycle: close behavior
let mut builder = KvRecordBatchBuilder::new(1, 4096,
KvFormat::COMPACTED);
let value = create_test_row(b"value");
builder.append_row(b"key", Some(&value)).unwrap();
builder.close().unwrap();
- assert!(builder.append_row::<CompactedRow>(b"key2", None).is_err());
// Can't append after close
- assert!(builder.build().is_ok()); // But can still build
+ assert!(builder.append_row::<CompactedRow>(b"key2", None).is_err());
+ assert!(builder.build().is_ok());
+
+ // Test KvFormat validation
+ let mut row_writer = CompactedRowWriter::new(1);
+ row_writer.write_int(42);
+ let row_type = RowType::with_data_types(vec![DataTypes::int()]);
+ let row = &CompactedRow::from_bytes(&row_type, row_writer.buffer());
+
+ // INDEXED format should reject append_row
+ let mut indexed_builder = KvRecordBatchBuilder::new(1, 4096,
KvFormat::INDEXED);
+ let result = indexed_builder.append_row(b"key", Some(row));
+ assert!(result.is_err());
+ assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput);
+
+ // COMPACTED format should accept append_row
+ let mut compacted_builder = KvRecordBatchBuilder::new(1, 4096,
KvFormat::COMPACTED);
+ let result = compacted_builder.append_row(b"key", Some(row));
+ assert!(result.is_ok());
}
#[test]
@@ -430,7 +441,10 @@ mod tests {
}
#[test]
- fn test_cache_invalidation_on_append() {
+ fn test_builder_cache_invalidation() {
+ use crate::record::kv::KvRecordBatch;
+
+ // Test cache invalidation on append
let mut builder = KvRecordBatchBuilder::new(1, 4096,
KvFormat::COMPACTED);
builder.set_writer_state(100, 5);
@@ -446,18 +460,13 @@ mod tests {
let len2 = bytes2.len();
// Verify the second build includes both records
- assert!(len2 > len1, "Second build should be larger");
-
- use crate::record::kv::KvRecordBatch;
+ assert!(len2 > len1);
let batch = KvRecordBatch::new(bytes2, 0);
assert!(batch.is_valid());
- assert_eq!(batch.record_count().unwrap(), 2, "Should have 2 records");
- }
+ assert_eq!(batch.record_count().unwrap(), 2);
- #[test]
- fn test_cache_invalidation_on_set_writer_state() {
+ // Test cache invalidation on writer state change
let mut builder = KvRecordBatchBuilder::new(1, 4096,
KvFormat::COMPACTED);
-
builder.set_writer_state(100, 5);
let value = create_test_row(b"value");
builder.append_row(b"key", Some(&value)).unwrap();
@@ -467,24 +476,19 @@ mod tests {
builder.set_writer_state(200, 10);
let bytes2 = builder.build().unwrap();
- assert_ne!(
- bytes1, bytes2,
- "Bytes should differ after writer state change"
- );
+ assert_ne!(bytes1, bytes2);
- use crate::record::kv::KvRecordBatch;
let batch1 = KvRecordBatch::new(bytes1, 0);
let batch2 = KvRecordBatch::new(bytes2, 0);
assert_eq!(batch1.writer_id().unwrap(), 100);
assert_eq!(batch1.batch_sequence().unwrap(), 5);
-
assert_eq!(batch2.writer_id().unwrap(), 200);
assert_eq!(batch2.batch_sequence().unwrap(), 10);
}
#[test]
- fn test_builder_with_compacted_row_writer() {
+ fn test_builder_with_compacted_row_writer() -> crate::error::Result<()> {
use crate::record::kv::KvRecordBatch;
use crate::row::InternalRow;
use crate::row::compacted::CompactedRow;
@@ -502,7 +506,7 @@ mod tests {
let key1 = b"key1";
assert!(builder.has_room_for_row(key1, Some(row1)));
- builder.append_row(key1, Some(row1)).unwrap();
+ builder.append_row(key1, Some(row1))?;
// Create and append second record
let mut row_writer2 = CompactedRowWriter::new(2);
@@ -512,63 +516,57 @@ mod tests {
let row2 = &CompactedRow::from_bytes(&row_type, row_writer2.buffer());
let key2 = b"key2";
- builder.append_row(key2, Some(row2)).unwrap();
+ builder.append_row(key2, Some(row2))?;
// Append a deletion record
let key3 = b"key3";
- builder.append_row::<CompactedRow>(key3, None).unwrap();
+ builder.append_row::<CompactedRow>(key3, None)?;
// Build and verify
- builder.close().unwrap();
- let bytes = builder.build().unwrap();
+ builder.close()?;
+ let bytes = builder.build()?;
let batch = KvRecordBatch::new(bytes, 0);
assert!(batch.is_valid());
- assert_eq!(batch.record_count().unwrap(), 3);
- assert_eq!(batch.writer_id().unwrap(), 100);
- assert_eq!(batch.batch_sequence().unwrap(), 5);
-
- // Read back and verify records
- let records: Vec<_> = batch.records().unwrap().collect();
- assert_eq!(records.len(), 3);
-
- // Verify first record
- let record1 = records[0].as_ref().unwrap();
- assert_eq!(record1.key().as_ref(), key1);
- let row1 = CompactedRow::from_bytes(&row_type,
record1.value().unwrap());
- assert_eq!(row1.get_int(0), 42);
- assert_eq!(row1.get_string(1), "hello");
-
- // Verify second record
- let record2 = records[1].as_ref().unwrap();
- assert_eq!(record2.key().as_ref(), key2);
- let row2 = CompactedRow::from_bytes(&row_type,
record2.value().unwrap());
- assert_eq!(row2.get_int(0), 100);
- assert_eq!(row2.get_string(1), "world");
-
- // Verify deletion record
- let record3 = records[2].as_ref().unwrap();
- assert_eq!(record3.key().as_ref(), key3);
- assert!(record3.value().is_none());
- }
-
- #[test]
- fn test_kv_format_validation() {
- let mut row_writer = CompactedRowWriter::new(1);
- row_writer.write_int(42);
-
- let row_type = RowType::with_data_types([DataTypes::int()].to_vec());
- let row = &CompactedRow::from_bytes(&row_type, row_writer.buffer());
-
- // INDEXED format should reject append_row
- let mut indexed_builder = KvRecordBatchBuilder::new(1, 4096,
KvFormat::INDEXED);
- let result = indexed_builder.append_row(b"key", Some(row));
- assert!(result.is_err());
- assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput);
+ assert_eq!(batch.record_count()?, 3);
+ assert_eq!(batch.writer_id()?, 100);
+ assert_eq!(batch.batch_sequence()?, 5);
+
+ // Create ReadContext for reading typed rows
+ let types = vec![DataTypes::int(), DataTypes::string()];
+ let read_context =
crate::record::kv::test_util::TestReadContext::compacted(types);
+
+ // Read back and verify records using idiomatic for-loop
+ let records = batch.records(&read_context)?;
+ let decoder = records.decoder_arc();
+ let mut record_count = 0;
+
+ for rec in records {
+ let rec = rec?;
+ record_count += 1;
+
+ match record_count {
+ 1 => {
+ assert_eq!(rec.key().as_ref(), key1);
+ let row = rec.row(&*decoder).unwrap();
+ assert_eq!(row.get_int(0), 42);
+ assert_eq!(row.get_string(1), "hello");
+ }
+ 2 => {
+ assert_eq!(rec.key().as_ref(), key2);
+ let row = rec.row(&*decoder).unwrap();
+ assert_eq!(row.get_int(0), 100);
+ assert_eq!(row.get_string(1), "world");
+ }
+ 3 => {
+ assert_eq!(rec.key().as_ref(), key3);
+ assert!(rec.is_deletion());
+ }
+ _ => panic!("Unexpected record count"),
+ }
+ }
- // COMPACTED format should accept append_row
- let mut compacted_builder = KvRecordBatchBuilder::new(1, 4096,
KvFormat::COMPACTED);
- let result = compacted_builder.append_row(b"key", Some(row));
- assert!(result.is_ok());
+ assert_eq!(record_count, 3);
+ Ok(())
}
}
diff --git a/crates/fluss/src/record/kv/kv_record_read_context.rs
b/crates/fluss/src/record/kv/kv_record_read_context.rs
new file mode 100644
index 0000000..2049c32
--- /dev/null
+++ b/crates/fluss/src/record/kv/kv_record_read_context.rs
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Default implementation of ReadContext with decoder caching.
+
+use super::ReadContext;
+use crate::error::{Error, Result};
+use crate::metadata::{KvFormat, Schema};
+use crate::row::{RowDecoder, RowDecoderFactory};
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+/// Trait for fetching schemas by ID.
+///
+/// This trait abstracts schema retrieval, allowing different implementations
+/// (e.g., from metadata store, cache, or test mocks).
+pub trait SchemaGetter: Send + Sync {
+ /// Get the schema for the given schema ID.
+ ///
+ /// # Arguments
+ /// * `schema_id` - The schema ID to fetch
+ ///
+ /// # Returns
+ /// An Arc-wrapped Schema for the specified ID, or an error if the schema
+ /// cannot be fetched (missing ID, network error, etc.)
+ fn get_schema(&self, schema_id: i16) -> Result<Arc<Schema>>;
+}
+
+/// Default implementation of ReadContext with decoder caching.
+///
+/// This implementation caches RowDecoders by schema ID for performance,
+/// avoiding repeated schema lookups and decoder creation.
+///
+/// Reference: org.apache.fluss.record.KvRecordReadContext
+pub struct KvRecordReadContext {
+ kv_format: KvFormat,
+ schema_getter: Arc<dyn SchemaGetter>,
+ row_decoder_cache: Mutex<HashMap<i16, Arc<dyn RowDecoder>>>,
+}
+
+impl KvRecordReadContext {
+ /// Create a new KvRecordReadContext.
+ ///
+ /// # Arguments
+ /// * `kv_format` - The KV format (COMPACTED or INDEXED)
+ /// * `schema_getter` - The schema getter for fetching schemas by ID
+ ///
+ /// # Returns
+ /// A new KvRecordReadContext instance
+ pub fn new(kv_format: KvFormat, schema_getter: Arc<dyn SchemaGetter>) ->
Self {
+ Self {
+ kv_format,
+ schema_getter,
+ row_decoder_cache: Mutex::new(HashMap::new()),
+ }
+ }
+}
+
+impl ReadContext for KvRecordReadContext {
+ fn get_row_decoder(&self, schema_id: i16) -> Result<Arc<dyn RowDecoder>> {
+ // First check: fast path
+ {
+ let cache = self
+ .row_decoder_cache
+ .lock()
+ .unwrap_or_else(|poisoned| poisoned.into_inner());
+ if let Some(decoder) = cache.get(&schema_id) {
+ return Ok(Arc::clone(decoder));
+ }
+ } // Release lock before expensive operations
+
+ // Build decoder outside the lock to avoid blocking other threads
+ let schema = self.schema_getter.get_schema(schema_id)?;
+ let row_type = match schema.row_type() {
+ crate::metadata::DataType::Row(row_type) => row_type.clone(),
+ other => {
+ return Err(Error::IoUnexpectedError {
+ message: format!(
+ "Schema {} has invalid row type: expected Row, got
{:?}",
+ schema_id, other
+ ),
+ source: std::io::Error::new(
+ std::io::ErrorKind::InvalidData,
+ "Invalid row type",
+ ),
+ });
+ }
+ };
+
+ // Create decoder outside lock
+ let decoder = RowDecoderFactory::create(self.kv_format.clone(),
row_type)?;
+
+ // Second check: insert only if another thread didn't beat us to it
+ {
+ let mut cache = self
+ .row_decoder_cache
+ .lock()
+ .unwrap_or_else(|poisoned| poisoned.into_inner());
+ // Check again - another thread might have inserted while we were
building
+ if let Some(existing) = cache.get(&schema_id) {
+ return Ok(Arc::clone(existing));
+ }
+ cache.insert(schema_id, Arc::clone(&decoder));
+ }
+
+ Ok(decoder)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::metadata::{DataTypes, Schema};
+
+ struct MockSchemaGetter {
+ schema: Arc<Schema>,
+ }
+
+ impl MockSchemaGetter {
+ fn new(data_types: Vec<crate::metadata::DataType>) -> Self {
+ let mut builder = Schema::builder();
+ for (i, dt) in data_types.iter().enumerate() {
+ builder = builder.column(&format!("field{}", i), dt.clone());
+ }
+ let schema = builder.build().expect("Failed to build schema");
+
+ Self {
+ schema: Arc::new(schema),
+ }
+ }
+ }
+
+ impl SchemaGetter for MockSchemaGetter {
+ fn get_schema(&self, _schema_id: i16) -> Result<Arc<Schema>> {
+ Ok(Arc::clone(&self.schema))
+ }
+ }
+
+ #[test]
+ fn test_kv_record_read_context() {
+ // Test decoder caching for same schema ID
+ let schema_getter = Arc::new(MockSchemaGetter::new(vec![
+ DataTypes::int(),
+ DataTypes::string(),
+ ]));
+ let read_context = KvRecordReadContext::new(KvFormat::COMPACTED,
schema_getter);
+
+ // Get decoder twice - should return the same instance (cached)
+ let decoder1 = read_context.get_row_decoder(42).unwrap();
+ let decoder2 = read_context.get_row_decoder(42).unwrap();
+
+ // Verify same instance (Arc pointer equality)
+ assert!(Arc::ptr_eq(&decoder1, &decoder2));
+
+ // Test different schema IDs get different decoders
+ let schema_getter =
Arc::new(MockSchemaGetter::new(vec![DataTypes::int()]));
+ let read_context = KvRecordReadContext::new(KvFormat::COMPACTED,
schema_getter);
+
+ let decoder1 = read_context.get_row_decoder(10).unwrap();
+ let decoder2 = read_context.get_row_decoder(20).unwrap();
+
+ // Should be different instances
+ assert!(!Arc::ptr_eq(&decoder1, &decoder2));
+ }
+}
diff --git a/crates/fluss/src/record/kv/mod.rs
b/crates/fluss/src/record/kv/mod.rs
index ecb762d..857c5e5 100644
--- a/crates/fluss/src/record/kv/mod.rs
+++ b/crates/fluss/src/record/kv/mod.rs
@@ -20,10 +20,17 @@
mod kv_record;
mod kv_record_batch;
mod kv_record_batch_builder;
+mod kv_record_read_context;
+mod read_context;
+
+#[cfg(test)]
+mod test_util;
pub use kv_record::{KvRecord, LENGTH_LENGTH as KV_RECORD_LENGTH_LENGTH};
pub use kv_record_batch::*;
pub use kv_record_batch_builder::*;
+pub use kv_record_read_context::{KvRecordReadContext, SchemaGetter};
+pub use read_context::ReadContext;
/// Current KV magic value
pub const CURRENT_KV_MAGIC_VALUE: u8 = 0;
diff --git a/crates/fluss/src/record/kv/read_context.rs
b/crates/fluss/src/record/kv/read_context.rs
new file mode 100644
index 0000000..6350261
--- /dev/null
+++ b/crates/fluss/src/record/kv/read_context.rs
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Read context for KV record batches.
+//!
+//! Provides schema and decoder information needed for typed record reading.
+
+use crate::error::Result;
+use crate::row::RowDecoder;
+use std::sync::Arc;
+
+/// Context for reading KV records with type information.
+///
+/// The ReadContext provides access to RowDecoders based on schema IDs,
+/// enabling typed deserialization of KV record values.
+///
+/// Reference: org.apache.fluss.record.KvRecordBatch.ReadContext
+pub trait ReadContext: Send + Sync {
+ /// Get the row decoder for the given schema ID.
+ ///
+ /// The decoder is typically cached, so repeated calls with the same
+ /// schema ID should return the same decoder instance.
+ ///
+ /// # Arguments
+ /// * `schema_id` - The schema ID for which to get the decoder
+ ///
+ /// # Returns
+ /// An Arc-wrapped RowDecoder for the specified schema, or an error if
+ /// the schema is invalid or cannot be retrieved
+ fn get_row_decoder(&self, schema_id: i16) -> Result<Arc<dyn RowDecoder>>;
+}
diff --git a/crates/fluss/src/record/kv/test_util.rs
b/crates/fluss/src/record/kv/test_util.rs
new file mode 100644
index 0000000..50ab911
--- /dev/null
+++ b/crates/fluss/src/record/kv/test_util.rs
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Test utilities for KV record reading.
+
+use super::ReadContext;
+use crate::error::Result;
+use crate::metadata::{DataType, KvFormat, RowType};
+use crate::row::{RowDecoder, RowDecoderFactory};
+use std::sync::Arc;
+
+/// Simple test-only ReadContext that creates decoders directly from data
types.
+///
+/// This bypasses the production Schema/SchemaGetter machinery for simpler
tests.
+pub(crate) struct TestReadContext {
+ kv_format: KvFormat,
+ data_types: Vec<DataType>,
+}
+
+impl TestReadContext {
+ /// Create a test context for COMPACTED format (most common case).
+ pub(crate) fn compacted(data_types: Vec<DataType>) -> Self {
+ Self {
+ kv_format: KvFormat::COMPACTED,
+ data_types,
+ }
+ }
+}
+
+impl ReadContext for TestReadContext {
+ fn get_row_decoder(&self, _schema_id: i16) -> Result<Arc<dyn RowDecoder>> {
+ // Directly create decoder from data types - no Schema needed!
+ let row_type = RowType::with_data_types(self.data_types.clone());
+ RowDecoderFactory::create(self.kv_format.clone(), row_type)
+ }
+}
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index 3477f1d..536409e 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -23,11 +23,13 @@ pub mod binary;
pub mod compacted;
pub mod encode;
mod field_getter;
+mod row_decoder;
pub use column::*;
pub use compacted::CompactedRow;
pub use datum::*;
pub use encode::KeyEncoder;
+pub use row_decoder::{CompactedRowDecoder, RowDecoder, RowDecoderFactory};
pub trait BinaryRow: InternalRow {
/// Returns the binary representation of this row as a byte slice.
diff --git a/crates/fluss/src/row/row_decoder.rs
b/crates/fluss/src/row/row_decoder.rs
new file mode 100644
index 0000000..9f9b421
--- /dev/null
+++ b/crates/fluss/src/row/row_decoder.rs
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Row decoder for deserializing binary row formats.
+//!
+//! Mirrors the Java org.apache.fluss.row.decode package.
+
+use crate::error::{Error, Result};
+use crate::metadata::{KvFormat, RowType};
+use crate::row::compacted::{CompactedRow, CompactedRowDeserializer};
+use std::sync::Arc;
+
+/// Decoder for creating BinaryRow from bytes.
+///
+/// This trait provides an abstraction for decoding different row formats
+/// (COMPACTED, INDEXED, etc.) from binary data.
+///
+/// Reference: org.apache.fluss.row.decode.RowDecoder
+pub trait RowDecoder: Send + Sync {
+ /// Decode bytes into a CompactedRow.
+ ///
+ /// The lifetime 'a ties the returned row to the input data, ensuring
+ /// the data remains valid as long as the row is used.
+ fn decode<'a>(&self, data: &'a [u8]) -> CompactedRow<'a>;
+}
+
+/// Decoder for CompactedRow format.
+///
+/// Uses the existing CompactedRow infrastructure for decoding.
+/// This is a thin wrapper that implements the RowDecoder trait.
+///
+/// Reference: org.apache.fluss.row.decode.CompactedRowDecoder
+pub struct CompactedRowDecoder {
+ field_count: usize,
+ deserializer: Arc<CompactedRowDeserializer<'static>>,
+}
+
+impl CompactedRowDecoder {
+ /// Create a new CompactedRowDecoder with the given row type.
+ pub fn new(row_type: RowType) -> Self {
+ let field_count = row_type.fields().len();
+ let deserializer =
Arc::new(CompactedRowDeserializer::new_from_owned(row_type));
+
+ Self {
+ field_count,
+ deserializer,
+ }
+ }
+}
+
+impl RowDecoder for CompactedRowDecoder {
+ fn decode<'a>(&self, data: &'a [u8]) -> CompactedRow<'a> {
+ // Use existing CompactedRow::deserialize() infrastructure
+ CompactedRow::deserialize(Arc::clone(&self.deserializer),
self.field_count, data)
+ }
+}
+
+/// Factory for creating RowDecoders based on KvFormat.
+///
+/// Reference: org.apache.fluss.row.decode.RowDecoder.create()
+pub struct RowDecoderFactory;
+
+impl RowDecoderFactory {
+ /// Create a RowDecoder for the given format and row type.
+ pub fn create(kv_format: KvFormat, row_type: RowType) -> Result<Arc<dyn
RowDecoder>> {
+ match kv_format {
+ KvFormat::COMPACTED =>
Ok(Arc::new(CompactedRowDecoder::new(row_type))),
+ KvFormat::INDEXED => Err(Error::UnsupportedOperation {
+ message: "INDEXED format is not yet supported".to_string(),
+ }),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::metadata::DataTypes;
+ use crate::row::InternalRow;
+ use crate::row::binary::BinaryWriter;
+ use crate::row::compacted::CompactedRowWriter;
+
+ #[test]
+ fn test_compacted_row_decoder() {
+ // Write a CompactedRow
+ let mut writer = CompactedRowWriter::new(2);
+ writer.write_int(42);
+ writer.write_string("hello");
+
+ let data = writer.to_bytes();
+
+ // Create decoder with RowType
+ let row_type = RowType::with_data_types(vec![DataTypes::int(),
DataTypes::string()]);
+ let decoder = CompactedRowDecoder::new(row_type);
+
+ // Decode
+ let row = decoder.decode(&data);
+
+ // Verify
+ assert_eq!(row.get_field_count(), 2);
+ assert_eq!(row.get_int(0), 42);
+ assert_eq!(row.get_string(1), "hello");
+ }
+
+ #[test]
+ fn test_row_decoder_factory() {
+ let row_type = RowType::with_data_types(vec![DataTypes::int(),
DataTypes::string()]);
+ let decoder = RowDecoderFactory::create(KvFormat::COMPACTED,
row_type).unwrap();
+
+ // Write a row
+ let mut writer = CompactedRowWriter::new(2);
+ writer.write_int(100);
+ writer.write_string("world");
+ let data = writer.to_bytes();
+
+ // Decode
+ let row = decoder.decode(&data);
+
+ // Verify
+ assert_eq!(row.get_int(0), 100);
+ assert_eq!(row.get_string(1), "world");
+ }
+}