This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch not-copy-bytes
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git

commit d5b00855abd675fc5f2c4a15392231e8131a79bc
Author: luoyuxia <[email protected]>
AuthorDate: Sat Jan 10 23:50:55 2026 +0800

    chore: implement zero copy in CompactedRowReader
---
 bindings/cpp/src/types.rs                          |  3 +-
 crates/fluss/src/row/binary/binary_writer.rs       | 10 +--
 crates/fluss/src/row/compacted/compacted_row.rs    | 37 ++++++---
 .../src/row/compacted/compacted_row_reader.rs      | 91 ++++++++++++----------
 crates/fluss/src/row/datum.rs                      | 91 +++-------------------
 5 files changed, 92 insertions(+), 140 deletions(-)

diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs
index d95da14..24193d0 100644
--- a/bindings/cpp/src/types.rs
+++ b/bindings/cpp/src/types.rs
@@ -219,8 +219,7 @@ pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) -> 
fcore::row::GenericRow<'_> {
             DATUM_TYPE_FLOAT32 => Datum::Float32(field.f32_val.into()),
             DATUM_TYPE_FLOAT64 => Datum::Float64(field.f64_val.into()),
             DATUM_TYPE_STRING => Datum::String(field.string_val.as_str()),
-            // todo: avoid copy bytes for blob
-            DATUM_TYPE_BYTES => Datum::Blob(field.bytes_val.clone().into()),
+            DATUM_TYPE_BYTES => Datum::Blob(field.bytes_val.as_slice()),
             _ => Datum::Null,
         };
         generic_row.set_field(idx, datum);
diff --git a/crates/fluss/src/row/binary/binary_writer.rs 
b/crates/fluss/src/row/binary/binary_writer.rs
index 44f10b6..c819f3f 100644
--- a/crates/fluss/src/row/binary/binary_writer.rs
+++ b/crates/fluss/src/row/binary/binary_writer.rs
@@ -170,16 +170,10 @@ impl InnerValueWriter {
                 writer.write_boolean(*v);
             }
             (InnerValueWriter::Binary, Datum::Blob(v)) => {
-                writer.write_binary(v.as_ref(), v.len());
-            }
-            (InnerValueWriter::Binary, Datum::BorrowedBlob(v)) => {
-                writer.write_binary(v.as_ref(), v.len());
+                writer.write_binary(v, v.len());
             }
             (InnerValueWriter::Bytes, Datum::Blob(v)) => {
-                writer.write_bytes(v.as_ref());
-            }
-            (InnerValueWriter::Bytes, Datum::BorrowedBlob(v)) => {
-                writer.write_bytes(v.as_ref());
+                writer.write_bytes(v);
             }
             (InnerValueWriter::TinyInt, Datum::Int8(v)) => {
                 writer.write_byte(*v as u8);
diff --git a/crates/fluss/src/row/compacted/compacted_row.rs 
b/crates/fluss/src/row/compacted/compacted_row.rs
index fca41c6..ba14862 100644
--- a/crates/fluss/src/row/compacted/compacted_row.rs
+++ b/crates/fluss/src/row/compacted/compacted_row.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use bytes::Bytes;
+use std::mem;
 
 use crate::metadata::DataType;
 use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer, 
CompactedRowReader};
@@ -24,19 +25,19 @@ use crate::row::{GenericRow, InternalRow};
 // Reference implementation:
 // 
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRow.java
 #[allow(dead_code)]
-pub struct CompactedRow {
+pub struct CompactedRow<'a> {
     arity: usize,
     segment: Bytes,
     offset: usize,
     size_in_bytes: usize,
     decoded: bool,
-    decoded_row: GenericRow<'static>,
-    reader: CompactedRowReader,
-    deserializer: CompactedRowDeserializer,
+    decoded_row: GenericRow<'a>,
+    reader: CompactedRowReader<'a>,
+    data_types: Vec<DataType>,
 }
 
 #[allow(dead_code)]
