fresh-borzoni commented on code in PR #138:
URL: https://github.com/apache/fluss-rust/pull/138#discussion_r2679482817


##########
crates/fluss/src/row/compacted/compacted_row_reader.rs:
##########
@@ -71,151 +92,133 @@ impl CompactedRowDeserializer {
 // Reference implementation:
 // 
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java
 #[allow(dead_code)]
-pub struct CompactedRowReader {
-    segment: Bytes,
+pub struct CompactedRowReader<'a> {
+    segment: &'a [u8],
     offset: usize,
-    position: usize,
     limit: usize,
     header_size_in_bytes: usize,
 }
 
 #[allow(dead_code)]
-impl CompactedRowReader {
-    pub fn new(field_count: usize) -> Self {
-        let header = 
CompactedRow::calculate_bit_set_width_in_bytes(field_count);
-        Self {
-            header_size_in_bytes: header,
-            segment: Bytes::new(),
-            offset: 0,
-            position: 0,
-            limit: 0,
-        }
-    }
-
-    pub fn point_to(&mut self, data: Bytes, offset: usize, length: usize) {
+impl<'a> CompactedRowReader<'a> {
+    pub fn new(field_count: usize, data: &'a [u8], offset: usize, length: 
usize) -> Self {
+        let header_size_in_bytes = 
calculate_bit_set_width_in_bytes(field_count);
         let limit = offset + length;
-        let position = offset + self.header_size_in_bytes;
-
+        let position = offset + header_size_in_bytes;
         debug_assert!(limit <= data.len());
         debug_assert!(position <= limit);
 
-        self.segment = data;
-        self.offset = offset;
-        self.position = position;
-        self.limit = limit;
+        CompactedRowReader {
+            segment: data,
+            offset,
+            limit,
+            header_size_in_bytes,
+        }
     }
 
-    pub fn is_null_at(&self, pos: usize) -> bool {
-        let byte_index = pos >> 3;
-        let bit = pos & 7;
+    fn initial_position(&self) -> usize {
+        self.offset + self.header_size_in_bytes
+    }
+
+    pub fn is_null_at(&self, col_pos: usize) -> bool {
+        let byte_index = col_pos >> 3;
+        let bit = col_pos & 7;
         debug_assert!(byte_index < self.header_size_in_bytes);
         let idx = self.offset + byte_index;
         (self.segment[idx] & (1u8 << bit)) != 0
     }
 
-    pub fn read_boolean(&mut self) -> bool {
-        self.read_byte() != 0
+    pub fn read_boolean(&self, pos: usize) -> (bool, usize) {
+        let (val, next) = self.read_byte(pos);
+        (val != 0, next)
     }
 
-    pub fn read_byte(&mut self) -> u8 {
-        debug_assert!(self.position < self.limit);
-        let b = self.segment[self.position];
-        self.position += 1;
-        b
+    pub fn read_byte(&self, pos: usize) -> (u8, usize) {
+        debug_assert!(pos < self.limit);
+        (self.segment[pos], pos + 1)
     }
 
-    pub fn read_short(&mut self) -> i16 {
-        debug_assert!(self.position + 2 <= self.limit);
-        let bytes_slice = &self.segment[self.position..self.position + 2];
-        let byte_array: [u8; 2] = bytes_slice
-            .try_into()
-            .expect("Slice must be exactly 2 bytes long");
-
-        self.position += 2;
-        i16::from_ne_bytes(byte_array)
+    pub fn read_short(&self, pos: usize) -> (i16, usize) {
+        let next_pos = pos + 2;
+        debug_assert!(next_pos <= self.limit);
+        let bytes_slice = &self.segment[pos..pos + 2];
+        let val = i16::from_ne_bytes(
+            bytes_slice
+                .try_into()
+                .expect("Slice must be exactly 2 bytes long"),
+        );
+        (val, next_pos)
     }
 
-    pub fn read_int(&mut self) -> i32 {
+    pub fn read_int(&self, mut pos: usize) -> (i32, usize) {
         let mut result: u32 = 0;
         let mut shift = 0;
 
         for _ in 0..CompactedRowWriter::MAX_INT_SIZE {
-            let b = self.read_byte();
+            let (b, next_pos) = self.read_byte(pos);
+            pos = next_pos;
             result |= ((b & 0x7F) as u32) << shift;
             if (b & 0x80) == 0 {
-                return result as i32;
+                return (result as i32, pos);
             }
             shift += 7;
         }
-
-        panic!("Invalid input stream.");
+        panic!("Invalid VarInt32 input stream.");
     }
 
-    pub fn read_long(&mut self) -> i64 {
+    pub fn read_long(&self, mut pos: usize) -> (i64, usize) {
         let mut result: u64 = 0;
         let mut shift = 0;
 
         for _ in 0..CompactedRowWriter::MAX_LONG_SIZE {
-            let b = self.read_byte();
+            let (b, next_pos) = self.read_byte(pos);
+            pos = next_pos;
             result |= ((b & 0x7F) as u64) << shift;
             if (b & 0x80) == 0 {
-                return result as i64;
+                return (result as i64, pos);
             }
             shift += 7;
         }
-
-        panic!("Invalid input stream.");
+        panic!("Invalid VarInt64 input stream.");
     }
 
-    pub fn read_float(&mut self) -> f32 {
-        debug_assert!(self.position + 4 <= self.limit);
-        let bytes_slice = &self.segment[self.position..self.position + 4];
-        let byte_array: [u8; 4] = bytes_slice
-            .try_into()
-            .expect("Slice must be exactly 4 bytes long");
-
-        self.position += 4;
-        f32::from_ne_bytes(byte_array)
+    pub fn read_float(&self, pos: usize) -> (f32, usize) {
+        let next_pos = pos + 4;
+        debug_assert!(next_pos <= self.limit);
+        let val = f32::from_ne_bytes(
+            self.segment[pos..pos + 4]
+                .try_into()
+                .expect("Slice must be exactly 4 bytes long"),
+        );
+        (val, next_pos)
     }
 
-    pub fn read_double(&mut self) -> f64 {
-        debug_assert!(self.position + 8 <= self.limit);
-        let bytes_slice = &self.segment[self.position..self.position + 8];
-        let byte_array: [u8; 8] = bytes_slice
-            .try_into()
-            .expect("Slice must be exactly 8 bytes long");
-
-        self.position += 8;
-        f64::from_ne_bytes(byte_array)
+    pub fn read_double(&self, pos: usize) -> (f64, usize) {
+        let next_pos = pos + 8;
+        debug_assert!(next_pos <= self.limit);
+        let val = f64::from_ne_bytes(
+            self.segment[pos..pos + 8]
+                .try_into()
+                .expect("Slice must be exactly 8 bytes long"),
+        );
+        (val, next_pos)
     }
 
-    pub fn read_binary(&mut self, length: usize) -> Bytes {
-        debug_assert!(self.position + length <= self.limit);
-
-        let start = self.position;
-        let end = start + length;
-        self.position = end;
-
-        self.segment.slice(start..end)
+    pub fn read_binary(&self, length: usize) -> (&'a [u8], usize) {
+        self.read_bytes(length)

Review Comment:
   Why do we pass length when position is expected?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to