This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch not-copy-bytes in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
commit d5b00855abd675fc5f2c4a15392231e8131a79bc Author: luoyuxia <[email protected]> AuthorDate: Sat Jan 10 23:50:55 2026 +0800 chore: implement zero copy in CompactedRowReader --- bindings/cpp/src/types.rs | 3 +- crates/fluss/src/row/binary/binary_writer.rs | 10 +-- crates/fluss/src/row/compacted/compacted_row.rs | 37 ++++++--- .../src/row/compacted/compacted_row_reader.rs | 91 ++++++++++++---------- crates/fluss/src/row/datum.rs | 91 +++------------------- 5 files changed, 92 insertions(+), 140 deletions(-) diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs index d95da14..24193d0 100644 --- a/bindings/cpp/src/types.rs +++ b/bindings/cpp/src/types.rs @@ -219,8 +219,7 @@ pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) -> fcore::row::GenericRow<'_> { DATUM_TYPE_FLOAT32 => Datum::Float32(field.f32_val.into()), DATUM_TYPE_FLOAT64 => Datum::Float64(field.f64_val.into()), DATUM_TYPE_STRING => Datum::String(field.string_val.as_str()), - // todo: avoid copy bytes for blob - DATUM_TYPE_BYTES => Datum::Blob(field.bytes_val.clone().into()), + DATUM_TYPE_BYTES => Datum::Blob(field.bytes_val.as_slice()), _ => Datum::Null, }; generic_row.set_field(idx, datum); diff --git a/crates/fluss/src/row/binary/binary_writer.rs b/crates/fluss/src/row/binary/binary_writer.rs index 44f10b6..c819f3f 100644 --- a/crates/fluss/src/row/binary/binary_writer.rs +++ b/crates/fluss/src/row/binary/binary_writer.rs @@ -170,16 +170,10 @@ impl InnerValueWriter { writer.write_boolean(*v); } (InnerValueWriter::Binary, Datum::Blob(v)) => { - writer.write_binary(v.as_ref(), v.len()); - } - (InnerValueWriter::Binary, Datum::BorrowedBlob(v)) => { - writer.write_binary(v.as_ref(), v.len()); + writer.write_binary(v, v.len()); } (InnerValueWriter::Bytes, Datum::Blob(v)) => { - writer.write_bytes(v.as_ref()); - } - (InnerValueWriter::Bytes, Datum::BorrowedBlob(v)) => { - writer.write_bytes(v.as_ref()); + writer.write_bytes(v); } (InnerValueWriter::TinyInt, Datum::Int8(v)) => { writer.write_byte(*v as u8); diff --git a/crates/fluss/src/row/compacted/compacted_row.rs b/crates/fluss/src/row/compacted/compacted_row.rs index fca41c6..ba14862 100644 --- a/crates/fluss/src/row/compacted/compacted_row.rs +++ b/crates/fluss/src/row/compacted/compacted_row.rs @@ -16,6 +16,7 @@ // under the License. use bytes::Bytes; +use std::mem; use crate::metadata::DataType; use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer, CompactedRowReader}; @@ -24,19 +25,19 @@ use crate::row::{GenericRow, InternalRow}; // Reference implementation: // https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRow.java #[allow(dead_code)] -pub struct CompactedRow { +pub struct CompactedRow<'a> { arity: usize, segment: Bytes, offset: usize, size_in_bytes: usize, decoded: bool, - decoded_row: GenericRow<'static>, - reader: CompactedRowReader, - deserializer: CompactedRowDeserializer, + decoded_row: GenericRow<'a>, + reader: CompactedRowReader<'a>, + data_types: Vec<DataType>, } #[allow(dead_code)] -impl CompactedRow { +impl<'a> CompactedRow<'a> { pub fn calculate_bit_set_width_in_bytes(arity: usize) -> usize { arity.div_ceil(8) } @@ -51,7 +52,7 @@ impl CompactedRow { decoded: false, decoded_row: GenericRow::new(), reader: CompactedRowReader::new(arity), - deserializer: CompactedRowDeserializer::new(types), + data_types: types, } } @@ -66,7 +67,7 @@ impl CompactedRow { decoded: false, decoded_row: GenericRow::new(), reader: CompactedRowReader::new(arity), - deserializer: CompactedRowDeserializer::new(types), + data_types: types, } } @@ -100,11 +101,29 @@ impl CompactedRow { (self.segment[idx] & (1u8 << bit)) != 0 } - fn decoded_row(&mut self) -> &GenericRow<'static> { + fn decoded_row(&mut self) -> &GenericRow<'a> { if !self.decoded { + let deserializer = CompactedRowDeserializer::new(self.data_types.clone()); self.reader .point_to(self.segment.clone(), self.offset, self.size_in_bytes); - self.decoded_row = self.deserializer.deserialize(&mut self.reader); + + // Safety: + // We use transmute to extend the lifetime of the borrow of self.reader to 'a. + // This is safe because: + // 1. self.reader internally holds the data via `Bytes`, which is heap-allocated. + // The heap address remains stable throughout the lifetime 'a. + // 2. The deserialized `GenericRow` is stored in `self.decoded_row`, which shares + // the same lifetime as `self`. + // 3. As long as the `CompactedRow` instance is not dropped, the heap references + // produced by the reader remain valid. + // 4. While Rust normally forbids self-referential borrows due to move-safety concerns, + // this specific implementation is move-safe because the pointers within `GenericRow` + // reference the stable heap memory of the `Bytes` segment, not the stack address + // of the fields themselves. + let long_lived_reader: &'a CompactedRowReader<'a> = + unsafe { mem::transmute(&self.reader) }; + self.decoded_row = deserializer.deserialize(long_lived_reader); + self.decoded = true; } &self.decoded_row diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs b/crates/fluss/src/row/compacted/compacted_row_reader.rs index 19afe88..70300cb 100644 --- a/crates/fluss/src/row/compacted/compacted_row_reader.rs +++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs @@ -15,7 +15,11 @@ // specific language governing permissions and limitations // under the License. +use arrow::datatypes::ToByteSlice; use bytes::Bytes; +use std::cell::Cell; +use std::marker::PhantomData; +use std::str::from_utf8; use crate::{ metadata::DataType, @@ -36,7 +40,7 @@ impl CompactedRowDeserializer { Self { schema } } - pub fn deserialize(&self, reader: &mut CompactedRowReader) -> GenericRow<'static> { + pub fn deserialize<'a>(&self, reader: &'a CompactedRowReader) -> GenericRow<'a> { let mut row = GenericRow::new(); for (pos, dtype) in self.schema.iter().enumerate() { if reader.is_null_at(pos) { @@ -52,11 +56,9 @@ impl CompactedRowDeserializer { DataType::Float(_) => Datum::Float32(reader.read_float().into()), DataType::Double(_) => Datum::Float64(reader.read_double().into()), // TODO: use read_char(length) in the future, but need to keep compatibility - DataType::Char(_) | DataType::String(_) => Datum::OwnedString(reader.read_string()), + DataType::Char(_) | DataType::String(_) => Datum::String(reader.read_string()), // TODO: use read_binary(length) in the future, but need to keep compatibility - DataType::Bytes(_) | DataType::Binary(_) => { - Datum::Blob(reader.read_bytes().into_vec().into()) - } + DataType::Bytes(_) | DataType::Binary(_) => Datum::Blob(reader.read_bytes()), _ => panic!("unsupported DataType in CompactedRowDeserializer"), }; row.set_field(pos, datum); @@ -68,24 +70,26 @@ impl CompactedRowDeserializer { // Reference implementation: // https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java #[allow(dead_code)] -pub struct CompactedRowReader { +pub struct CompactedRowReader<'a> { segment: Bytes, offset: usize, - position: usize, + position: Cell<usize>, limit: usize, header_size_in_bytes: usize, + phantom_data: PhantomData<&'a ()>, } #[allow(dead_code)] -impl CompactedRowReader { +impl<'a> CompactedRowReader<'a> { pub fn new(field_count: usize) -> Self { let header = CompactedRow::calculate_bit_set_width_in_bytes(field_count); Self { header_size_in_bytes: header, segment: Bytes::new(), offset: 0, - position: 0, + position: Cell::new(0), limit: 0, + phantom_data: PhantomData, } } @@ -98,7 +102,7 @@ impl CompactedRowReader { self.segment = data; self.offset = offset; - self.position = position; + self.position = Cell::new(position); self.limit = limit; } @@ -110,29 +114,31 @@ impl CompactedRowReader { (self.segment[idx] & (1u8 << bit)) != 0 } - pub fn read_boolean(&mut self) -> bool { + pub fn read_boolean(&self) -> bool { self.read_byte() != 0 } - pub fn read_byte(&mut self) -> u8 { - debug_assert!(self.position < self.limit); - let b = self.segment[self.position]; - self.position += 1; + pub fn read_byte(&self) -> u8 { + let pos = self.position.get(); + debug_assert!(pos < self.limit); + let b = self.segment[pos]; + self.position.set(pos + 1); b } - pub fn read_short(&mut self) -> i16 { - debug_assert!(self.position + 2 <= self.limit); - let bytes_slice = &self.segment[self.position..self.position + 2]; + pub fn read_short(&self) -> i16 { + let pos = self.position.get(); + debug_assert!(pos + 2 <= self.limit); + let bytes_slice = &self.segment[pos..pos + 2]; let byte_array: [u8; 2] = bytes_slice .try_into() .expect("Slice must be exactly 2 bytes long"); - self.position += 2; + self.position.set(pos + 2); i16::from_ne_bytes(byte_array) } - pub fn read_int(&mut self) -> i32 { + pub fn read_int(&self) -> i32 { let mut result: u32 = 0; let mut shift = 0; @@ -148,7 +154,7 @@ impl CompactedRowReader { panic!("Invalid input stream."); } - pub fn read_long(&mut self) -> i64 { + pub fn read_long(&self) -> i64 { let mut result: u64 = 0; let mut shift = 0; @@ -164,55 +170,58 @@ impl CompactedRowReader { panic!("Invalid input stream."); } - pub fn read_float(&mut self) -> f32 { - debug_assert!(self.position + 4 <= self.limit); - let bytes_slice = &self.segment[self.position..self.position + 4]; + pub fn read_float(&self) -> f32 { + let pos = self.position.get(); + debug_assert!(pos + 4 <= self.limit); + let bytes_slice = &self.segment[pos..pos + 4]; let byte_array: [u8; 4] = bytes_slice .try_into() .expect("Slice must be exactly 4 bytes long"); - self.position += 4; + self.position.set(pos + 4); f32::from_ne_bytes(byte_array) } - pub fn read_double(&mut self) -> f64 { - debug_assert!(self.position + 8 <= self.limit); - let bytes_slice = &self.segment[self.position..self.position + 8]; + pub fn read_double(&self) -> f64 { + let pos = self.position.get(); + debug_assert!(pos + 8 <= self.limit); + let bytes_slice = &self.segment[pos..pos + 8]; let byte_array: [u8; 8] = bytes_slice .try_into() .expect("Slice must be exactly 8 bytes long"); - self.position += 8; + self.position.set(pos + 8); f64::from_ne_bytes(byte_array) } - pub fn read_binary(&mut self, length: usize) -> Bytes { - debug_assert!(self.position + length <= self.limit); + pub fn read_binary(&self, length: usize) -> Bytes { + let pos = self.position.get(); + debug_assert!(pos + length <= self.limit); - let start = self.position; + let start = pos; let end = start + length; - self.position = end; + self.position.set(end); self.segment.slice(start..end) } - pub fn read_bytes(&mut self) -> Box<[u8]> { + pub fn read_bytes(&self) -> &[u8] { let len = self.read_int(); debug_assert!(len >= 0); + let pos = self.position.get(); let len = len as usize; - debug_assert!(self.position + len <= self.limit); + debug_assert!(pos + len <= self.limit); - let start = self.position; + let start = pos; let end = start + len; - self.position = end; + self.position.set(end); - self.segment[start..end].to_vec().into_boxed_slice() + self.segment[start..end].to_byte_slice() } - pub fn read_string(&mut self) -> String { + pub fn read_string(&self) -> &str { let bytes = self.read_bytes(); - String::from_utf8(bytes.into_vec()) - .unwrap_or_else(|e| panic!("Invalid UTF-8 in string data: {e}")) + from_utf8(bytes).expect("Invalid UTF-8 when reading string from compacted row") } } diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs index 78872a9..6540618 100644 --- a/crates/fluss/src/row/datum.rs +++ b/crates/fluss/src/row/datum.rs @@ -24,11 +24,8 @@ use arrow::array::{ use jiff::ToSpan; use ordered_float::OrderedFloat; use parse_display::Display; -use ref_cast::RefCast; use rust_decimal::Decimal; -use serde::{Deserialize, Serialize}; -use std::fmt; -use std::ops::Deref; +use serde::Serialize; #[allow(dead_code)] const THIRTY_YEARS_MICROSECONDS: i64 = 946_684_800_000_000; @@ -53,13 +50,8 @@ pub enum Datum<'a> { Float64(F64), #[display("'{0}'")] String(&'a str), - /// Owned string - #[display("'{0}'")] - OwnedString(String), - #[display("{0}")] - Blob(Blob), #[display("{:?}")] - BorrowedBlob(&'a [u8]), + Blob(&'a [u8]), #[display("{0}")] Decimal(Decimal), #[display("{0}")] @@ -78,15 +70,13 @@ impl Datum<'_> { pub fn as_str(&self) -> &str { match self { Self::String(s) => s, - Self::OwnedString(s) => s.as_str(), _ => panic!("not a string: {self:?}"), } } pub fn as_blob(&self) -> &[u8] { match self { - Self::Blob(blob) => blob.as_ref(), - Self::BorrowedBlob(blob) => blob, + Self::Blob(blob) => blob, _ => panic!("not a blob: {self:?}"), } } @@ -128,6 +118,13 @@ impl<'a> From<&'a str> for Datum<'a> { } } +impl<'a> From<&'a [u8]> for Datum<'a> { + #[inline] + fn from(value: &'a [u8]) -> Self { + Datum::Blob(value) + } +} + impl From<Option<&()>> for Datum<'_> { fn from(_: Option<&()>) -> Self { Self::Null @@ -227,7 +224,6 @@ impl<'b, 'a: 'b> TryFrom<&'b Datum<'a>> for &'b str { fn try_from(from: &'b Datum<'a>) -> std::result::Result<Self, Self::Error> { match from { Datum::String(i) => Ok(*i), - Datum::OwnedString(s) => Ok(s.as_str()), _ => Err(()), } } @@ -296,9 +292,7 @@ impl Datum<'_> { Datum::Float32(v) => append_value_to_arrow!(Float32Builder, v.into_inner()), Datum::Float64(v) => append_value_to_arrow!(Float64Builder, v.into_inner()), Datum::String(v) => append_value_to_arrow!(StringBuilder, *v), - Datum::OwnedString(v) => append_value_to_arrow!(StringBuilder, v.as_str()), - Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder, v.as_ref()), - Datum::BorrowedBlob(v) => append_value_to_arrow!(BinaryBuilder, *v), + Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder, v), Datum::Decimal(_) | Datum::Date(_) | Datum::Timestamp(_) | Datum::TimestampTz(_) => { return Err(RowConvertError { message: format!( @@ -349,57 +343,6 @@ impl_to_arrow!(&str, StringBuilder); pub type F32 = OrderedFloat<f32>; pub type F64 = OrderedFloat<f64>; -#[allow(dead_code)] -pub type Str = Box<str>; - -#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize, Default)] -pub struct Blob(Box<[u8]>); - -impl Deref for Blob { - type Target = BlobRef; - - fn deref(&self) -> &Self::Target { - BlobRef::new(&self.0) - } -} - -impl BlobRef { - pub fn new(bytes: &[u8]) -> &Self { - // SAFETY: `&BlobRef` and `&[u8]` have the same layout. - BlobRef::ref_cast(bytes) - } -} - -/// A slice of a blob. -#[repr(transparent)] -#[derive(PartialEq, Eq, PartialOrd, Ord, RefCast, Hash)] -pub struct BlobRef([u8]); - -impl fmt::Debug for Blob { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{:?}", self.as_ref()) - } -} - -impl fmt::Display for Blob { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{:?}", self.as_ref()) - } -} - -impl AsRef<[u8]> for BlobRef { - fn as_ref(&self) -> &[u8] { - &self.0 - } -} - -impl Deref for BlobRef { - type Target = [u8]; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} #[derive(PartialOrd, Ord, Display, PartialEq, Eq, Debug, Copy, Clone, Default, Hash, Serialize)] pub struct Date(i32); @@ -410,18 +353,6 @@ pub struct Timestamp(i64); #[derive(PartialOrd, Ord, Display, PartialEq, Eq, Debug, Copy, Clone, Default, Hash, Serialize)] pub struct TimestampLtz(i64); -impl From<Vec<u8>> for Blob { - fn from(vec: Vec<u8>) -> Self { - Blob(vec.into()) - } -} - -impl<'a> From<&'a [u8]> for Datum<'a> { - fn from(bytes: &'a [u8]) -> Datum<'a> { - Datum::BorrowedBlob(bytes) - } -} - const UNIX_EPOCH_DAY: jiff::civil::Date = jiff::civil::date(1970, 1, 1); impl Date {