-impl CompactedRow {
+impl<'a> CompactedRow<'a> {
     pub fn calculate_bit_set_width_in_bytes(arity: usize) -> usize {
         arity.div_ceil(8)
     }
@@ -51,7 +52,7 @@ impl CompactedRow {
             decoded: false,
             decoded_row: GenericRow::new(),
             reader: CompactedRowReader::new(arity),
-            deserializer: CompactedRowDeserializer::new(types),
+            data_types: types,
         }
     }
 
@@ -66,7 +67,7 @@ impl CompactedRow {
             decoded: false,
             decoded_row: GenericRow::new(),
             reader: CompactedRowReader::new(arity),
-            deserializer: CompactedRowDeserializer::new(types),
+            data_types: types,
         }
     }
 
@@ -100,11 +101,29 @@ impl CompactedRow {
         (self.segment[idx] & (1u8 << bit)) != 0
     }
 
-    fn decoded_row(&mut self) -> &GenericRow<'static> {
+    fn decoded_row(&mut self) -> &GenericRow<'a> {
         if !self.decoded {
+            let deserializer = 
CompactedRowDeserializer::new(self.data_types.clone());
             self.reader
                 .point_to(self.segment.clone(), self.offset, 
self.size_in_bytes);
-            self.decoded_row = self.deserializer.deserialize(&mut self.reader);
+
+            // Safety:
+            // We use transmute to extend the lifetime of the borrow of 
self.reader to 'a.
+            // This is safe because:
+            // 1. self.reader internally holds the data via `Bytes`, which is 
heap-allocated.
+            //    The heap address remains stable throughout the lifetime 'a.
+            // 2. The deserialized `GenericRow` is stored in 
`self.decoded_row`, which shares
+            //    the same lifetime as `self`.
+            // 3. As long as the `CompactedRow` instance is not dropped, the 
heap references
+            //    produced by the reader remain valid.
+            // 4. While Rust normally forbids self-referential borrows due to 
move-safety concerns,
+            //    this specific implementation is move-safe because the 
pointers within `GenericRow`
+            //    reference the stable heap memory of the `Bytes` segment, not 
the stack address
+            //    of the fields themselves.
+            let long_lived_reader: &'a CompactedRowReader<'a> =
+                unsafe { mem::transmute(&self.reader) };
+            self.decoded_row = deserializer.deserialize(long_lived_reader);
+
             self.decoded = true;
         }
         &self.decoded_row
diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs 
b/crates/fluss/src/row/compacted/compacted_row_reader.rs
index 19afe88..70300cb 100644
--- a/crates/fluss/src/row/compacted/compacted_row_reader.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs
@@ -15,7 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use arrow::datatypes::ToByteSlice;
 use bytes::Bytes;
