Copilot commented on code in PR #138:
URL: https://github.com/apache/fluss-rust/pull/138#discussion_r2678773995


##########
crates/fluss/src/row/compacted/compacted_row_reader.rs:
##########
@@ -164,55 +170,58 @@ impl CompactedRowReader {
         panic!("Invalid input stream.");
     }
 
-    pub fn read_float(&mut self) -> f32 {
-        debug_assert!(self.position + 4 <= self.limit);
-        let bytes_slice = &self.segment[self.position..self.position + 4];
+    pub fn read_float(&self) -> f32 {
+        let pos = self.position.get();
+        debug_assert!(pos + 4 <= self.limit);
+        let bytes_slice = &self.segment[pos..pos + 4];
         let byte_array: [u8; 4] = bytes_slice
             .try_into()
             .expect("Slice must be exactly 4 bytes long");
 
-        self.position += 4;
+        self.position.set(pos + 4);
         f32::from_ne_bytes(byte_array)
     }
 
-    pub fn read_double(&mut self) -> f64 {
-        debug_assert!(self.position + 8 <= self.limit);
-        let bytes_slice = &self.segment[self.position..self.position + 8];
+    pub fn read_double(&self) -> f64 {
+        let pos = self.position.get();
+        debug_assert!(pos + 8 <= self.limit);
+        let bytes_slice = &self.segment[pos..pos + 8];
         let byte_array: [u8; 8] = bytes_slice
             .try_into()
             .expect("Slice must be exactly 8 bytes long");
 
-        self.position += 8;
+        self.position.set(pos + 8);
         f64::from_ne_bytes(byte_array)
     }
 
-    pub fn read_binary(&mut self, length: usize) -> Bytes {
-        debug_assert!(self.position + length <= self.limit);
+    pub fn read_binary(&self, length: usize) -> Bytes {
+        let pos = self.position.get();
+        debug_assert!(pos + length <= self.limit);
 
-        let start = self.position;
+        let start = pos;
         let end = start + length;
-        self.position = end;
+        self.position.set(end);
 
         self.segment.slice(start..end)
     }
 
-    pub fn read_bytes(&mut self) -> Box<[u8]> {
+    pub fn read_bytes(&self) -> &[u8] {
+        let pos = self.position.get();
         let len = self.read_int();
         debug_assert!(len >= 0);
 
         let len = len as usize;
-        debug_assert!(self.position + len <= self.limit);
+        debug_assert!(pos + len <= self.limit);
 
-        let start = self.position;
+        let start = pos;
         let end = start + len;
-        self.position = end;
+        self.position.set(end);
 
-        self.segment[start..end].to_vec().into_boxed_slice()
+        self.segment[start..end].to_byte_slice()
     }
 
-    pub fn read_string(&mut self) -> String {
+    pub fn read_string(&self) -> &str {
         let bytes = self.read_bytes();
-        String::from_utf8(bytes.into_vec())
-            .unwrap_or_else(|e| panic!("Invalid UTF-8 in string data: {e}"))
+        from_utf8(bytes).unwrap()

Review Comment:
   The from_utf8() call uses unwrap() which will panic on invalid UTF-8 data. 
This can crash the application if the compacted row data is corrupted or 
malformed. Consider using from_utf8() with proper error handling instead, or at 
least provide a more descriptive panic message with context about which field 
failed.
   ```suggestion
           from_utf8(bytes).expect("Invalid UTF-8 when reading string from 
compacted row")
   ```



##########
crates/fluss/src/row/compacted/compacted_row.rs:
##########
@@ -100,11 +101,29 @@ impl CompactedRow {
         (self.segment[idx] & (1u8 << bit)) != 0
     }
 
-    fn decoded_row(&mut self) -> &GenericRow<'static> {
+    fn decoded_row(&mut self) -> &GenericRow<'a> {
         if !self.decoded {
+            let deserializer = 
CompactedRowDeserializer::new(self.data_types.clone());
             self.reader
                 .point_to(self.segment.clone(), self.offset, 
self.size_in_bytes);
-            self.decoded_row = self.deserializer.deserialize(&mut self.reader);
+
+            // Safety:
+            // We use transmute to extend the lifetime of the borrow of 
self.reader to 'a.
+            // This is safe because:
+            // 1. self.reader internally holds the data via `Bytes`, which is 
heap-allocated.
+            //    The heap address remains stable throughout the lifetime 'a.
+            // 2. The deserialized `GenericRow` is stored in 
`self.decoded_row`, which shares
+            //    the same lifetime as `self`.
+            // 3. As long as the `CompactedRow` instance is not dropped, the 
heap references
+            //    produced by the reader remain valid.
+            // 4. While Rust normally forbids self-referential borrows due to 
move-safety concerns,
+            //    this specific implementation is move-safe because the 
pointers within `GenericRow`
+            //    reference the stable heap memory of the `Bytes` segment, not 
the stack address
+            //    of the fields themselves.
+            let long_lived_reader: &'a CompactedRowReader<'a> =
+                unsafe { mem::transmute(&self.reader) };
+            self.decoded_row = deserializer.deserialize(long_lived_reader);

Review Comment:
   The unsafe transmute to extend the lifetime is fundamentally unsound. The 
code creates a self-referential structure where decoded_row contains references 
to data in self.reader.segment. If CompactedRow is moved (e.g., returned from a 
function, stored in a Vec, or passed by value), the self.reader field moves to 
a new memory location, but decoded_row still holds references that point to the 
old location of the Bytes within reader. While Bytes itself is heap-allocated 
and stable, the references in decoded_row are tied to the lifetime 'a which is 
being artificially extended beyond the actual borrow. This violates Rust's 
safety guarantees. Consider using a proper self-referential struct library like 
ouroboros or rental, or restructuring to avoid self-referential borrowing.



##########
bindings/python/src/table.rs:
##########
@@ -266,8 +267,9 @@ impl AppendWriter {
         }
 
         if let Ok(bytes_val) = value.extract::<Vec<u8>>(py) {
-            let blob = Blob::from(bytes_val);
-            return Ok(Datum::Blob(blob));
+            // todo: don't use leak
+            let leaked_bytes: &'static [u8] = 
Box::leak(bytes_val.into_boxed_slice());
+            return Ok(Blob(leaked_bytes));

Review Comment:
   Using Box::leak here intentionally leaks memory on every call to convert 
Python bytes values. This memory will never be reclaimed, leading to unbounded 
memory growth. The comment acknowledges this is a problem ("todo: don't use 
leak"), but the implementation still has this critical memory leak. Consider 
using an arena allocator or restructuring the API to accept owned data.
   ```suggestion
               // Convert Python bytes to an owned buffer for Datum::Blob 
without leaking memory
               return Ok(Blob(bytes_val));
   ```



##########
crates/fluss/src/row/binary/binary_writer.rs:
##########
@@ -172,14 +172,8 @@ impl InnerValueWriter {
             (InnerValueWriter::Binary, Datum::Blob(v)) => {
                 writer.write_binary(v.as_ref(), v.len());

Review Comment:
   Since Datum::Blob now contains &[u8] directly, calling as_ref() on it is 
unnecessary. The value v is already a &[u8], so you can pass *v directly to 
write_binary.
   ```suggestion
                   writer.write_binary(*v, v.len());
   ```



##########
crates/fluss/src/row/compacted/compacted_row_reader.rs:
##########
@@ -164,55 +170,58 @@ impl CompactedRowReader {
         panic!("Invalid input stream.");
     }
 
-    pub fn read_float(&mut self) -> f32 {
-        debug_assert!(self.position + 4 <= self.limit);
-        let bytes_slice = &self.segment[self.position..self.position + 4];
+    pub fn read_float(&self) -> f32 {
+        let pos = self.position.get();
+        debug_assert!(pos + 4 <= self.limit);
+        let bytes_slice = &self.segment[pos..pos + 4];
         let byte_array: [u8; 4] = bytes_slice
             .try_into()
             .expect("Slice must be exactly 4 bytes long");
 
-        self.position += 4;
+        self.position.set(pos + 4);
         f32::from_ne_bytes(byte_array)
     }
 
-    pub fn read_double(&mut self) -> f64 {
-        debug_assert!(self.position + 8 <= self.limit);
-        let bytes_slice = &self.segment[self.position..self.position + 8];
+    pub fn read_double(&self) -> f64 {
+        let pos = self.position.get();
+        debug_assert!(pos + 8 <= self.limit);
+        let bytes_slice = &self.segment[pos..pos + 8];
         let byte_array: [u8; 8] = bytes_slice
             .try_into()
             .expect("Slice must be exactly 8 bytes long");
 
-        self.position += 8;
+        self.position.set(pos + 8);
         f64::from_ne_bytes(byte_array)
     }
 
-    pub fn read_binary(&mut self, length: usize) -> Bytes {
-        debug_assert!(self.position + length <= self.limit);
+    pub fn read_binary(&self, length: usize) -> Bytes {
+        let pos = self.position.get();
+        debug_assert!(pos + length <= self.limit);
 
-        let start = self.position;
+        let start = pos;
         let end = start + length;
-        self.position = end;
+        self.position.set(end);
 
         self.segment.slice(start..end)
     }
 
-    pub fn read_bytes(&mut self) -> Box<[u8]> {
+    pub fn read_bytes(&self) -> &[u8] {
+        let pos = self.position.get();
         let len = self.read_int();
         debug_assert!(len >= 0);
 
         let len = len as usize;
-        debug_assert!(self.position + len <= self.limit);
+        debug_assert!(pos + len <= self.limit);
 
-        let start = self.position;
+        let start = pos;
         let end = start + len;
-        self.position = end;
+        self.position.set(end);
 
-        self.segment[start..end].to_vec().into_boxed_slice()
+        self.segment[start..end].to_byte_slice()
     }

Review Comment:
   The position is captured before calling read_int(), but read_int() advances 
the position through multiple read_byte() calls. This means the captured pos 
variable is stale and will cause incorrect byte range calculation. The position 
should be captured after read_int() completes. Move line 209 to after line 210.



##########
crates/fluss/src/row/compacted/compacted_row_reader.rs:
##########
@@ -52,11 +56,9 @@ impl CompactedRowDeserializer {
                 DataType::Float(_) => 
Datum::Float32(reader.read_float().into()),
                 DataType::Double(_) => 
Datum::Float64(reader.read_double().into()),
                 // TODO: use read_char(length) in the future, but need to keep 
compatibility
-                DataType::Char(_) | DataType::String(_) => 
Datum::OwnedString(reader.read_string()),
-                // TODO: use read_binary(length) in the future, but need to 
keep compatibility
-                DataType::Bytes(_) | DataType::Binary(_) => {
-                    Datum::Blob(reader.read_bytes().into_vec().into())
-                }
+                DataType::Char(_) | DataType::String(_) => 
Datum::String(reader.read_string()),
+                // // TODO: use read_binary(length) in the future, but need to 
keep compatibility

Review Comment:
   This comment contains an extra forward slash, making it look like a regular 
comment continuation rather than a doc comment. This appears to be a typo - it 
should be a single forward slash to maintain consistency with standard Rust 
comment style.
   ```suggestion
                   // TODO: use read_binary(length) in the future, but need to 
keep compatibility
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to