This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new dab09b4  chore: implement zero copy in CompactedRowReader (#138)
dab09b4 is described below

commit dab09b4268ea700341f405ddef27274d8d69d1bd
Author: yuxia Luo <[email protected]>
AuthorDate: Sun Jan 11 21:36:46 2026 +0800

    chore: implement zero copy in CompactedRowReader (#138)
---
 crates/fluss/src/row/compacted/compacted_row.rs    | 137 +++++-------
 .../src/row/compacted/compacted_row_reader.rs      | 239 +++++++++++----------
 .../src/row/compacted/compacted_row_writer.rs      |   4 +-
 crates/fluss/src/row/datum.rs                      |   2 +-
 4 files changed, 178 insertions(+), 204 deletions(-)

diff --git a/crates/fluss/src/row/compacted/compacted_row.rs 
b/crates/fluss/src/row/compacted/compacted_row.rs
index fca41c6..481f9be 100644
--- a/crates/fluss/src/row/compacted/compacted_row.rs
+++ b/crates/fluss/src/row/compacted/compacted_row.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use bytes::Bytes;
+use std::sync::OnceLock;
 
 use crate::metadata::DataType;
 use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer, 
CompactedRowReader};
@@ -24,125 +24,95 @@ use crate::row::{GenericRow, InternalRow};
 // Reference implementation:
 // 
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRow.java
 #[allow(dead_code)]
-pub struct CompactedRow {
+pub struct CompactedRow<'a> {
     arity: usize,
-    segment: Bytes,
-    offset: usize,
     size_in_bytes: usize,
-    decoded: bool,
-    decoded_row: GenericRow<'static>,
-    reader: CompactedRowReader,
-    deserializer: CompactedRowDeserializer,
+    decoded_row: OnceLock<GenericRow<'a>>,
+    deserializer: CompactedRowDeserializer<'a>,
+    reader: CompactedRowReader<'a>,
+    data_types: &'a [DataType],
 }
 
-#[allow(dead_code)]
-impl CompactedRow {
-    pub fn calculate_bit_set_width_in_bytes(arity: usize) -> usize {
-        arity.div_ceil(8)
-    }
-
-    pub fn new(types: Vec<DataType>) -> Self {
-        let arity = types.len();
-        Self {
-            arity,
-            segment: Bytes::new(),
-            offset: 0,
-            size_in_bytes: 0,
-            decoded: false,
-            decoded_row: GenericRow::new(),
-            reader: CompactedRowReader::new(arity),
-            deserializer: CompactedRowDeserializer::new(types),
-        }
-    }
+pub fn calculate_bit_set_width_in_bytes(arity: usize) -> usize {
+    arity.div_ceil(8)
+}
 
-    pub fn from_bytes(types: Vec<DataType>, data: Bytes) -> Self {
-        let arity = types.len();
+#[allow(dead_code)]
+impl<'a> CompactedRow<'a> {
+    pub fn from_bytes(data_types: &'a [DataType], data: &'a [u8]) -> Self {
+        let arity = data_types.len();
         let size = data.len();
         Self {
             arity,
-            segment: data,
-            offset: 0,
             size_in_bytes: size,
-            decoded: false,
-            decoded_row: GenericRow::new(),
-            reader: CompactedRowReader::new(arity),
-            deserializer: CompactedRowDeserializer::new(types),
+            decoded_row: OnceLock::new(),
+            deserializer: CompactedRowDeserializer::new(data_types),
+            reader: CompactedRowReader::new(arity, data, 0, size),
+            data_types,
         }
     }
 
-    pub fn point_to(&mut self, segment: Bytes, offset: usize, size_in_bytes: 
usize) {
-        self.segment = segment;
-        self.offset = offset;
-        self.size_in_bytes = size_in_bytes;
-        self.decoded = false;
-    }
-
-    pub fn get_segment(&self) -> &Bytes {
-        &self.segment
-    }
-
-    pub fn get_offset(&self) -> usize {
-        self.offset
-    }
-
     pub fn get_size_in_bytes(&self) -> usize {
         self.size_in_bytes
     }
 
-    pub fn get_field_count(&self) -> usize {
-        self.arity
+    fn decoded_row(&self) -> &GenericRow<'_> {
+        self.decoded_row
+            .get_or_init(|| self.deserializer.deserialize(&self.reader))
     }
+}
 
-    pub fn is_null_at(&self, pos: usize) -> bool {
-        let byte_index = pos >> 3;
-        let bit = pos & 7;
-        let idx = self.offset + byte_index;
-        (self.segment[idx] & (1u8 << bit)) != 0
+#[allow(dead_code)]
+impl<'a> InternalRow for CompactedRow<'a> {
+    fn get_field_count(&self) -> usize {
+        self.arity
     }
 
-    fn decoded_row(&mut self) -> &GenericRow<'static> {
-        if !self.decoded {
-            self.reader
-                .point_to(self.segment.clone(), self.offset, 
self.size_in_bytes);
-            self.decoded_row = self.deserializer.deserialize(&mut self.reader);
-            self.decoded = true;
-        }
-        &self.decoded_row
+    fn is_null_at(&self, pos: usize) -> bool {
+        self.data_types[pos].is_nullable() && self.reader.is_null_at(pos)
     }
 
-    pub fn get_boolean(&mut self, pos: usize) -> bool {
+    fn get_boolean(&self, pos: usize) -> bool {
         self.decoded_row().get_boolean(pos)
     }
 
-    pub fn get_byte(&mut self, pos: usize) -> i8 {
+    fn get_byte(&self, pos: usize) -> i8 {
         self.decoded_row().get_byte(pos)
     }
 
-    pub fn get_short(&mut self, pos: usize) -> i16 {
+    fn get_short(&self, pos: usize) -> i16 {
         self.decoded_row().get_short(pos)
     }
 
-    pub fn get_int(&mut self, pos: usize) -> i32 {
+    fn get_int(&self, pos: usize) -> i32 {
         self.decoded_row().get_int(pos)
     }
 
-    pub fn get_long(&mut self, pos: usize) -> i64 {
+    fn get_long(&self, pos: usize) -> i64 {
         self.decoded_row().get_long(pos)
     }
 
-    pub fn get_float(&mut self, pos: usize) -> f32 {
+    fn get_float(&self, pos: usize) -> f32 {
         self.decoded_row().get_float(pos)
     }
 
-    pub fn get_double(&mut self, pos: usize) -> f64 {
+    fn get_double(&self, pos: usize) -> f64 {
         self.decoded_row().get_double(pos)
     }
 
-    pub fn get_string(&mut self, pos: usize) -> &str {
+    fn get_char(&self, pos: usize, length: usize) -> &str {
+        self.decoded_row().get_char(pos, length)
+    }
+
+    fn get_string(&self, pos: usize) -> &str {
         self.decoded_row().get_string(pos)
     }
 
-    pub fn get_bytes(&mut self, pos: usize) -> &[u8] {
+    fn get_binary(&self, pos: usize, length: usize) -> &[u8] {
+        self.decoded_row().get_binary(pos, length)
+    }
+
+    fn get_bytes(&self, pos: usize) -> &[u8] {
         self.decoded_row().get_bytes(pos)
     }
 }
@@ -171,7 +141,6 @@ mod tests {
             DataType::Bytes(BytesType::new()),
         ];
 
-        let mut row = CompactedRow::new(types.clone());
         let mut writer = CompactedRowWriter::new(types.len());
 
         writer.write_boolean(true);
@@ -184,7 +153,8 @@ mod tests {
         writer.write_string("Hello World");
         writer.write_bytes(&[1, 2, 3, 4, 5]);
 
-        row.point_to(writer.to_bytes(), 0, writer.position());
+        let bytes = writer.to_bytes();
+        let mut row = CompactedRow::from_bytes(types.as_slice(), 
bytes.as_ref());
 
         assert_eq!(row.get_field_count(), 9);
         assert!(row.get_boolean(0));
@@ -204,14 +174,14 @@ mod tests {
             DataType::Double(DoubleType::new()),
         ];
 
-        let mut row = CompactedRow::new(types.clone());
         let mut writer = CompactedRowWriter::new(types.len());
 
         writer.write_int(100);
         writer.set_null_at(1);
         writer.write_double(2.71);
 
-        row.point_to(writer.to_bytes(), 0, writer.position());
+        let bytes = writer.to_bytes();
+        row = CompactedRow::from_bytes(types.as_slice(), bytes.as_ref());
 
         assert!(!row.is_null_at(0));
         assert!(row.is_null_at(1));
@@ -230,12 +200,13 @@ mod tests {
         ];
 
         let mut writer = CompactedRowWriter::new(types.len());
-        writer.write_int(42);
+        writer.write_int(-1);
         writer.write_string("test");
 
-        let mut row = CompactedRow::from_bytes(types, writer.to_bytes());
+        let bytes = writer.to_bytes();
+        let mut row = CompactedRow::from_bytes(types.as_slice(), 
bytes.as_ref());
 
-        assert_eq!(row.get_int(0), 42);
+        assert_eq!(row.get_int(0), -1);
         assert_eq!(row.get_string(1), "test");
 
         // Test large row
@@ -244,14 +215,14 @@ mod tests {
             .map(|_| DataType::Int(IntType::new()))
             .collect();
 
-        let mut row = CompactedRow::new(types.clone());
         let mut writer = CompactedRowWriter::new(num_fields);
 
         for i in 0..num_fields {
             writer.write_int((i * 10) as i32);
         }
 
-        row.point_to(writer.to_bytes(), 0, writer.position());
+        let bytes = writer.to_bytes();
+        row = CompactedRow::from_bytes(types.as_slice(), bytes.as_ref());
 
         for i in 0..num_fields {
             assert_eq!(row.get_int(i), (i * 10) as i32);
diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs 
b/crates/fluss/src/row/compacted/compacted_row_reader.rs
index 00d94ad..c053d4e 100644
--- a/crates/fluss/src/row/compacted/compacted_row_reader.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs
@@ -15,54 +15,75 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use bytes::Bytes;
-use std::borrow::Cow;
-
+use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes;
 use crate::{
     metadata::DataType,
-    row::{
-        Datum, GenericRow,
-        compacted::{compacted_row::CompactedRow, 
compacted_row_writer::CompactedRowWriter},
-    },
+    row::{Datum, GenericRow, 
compacted::compacted_row_writer::CompactedRowWriter},
 };
+use std::str::from_utf8;
 
 #[allow(dead_code)]
-pub struct CompactedRowDeserializer {
-    schema: Vec<DataType>,
+pub struct CompactedRowDeserializer<'a> {
+    schema: &'a [DataType],
 }
 
 #[allow(dead_code)]
-impl CompactedRowDeserializer {
-    pub fn new(schema: Vec<DataType>) -> Self {
+impl<'a> CompactedRowDeserializer<'a> {
+    pub fn new(schema: &'a [DataType]) -> Self {
         Self { schema }
     }
 
-    pub fn deserialize(&self, reader: &mut CompactedRowReader) -> 
GenericRow<'static> {
+    pub fn deserialize(&self, reader: &CompactedRowReader<'a>) -> 
GenericRow<'a> {
         let mut row = GenericRow::new();
-        for (pos, dtype) in self.schema.iter().enumerate() {
-            if reader.is_null_at(pos) {
-                row.set_field(pos, Datum::Null);
+        let mut cursor = reader.initial_position();
+        for (col_pos, dtype) in self.schema.iter().enumerate() {
+            if dtype.is_nullable() && reader.is_null_at(col_pos) {
+                row.set_field(col_pos, Datum::Null);
                 continue;
             }
-            let datum = match dtype {
-                DataType::Boolean(_) => Datum::Bool(reader.read_boolean()),
-                DataType::TinyInt(_) => Datum::Int8(reader.read_byte() as i8),
-                DataType::SmallInt(_) => Datum::Int16(reader.read_short()),
-                DataType::Int(_) => Datum::Int32(reader.read_int()),
-                DataType::BigInt(_) => Datum::Int64(reader.read_long()),
-                DataType::Float(_) => 
Datum::Float32(reader.read_float().into()),
-                DataType::Double(_) => 
Datum::Float64(reader.read_double().into()),
+            let (datum, next_cursor) = match dtype {
+                DataType::Boolean(_) => {
+                    let (val, next) = reader.read_boolean(cursor);
+                    (Datum::Bool(val), next)
+                }
+                DataType::TinyInt(_) => {
+                    let (val, next) = reader.read_byte(cursor);
+                    (Datum::Int8(val as i8), next)
+                }
+                DataType::SmallInt(_) => {
+                    let (val, next) = reader.read_short(cursor);
+                    (Datum::Int16(val), next)
+                }
+                DataType::Int(_) => {
+                    let (val, next) = reader.read_int(cursor);
+                    (Datum::Int32(val), next)
+                }
+                DataType::BigInt(_) => {
+                    let (val, next) = reader.read_long(cursor);
+                    (Datum::Int64(val), next)
+                }
+                DataType::Float(_) => {
+                    let (val, next) = reader.read_float(cursor);
+                    (Datum::Float32(val.into()), next)
+                }
+                DataType::Double(_) => {
+                    let (val, next) = reader.read_double(cursor);
+                    (Datum::Float64(val.into()), next)
+                }
                 // TODO: use read_char(length) in the future, but need to keep 
compatibility
                 DataType::Char(_) | DataType::String(_) => {
-                    Datum::String(Cow::Owned(reader.read_string()))
+                    let (val, next) = reader.read_string(cursor);
+                    (Datum::String(val.into()), next)
                 }
                 // TODO: use read_binary(length) in the future, but need to 
keep compatibility
                 DataType::Bytes(_) | DataType::Binary(_) => {
-                    Datum::Blob(Cow::Owned(reader.read_bytes().into_vec()))
+                    let (val, next) = reader.read_bytes(cursor);
+                    (Datum::Blob(val.into()), next)
                 }
                 _ => panic!("unsupported DataType in 
CompactedRowDeserializer"),
             };
-            row.set_field(pos, datum);
+            cursor = next_cursor;
+            row.set_field(col_pos, datum);
         }
         row
     }
@@ -71,151 +92,133 @@ impl CompactedRowDeserializer {
 // Reference implementation:
 // 
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java
 #[allow(dead_code)]
-pub struct CompactedRowReader {
-    segment: Bytes,
+pub struct CompactedRowReader<'a> {
+    segment: &'a [u8],
     offset: usize,
-    position: usize,
     limit: usize,
     header_size_in_bytes: usize,
 }
 
 #[allow(dead_code)]
-impl CompactedRowReader {
-    pub fn new(field_count: usize) -> Self {
-        let header = 
CompactedRow::calculate_bit_set_width_in_bytes(field_count);
-        Self {
-            header_size_in_bytes: header,
-            segment: Bytes::new(),
-            offset: 0,
-            position: 0,
-            limit: 0,
-        }
-    }
-
-    pub fn point_to(&mut self, data: Bytes, offset: usize, length: usize) {
+impl<'a> CompactedRowReader<'a> {
+    pub fn new(field_count: usize, data: &'a [u8], offset: usize, length: 
usize) -> Self {
+        let header_size_in_bytes = 
calculate_bit_set_width_in_bytes(field_count);
         let limit = offset + length;
-        let position = offset + self.header_size_in_bytes;
-
+        let position = offset + header_size_in_bytes;
         debug_assert!(limit <= data.len());
         debug_assert!(position <= limit);
 
-        self.segment = data;
-        self.offset = offset;
-        self.position = position;
-        self.limit = limit;
+        CompactedRowReader {
+            segment: data,
+            offset,
+            limit,
+            header_size_in_bytes,
+        }
     }
 
-    pub fn is_null_at(&self, pos: usize) -> bool {
-        let byte_index = pos >> 3;
-        let bit = pos & 7;
+    fn initial_position(&self) -> usize {
+        self.offset + self.header_size_in_bytes
+    }
+
+    pub fn is_null_at(&self, col_pos: usize) -> bool {
+        let byte_index = col_pos >> 3;
+        let bit = col_pos & 7;
         debug_assert!(byte_index < self.header_size_in_bytes);
         let idx = self.offset + byte_index;
         (self.segment[idx] & (1u8 << bit)) != 0
     }
 
-    pub fn read_boolean(&mut self) -> bool {
-        self.read_byte() != 0
+    pub fn read_boolean(&self, pos: usize) -> (bool, usize) {
+        let (val, next) = self.read_byte(pos);
+        (val != 0, next)
     }
 
-    pub fn read_byte(&mut self) -> u8 {
-        debug_assert!(self.position < self.limit);
-        let b = self.segment[self.position];
-        self.position += 1;
-        b
+    pub fn read_byte(&self, pos: usize) -> (u8, usize) {
+        debug_assert!(pos < self.limit);
+        (self.segment[pos], pos + 1)
     }
 
-    pub fn read_short(&mut self) -> i16 {
-        debug_assert!(self.position + 2 <= self.limit);
-        let bytes_slice = &self.segment[self.position..self.position + 2];
-        let byte_array: [u8; 2] = bytes_slice
-            .try_into()
-            .expect("Slice must be exactly 2 bytes long");
-
-        self.position += 2;
-        i16::from_ne_bytes(byte_array)
+    pub fn read_short(&self, pos: usize) -> (i16, usize) {
+        let next_pos = pos + 2;
+        debug_assert!(next_pos <= self.limit);
+        let bytes_slice = &self.segment[pos..pos + 2];
+        let val = i16::from_ne_bytes(
+            bytes_slice
+                .try_into()
+                .expect("Slice must be exactly 2 bytes long"),
+        );
+        (val, next_pos)
     }
 
-    pub fn read_int(&mut self) -> i32 {
+    pub fn read_int(&self, mut pos: usize) -> (i32, usize) {
         let mut result: u32 = 0;
         let mut shift = 0;
 
         for _ in 0..CompactedRowWriter::MAX_INT_SIZE {
-            let b = self.read_byte();
+            let (b, next_pos) = self.read_byte(pos);
+            pos = next_pos;
             result |= ((b & 0x7F) as u32) << shift;
             if (b & 0x80) == 0 {
-                return result as i32;
+                return (result as i32, pos);
             }
             shift += 7;
         }
-
-        panic!("Invalid input stream.");
+        panic!("Invalid VarInt32 input stream.");
     }
 
-    pub fn read_long(&mut self) -> i64 {
+    pub fn read_long(&self, mut pos: usize) -> (i64, usize) {
         let mut result: u64 = 0;
         let mut shift = 0;
 
         for _ in 0..CompactedRowWriter::MAX_LONG_SIZE {
-            let b = self.read_byte();
+            let (b, next_pos) = self.read_byte(pos);
+            pos = next_pos;
             result |= ((b & 0x7F) as u64) << shift;
             if (b & 0x80) == 0 {
-                return result as i64;
+                return (result as i64, pos);
             }
             shift += 7;
         }
-
-        panic!("Invalid input stream.");
+        panic!("Invalid VarInt64 input stream.");
     }
 
-    pub fn read_float(&mut self) -> f32 {
-        debug_assert!(self.position + 4 <= self.limit);
-        let bytes_slice = &self.segment[self.position..self.position + 4];
-        let byte_array: [u8; 4] = bytes_slice
-            .try_into()
-            .expect("Slice must be exactly 4 bytes long");
-
-        self.position += 4;
-        f32::from_ne_bytes(byte_array)
+    pub fn read_float(&self, pos: usize) -> (f32, usize) {
+        let next_pos = pos + 4;
+        debug_assert!(next_pos <= self.limit);
+        let val = f32::from_ne_bytes(
+            self.segment[pos..pos + 4]
+                .try_into()
+                .expect("Slice must be exactly 4 bytes long"),
+        );
+        (val, next_pos)
     }
 
-    pub fn read_double(&mut self) -> f64 {
-        debug_assert!(self.position + 8 <= self.limit);
-        let bytes_slice = &self.segment[self.position..self.position + 8];
-        let byte_array: [u8; 8] = bytes_slice
-            .try_into()
-            .expect("Slice must be exactly 8 bytes long");
-
-        self.position += 8;
-        f64::from_ne_bytes(byte_array)
+    pub fn read_double(&self, pos: usize) -> (f64, usize) {
+        let next_pos = pos + 8;
+        debug_assert!(next_pos <= self.limit);
+        let val = f64::from_ne_bytes(
+            self.segment[pos..pos + 8]
+                .try_into()
+                .expect("Slice must be exactly 8 bytes long"),
+        );
+        (val, next_pos)
     }
 
-    pub fn read_binary(&mut self, length: usize) -> Bytes {
-        debug_assert!(self.position + length <= self.limit);
-
-        let start = self.position;
-        let end = start + length;
-        self.position = end;
-
-        self.segment.slice(start..end)
+    pub fn read_binary(&self, pos: usize) -> (&'a [u8], usize) {
+        self.read_bytes(pos)
     }
 
-    pub fn read_bytes(&mut self) -> Box<[u8]> {
-        let len = self.read_int();
-        debug_assert!(len >= 0);
-
+    pub fn read_bytes(&self, pos: usize) -> (&'a [u8], usize) {
+        let (len, data_pos) = self.read_int(pos);
         let len = len as usize;
-        debug_assert!(self.position + len <= self.limit);
-
-        let start = self.position;
-        let end = start + len;
-        self.position = end;
-
-        self.segment[start..end].to_vec().into_boxed_slice()
+        let next_pos = data_pos + len;
+        debug_assert!(next_pos <= self.limit);
+        (&self.segment[data_pos..next_pos], next_pos)
     }
 
-    pub fn read_string(&mut self) -> String {
-        let bytes = self.read_bytes();
-        String::from_utf8(bytes.into_vec())
-            .unwrap_or_else(|e| panic!("Invalid UTF-8 in string data: {e}"))
+    pub fn read_string(&self, pos: usize) -> (&'a str, usize) {
+        let (bytes, next_pos) = self.read_bytes(pos);
+        let s = from_utf8(bytes).expect("Invalid UTF-8 when reading string");
+        (s, next_pos)
     }
 }
diff --git a/crates/fluss/src/row/compacted/compacted_row_writer.rs 
b/crates/fluss/src/row/compacted/compacted_row_writer.rs
index 8345123..4f535c6 100644
--- a/crates/fluss/src/row/compacted/compacted_row_writer.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_writer.rs
@@ -18,7 +18,7 @@
 use bytes::{Bytes, BytesMut};
 use std::cmp;
 
-use crate::row::compacted::compacted_row::CompactedRow;
+use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes;
 
 // Writer for CompactedRow
 // Reference implementation:
@@ -36,7 +36,7 @@ impl CompactedRowWriter {
     pub const MAX_LONG_SIZE: usize = 10;
 
     pub fn new(field_count: usize) -> Self {
-        let header_size = 
CompactedRow::calculate_bit_set_width_in_bytes(field_count);
+        let header_size = calculate_bit_set_width_in_bytes(field_count);
         let cap = cmp::max(64, header_size);
 
         let mut buffer = BytesMut::with_capacity(cap);
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index fa85ded..ad7948d 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -403,7 +403,7 @@ mod tests {
 
     #[test]
     fn datum_accessors_and_conversions() {
-        let datum = Datum::String("value");
+        let datum = Datum::String("value".into());
         assert_eq!(datum.as_str(), "value");
         assert!(!datum.is_null());
 

Reply via email to