+use std::cell::Cell;
+use std::marker::PhantomData;
+use std::str::from_utf8;
 
 use crate::{
     metadata::DataType,
@@ -36,7 +40,7 @@ impl CompactedRowDeserializer {
         Self { schema }
     }
 
-    pub fn deserialize(&self, reader: &mut CompactedRowReader) -> 
GenericRow<'static> {
+    pub fn deserialize<'a>(&self, reader: &'a CompactedRowReader) -> 
GenericRow<'a> {
         let mut row = GenericRow::new();
         for (pos, dtype) in self.schema.iter().enumerate() {
             if reader.is_null_at(pos) {
@@ -52,11 +56,9 @@ impl CompactedRowDeserializer {
                 DataType::Float(_) => 
Datum::Float32(reader.read_float().into()),
                 DataType::Double(_) => 
Datum::Float64(reader.read_double().into()),
                 // TODO: use read_char(length) in the future, but need to keep 
compatibility
-                DataType::Char(_) | DataType::String(_) => 
Datum::OwnedString(reader.read_string()),
+                DataType::Char(_) | DataType::String(_) => 
Datum::String(reader.read_string()),
                 // TODO: use read_binary(length) in the future, but need to 
keep compatibility
-                DataType::Bytes(_) | DataType::Binary(_) => {
-                    Datum::Blob(reader.read_bytes().into_vec().into())
-                }
+                DataType::Bytes(_) | DataType::Binary(_) => 
Datum::Blob(reader.read_bytes()),
                 _ => panic!("unsupported DataType in 
CompactedRowDeserializer"),
             };
             row.set_field(pos, datum);
@@ -68,24 +70,26 @@ impl CompactedRowDeserializer {
 // Reference implementation:
 // 
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java
 #[allow(dead_code)]
-pub struct CompactedRowReader {
+pub struct CompactedRowReader<'a> {
     segment: Bytes,
     offset: usize,
-    position: usize,
+    position: Cell<usize>,
     limit: usize,
     header_size_in_bytes: usize,
+    phantom_data: PhantomData<&'a ()>,
 }
 
 #[allow(dead_code)]
-impl CompactedRowReader {
+impl<'a> CompactedRowReader<'a> {
     pub fn new(field_count: usize) -> Self {
         let header = 
CompactedRow::calculate_bit_set_width_in_bytes(field_count);
         Self {
             header_size_in_bytes: header,
             segment: Bytes::new(),
             offset: 0,
-            position: 0,
+            position: Cell::new(0),
             limit: 0,
+            phantom_data: PhantomData,
         }
     }
 
@@ -98,7 +102,7 @@ impl CompactedRowReader {
 
         self.segment = data;
         self.offset = offset;
-        self.position = position;
+        self.position = Cell::new(position);
         self.limit = limit;
     }
 
@@ -110,29 +114,31 @@ impl CompactedRowReader {
         (self.segment[idx] & (1u8 << bit)) != 0
     }
 
-    pub fn read_boolean(&mut self) -> bool {
+    pub fn read_boolean(&self) -> bool {
         self.read_byte() != 0
     }
 
-    pub fn read_byte(&mut self) -> u8 {
-        debug_assert!(self.position < self.limit);
-        let b = self.segment[self.position];
-        self.position += 1;
+    pub fn read_byte(&self) -> u8 {
+        let pos = self.position.get();
+        debug_assert!(pos < self.limit);
+        let b = self.segment[pos];
+        self.position.set(pos + 1);
         b
     }
 
-    pub fn read_short(&mut self) -> i16 {
-        debug_assert!(self.position + 2 <= self.limit);
-        let bytes_slice = &self.segment[self.position..self.position + 2];
+    pub fn read_short(&self) -> i16 {
+        let pos = self.position.get();
+        debug_assert!(pos + 2 <= self.limit);
+        let bytes_slice = &self.segment[pos..pos + 2];
         let byte_array: [u8; 2] = bytes_slice
             .try_into()
             .expect("Slice must be exactly 2 bytes long");
 
-        self.position += 2;
+        self.position.set(pos + 2);
         i16::from_ne_bytes(byte_array)
     }
 
-    pub fn read_int(&mut self) -> i32 {
+    pub fn read_int(&self) -> i32 {
         let mut result: u32 = 0;
         let mut shift = 0;
 
@@ -148,7 +154,7 @@ impl CompactedRowReader {
         panic!("Invalid input stream.");
     }
 
-    pub fn read_long(&mut self) -> i64 {
+    pub fn read_long(&self) -> i64 {
         let mut result: u64 = 0;
         let mut shift = 0;
 
@@ -164,55 +170,58 @@ impl CompactedRowReader {
         panic!("Invalid input stream.");
     }
 
-    pub fn read_float(&mut self) -> f32 {
-        debug_assert!(self.position + 4 <= self.limit);
-        let bytes_slice = &self.segment[self.position..self.position + 4];
+    pub fn read_float(&self) -> f32 {
+        let pos = self.position.get();
+        debug_assert!(pos + 4 <= self.limit);
+        let bytes_slice = &self.segment[pos..pos + 4];
         let byte_array: [u8; 4] = bytes_slice
             .try_into()
             .expect("Slice must be exactly 4 bytes long");
 
-        self.position += 4;
+        self.position.set(pos + 4);
         f32::from_ne_bytes(byte_array)
     }
 
-    pub fn read_double(&mut self) -> f64 {
-        debug_assert!(self.position + 8 <= self.limit);
-        let bytes_slice = &self.segment[self.position..self.position + 8];
+    pub fn read_double(&self) -> f64 {
+        let pos = self.position.get();
+        debug_assert!(pos + 8 <= self.limit);
+        let bytes_slice = &self.segment[pos..pos + 8];
         let byte_array: [u8; 8] = bytes_slice
             .try_into()
             .expect("Slice must be exactly 8 bytes long");
 
-        self.position += 8;
+        self.position.set(pos + 8);
         f64::from_ne_bytes(byte_array)
     }
 
-    pub fn read_binary(&mut self, length: usize) -> Bytes {
-        debug_assert!(self.position + length <= self.limit);
+    pub fn read_binary(&self, length: usize) -> Bytes {
+        let pos = self.position.get();
+        debug_assert!(pos + length <= self.limit);
 
-        let start = self.position;
+        let start = pos;
         let end = start + length;
-        self.position = end;
+        self.position.set(end);
 
         self.segment.slice(start..end)
     }
 
-    pub fn read_bytes(&mut self) -> Box<[u8]> {
+    pub fn read_bytes(&self) -> &[u8] {
         let len = self.read_int();
         debug_assert!(len >= 0);
+        let pos = self.position.get();
 
         let len = len as usize;
-        debug_assert!(self.position + len <= self.limit);
+        debug_assert!(pos + len <= self.limit);
 
-        let start = self.position;
+        let start = pos;
         let end = start + len;
-        self.position = end;
+        self.position.set(end);
 
-        self.segment[start..end].to_vec().into_boxed_slice()
+        self.segment[start..end].to_byte_slice()
     }
 
-    pub fn read_string(&mut self) -> String {
+    pub fn read_string(&self) -> &str {
         let bytes = self.read_bytes();
-        String::from_utf8(bytes.into_vec())
-            .unwrap_or_else(|e| panic!("Invalid UTF-8 in string data: {e}"))
+        from_utf8(bytes).expect("Invalid UTF-8 when reading string from 
compacted row")
     }
 }
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index 78872a9..6540618 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -24,11 +24,8 @@ use arrow::array::{
 use jiff::ToSpan;
 use ordered_float::OrderedFloat;
 use parse_display::Display;
-use ref_cast::RefCast;
 use rust_decimal::Decimal;
-use serde::{Deserialize, Serialize};
-use std::fmt;
-use std::ops::Deref;
+use serde::Serialize;
 
 #[allow(dead_code)]
 const THIRTY_YEARS_MICROSECONDS: i64 = 946_684_800_000_000;
@@ -53,13 +50,8 @@ pub enum Datum<'a> {
     Float64(F64),
     #[display("'{0}'")]
     String(&'a str),
-    /// Owned string
-    #[display("'{0}'")]
-    OwnedString(String),
-    #[display("{0}")]
-    Blob(Blob),
     #[display("{:?}")]
-    BorrowedBlob(&'a [u8]),
+    Blob(&'a [u8]),
     #[display("{0}")]
     Decimal(Decimal),
     #[display("{0}")]
@@ -78,15 +70,13 @@ impl Datum<'_> {
     pub fn as_str(&self) -> &str {
         match self {
             Self::String(s) => s,
-            Self::OwnedString(s) => s.as_str(),
             _ => panic!("not a string: {self:?}"),
         }
     }
 
     pub fn as_blob(&self) -> &[u8] {
         match self {
-            Self::Blob(blob) => blob.as_ref(),
-            Self::BorrowedBlob(blob) => blob,
+            Self::Blob(blob) => blob,
             _ => panic!("not a blob: {self:?}"),
         }
     }
@@ -128,6 +118,13 @@ impl<'a> From<&'a str> for Datum<'a> {
     }
 }
 
+impl<'a> From<&'a [u8]> for Datum<'a> {
+    #[inline]
+    fn from(value: &'a [u8]) -> Self {
+        Datum::Blob(value)
+    }
+}
+
 impl From<Option<&()>> for Datum<'_> {
     fn from(_: Option<&()>) -> Self {
         Self::Null
@@ -227,7 +224,6 @@ impl<'b, 'a: 'b> TryFrom<&'b Datum<'a>> for &'b str {
     fn try_from(from: &'b Datum<'a>) -> std::result::Result<Self, Self::Error> 
{
         match from {
             Datum::String(i) => Ok(*i),
-            Datum::OwnedString(s) => Ok(s.as_str()),
             _ => Err(()),
         }
     }
@@ -296,9 +292,7 @@ impl Datum<'_> {
             Datum::Float32(v) => append_value_to_arrow!(Float32Builder, 
v.into_inner()),
             Datum::Float64(v) => append_value_to_arrow!(Float64Builder, 
v.into_inner()),
             Datum::String(v) => append_value_to_arrow!(StringBuilder, *v),
-            Datum::OwnedString(v) => append_value_to_arrow!(StringBuilder, 
v.as_str()),
-            Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder, 
v.as_ref()),
-            Datum::BorrowedBlob(v) => append_value_to_arrow!(BinaryBuilder, 
*v),
+            Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder, v),
             Datum::Decimal(_) | Datum::Date(_) | Datum::Timestamp(_) | 
Datum::TimestampTz(_) => {
                 return Err(RowConvertError {
                     message: format!(
@@ -349,57 +343,6 @@ impl_to_arrow!(&str, StringBuilder);
 
 pub type F32 = OrderedFloat<f32>;
 pub type F64 = OrderedFloat<f64>;
-#[allow(dead_code)]
-pub type Str = Box<str>;
-
-#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize, 
Default)]
-pub struct Blob(Box<[u8]>);
-
-impl Deref for Blob {
-    type Target = BlobRef;
-
-    fn deref(&self) -> &Self::Target {
-        BlobRef::new(&self.0)
-    }
-}
-
-impl BlobRef {
-    pub fn new(bytes: &[u8]) -> &Self {
-        // SAFETY: `&BlobRef` and `&[u8]` have the same layout.
-        BlobRef::ref_cast(bytes)
-    }
-}
-
-/// A slice of a blob.
-#[repr(transparent)]
-#[derive(PartialEq, Eq, PartialOrd, Ord, RefCast, Hash)]
-pub struct BlobRef([u8]);
-
-impl fmt::Debug for Blob {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        write!(f, "{:?}", self.as_ref())
-    }
-}
-
-impl fmt::Display for Blob {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        write!(f, "{:?}", self.as_ref())
-    }
-}
-
-impl AsRef<[u8]> for BlobRef {
-    fn as_ref(&self) -> &[u8] {
-        &self.0
-    }
-}
-
-impl Deref for BlobRef {
-    type Target = [u8];
-
-    fn deref(&self) -> &Self::Target {
-        &self.0
-    }
-}
 
 #[derive(PartialOrd, Ord, Display, PartialEq, Eq, Debug, Copy, Clone, Default, 
Hash, Serialize)]
 pub struct Date(i32);
@@ -410,18 +353,6 @@ pub struct Timestamp(i64);
 #[derive(PartialOrd, Ord, Display, PartialEq, Eq, Debug, Copy, Clone, Default, 
Hash, Serialize)]
 pub struct TimestampLtz(i64);
 
-impl From<Vec<u8>> for Blob {
-    fn from(vec: Vec<u8>) -> Self {
-        Blob(vec.into())
-    }
-}
-
-impl<'a> From<&'a [u8]> for Datum<'a> {
-    fn from(bytes: &'a [u8]) -> Datum<'a> {
-        Datum::BorrowedBlob(bytes)
-    }
-}
-
 const UNIX_EPOCH_DAY: jiff::civil::Date = jiff::civil::date(1970, 1, 1);
 
 impl Date {

Reply via email to