fresh-borzoni commented on code in PR #138:
URL: https://github.com/apache/fluss-rust/pull/138#discussion_r2679497893


##########
crates/fluss/src/row/compacted/compacted_row.rs:
##########
@@ -24,125 +24,95 @@ use crate::row::{GenericRow, InternalRow};
 // Reference implementation:
 // 
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRow.java
 #[allow(dead_code)]
-pub struct CompactedRow {
+pub struct CompactedRow<'a> {
     arity: usize,
-    segment: Bytes,
-    offset: usize,
+    segment: &'a [u8],
     size_in_bytes: usize,
-    decoded: bool,
-    decoded_row: GenericRow<'static>,
-    reader: CompactedRowReader,
-    deserializer: CompactedRowDeserializer,
+    decoded_row: OnceLock<GenericRow<'a>>,
+    deserializer: CompactedRowDeserializer<'a>,
+    reader: CompactedRowReader<'a>,
 }
 
-#[allow(dead_code)]
-impl CompactedRow {
-    pub fn calculate_bit_set_width_in_bytes(arity: usize) -> usize {
-        arity.div_ceil(8)
-    }
-
-    pub fn new(types: Vec<DataType>) -> Self {
-        let arity = types.len();
-        Self {
-            arity,
-            segment: Bytes::new(),
-            offset: 0,
-            size_in_bytes: 0,
-            decoded: false,
-            decoded_row: GenericRow::new(),
-            reader: CompactedRowReader::new(arity),
-            deserializer: CompactedRowDeserializer::new(types),
-        }
-    }
+pub fn calculate_bit_set_width_in_bytes(arity: usize) -> usize {
+    arity.div_ceil(8)
+}
 
-    pub fn from_bytes(types: Vec<DataType>, data: Bytes) -> Self {
+#[allow(dead_code)]
+impl<'a> CompactedRow<'a> {
+    pub fn from_bytes(types: &'a [DataType], data: &'a [u8]) -> Self {
         let arity = types.len();
         let size = data.len();
         Self {
             arity,
             segment: data,
-            offset: 0,
             size_in_bytes: size,
-            decoded: false,
-            decoded_row: GenericRow::new(),
-            reader: CompactedRowReader::new(arity),
+            decoded_row: OnceLock::new(),
             deserializer: CompactedRowDeserializer::new(types),
+            reader: CompactedRowReader::new(arity, data, 0, size),
         }
     }
 
-    pub fn point_to(&mut self, segment: Bytes, offset: usize, size_in_bytes: 
usize) {
-        self.segment = segment;
-        self.offset = offset;
-        self.size_in_bytes = size_in_bytes;
-        self.decoded = false;
-    }
-
-    pub fn get_segment(&self) -> &Bytes {
-        &self.segment
-    }
-
-    pub fn get_offset(&self) -> usize {
-        self.offset
-    }
-
     pub fn get_size_in_bytes(&self) -> usize {
         self.size_in_bytes
     }
 
-    pub fn get_field_count(&self) -> usize {
-        self.arity
+    fn decoded_row(&self) -> &GenericRow<'_> {
+        self.decoded_row
+            .get_or_init(|| self.deserializer.deserialize(&self.reader))
     }
+}
 
-    pub fn is_null_at(&self, pos: usize) -> bool {
-        let byte_index = pos >> 3;
-        let bit = pos & 7;
-        let idx = self.offset + byte_index;
-        (self.segment[idx] & (1u8 << bit)) != 0
+#[allow(dead_code)]
+impl<'a> InternalRow for CompactedRow<'a> {
+    fn get_field_count(&self) -> usize {
+        self.arity
     }
 
-    fn decoded_row(&mut self) -> &GenericRow<'static> {
-        if !self.decoded {
-            self.reader
-                .point_to(self.segment.clone(), self.offset, 
self.size_in_bytes);
-            self.decoded_row = self.deserializer.deserialize(&mut self.reader);
-            self.decoded = true;
-        }
-        &self.decoded_row
+    fn is_null_at(&self, pos: usize) -> bool {
+        self.decoded_row().is_null_at(pos)

Review Comment:
   We shouldn't pay deserialization price for just checking for null bitmap.
   
   The only problem is semantical one, as we use this in deserialize:
   ```rust
   if dtype.is_nullable() && reader.is_null_at(col_pos)
   ```
   



##########
crates/fluss/src/row/compacted/compacted_row.rs:
##########
@@ -24,125 +24,95 @@ use crate::row::{GenericRow, InternalRow};
 // Reference implementation:
 // 
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRow.java
 #[allow(dead_code)]
-pub struct CompactedRow {
+pub struct CompactedRow<'a> {
     arity: usize,
-    segment: Bytes,
-    offset: usize,
+    segment: &'a [u8],

Review Comment:
   Do we still use it?



##########
crates/fluss/src/row/compacted/compacted_row_reader.rs:
##########
@@ -71,151 +92,133 @@ impl CompactedRowDeserializer {
 // Reference implementation:
 // 
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java
 #[allow(dead_code)]
-pub struct CompactedRowReader {
-    segment: Bytes,
+pub struct CompactedRowReader<'a> {
+    segment: &'a [u8],
     offset: usize,
-    position: usize,
     limit: usize,
     header_size_in_bytes: usize,
 }
 
 #[allow(dead_code)]
-impl CompactedRowReader {
-    pub fn new(field_count: usize) -> Self {
-        let header = 
CompactedRow::calculate_bit_set_width_in_bytes(field_count);
-        Self {
-            header_size_in_bytes: header,
-            segment: Bytes::new(),
-            offset: 0,
-            position: 0,
-            limit: 0,
-        }
-    }
-
-    pub fn point_to(&mut self, data: Bytes, offset: usize, length: usize) {
+impl<'a> CompactedRowReader<'a> {
+    pub fn new(field_count: usize, data: &'a [u8], offset: usize, length: 
usize) -> Self {
+        let header_size_in_bytes = 
calculate_bit_set_width_in_bytes(field_count);
         let limit = offset + length;
-        let position = offset + self.header_size_in_bytes;
-
+        let position = offset + header_size_in_bytes;
         debug_assert!(limit <= data.len());
         debug_assert!(position <= limit);
 
-        self.segment = data;
-        self.offset = offset;
-        self.position = position;
-        self.limit = limit;
+        CompactedRowReader {
+            segment: data,
+            offset,
+            limit,
+            header_size_in_bytes,
+        }
     }
 
-    pub fn is_null_at(&self, pos: usize) -> bool {
-        let byte_index = pos >> 3;
-        let bit = pos & 7;
+    fn initial_position(&self) -> usize {
+        self.offset + self.header_size_in_bytes
+    }
+
+    pub fn is_null_at(&self, col_pos: usize) -> bool {
+        let byte_index = col_pos >> 3;
+        let bit = col_pos & 7;
         debug_assert!(byte_index < self.header_size_in_bytes);
         let idx = self.offset + byte_index;
         (self.segment[idx] & (1u8 << bit)) != 0
     }
 
-    pub fn read_boolean(&mut self) -> bool {
-        self.read_byte() != 0
+    pub fn read_boolean(&self, pos: usize) -> (bool, usize) {
+        let (val, next) = self.read_byte(pos);
+        (val != 0, next)
     }
 
-    pub fn read_byte(&mut self) -> u8 {
-        debug_assert!(self.position < self.limit);
-        let b = self.segment[self.position];
-        self.position += 1;
-        b
+    pub fn read_byte(&self, pos: usize) -> (u8, usize) {
+        debug_assert!(pos < self.limit);
+        (self.segment[pos], pos + 1)
     }
 
-    pub fn read_short(&mut self) -> i16 {
-        debug_assert!(self.position + 2 <= self.limit);
-        let bytes_slice = &self.segment[self.position..self.position + 2];
-        let byte_array: [u8; 2] = bytes_slice
-            .try_into()
-            .expect("Slice must be exactly 2 bytes long");
-
-        self.position += 2;
-        i16::from_ne_bytes(byte_array)
+    pub fn read_short(&self, pos: usize) -> (i16, usize) {
+        let next_pos = pos + 2;
+        debug_assert!(next_pos <= self.limit);
+        let bytes_slice = &self.segment[pos..pos + 2];
+        let val = i16::from_ne_bytes(
+            bytes_slice
+                .try_into()
+                .expect("Slice must be exactly 2 bytes long"),
+        );
+        (val, next_pos)
     }
 
-    pub fn read_int(&mut self) -> i32 {
+    pub fn read_int(&self, mut pos: usize) -> (i32, usize) {
         let mut result: u32 = 0;
         let mut shift = 0;
 
         for _ in 0..CompactedRowWriter::MAX_INT_SIZE {
-            let b = self.read_byte();
+            let (b, next_pos) = self.read_byte(pos);
+            pos = next_pos;
             result |= ((b & 0x7F) as u32) << shift;
             if (b & 0x80) == 0 {
-                return result as i32;
+                return (result as i32, pos);
             }
             shift += 7;
         }
-
-        panic!("Invalid input stream.");
+        panic!("Invalid VarInt32 input stream.");
     }
 
-    pub fn read_long(&mut self) -> i64 {
+    pub fn read_long(&self, mut pos: usize) -> (i64, usize) {
         let mut result: u64 = 0;
         let mut shift = 0;
 
         for _ in 0..CompactedRowWriter::MAX_LONG_SIZE {
-            let b = self.read_byte();
+            let (b, next_pos) = self.read_byte(pos);
+            pos = next_pos;
             result |= ((b & 0x7F) as u64) << shift;
             if (b & 0x80) == 0 {
-                return result as i64;
+                return (result as i64, pos);
             }
             shift += 7;
         }
-
-        panic!("Invalid input stream.");
+        panic!("Invalid VarInt64 input stream.");
     }
 
-    pub fn read_float(&mut self) -> f32 {
-        debug_assert!(self.position + 4 <= self.limit);
-        let bytes_slice = &self.segment[self.position..self.position + 4];
-        let byte_array: [u8; 4] = bytes_slice
-            .try_into()
-            .expect("Slice must be exactly 4 bytes long");
-
-        self.position += 4;
-        f32::from_ne_bytes(byte_array)
+    pub fn read_float(&self, pos: usize) -> (f32, usize) {
+        let next_pos = pos + 4;
+        debug_assert!(next_pos <= self.limit);
+        let val = f32::from_ne_bytes(
+            self.segment[pos..pos + 4]
+                .try_into()
+                .expect("Slice must be exactly 4 bytes long"),
+        );
+        (val, next_pos)
     }
 
-    pub fn read_double(&mut self) -> f64 {
-        debug_assert!(self.position + 8 <= self.limit);
-        let bytes_slice = &self.segment[self.position..self.position + 8];
-        let byte_array: [u8; 8] = bytes_slice
-            .try_into()
-            .expect("Slice must be exactly 8 bytes long");
-
-        self.position += 8;
-        f64::from_ne_bytes(byte_array)
+    pub fn read_double(&self, pos: usize) -> (f64, usize) {
+        let next_pos = pos + 8;
+        debug_assert!(next_pos <= self.limit);
+        let val = f64::from_ne_bytes(
+            self.segment[pos..pos + 8]
+                .try_into()
+                .expect("Slice must be exactly 8 bytes long"),
+        );
+        (val, next_pos)
     }
 
-    pub fn read_binary(&mut self, length: usize) -> Bytes {
-        debug_assert!(self.position + length <= self.limit);
-
-        let start = self.position;
-        let end = start + length;
-        self.position = end;
-
-        self.segment.slice(start..end)
+    pub fn read_binary(&self, length: usize) -> (&'a [u8], usize) {
+        self.read_bytes(length)

Review Comment:
   Why do we path length when position is expected?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to