This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new f4808d8  feat: Introduce KVReadContext and read path wiring (#174)
f4808d8 is described below

commit f4808d81cc5e5788815f13cd49a012f9a3d71528
Author: Anton Borisov <[email protected]>
AuthorDate: Sat Jan 17 15:41:12 2026 +0000

    feat: Introduce KVReadContext and read path wiring (#174)
---
 crates/fluss/src/record/kv/kv_record.rs            | 166 ++++++++++---------
 crates/fluss/src/record/kv/kv_record_batch.rs      | 115 ++++++++++---
 .../fluss/src/record/kv/kv_record_batch_builder.rs | 154 +++++++++---------
 .../fluss/src/record/kv/kv_record_read_context.rs  | 179 +++++++++++++++++++++
 crates/fluss/src/record/kv/mod.rs                  |   7 +
 crates/fluss/src/record/kv/read_context.rs         |  45 ++++++
 crates/fluss/src/record/kv/test_util.rs            |  50 ++++++
 crates/fluss/src/row/mod.rs                        |   2 +
 crates/fluss/src/row/row_decoder.rs                | 137 ++++++++++++++++
 9 files changed, 674 insertions(+), 181 deletions(-)

diff --git a/crates/fluss/src/record/kv/kv_record.rs 
b/crates/fluss/src/record/kv/kv_record.rs
index ab8c2ac..a9c45d6 100644
--- a/crates/fluss/src/record/kv/kv_record.rs
+++ b/crates/fluss/src/record/kv/kv_record.rs
@@ -27,6 +27,8 @@
 use bytes::{BufMut, Bytes, BytesMut};
 use std::io;
 
+use crate::row::RowDecoder;
+use crate::row::compacted::CompactedRow;
 use crate::util::varint::{
     read_unsigned_varint_bytes, size_of_unsigned_varint, 
write_unsigned_varint_buf,
 };
@@ -34,7 +36,7 @@ use crate::util::varint::{
 /// Length field size in bytes
 pub const LENGTH_LENGTH: usize = 4;
 
-/// A key-value record.
+/// A key-value record containing raw key and value bytes.
 ///
 /// The schema is:
 /// - Length => Int32
@@ -43,34 +45,39 @@ pub const LENGTH_LENGTH: usize = 4;
 /// - Value => bytes (BinaryRow, written directly without length prefix)
 ///
 /// When the value is None (deletion), no Value bytes are present.
-// Reference implementation:
-// 
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/record/KvRecord.java
+///
+/// This struct stores only raw bytes. To decode the value into a typed row,
+/// use the `row()` method with a RowDecoder (typically obtained from the 
iterator).
+///
+/// Reference implementation:
+/// 
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/record/KvRecord.java
 #[derive(Debug, Clone)]
 pub struct KvRecord {
     key: Bytes,
-    value: Option<Bytes>,
+    value_bytes: Option<Bytes>,
     size_in_bytes: usize,
 }
 
 impl KvRecord {
-    /// Create a new KvRecord with the given key and optional value.
-    pub fn new(key: Bytes, value: Option<Bytes>) -> Self {
-        let size_in_bytes = Self::size_of(&key, value.as_deref());
-        Self {
-            key,
-            value,
-            size_in_bytes,
-        }
-    }
-
     /// Get the key bytes.
     pub fn key(&self) -> &Bytes {
         &self.key
     }
 
-    /// Get the value bytes (None indicates a deletion).
-    pub fn value(&self) -> Option<&Bytes> {
-        self.value.as_ref()
+    /// Get the raw value bytes (for testing).
+    #[cfg(test)]
+    pub(crate) fn value_bytes(&self) -> Option<&Bytes> {
+        self.value_bytes.as_ref()
+    }
+
+    /// Decode the value bytes into a typed row using the provided decoder.
+    /// This creates a lightweight CompactedRow view over the raw bytes.
+    /// Actual field parsing is lazy (on first access).
+    pub fn row<'a>(&'a self, decoder: &dyn RowDecoder) -> 
Option<CompactedRow<'a>> {
+        self.value_bytes.as_ref().map(|bytes| {
+            // Decode on-demand - CompactedRow<'a> lifetime tied to &'a self
+            decoder.decode(bytes.as_ref())
+        })
     }
 
     /// Calculate the total size of the record when serialized (including 
length prefix).
@@ -121,8 +128,7 @@ impl KvRecord {
     /// Read a KV record from bytes at the given position.
     ///
     /// Returns the KvRecord and the number of bytes consumed.
-    ///
-    /// TODO: Connect KvReadContext and return CompactedRow records.
+    /// The record contains only raw bytes; use `row()` with a RowDecoder to 
decode the value.
     pub fn read_from(bytes: &Bytes, position: usize) -> io::Result<(Self, 
usize)> {
         if bytes.len() < position.saturating_add(LENGTH_LENGTH) {
             return Err(io::Error::new(
@@ -183,11 +189,10 @@ impl KvRecord {
         let key = bytes.slice(current_offset..key_end);
         current_offset = key_end;
 
-        // Read value bytes directly
-        let value = if current_offset < record_end {
+        // Read value bytes directly (don't decode yet - will decode on-demand)
+        let value_bytes = if current_offset < record_end {
             // Value is present: all remaining bytes are the value
-            let value_bytes = bytes.slice(current_offset..record_end);
-            Some(value_bytes)
+            Some(bytes.slice(current_offset..record_end))
         } else {
             // No remaining bytes: this is a deletion record
             None
@@ -196,7 +201,7 @@ impl KvRecord {
         Ok((
             Self {
                 key,
-                value,
+                value_bytes,
                 size_in_bytes: total_size,
             },
             total_size,
@@ -207,6 +212,11 @@ impl KvRecord {
     pub fn get_size_in_bytes(&self) -> usize {
         self.size_in_bytes
     }
+
+    /// Check if this is a deletion record (no value).
+    pub fn is_deletion(&self) -> bool {
+        self.value_bytes.is_none()
+    }
 }
 
 #[cfg(test)]
@@ -214,30 +224,25 @@ mod tests {
     use super::*;
 
     #[test]
-    fn test_kv_record_size_calculation() {
+    fn test_kv_record_basic_operations() {
         let key = b"test_key";
         let value = b"test_value";
 
-        // With value (no value length varint)
+        // Test size calculation with value
         let size_with_value = KvRecord::size_of(key, Some(value));
         assert_eq!(
             size_with_value,
             LENGTH_LENGTH + size_of_unsigned_varint(key.len() as u32) + 
key.len() + value.len()
         );
 
-        // Without value
+        // Test size calculation without value (deletion)
         let size_without_value = KvRecord::size_of(key, None);
         assert_eq!(
             size_without_value,
             LENGTH_LENGTH + size_of_unsigned_varint(key.len() as u32) + 
key.len()
         );
-    }
-
-    #[test]
-    fn test_kv_record_write_read_round_trip() {
-        let key = b"my_key";
-        let value = b"my_value_data";
 
+        // Test write/read round trip with value
         let mut buf = BytesMut::new();
         let written = KvRecord::write_to_buf(&mut buf, key, 
Some(value)).unwrap();
 
@@ -246,40 +251,70 @@ mod tests {
 
         assert_eq!(written, read_size);
         assert_eq!(record.key().as_ref(), key);
-        assert_eq!(record.value().unwrap().as_ref(), value);
+        assert_eq!(record.value_bytes().unwrap().as_ref(), value);
         assert_eq!(record.get_size_in_bytes(), written);
-    }
-
-    #[test]
-    fn test_kv_record_deletion() {
-        let key = b"delete_me";
+        assert!(!record.is_deletion());
 
-        // Write deletion record (no value)
+        // Test deletion record (no value)
+        let delete_key = b"delete_me";
         let mut buf = BytesMut::new();
-        let written = KvRecord::write_to_buf(&mut buf, key, None).unwrap();
+        let written = KvRecord::write_to_buf(&mut buf, delete_key, 
None).unwrap();
 
         let bytes = buf.freeze();
         let (record, read_size) = KvRecord::read_from(&bytes, 0).unwrap();
 
         assert_eq!(written, read_size);
-        assert_eq!(record.key().as_ref(), key);
-        assert!(record.value().is_none());
+        assert_eq!(record.key().as_ref(), delete_key);
+        assert!(record.is_deletion());
+        assert!(record.value_bytes().is_none());
     }
 
     #[test]
-    fn test_kv_record_with_large_key() {
-        let key = vec![0u8; 1024];
-        let value = vec![1u8; 4096];
+    fn test_kv_record_multiple_records() {
+        // Test multiple regular-sized records in buffer
+        let records = vec![
+            (b"key1".as_slice(), Some(b"value1".as_slice())),
+            (b"key2".as_slice(), None), // Deletion
+            (b"key3".as_slice(), Some(b"value3".as_slice())),
+        ];
 
         let mut buf = BytesMut::new();
-        let written = KvRecord::write_to_buf(&mut buf, &key, 
Some(&value)).unwrap();
+        for (key, value) in &records {
+            KvRecord::write_to_buf(&mut buf, key, *value).unwrap();
+        }
+
+        let bytes = buf.freeze();
+        let mut offset = 0;
+        for (expected_key, expected_value) in &records {
+            let (record, size) = KvRecord::read_from(&bytes, offset).unwrap();
+            assert_eq!(record.key().as_ref(), *expected_key);
+            match expected_value {
+                Some(v) => {
+                    assert_eq!(record.value_bytes().unwrap().as_ref(), *v);
+                    assert!(!record.is_deletion());
+                }
+                None => {
+                    assert!(record.is_deletion());
+                    assert!(record.value_bytes().is_none());
+                }
+            }
+            offset += size;
+        }
+        assert_eq!(offset, bytes.len());
+
+        // Test large keys and values
+        let large_key = vec![0u8; 1024];
+        let large_value = vec![1u8; 4096];
+
+        let mut buf = BytesMut::new();
+        let written = KvRecord::write_to_buf(&mut buf, &large_key, 
Some(&large_value)).unwrap();
 
         let bytes = buf.freeze();
         let (record, read_size) = KvRecord::read_from(&bytes, 0).unwrap();
 
         assert_eq!(written, read_size);
-        assert_eq!(record.key().len(), key.len());
-        assert_eq!(record.value().unwrap().len(), value.len());
+        assert_eq!(record.key().len(), large_key.len());
+        assert_eq!(record.value_bytes().unwrap().len(), large_value.len());
     }
 
     #[test]
@@ -291,7 +326,9 @@ mod tests {
         let bytes = buf.freeze();
         let result = KvRecord::read_from(&bytes, 0);
         assert!(result.is_err());
-        assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidData);
+        if let Err(e) = result {
+            assert_eq!(e.kind(), io::ErrorKind::InvalidData);
+        }
 
         // Test overflow length
         let mut buf = BytesMut::new();
@@ -307,33 +344,8 @@ mod tests {
         let bytes = buf.freeze();
         let result = KvRecord::read_from(&bytes, 0);
         assert!(result.is_err());
-        assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof);
-    }
-
-    #[test]
-    fn test_multiple_records_in_buffer() {
-        let records = vec![
-            (b"key1".as_slice(), Some(b"value1".as_slice())),
-            (b"key2".as_slice(), None),
-            (b"key3".as_slice(), Some(b"value3".as_slice())),
-        ];
-
-        let mut buf = BytesMut::new();
-        for (key, value) in &records {
-            KvRecord::write_to_buf(&mut buf, key, *value).unwrap();
+        if let Err(e) = result {
+            assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
         }
-
-        let bytes = buf.freeze();
-        let mut offset = 0;
-        for (expected_key, expected_value) in &records {
-            let (record, size) = KvRecord::read_from(&bytes, offset).unwrap();
-            assert_eq!(record.key().as_ref(), *expected_key);
-            match expected_value {
-                Some(v) => assert_eq!(record.value().unwrap().as_ref(), *v),
-                None => assert!(record.value().is_none()),
-            }
-            offset += size;
-        }
-        assert_eq!(offset, bytes.len());
     }
 }
diff --git a/crates/fluss/src/record/kv/kv_record_batch.rs 
b/crates/fluss/src/record/kv/kv_record_batch.rs
index eb3c09a..32f712f 100644
--- a/crates/fluss/src/record/kv/kv_record_batch.rs
+++ b/crates/fluss/src/record/kv/kv_record_batch.rs
@@ -32,8 +32,11 @@
 
 use bytes::Bytes;
 use std::io;
+use std::sync::Arc;
 
-use crate::record::kv::KvRecord;
+use crate::error::Result;
+use crate::record::kv::{KvRecord, ReadContext};
+use crate::row::RowDecoder;
 
 // Field lengths in bytes
 pub const LENGTH_LENGTH: usize = 4;
@@ -253,38 +256,87 @@ impl KvRecordBatch {
         ]))
     }
 
-    /// Create an iterator over the records in this batch.
-    /// This validates the batch checksum before returning the iterator.
+    /// Create an iterable collection of records in this batch.
+    ///
+    /// This validates the batch checksum before returning the records.
     /// For trusted data paths, use `records_unchecked()` to skip validation.
-    pub fn records(&self) -> io::Result<KvRecordIterator> {
+    ///
+    /// Mirrors: KvRecordBatch.records(ReadContext)
+    pub fn records(&self, read_context: &dyn ReadContext) -> Result<KvRecords> 
{
         if !self.is_valid() {
-            return Err(io::Error::new(
-                io::ErrorKind::InvalidData,
-                "Invalid batch checksum",
-            ));
+            return Err(crate::error::Error::IoUnexpectedError {
+                message: "Invalid batch checksum".to_string(),
+                source: io::Error::new(io::ErrorKind::InvalidData, "Invalid 
batch checksum"),
+            });
         }
-        self.records_unchecked()
+        self.records_unchecked(read_context)
     }
 
-    /// Create an iterator over the records in this batch without validating 
the checksum
-    pub fn records_unchecked(&self) -> io::Result<KvRecordIterator> {
+    /// Create an iterable collection of records in this batch without 
validating the checksum.
+    pub fn records_unchecked(&self, read_context: &dyn ReadContext) -> 
Result<KvRecords> {
         let size = self.size_in_bytes()?;
         let count = self.record_count()?;
+        let schema_id = self.schema_id()?;
+
         if count < 0 {
-            return Err(io::Error::new(
-                io::ErrorKind::InvalidData,
-                format!("Invalid record count: {count}"),
-            ));
+            return Err(crate::error::Error::IoUnexpectedError {
+                message: format!("Invalid record count: {count}"),
+                source: io::Error::new(io::ErrorKind::InvalidData, "Invalid 
record count"),
+            });
         }
-        Ok(KvRecordIterator {
-            data: self.data.clone(),
-            position: self.position + RECORDS_OFFSET,
-            end: self.position + size,
-            remaining_count: count,
+
+        // Get row decoder for this schema from context (cached)
+        let row_decoder = read_context.get_row_decoder(schema_id)?;
+
+        Ok(KvRecords {
+            iter: KvRecordIterator {
+                data: self.data.clone(),
+                position: self.position + RECORDS_OFFSET,
+                end: self.position + size,
+                remaining_count: count,
+            },
+            row_decoder,
         })
     }
 }
 
+/// Iterable collection of KV records with associated decoder.
+///
+/// This wrapper provides both iteration capability and access to the row 
decoder
+/// needed to decode record values into typed rows.
+pub struct KvRecords {
+    iter: KvRecordIterator,
+    row_decoder: Arc<dyn RowDecoder>,
+}
+
+impl KvRecords {
+    /// Get a reference to the row decoder for decoding record values.
+    ///
+    /// Returns a reference tied to the lifetime of `&self`.
+    /// Use this when iterating by reference.
+    pub fn decoder(&self) -> &dyn RowDecoder {
+        &*self.row_decoder
+    }
+
+    /// Get an owned Arc to the row decoder.
+    ///
+    /// Returns a cloned Arc that can outlive the KvRecords,
+    /// allowing you to grab it before consuming the iterator.
+    /// Useful if you must keep the decoder beyond the iterable’s 
lifetime(collect then decode style)
+    pub fn decoder_arc(&self) -> Arc<dyn RowDecoder> {
+        Arc::clone(&self.row_decoder)
+    }
+}
+
+impl IntoIterator for KvRecords {
+    type Item = io::Result<KvRecord>;
+    type IntoIter = KvRecordIterator;
+
+    fn into_iter(self) -> Self::IntoIter {
+        self.iter
+    }
+}
+
 /// Iterator over records in a KV record batch.
 pub struct KvRecordIterator {
     data: Bytes,
@@ -319,7 +371,9 @@ impl Iterator for KvRecordIterator {
 mod tests {
     use super::*;
     use crate::metadata::{DataTypes, KvFormat, RowType};
+    use crate::record::kv::test_util::TestReadContext;
     use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, KvRecordBatchBuilder};
+    use crate::row::InternalRow;
     use crate::row::binary::BinaryWriter;
     use crate::row::compacted::CompactedRow;
     use bytes::{BufMut, BytesMut};
@@ -380,15 +434,24 @@ mod tests {
         assert_eq!(batch.batch_sequence().unwrap(), 5);
         assert_eq!(batch.record_count().unwrap(), 2);
 
-        let records: Vec<_> = batch.records().unwrap().collect();
-        assert_eq!(records.len(), 2);
+        // Create ReadContext for reading
+        let read_context = 
TestReadContext::compacted(vec![DataTypes::bytes()]);
 
-        let record1 = records[0].as_ref().unwrap();
+        // Iterate and verify records using typed API
+        let records = batch.records(&read_context).unwrap();
+        let decoder = records.decoder_arc(); // Get Arc before consuming
+
+        let mut iter = records.into_iter();
+        let record1 = iter.next().unwrap().unwrap();
         assert_eq!(record1.key().as_ref(), key1);
-        assert_eq!(record1.value().unwrap().as_ref(), value1_writer.buffer());
+        assert!(!record1.is_deletion());
+        let row1 = record1.row(&*decoder).unwrap();
+        assert_eq!(row1.get_bytes(0), &[1, 2, 3, 4, 5]);
 
-        let record2 = records[1].as_ref().unwrap();
+        let record2 = iter.next().unwrap().unwrap();
         assert_eq!(record2.key().as_ref(), key2);
-        assert!(record2.value().is_none());
+        assert!(record2.is_deletion());
+
+        assert!(iter.next().is_none());
     }
 }
diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs 
b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
index c36a861..636104d 100644
--- a/crates/fluss/src/record/kv/kv_record_batch_builder.rs
+++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
@@ -330,22 +330,20 @@ mod tests {
     }
 
     #[test]
-    fn test_builder_basic_workflow() {
+    fn test_builder_basic_operations() {
+        // Test basic workflow: initial state, writer state, append, close, 
build
         let schema_id = 42;
         let write_limit = 4096;
         let mut builder = KvRecordBatchBuilder::new(schema_id, write_limit, 
KvFormat::COMPACTED);
 
-        // Test initial state
         assert!(!builder.is_closed());
         assert_eq!(builder.writer_id(), NO_WRITER_ID);
         assert_eq!(builder.batch_sequence(), NO_BATCH_SEQUENCE);
 
-        // Test writer state
         builder.set_writer_state(100, 5);
         assert_eq!(builder.writer_id(), 100);
         assert_eq!(builder.batch_sequence(), 5);
 
-        // Test appending records
         let key1 = b"key1";
         let value1 = create_test_row(b"value1");
         assert!(builder.has_room_for_row(key1, Some(&value1)));
@@ -355,7 +353,6 @@ mod tests {
         assert!(builder.has_room_for_row::<CompactedRow>(key2, None));
         builder.append_row::<CompactedRow>(key2, None).unwrap();
 
-        // Test close and build
         builder.close().unwrap();
         assert!(builder.is_closed());
 
@@ -365,11 +362,8 @@ mod tests {
         // Building again should return cached result
         let bytes2 = builder.build().unwrap();
         assert_eq!(bytes.len(), bytes2.len());
-    }
 
-    #[test]
-    fn test_builder_lifecycle() {
-        // Test abort behavior
+        // Test lifecycle: abort behavior
         let mut builder = KvRecordBatchBuilder::new(1, 4096, 
KvFormat::COMPACTED);
         let value = create_test_row(b"value");
         builder.append_row(b"key", Some(&value)).unwrap();
@@ -378,13 +372,30 @@ mod tests {
         assert!(builder.build().is_err());
         assert!(builder.close().is_err());
 
-        // Test close behavior
+        // Test lifecycle: close behavior
         let mut builder = KvRecordBatchBuilder::new(1, 4096, 
KvFormat::COMPACTED);
         let value = create_test_row(b"value");
         builder.append_row(b"key", Some(&value)).unwrap();
         builder.close().unwrap();
-        assert!(builder.append_row::<CompactedRow>(b"key2", None).is_err()); 
// Can't append after close
-        assert!(builder.build().is_ok()); // But can still build
+        assert!(builder.append_row::<CompactedRow>(b"key2", None).is_err());
+        assert!(builder.build().is_ok());
+
+        // Test KvFormat validation
+        let mut row_writer = CompactedRowWriter::new(1);
+        row_writer.write_int(42);
+        let row_type = RowType::with_data_types(vec![DataTypes::int()]);
+        let row = &CompactedRow::from_bytes(&row_type, row_writer.buffer());
+
+        // INDEXED format should reject append_row
+        let mut indexed_builder = KvRecordBatchBuilder::new(1, 4096, 
KvFormat::INDEXED);
+        let result = indexed_builder.append_row(b"key", Some(row));
+        assert!(result.is_err());
+        assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput);
+
+        // COMPACTED format should accept append_row
+        let mut compacted_builder = KvRecordBatchBuilder::new(1, 4096, 
KvFormat::COMPACTED);
+        let result = compacted_builder.append_row(b"key", Some(row));
+        assert!(result.is_ok());
     }
 
     #[test]
@@ -430,7 +441,10 @@ mod tests {
     }
 
     #[test]
-    fn test_cache_invalidation_on_append() {
+    fn test_builder_cache_invalidation() {
+        use crate::record::kv::KvRecordBatch;
+
+        // Test cache invalidation on append
         let mut builder = KvRecordBatchBuilder::new(1, 4096, 
KvFormat::COMPACTED);
         builder.set_writer_state(100, 5);
 
@@ -446,18 +460,13 @@ mod tests {
         let len2 = bytes2.len();
 
         // Verify the second build includes both records
-        assert!(len2 > len1, "Second build should be larger");
-
-        use crate::record::kv::KvRecordBatch;
+        assert!(len2 > len1);
         let batch = KvRecordBatch::new(bytes2, 0);
         assert!(batch.is_valid());
-        assert_eq!(batch.record_count().unwrap(), 2, "Should have 2 records");
-    }
+        assert_eq!(batch.record_count().unwrap(), 2);
 
-    #[test]
-    fn test_cache_invalidation_on_set_writer_state() {
+        // Test cache invalidation on writer state change
         let mut builder = KvRecordBatchBuilder::new(1, 4096, 
KvFormat::COMPACTED);
-
         builder.set_writer_state(100, 5);
         let value = create_test_row(b"value");
         builder.append_row(b"key", Some(&value)).unwrap();
@@ -467,24 +476,19 @@ mod tests {
         builder.set_writer_state(200, 10);
         let bytes2 = builder.build().unwrap();
 
-        assert_ne!(
-            bytes1, bytes2,
-            "Bytes should differ after writer state change"
-        );
+        assert_ne!(bytes1, bytes2);
 
-        use crate::record::kv::KvRecordBatch;
         let batch1 = KvRecordBatch::new(bytes1, 0);
         let batch2 = KvRecordBatch::new(bytes2, 0);
 
         assert_eq!(batch1.writer_id().unwrap(), 100);
         assert_eq!(batch1.batch_sequence().unwrap(), 5);
-
         assert_eq!(batch2.writer_id().unwrap(), 200);
         assert_eq!(batch2.batch_sequence().unwrap(), 10);
     }
 
     #[test]
-    fn test_builder_with_compacted_row_writer() {
+    fn test_builder_with_compacted_row_writer() -> crate::error::Result<()> {
         use crate::record::kv::KvRecordBatch;
         use crate::row::InternalRow;
         use crate::row::compacted::CompactedRow;
@@ -502,7 +506,7 @@ mod tests {
 
         let key1 = b"key1";
         assert!(builder.has_room_for_row(key1, Some(row1)));
-        builder.append_row(key1, Some(row1)).unwrap();
+        builder.append_row(key1, Some(row1))?;
 
         // Create and append second record
         let mut row_writer2 = CompactedRowWriter::new(2);
@@ -512,63 +516,57 @@ mod tests {
         let row2 = &CompactedRow::from_bytes(&row_type, row_writer2.buffer());
 
         let key2 = b"key2";
-        builder.append_row(key2, Some(row2)).unwrap();
+        builder.append_row(key2, Some(row2))?;
 
         // Append a deletion record
         let key3 = b"key3";
-        builder.append_row::<CompactedRow>(key3, None).unwrap();
+        builder.append_row::<CompactedRow>(key3, None)?;
 
         // Build and verify
-        builder.close().unwrap();
-        let bytes = builder.build().unwrap();
+        builder.close()?;
+        let bytes = builder.build()?;
 
         let batch = KvRecordBatch::new(bytes, 0);
         assert!(batch.is_valid());
-        assert_eq!(batch.record_count().unwrap(), 3);
-        assert_eq!(batch.writer_id().unwrap(), 100);
-        assert_eq!(batch.batch_sequence().unwrap(), 5);
-
-        // Read back and verify records
-        let records: Vec<_> = batch.records().unwrap().collect();
-        assert_eq!(records.len(), 3);
-
-        // Verify first record
-        let record1 = records[0].as_ref().unwrap();
-        assert_eq!(record1.key().as_ref(), key1);
-        let row1 = CompactedRow::from_bytes(&row_type, 
record1.value().unwrap());
-        assert_eq!(row1.get_int(0), 42);
-        assert_eq!(row1.get_string(1), "hello");
-
-        // Verify second record
-        let record2 = records[1].as_ref().unwrap();
-        assert_eq!(record2.key().as_ref(), key2);
-        let row2 = CompactedRow::from_bytes(&row_type, 
record2.value().unwrap());
-        assert_eq!(row2.get_int(0), 100);
-        assert_eq!(row2.get_string(1), "world");
-
-        // Verify deletion record
-        let record3 = records[2].as_ref().unwrap();
-        assert_eq!(record3.key().as_ref(), key3);
-        assert!(record3.value().is_none());
-    }
-
-    #[test]
-    fn test_kv_format_validation() {
-        let mut row_writer = CompactedRowWriter::new(1);
-        row_writer.write_int(42);
-
-        let row_type = RowType::with_data_types([DataTypes::int()].to_vec());
-        let row = &CompactedRow::from_bytes(&row_type, row_writer.buffer());
-
-        // INDEXED format should reject append_row
-        let mut indexed_builder = KvRecordBatchBuilder::new(1, 4096, 
KvFormat::INDEXED);
-        let result = indexed_builder.append_row(b"key", Some(row));
-        assert!(result.is_err());
-        assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput);
+        assert_eq!(batch.record_count()?, 3);
+        assert_eq!(batch.writer_id()?, 100);
+        assert_eq!(batch.batch_sequence()?, 5);
+
+        // Create ReadContext for reading typed rows
+        let types = vec![DataTypes::int(), DataTypes::string()];
+        let read_context = 
crate::record::kv::test_util::TestReadContext::compacted(types);
+
+        // Read back and verify records using idiomatic for-loop
+        let records = batch.records(&read_context)?;
+        let decoder = records.decoder_arc();
+        let mut record_count = 0;
+
+        for rec in records {
+            let rec = rec?;
+            record_count += 1;
+
+            match record_count {
+                1 => {
+                    assert_eq!(rec.key().as_ref(), key1);
+                    let row = rec.row(&*decoder).unwrap();
+                    assert_eq!(row.get_int(0), 42);
+                    assert_eq!(row.get_string(1), "hello");
+                }
+                2 => {
+                    assert_eq!(rec.key().as_ref(), key2);
+                    let row = rec.row(&*decoder).unwrap();
+                    assert_eq!(row.get_int(0), 100);
+                    assert_eq!(row.get_string(1), "world");
+                }
+                3 => {
+                    assert_eq!(rec.key().as_ref(), key3);
+                    assert!(rec.is_deletion());
+                }
+                _ => panic!("Unexpected record count"),
+            }
+        }
 
-        // COMPACTED format should accept append_row
-        let mut compacted_builder = KvRecordBatchBuilder::new(1, 4096, 
KvFormat::COMPACTED);
-        let result = compacted_builder.append_row(b"key", Some(row));
-        assert!(result.is_ok());
+        assert_eq!(record_count, 3);
+        Ok(())
     }
 }
diff --git a/crates/fluss/src/record/kv/kv_record_read_context.rs 
b/crates/fluss/src/record/kv/kv_record_read_context.rs
new file mode 100644
index 0000000..2049c32
--- /dev/null
+++ b/crates/fluss/src/record/kv/kv_record_read_context.rs
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Default implementation of ReadContext with decoder caching.
+
+use super::ReadContext;
+use crate::error::{Error, Result};
+use crate::metadata::{KvFormat, Schema};
+use crate::row::{RowDecoder, RowDecoderFactory};
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+/// Trait for fetching schemas by ID.
+///
+/// This trait abstracts schema retrieval, allowing different implementations
+/// (e.g., from metadata store, cache, or test mocks).
+pub trait SchemaGetter: Send + Sync {
+    /// Get the schema for the given schema ID.
+    ///
+    /// # Arguments
+    /// * `schema_id` - The schema ID to fetch
+    ///
+    /// # Returns
+    /// An Arc-wrapped Schema for the specified ID, or an error if the schema
+    /// cannot be fetched (missing ID, network error, etc.)
+    fn get_schema(&self, schema_id: i16) -> Result<Arc<Schema>>;
+}
+
+/// Default implementation of ReadContext with decoder caching.
+///
+/// This implementation caches RowDecoders by schema ID for performance,
+/// avoiding repeated schema lookups and decoder creation.
+///
+/// Reference: org.apache.fluss.record.KvRecordReadContext
+pub struct KvRecordReadContext {
+    kv_format: KvFormat,
+    schema_getter: Arc<dyn SchemaGetter>,
+    row_decoder_cache: Mutex<HashMap<i16, Arc<dyn RowDecoder>>>,
+}
+
+impl KvRecordReadContext {
+    /// Create a new KvRecordReadContext.
+    ///
+    /// # Arguments
+    /// * `kv_format` - The KV format (COMPACTED or INDEXED)
+    /// * `schema_getter` - The schema getter for fetching schemas by ID
+    ///
+    /// # Returns
+    /// A new KvRecordReadContext instance
+    pub fn new(kv_format: KvFormat, schema_getter: Arc<dyn SchemaGetter>) -> 
Self {
+        Self {
+            kv_format,
+            schema_getter,
+            row_decoder_cache: Mutex::new(HashMap::new()),
+        }
+    }
+}
+
+impl ReadContext for KvRecordReadContext {
+    fn get_row_decoder(&self, schema_id: i16) -> Result<Arc<dyn RowDecoder>> {
+        // First check: fast path
+        {
+            let cache = self
+                .row_decoder_cache
+                .lock()
+                .unwrap_or_else(|poisoned| poisoned.into_inner());
+            if let Some(decoder) = cache.get(&schema_id) {
+                return Ok(Arc::clone(decoder));
+            }
+        } // Release lock before expensive operations
+
+        // Build decoder outside the lock to avoid blocking other threads
+        let schema = self.schema_getter.get_schema(schema_id)?;
+        let row_type = match schema.row_type() {
+            crate::metadata::DataType::Row(row_type) => row_type.clone(),
+            other => {
+                return Err(Error::IoUnexpectedError {
+                    message: format!(
+                        "Schema {} has invalid row type: expected Row, got 
{:?}",
+                        schema_id, other
+                    ),
+                    source: std::io::Error::new(
+                        std::io::ErrorKind::InvalidData,
+                        "Invalid row type",
+                    ),
+                });
+            }
+        };
+
+        // Create decoder outside lock
+        let decoder = RowDecoderFactory::create(self.kv_format.clone(), 
row_type)?;
+
+        // Second check: insert only if another thread didn't beat us to it
+        {
+            let mut cache = self
+                .row_decoder_cache
+                .lock()
+                .unwrap_or_else(|poisoned| poisoned.into_inner());
+            // Check again - another thread might have inserted while we were 
building
+            if let Some(existing) = cache.get(&schema_id) {
+                return Ok(Arc::clone(existing));
+            }
+            cache.insert(schema_id, Arc::clone(&decoder));
+        }
+
+        Ok(decoder)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::metadata::{DataTypes, Schema};
+
+    struct MockSchemaGetter {
+        schema: Arc<Schema>,
+    }
+
+    impl MockSchemaGetter {
+        fn new(data_types: Vec<crate::metadata::DataType>) -> Self {
+            let mut builder = Schema::builder();
+            for (i, dt) in data_types.iter().enumerate() {
+                builder = builder.column(&format!("field{}", i), dt.clone());
+            }
+            let schema = builder.build().expect("Failed to build schema");
+
+            Self {
+                schema: Arc::new(schema),
+            }
+        }
+    }
+
+    impl SchemaGetter for MockSchemaGetter {
+        fn get_schema(&self, _schema_id: i16) -> Result<Arc<Schema>> {
+            Ok(Arc::clone(&self.schema))
+        }
+    }
+
+    #[test]
+    fn test_kv_record_read_context() {
+        // Test decoder caching for same schema ID
+        let schema_getter = Arc::new(MockSchemaGetter::new(vec![
+            DataTypes::int(),
+            DataTypes::string(),
+        ]));
+        let read_context = KvRecordReadContext::new(KvFormat::COMPACTED, 
schema_getter);
+
+        // Get decoder twice - should return the same instance (cached)
+        let decoder1 = read_context.get_row_decoder(42).unwrap();
+        let decoder2 = read_context.get_row_decoder(42).unwrap();
+
+        // Verify same instance (Arc pointer equality)
+        assert!(Arc::ptr_eq(&decoder1, &decoder2));
+
+        // Test different schema IDs get different decoders
+        let schema_getter = 
Arc::new(MockSchemaGetter::new(vec![DataTypes::int()]));
+        let read_context = KvRecordReadContext::new(KvFormat::COMPACTED, 
schema_getter);
+
+        let decoder1 = read_context.get_row_decoder(10).unwrap();
+        let decoder2 = read_context.get_row_decoder(20).unwrap();
+
+        // Should be different instances
+        assert!(!Arc::ptr_eq(&decoder1, &decoder2));
+    }
+}
diff --git a/crates/fluss/src/record/kv/mod.rs 
b/crates/fluss/src/record/kv/mod.rs
index ecb762d..857c5e5 100644
--- a/crates/fluss/src/record/kv/mod.rs
+++ b/crates/fluss/src/record/kv/mod.rs
@@ -20,10 +20,17 @@
 mod kv_record;
 mod kv_record_batch;
 mod kv_record_batch_builder;
+mod kv_record_read_context;
+mod read_context;
+
+#[cfg(test)]
+mod test_util;
 
 pub use kv_record::{KvRecord, LENGTH_LENGTH as KV_RECORD_LENGTH_LENGTH};
 pub use kv_record_batch::*;
 pub use kv_record_batch_builder::*;
+pub use kv_record_read_context::{KvRecordReadContext, SchemaGetter};
+pub use read_context::ReadContext;
 
 /// Current KV magic value
 pub const CURRENT_KV_MAGIC_VALUE: u8 = 0;
diff --git a/crates/fluss/src/record/kv/read_context.rs 
b/crates/fluss/src/record/kv/read_context.rs
new file mode 100644
index 0000000..6350261
--- /dev/null
+++ b/crates/fluss/src/record/kv/read_context.rs
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Read context for KV record batches.
+//!
+//! Provides schema and decoder information needed for typed record reading.
+
+use crate::error::Result;
+use crate::row::RowDecoder;
+use std::sync::Arc;
+
+/// Context for reading KV records with type information.
+///
+/// The ReadContext provides access to RowDecoders based on schema IDs,
+/// enabling typed deserialization of KV record values.
+///
+/// Reference: org.apache.fluss.record.KvRecordBatch.ReadContext
+pub trait ReadContext: Send + Sync {
+    /// Get the row decoder for the given schema ID.
+    ///
+    /// The decoder is typically cached, so repeated calls with the same
+    /// schema ID should return the same decoder instance.
+    ///
+    /// # Arguments
+    /// * `schema_id` - The schema ID for which to get the decoder
+    ///
+    /// # Returns
+    /// An Arc-wrapped RowDecoder for the specified schema, or an error if
+    /// the schema is invalid or cannot be retrieved
+    fn get_row_decoder(&self, schema_id: i16) -> Result<Arc<dyn RowDecoder>>;
+}
diff --git a/crates/fluss/src/record/kv/test_util.rs 
b/crates/fluss/src/record/kv/test_util.rs
new file mode 100644
index 0000000..50ab911
--- /dev/null
+++ b/crates/fluss/src/record/kv/test_util.rs
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Test utilities for KV record reading.
+
+use super::ReadContext;
+use crate::error::Result;
+use crate::metadata::{DataType, KvFormat, RowType};
+use crate::row::{RowDecoder, RowDecoderFactory};
+use std::sync::Arc;
+
+/// Simple test-only ReadContext that creates decoders directly from data 
types.
+///
+/// This bypasses the production Schema/SchemaGetter machinery for simpler 
tests.
+pub(crate) struct TestReadContext {
+    kv_format: KvFormat,
+    data_types: Vec<DataType>,
+}
+
+impl TestReadContext {
+    /// Create a test context for COMPACTED format (most common case).
+    pub(crate) fn compacted(data_types: Vec<DataType>) -> Self {
+        Self {
+            kv_format: KvFormat::COMPACTED,
+            data_types,
+        }
+    }
+}
+
+impl ReadContext for TestReadContext {
+    fn get_row_decoder(&self, _schema_id: i16) -> Result<Arc<dyn RowDecoder>> {
+        // Directly create decoder from data types - no Schema needed!
+        let row_type = RowType::with_data_types(self.data_types.clone());
+        RowDecoderFactory::create(self.kv_format.clone(), row_type)
+    }
+}
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index 3477f1d..536409e 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -23,11 +23,13 @@ pub mod binary;
 pub mod compacted;
 pub mod encode;
 mod field_getter;
+mod row_decoder;
 
 pub use column::*;
 pub use compacted::CompactedRow;
 pub use datum::*;
 pub use encode::KeyEncoder;
+pub use row_decoder::{CompactedRowDecoder, RowDecoder, RowDecoderFactory};
 
 pub trait BinaryRow: InternalRow {
     /// Returns the binary representation of this row as a byte slice.
diff --git a/crates/fluss/src/row/row_decoder.rs 
b/crates/fluss/src/row/row_decoder.rs
new file mode 100644
index 0000000..9f9b421
--- /dev/null
+++ b/crates/fluss/src/row/row_decoder.rs
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Row decoder for deserializing binary row formats.
+//!
+//! Mirrors the Java org.apache.fluss.row.decode package.
+
+use crate::error::{Error, Result};
+use crate::metadata::{KvFormat, RowType};
+use crate::row::compacted::{CompactedRow, CompactedRowDeserializer};
+use std::sync::Arc;
+
+/// Decoder for creating BinaryRow from bytes.
+///
+/// This trait provides an abstraction for decoding different row formats
+/// (COMPACTED, INDEXED, etc.) from binary data.
+///
+/// Reference: org.apache.fluss.row.decode.RowDecoder
+pub trait RowDecoder: Send + Sync {
+    /// Decode bytes into a CompactedRow.
+    ///
+    /// The lifetime 'a ties the returned row to the input data, ensuring
+    /// the data remains valid as long as the row is used.
+    fn decode<'a>(&self, data: &'a [u8]) -> CompactedRow<'a>;
+}
+
+/// Decoder for CompactedRow format.
+///
+/// Uses the existing CompactedRow infrastructure for decoding.
+/// This is a thin wrapper that implements the RowDecoder trait.
+///
+/// Reference: org.apache.fluss.row.decode.CompactedRowDecoder
+pub struct CompactedRowDecoder {
+    field_count: usize,
+    deserializer: Arc<CompactedRowDeserializer<'static>>,
+}
+
+impl CompactedRowDecoder {
+    /// Create a new CompactedRowDecoder with the given row type.
+    pub fn new(row_type: RowType) -> Self {
+        let field_count = row_type.fields().len();
+        let deserializer = 
Arc::new(CompactedRowDeserializer::new_from_owned(row_type));
+
+        Self {
+            field_count,
+            deserializer,
+        }
+    }
+}
+
+impl RowDecoder for CompactedRowDecoder {
+    fn decode<'a>(&self, data: &'a [u8]) -> CompactedRow<'a> {
+        // Use existing CompactedRow::deserialize() infrastructure
+        CompactedRow::deserialize(Arc::clone(&self.deserializer), 
self.field_count, data)
+    }
+}
+
+/// Factory for creating RowDecoders based on KvFormat.
+///
+/// Reference: org.apache.fluss.row.decode.RowDecoder.create()
+pub struct RowDecoderFactory;
+
+impl RowDecoderFactory {
+    /// Create a RowDecoder for the given format and row type.
+    pub fn create(kv_format: KvFormat, row_type: RowType) -> Result<Arc<dyn 
RowDecoder>> {
+        match kv_format {
+            KvFormat::COMPACTED => 
Ok(Arc::new(CompactedRowDecoder::new(row_type))),
+            KvFormat::INDEXED => Err(Error::UnsupportedOperation {
+                message: "INDEXED format is not yet supported".to_string(),
+            }),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::metadata::DataTypes;
+    use crate::row::InternalRow;
+    use crate::row::binary::BinaryWriter;
+    use crate::row::compacted::CompactedRowWriter;
+
+    #[test]
+    fn test_compacted_row_decoder() {
+        // Write a CompactedRow
+        let mut writer = CompactedRowWriter::new(2);
+        writer.write_int(42);
+        writer.write_string("hello");
+
+        let data = writer.to_bytes();
+
+        // Create decoder with RowType
+        let row_type = RowType::with_data_types(vec![DataTypes::int(), 
DataTypes::string()]);
+        let decoder = CompactedRowDecoder::new(row_type);
+
+        // Decode
+        let row = decoder.decode(&data);
+
+        // Verify
+        assert_eq!(row.get_field_count(), 2);
+        assert_eq!(row.get_int(0), 42);
+        assert_eq!(row.get_string(1), "hello");
+    }
+
+    #[test]
+    fn test_row_decoder_factory() {
+        let row_type = RowType::with_data_types(vec![DataTypes::int(), 
DataTypes::string()]);
+        let decoder = RowDecoderFactory::create(KvFormat::COMPACTED, 
row_type).unwrap();
+
+        // Write a row
+        let mut writer = CompactedRowWriter::new(2);
+        writer.write_int(100);
+        writer.write_string("world");
+        let data = writer.to_bytes();
+
+        // Decode
+        let row = decoder.decode(&data);
+
+        // Verify
+        assert_eq!(row.get_int(0), 100);
+        assert_eq!(row.get_string(1), "world");
+    }
+}


Reply via email to