This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new dab09b4 chore: implement zero copy in CompactedRowReader (#138)
dab09b4 is described below
commit dab09b4268ea700341f405ddef27274d8d69d1bd
Author: yuxia Luo <[email protected]>
AuthorDate: Sun Jan 11 21:36:46 2026 +0800
chore: implement zero copy in CompactedRowReader (#138)
---
crates/fluss/src/row/compacted/compacted_row.rs | 137 +++++-------
.../src/row/compacted/compacted_row_reader.rs | 239 +++++++++++----------
.../src/row/compacted/compacted_row_writer.rs | 4 +-
crates/fluss/src/row/datum.rs | 2 +-
4 files changed, 178 insertions(+), 204 deletions(-)
diff --git a/crates/fluss/src/row/compacted/compacted_row.rs
b/crates/fluss/src/row/compacted/compacted_row.rs
index fca41c6..481f9be 100644
--- a/crates/fluss/src/row/compacted/compacted_row.rs
+++ b/crates/fluss/src/row/compacted/compacted_row.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use bytes::Bytes;
+use std::sync::OnceLock;
use crate::metadata::DataType;
use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer,
CompactedRowReader};
@@ -24,125 +24,95 @@ use crate::row::{GenericRow, InternalRow};
// Reference implementation:
//
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRow.java
#[allow(dead_code)]
-pub struct CompactedRow {
+pub struct CompactedRow<'a> {
arity: usize,
- segment: Bytes,
- offset: usize,
size_in_bytes: usize,
- decoded: bool,
- decoded_row: GenericRow<'static>,
- reader: CompactedRowReader,
- deserializer: CompactedRowDeserializer,
+ decoded_row: OnceLock<GenericRow<'a>>,
+ deserializer: CompactedRowDeserializer<'a>,
+ reader: CompactedRowReader<'a>,
+ data_types: &'a [DataType],
}
-#[allow(dead_code)]
-impl CompactedRow {
- pub fn calculate_bit_set_width_in_bytes(arity: usize) -> usize {
- arity.div_ceil(8)
- }
-
- pub fn new(types: Vec<DataType>) -> Self {
- let arity = types.len();
- Self {
- arity,
- segment: Bytes::new(),
- offset: 0,
- size_in_bytes: 0,
- decoded: false,
- decoded_row: GenericRow::new(),
- reader: CompactedRowReader::new(arity),
- deserializer: CompactedRowDeserializer::new(types),
- }
- }
+pub fn calculate_bit_set_width_in_bytes(arity: usize) -> usize {
+ arity.div_ceil(8)
+}
- pub fn from_bytes(types: Vec<DataType>, data: Bytes) -> Self {
- let arity = types.len();
+#[allow(dead_code)]
+impl<'a> CompactedRow<'a> {
+ pub fn from_bytes(data_types: &'a [DataType], data: &'a [u8]) -> Self {
+ let arity = data_types.len();
let size = data.len();
Self {
arity,
- segment: data,
- offset: 0,
size_in_bytes: size,
- decoded: false,
- decoded_row: GenericRow::new(),
- reader: CompactedRowReader::new(arity),
- deserializer: CompactedRowDeserializer::new(types),
+ decoded_row: OnceLock::new(),
+ deserializer: CompactedRowDeserializer::new(data_types),
+ reader: CompactedRowReader::new(arity, data, 0, size),
+ data_types,
}
}
- pub fn point_to(&mut self, segment: Bytes, offset: usize, size_in_bytes:
usize) {
- self.segment = segment;
- self.offset = offset;
- self.size_in_bytes = size_in_bytes;
- self.decoded = false;
- }
-
- pub fn get_segment(&self) -> &Bytes {
- &self.segment
- }
-
- pub fn get_offset(&self) -> usize {
- self.offset
- }
-
pub fn get_size_in_bytes(&self) -> usize {
self.size_in_bytes
}
- pub fn get_field_count(&self) -> usize {
- self.arity
+ fn decoded_row(&self) -> &GenericRow<'_> {
+ self.decoded_row
+ .get_or_init(|| self.deserializer.deserialize(&self.reader))
}
+}
- pub fn is_null_at(&self, pos: usize) -> bool {
- let byte_index = pos >> 3;
- let bit = pos & 7;
- let idx = self.offset + byte_index;
- (self.segment[idx] & (1u8 << bit)) != 0
+#[allow(dead_code)]
+impl<'a> InternalRow for CompactedRow<'a> {
+ fn get_field_count(&self) -> usize {
+ self.arity
}
- fn decoded_row(&mut self) -> &GenericRow<'static> {
- if !self.decoded {
- self.reader
- .point_to(self.segment.clone(), self.offset,
self.size_in_bytes);
- self.decoded_row = self.deserializer.deserialize(&mut self.reader);
- self.decoded = true;
- }
- &self.decoded_row
+ fn is_null_at(&self, pos: usize) -> bool {
+ self.data_types[pos].is_nullable() && self.reader.is_null_at(pos)
}
- pub fn get_boolean(&mut self, pos: usize) -> bool {
+ fn get_boolean(&self, pos: usize) -> bool {
self.decoded_row().get_boolean(pos)
}
- pub fn get_byte(&mut self, pos: usize) -> i8 {
+ fn get_byte(&self, pos: usize) -> i8 {
self.decoded_row().get_byte(pos)
}
- pub fn get_short(&mut self, pos: usize) -> i16 {
+ fn get_short(&self, pos: usize) -> i16 {
self.decoded_row().get_short(pos)
}
- pub fn get_int(&mut self, pos: usize) -> i32 {
+ fn get_int(&self, pos: usize) -> i32 {
self.decoded_row().get_int(pos)
}
- pub fn get_long(&mut self, pos: usize) -> i64 {
+ fn get_long(&self, pos: usize) -> i64 {
self.decoded_row().get_long(pos)
}
- pub fn get_float(&mut self, pos: usize) -> f32 {
+ fn get_float(&self, pos: usize) -> f32 {
self.decoded_row().get_float(pos)
}
- pub fn get_double(&mut self, pos: usize) -> f64 {
+ fn get_double(&self, pos: usize) -> f64 {
self.decoded_row().get_double(pos)
}
- pub fn get_string(&mut self, pos: usize) -> &str {
+ fn get_char(&self, pos: usize, length: usize) -> &str {
+ self.decoded_row().get_char(pos, length)
+ }
+
+ fn get_string(&self, pos: usize) -> &str {
self.decoded_row().get_string(pos)
}
- pub fn get_bytes(&mut self, pos: usize) -> &[u8] {
+ fn get_binary(&self, pos: usize, length: usize) -> &[u8] {
+ self.decoded_row().get_binary(pos, length)
+ }
+
+ fn get_bytes(&self, pos: usize) -> &[u8] {
self.decoded_row().get_bytes(pos)
}
}
@@ -171,7 +141,6 @@ mod tests {
DataType::Bytes(BytesType::new()),
];
- let mut row = CompactedRow::new(types.clone());
let mut writer = CompactedRowWriter::new(types.len());
writer.write_boolean(true);
@@ -184,7 +153,8 @@ mod tests {
writer.write_string("Hello World");
writer.write_bytes(&[1, 2, 3, 4, 5]);
- row.point_to(writer.to_bytes(), 0, writer.position());
+ let bytes = writer.to_bytes();
+ let mut row = CompactedRow::from_bytes(types.as_slice(),
bytes.as_ref());
assert_eq!(row.get_field_count(), 9);
assert!(row.get_boolean(0));
@@ -204,14 +174,14 @@ mod tests {
DataType::Double(DoubleType::new()),
];
- let mut row = CompactedRow::new(types.clone());
let mut writer = CompactedRowWriter::new(types.len());
writer.write_int(100);
writer.set_null_at(1);
writer.write_double(2.71);
- row.point_to(writer.to_bytes(), 0, writer.position());
+ let bytes = writer.to_bytes();
+ row = CompactedRow::from_bytes(types.as_slice(), bytes.as_ref());
assert!(!row.is_null_at(0));
assert!(row.is_null_at(1));
@@ -230,12 +200,13 @@ mod tests {
];
let mut writer = CompactedRowWriter::new(types.len());
- writer.write_int(42);
+ writer.write_int(-1);
writer.write_string("test");
- let mut row = CompactedRow::from_bytes(types, writer.to_bytes());
+ let bytes = writer.to_bytes();
+ let mut row = CompactedRow::from_bytes(types.as_slice(),
bytes.as_ref());
- assert_eq!(row.get_int(0), 42);
+ assert_eq!(row.get_int(0), -1);
assert_eq!(row.get_string(1), "test");
// Test large row
@@ -244,14 +215,14 @@ mod tests {
.map(|_| DataType::Int(IntType::new()))
.collect();
- let mut row = CompactedRow::new(types.clone());
let mut writer = CompactedRowWriter::new(num_fields);
for i in 0..num_fields {
writer.write_int((i * 10) as i32);
}
- row.point_to(writer.to_bytes(), 0, writer.position());
+ let bytes = writer.to_bytes();
+ row = CompactedRow::from_bytes(types.as_slice(), bytes.as_ref());
for i in 0..num_fields {
assert_eq!(row.get_int(i), (i * 10) as i32);
diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs
b/crates/fluss/src/row/compacted/compacted_row_reader.rs
index 00d94ad..c053d4e 100644
--- a/crates/fluss/src/row/compacted/compacted_row_reader.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs
@@ -15,54 +15,75 @@
// specific language governing permissions and limitations
// under the License.
-use bytes::Bytes;
-use std::borrow::Cow;
-
+use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes;
use crate::{
metadata::DataType,
- row::{
- Datum, GenericRow,
- compacted::{compacted_row::CompactedRow,
compacted_row_writer::CompactedRowWriter},
- },
+ row::{Datum, GenericRow,
compacted::compacted_row_writer::CompactedRowWriter},
};
+use std::str::from_utf8;
#[allow(dead_code)]
-pub struct CompactedRowDeserializer {
- schema: Vec<DataType>,
+pub struct CompactedRowDeserializer<'a> {
+ schema: &'a [DataType],
}
#[allow(dead_code)]
-impl CompactedRowDeserializer {
- pub fn new(schema: Vec<DataType>) -> Self {
+impl<'a> CompactedRowDeserializer<'a> {
+ pub fn new(schema: &'a [DataType]) -> Self {
Self { schema }
}
- pub fn deserialize(&self, reader: &mut CompactedRowReader) ->
GenericRow<'static> {
+ pub fn deserialize(&self, reader: &CompactedRowReader<'a>) ->
GenericRow<'a> {
let mut row = GenericRow::new();
- for (pos, dtype) in self.schema.iter().enumerate() {
- if reader.is_null_at(pos) {
- row.set_field(pos, Datum::Null);
+ let mut cursor = reader.initial_position();
+ for (col_pos, dtype) in self.schema.iter().enumerate() {
+ if dtype.is_nullable() && reader.is_null_at(col_pos) {
+ row.set_field(col_pos, Datum::Null);
continue;
}
- let datum = match dtype {
- DataType::Boolean(_) => Datum::Bool(reader.read_boolean()),
- DataType::TinyInt(_) => Datum::Int8(reader.read_byte() as i8),
- DataType::SmallInt(_) => Datum::Int16(reader.read_short()),
- DataType::Int(_) => Datum::Int32(reader.read_int()),
- DataType::BigInt(_) => Datum::Int64(reader.read_long()),
- DataType::Float(_) =>
Datum::Float32(reader.read_float().into()),
- DataType::Double(_) =>
Datum::Float64(reader.read_double().into()),
+ let (datum, next_cursor) = match dtype {
+ DataType::Boolean(_) => {
+ let (val, next) = reader.read_boolean(cursor);
+ (Datum::Bool(val), next)
+ }
+ DataType::TinyInt(_) => {
+ let (val, next) = reader.read_byte(cursor);
+ (Datum::Int8(val as i8), next)
+ }
+ DataType::SmallInt(_) => {
+ let (val, next) = reader.read_short(cursor);
+ (Datum::Int16(val), next)
+ }
+ DataType::Int(_) => {
+ let (val, next) = reader.read_int(cursor);
+ (Datum::Int32(val), next)
+ }
+ DataType::BigInt(_) => {
+ let (val, next) = reader.read_long(cursor);
+ (Datum::Int64(val), next)
+ }
+ DataType::Float(_) => {
+ let (val, next) = reader.read_float(cursor);
+ (Datum::Float32(val.into()), next)
+ }
+ DataType::Double(_) => {
+ let (val, next) = reader.read_double(cursor);
+ (Datum::Float64(val.into()), next)
+ }
// TODO: use read_char(length) in the future, but need to keep
compatibility
DataType::Char(_) | DataType::String(_) => {
- Datum::String(Cow::Owned(reader.read_string()))
+ let (val, next) = reader.read_string(cursor);
+ (Datum::String(val.into()), next)
}
// TODO: use read_binary(length) in the future, but need to
keep compatibility
DataType::Bytes(_) | DataType::Binary(_) => {
- Datum::Blob(Cow::Owned(reader.read_bytes().into_vec()))
+ let (val, next) = reader.read_bytes(cursor);
+ (Datum::Blob(val.into()), next)
}
_ => panic!("unsupported DataType in
CompactedRowDeserializer"),
};
- row.set_field(pos, datum);
+ cursor = next_cursor;
+ row.set_field(col_pos, datum);
}
row
}
@@ -71,151 +92,133 @@ impl CompactedRowDeserializer {
// Reference implementation:
//
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java
#[allow(dead_code)]
-pub struct CompactedRowReader {
- segment: Bytes,
+pub struct CompactedRowReader<'a> {
+ segment: &'a [u8],
offset: usize,
- position: usize,
limit: usize,
header_size_in_bytes: usize,
}
#[allow(dead_code)]
-impl CompactedRowReader {
- pub fn new(field_count: usize) -> Self {
- let header =
CompactedRow::calculate_bit_set_width_in_bytes(field_count);
- Self {
- header_size_in_bytes: header,
- segment: Bytes::new(),
- offset: 0,
- position: 0,
- limit: 0,
- }
- }
-
- pub fn point_to(&mut self, data: Bytes, offset: usize, length: usize) {
+impl<'a> CompactedRowReader<'a> {
+ pub fn new(field_count: usize, data: &'a [u8], offset: usize, length:
usize) -> Self {
+ let header_size_in_bytes =
calculate_bit_set_width_in_bytes(field_count);
let limit = offset + length;
- let position = offset + self.header_size_in_bytes;
-
+ let position = offset + header_size_in_bytes;
debug_assert!(limit <= data.len());
debug_assert!(position <= limit);
- self.segment = data;
- self.offset = offset;
- self.position = position;
- self.limit = limit;
+ CompactedRowReader {
+ segment: data,
+ offset,
+ limit,
+ header_size_in_bytes,
+ }
}
- pub fn is_null_at(&self, pos: usize) -> bool {
- let byte_index = pos >> 3;
- let bit = pos & 7;
+ fn initial_position(&self) -> usize {
+ self.offset + self.header_size_in_bytes
+ }
+
+ pub fn is_null_at(&self, col_pos: usize) -> bool {
+ let byte_index = col_pos >> 3;
+ let bit = col_pos & 7;
debug_assert!(byte_index < self.header_size_in_bytes);
let idx = self.offset + byte_index;
(self.segment[idx] & (1u8 << bit)) != 0
}
- pub fn read_boolean(&mut self) -> bool {
- self.read_byte() != 0
+ pub fn read_boolean(&self, pos: usize) -> (bool, usize) {
+ let (val, next) = self.read_byte(pos);
+ (val != 0, next)
}
- pub fn read_byte(&mut self) -> u8 {
- debug_assert!(self.position < self.limit);
- let b = self.segment[self.position];
- self.position += 1;
- b
+ pub fn read_byte(&self, pos: usize) -> (u8, usize) {
+ debug_assert!(pos < self.limit);
+ (self.segment[pos], pos + 1)
}
- pub fn read_short(&mut self) -> i16 {
- debug_assert!(self.position + 2 <= self.limit);
- let bytes_slice = &self.segment[self.position..self.position + 2];
- let byte_array: [u8; 2] = bytes_slice
- .try_into()
- .expect("Slice must be exactly 2 bytes long");
-
- self.position += 2;
- i16::from_ne_bytes(byte_array)
+ pub fn read_short(&self, pos: usize) -> (i16, usize) {
+ let next_pos = pos + 2;
+ debug_assert!(next_pos <= self.limit);
+ let bytes_slice = &self.segment[pos..pos + 2];
+ let val = i16::from_ne_bytes(
+ bytes_slice
+ .try_into()
+ .expect("Slice must be exactly 2 bytes long"),
+ );
+ (val, next_pos)
}
- pub fn read_int(&mut self) -> i32 {
+ pub fn read_int(&self, mut pos: usize) -> (i32, usize) {
let mut result: u32 = 0;
let mut shift = 0;
for _ in 0..CompactedRowWriter::MAX_INT_SIZE {
- let b = self.read_byte();
+ let (b, next_pos) = self.read_byte(pos);
+ pos = next_pos;
result |= ((b & 0x7F) as u32) << shift;
if (b & 0x80) == 0 {
- return result as i32;
+ return (result as i32, pos);
}
shift += 7;
}
-
- panic!("Invalid input stream.");
+ panic!("Invalid VarInt32 input stream.");
}
- pub fn read_long(&mut self) -> i64 {
+ pub fn read_long(&self, mut pos: usize) -> (i64, usize) {
let mut result: u64 = 0;
let mut shift = 0;
for _ in 0..CompactedRowWriter::MAX_LONG_SIZE {
- let b = self.read_byte();
+ let (b, next_pos) = self.read_byte(pos);
+ pos = next_pos;
result |= ((b & 0x7F) as u64) << shift;
if (b & 0x80) == 0 {
- return result as i64;
+ return (result as i64, pos);
}
shift += 7;
}
-
- panic!("Invalid input stream.");
+ panic!("Invalid VarInt64 input stream.");
}
- pub fn read_float(&mut self) -> f32 {
- debug_assert!(self.position + 4 <= self.limit);
- let bytes_slice = &self.segment[self.position..self.position + 4];
- let byte_array: [u8; 4] = bytes_slice
- .try_into()
- .expect("Slice must be exactly 4 bytes long");
-
- self.position += 4;
- f32::from_ne_bytes(byte_array)
+ pub fn read_float(&self, pos: usize) -> (f32, usize) {
+ let next_pos = pos + 4;
+ debug_assert!(next_pos <= self.limit);
+ let val = f32::from_ne_bytes(
+ self.segment[pos..pos + 4]
+ .try_into()
+ .expect("Slice must be exactly 4 bytes long"),
+ );
+ (val, next_pos)
}
- pub fn read_double(&mut self) -> f64 {
- debug_assert!(self.position + 8 <= self.limit);
- let bytes_slice = &self.segment[self.position..self.position + 8];
- let byte_array: [u8; 8] = bytes_slice
- .try_into()
- .expect("Slice must be exactly 8 bytes long");
-
- self.position += 8;
- f64::from_ne_bytes(byte_array)
+ pub fn read_double(&self, pos: usize) -> (f64, usize) {
+ let next_pos = pos + 8;
+ debug_assert!(next_pos <= self.limit);
+ let val = f64::from_ne_bytes(
+ self.segment[pos..pos + 8]
+ .try_into()
+ .expect("Slice must be exactly 8 bytes long"),
+ );
+ (val, next_pos)
}
- pub fn read_binary(&mut self, length: usize) -> Bytes {
- debug_assert!(self.position + length <= self.limit);
-
- let start = self.position;
- let end = start + length;
- self.position = end;
-
- self.segment.slice(start..end)
+ pub fn read_binary(&self, pos: usize) -> (&'a [u8], usize) {
+ self.read_bytes(pos)
}
- pub fn read_bytes(&mut self) -> Box<[u8]> {
- let len = self.read_int();
- debug_assert!(len >= 0);
-
+ pub fn read_bytes(&self, pos: usize) -> (&'a [u8], usize) {
+ let (len, data_pos) = self.read_int(pos);
let len = len as usize;
- debug_assert!(self.position + len <= self.limit);
-
- let start = self.position;
- let end = start + len;
- self.position = end;
-
- self.segment[start..end].to_vec().into_boxed_slice()
+ let next_pos = data_pos + len;
+ debug_assert!(next_pos <= self.limit);
+ (&self.segment[data_pos..next_pos], next_pos)
}
- pub fn read_string(&mut self) -> String {
- let bytes = self.read_bytes();
- String::from_utf8(bytes.into_vec())
- .unwrap_or_else(|e| panic!("Invalid UTF-8 in string data: {e}"))
+ pub fn read_string(&self, pos: usize) -> (&'a str, usize) {
+ let (bytes, next_pos) = self.read_bytes(pos);
+ let s = from_utf8(bytes).expect("Invalid UTF-8 when reading string");
+ (s, next_pos)
}
}
diff --git a/crates/fluss/src/row/compacted/compacted_row_writer.rs
b/crates/fluss/src/row/compacted/compacted_row_writer.rs
index 8345123..4f535c6 100644
--- a/crates/fluss/src/row/compacted/compacted_row_writer.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_writer.rs
@@ -18,7 +18,7 @@
use bytes::{Bytes, BytesMut};
use std::cmp;
-use crate::row::compacted::compacted_row::CompactedRow;
+use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes;
// Writer for CompactedRow
// Reference implementation:
@@ -36,7 +36,7 @@ impl CompactedRowWriter {
pub const MAX_LONG_SIZE: usize = 10;
pub fn new(field_count: usize) -> Self {
- let header_size =
CompactedRow::calculate_bit_set_width_in_bytes(field_count);
+ let header_size = calculate_bit_set_width_in_bytes(field_count);
let cap = cmp::max(64, header_size);
let mut buffer = BytesMut::with_capacity(cap);
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index fa85ded..ad7948d 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -403,7 +403,7 @@ mod tests {
#[test]
fn datum_accessors_and_conversions() {
- let datum = Datum::String("value");
+ let datum = Datum::String("value".into());
assert_eq!(datum.as_str(), "value");
assert!(!datum.is_null());