Copilot commented on code in PR #174:
URL: https://github.com/apache/fluss-rust/pull/174#discussion_r2700987984


##########
crates/fluss/src/record/kv/kv_record_batch.rs:
##########
@@ -253,38 +255,87 @@ impl KvRecordBatch {
         ]))
     }
 
-    /// Create an iterator over the records in this batch.
-    /// This validates the batch checksum before returning the iterator.
+    /// Create an iterable collection of records in this batch.
+    ///
+    /// This validates the batch checksum before returning the records.
     /// For trusted data paths, use `records_unchecked()` to skip validation.
-    pub fn records(&self) -> io::Result<KvRecordIterator> {
+    ///
+    /// Mirrors: KvRecordBatch.records(ReadContext)
+    pub fn records(&self, read_context: &dyn ReadContext) -> 
io::Result<KvRecords> {
         if !self.is_valid() {
             return Err(io::Error::new(
                 io::ErrorKind::InvalidData,
                 "Invalid batch checksum",
             ));
         }
-        self.records_unchecked()
+        self.records_unchecked(read_context)
     }
 
-    /// Create an iterator over the records in this batch without validating 
the checksum
-    pub fn records_unchecked(&self) -> io::Result<KvRecordIterator> {
+    /// Create an iterable collection of records in this batch without 
validating the checksum.
+    pub fn records_unchecked(&self, read_context: &dyn ReadContext) -> 
io::Result<KvRecords> {
         let size = self.size_in_bytes()?;
         let count = self.record_count()?;
+        let schema_id = self.schema_id()?;
+
         if count < 0 {
             return Err(io::Error::new(
                 io::ErrorKind::InvalidData,
                 format!("Invalid record count: {count}"),
             ));
         }
-        Ok(KvRecordIterator {
-            data: self.data.clone(),
-            position: self.position + RECORDS_OFFSET,
-            end: self.position + size,
-            remaining_count: count,
+
+        // Get row decoder for this schema from context (cached)
+        let row_decoder = read_context.get_row_decoder(schema_id)?;
+
+        Ok(KvRecords {
+            iter: KvRecordIterator {
+                data: self.data.clone(),
+                position: self.position + RECORDS_OFFSET,
+                end: self.position + size,
+                remaining_count: count,
+            },
+            row_decoder,
         })
     }
 }
 
+/// Iterable collection of KV records with associated decoder.
+///
+/// This wrapper provides both iteration capability and access to the row 
decoder
+/// needed to decode record values into typed rows.
+pub struct KvRecords {
+    iter: KvRecordIterator,
+    row_decoder: Arc<dyn RowDecoder>,
+}
+
+impl KvRecords {
+    /// Get a reference to the row decoder for decoding record values.
+    ///
+    /// Returns a reference tied to the lifetime of `&self`.
+    /// Use this when iterating by reference.
+    pub fn decoder(&self) -> &dyn RowDecoder {
+        &*self.row_decoder
+    }
+
+    /// Get an owned Arc to the row decoder.
+    ///
+    /// Returns a cloned Arc that can outlive the KvRecords,
+    /// allowing you to grab it before consuming the iterator.
+    /// Useful if you must keep the decoder beyond the iterable’s 
lifetime(collect then decode style)

Review Comment:
   Missing space after "lifetime". The comment should read "...beyond the 
iterable's lifetime (collect..." with a space before the opening parenthesis.
   ```suggestion
       /// Useful if you must keep the decoder beyond the iterable’s lifetime 
(collect then decode style)
   ```



##########
crates/fluss/src/record/kv/mod.rs:
##########
@@ -33,3 +37,43 @@ pub const NO_WRITER_ID: i64 = -1;
 
 /// No batch sequence constant
 pub const NO_BATCH_SEQUENCE: i32 = -1;
+
+/// Test utilities for KV record reading.
+#[cfg(test)]
+pub mod test_utils {
+    use super::*;
+    use crate::metadata::{DataType, KvFormat};
+    use crate::row::{RowDecoder, RowDecoderFactory};
+    use std::io;
+    use std::sync::Arc;
+
+    /// Simple test-only ReadContext that creates decoders directly from data 
types.
+    ///
+    /// This bypasses the production Schema/SchemaGetter machinery for simpler 
tests.
+    pub struct TestReadContext {
+        kv_format: KvFormat,
+        data_types: Vec<DataType>,
+    }
+
+    impl TestReadContext {
+        /// Create a new test context with the given format and data types.
+        pub fn new(kv_format: KvFormat, data_types: Vec<DataType>) -> Self {
+            Self {
+                kv_format,
+                data_types,
+            }
+        }
+
+        /// Create a test context for COMPACTED format (most common case).
+        pub fn compacted(data_types: Vec<DataType>) -> Self {
+            Self::new(KvFormat::COMPACTED, data_types)
+        }
+    }
+
+    impl ReadContext for TestReadContext {
+        fn get_row_decoder(&self, _schema_id: i16) -> io::Result<Arc<dyn 
RowDecoder>> {
+            // Directly create decoder from data types - no Schema needed!
+            RowDecoderFactory::create(self.kv_format.clone(), 
self.data_types.clone())
+        }

Review Comment:
   The TestReadContext implementation clones data_types on every call to 
get_row_decoder, which defeats decoder caching. In production usage via 
KvRecordReadContext, decoder caching is important for performance. Consider 
caching the decoder in TestReadContext as well, similar to the production 
implementation, or document that this is intentionally uncached for test 
simplicity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to