leekeiabstraction commented on code in PR #138:
URL: https://github.com/apache/fluss-rust/pull/138#discussion_r2679321030


##########
crates/fluss/src/row/compacted/compacted_row_reader.rs:
##########
@@ -15,54 +15,75 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use bytes::Bytes;
-use std::borrow::Cow;
-
+use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes;
 use crate::{
     metadata::DataType,
-    row::{
-        Datum, GenericRow,
-        compacted::{compacted_row::CompactedRow, 
compacted_row_writer::CompactedRowWriter},
-    },
+    row::{Datum, GenericRow, 
compacted::compacted_row_writer::CompactedRowWriter},
 };
+use std::str::from_utf8;
 
 #[allow(dead_code)]
-pub struct CompactedRowDeserializer {
-    schema: Vec<DataType>,
+pub struct CompactedRowDeserializer<'a> {
+    schema: &'a [DataType],
 }
 
 #[allow(dead_code)]
-impl CompactedRowDeserializer {
-    pub fn new(schema: Vec<DataType>) -> Self {
+impl<'a> CompactedRowDeserializer<'a> {
+    pub fn new(schema: &'a [DataType]) -> Self {
         Self { schema }
     }
 
-    pub fn deserialize(&self, reader: &mut CompactedRowReader) -> 
GenericRow<'static> {
+    pub fn deserialize(&self, reader: &CompactedRowReader<'a>) -> 
GenericRow<'a> {
         let mut row = GenericRow::new();
-        for (pos, dtype) in self.schema.iter().enumerate() {
-            if reader.is_null_at(pos) {
-                row.set_field(pos, Datum::Null);
+        let mut cursor = reader.initial_position();
+        for (col_pos, dtype) in self.schema.iter().enumerate() {
+            if reader.is_null_at(col_pos) {

Review Comment:
   Let's address or add todo for returning error if data type is non nullable



##########
crates/fluss/src/row/compacted/compacted_row_reader.rs:
##########
@@ -71,151 +92,130 @@ impl CompactedRowDeserializer {
 // Reference implementation:
 // 
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java
 #[allow(dead_code)]
-pub struct CompactedRowReader {
-    segment: Bytes,
+pub struct CompactedRowReader<'a> {
+    segment: &'a [u8],
     offset: usize,
-    position: usize,
     limit: usize,
     header_size_in_bytes: usize,
 }
 
 #[allow(dead_code)]
-impl CompactedRowReader {
-    pub fn new(field_count: usize) -> Self {
-        let header = 
CompactedRow::calculate_bit_set_width_in_bytes(field_count);
-        Self {
-            header_size_in_bytes: header,
-            segment: Bytes::new(),
-            offset: 0,
-            position: 0,
-            limit: 0,
-        }
-    }
-
-    pub fn point_to(&mut self, data: Bytes, offset: usize, length: usize) {
+impl<'a> CompactedRowReader<'a> {
+    pub fn new(field_count: usize, data: &'a [u8], offset: usize, length: 
usize) -> Self {
+        let header_size_in_bytes = 
calculate_bit_set_width_in_bytes(field_count);
         let limit = offset + length;
-        let position = offset + self.header_size_in_bytes;
-
+        let position = offset + header_size_in_bytes;
         debug_assert!(limit <= data.len());
         debug_assert!(position <= limit);
 
-        self.segment = data;
-        self.offset = offset;
-        self.position = position;
-        self.limit = limit;
+        CompactedRowReader {
+            segment: data,
+            offset,
+            limit,
+            header_size_in_bytes,
+        }
     }
 
-    pub fn is_null_at(&self, pos: usize) -> bool {
-        let byte_index = pos >> 3;
-        let bit = pos & 7;
+    fn initial_position(&self) -> usize {
+        self.offset + self.header_size_in_bytes
+    }
+
+    pub fn is_null_at(&self, col_pos: usize) -> bool {
+        let byte_index = col_pos >> 3;
+        let bit = col_pos & 7;
         debug_assert!(byte_index < self.header_size_in_bytes);
         let idx = self.offset + byte_index;
         (self.segment[idx] & (1u8 << bit)) != 0
     }
 
-    pub fn read_boolean(&mut self) -> bool {
-        self.read_byte() != 0
+    pub fn read_boolean(&self, pos: usize) -> (bool, usize) {
+        let (val, next) = self.read_byte(pos);
+        (val != 0, next)
     }
 
-    pub fn read_byte(&mut self) -> u8 {
-        debug_assert!(self.position < self.limit);
-        let b = self.segment[self.position];
-        self.position += 1;
-        b
+    pub fn read_byte(&self, pos: usize) -> (u8, usize) {
+        debug_assert!(pos < self.limit);
+        (self.segment[pos], pos + 1)
     }
 
-    pub fn read_short(&mut self) -> i16 {
-        debug_assert!(self.position + 2 <= self.limit);
-        let bytes_slice = &self.segment[self.position..self.position + 2];
-        let byte_array: [u8; 2] = bytes_slice
-            .try_into()
-            .expect("Slice must be exactly 2 bytes long");
-
-        self.position += 2;
-        i16::from_ne_bytes(byte_array)
+    pub fn read_short(&self, pos: usize) -> (i16, usize) {
+        debug_assert!(pos + 2 <= self.limit);

Review Comment:
   We can calculate pos + x once and assign to variable. Similarly on other 
methods 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to