This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 6fc63e4 feat: introduce CompactedRowReader, CompactedRow,
CompactedRowDeserializer (#131)
6fc63e4 is described below
commit 6fc63e465bf6187ee6aa9e7c87a982030bdf44d6
Author: Kelvin Wu <[email protected]>
AuthorDate: Sat Jan 10 22:01:24 2026 +0800
feat: introduce CompactedRowReader, CompactedRow, CompactedRowDeserializer
(#131)
---
crates/fluss/src/metadata/datatype.rs | 2 +-
crates/fluss/src/row/binary/binary_writer.rs | 2 +-
crates/fluss/src/row/compacted/compacted_row.rs | 260 +++++++++++++++++++++
.../src/row/compacted/compacted_row_reader.rs | 218 +++++++++++++++++
.../src/row/compacted/compacted_row_writer.rs | 5 +-
crates/fluss/src/row/compacted/mod.rs | 9 +
crates/fluss/src/row/datum.rs | 10 +-
.../fluss/src/row/encode/compacted_key_encoder.rs | 10 +-
8 files changed, 502 insertions(+), 14 deletions(-)
diff --git a/crates/fluss/src/metadata/datatype.rs
b/crates/fluss/src/metadata/datatype.rs
index e5ccb9a..c53cd27 100644
--- a/crates/fluss/src/metadata/datatype.rs
+++ b/crates/fluss/src/metadata/datatype.rs
@@ -861,7 +861,7 @@ impl RowType {
pub fn with_data_types(data_types: Vec<DataType>) -> Self {
let mut fields: Vec<DataField> = Vec::new();
data_types.iter().enumerate().for_each(|(idx, data_type)| {
- fields.push(DataField::new(format!("f{}", idx), data_type.clone(),
None));
+ fields.push(DataField::new(format!("f{idx}"), data_type.clone(),
None));
});
Self::with_nullable(true, fields)
diff --git a/crates/fluss/src/row/binary/binary_writer.rs
b/crates/fluss/src/row/binary/binary_writer.rs
index a296777..44f10b6 100644
--- a/crates/fluss/src/row/binary/binary_writer.rs
+++ b/crates/fluss/src/row/binary/binary_writer.rs
@@ -201,7 +201,7 @@ impl InnerValueWriter {
}
_ => {
return Err(IllegalArgument {
- message: format!("{:?} used to write value {:?}", self,
value),
+ message: format!("{self:?} used to write value {value:?}"),
});
}
}
diff --git a/crates/fluss/src/row/compacted/compacted_row.rs
b/crates/fluss/src/row/compacted/compacted_row.rs
new file mode 100644
index 0000000..fca41c6
--- /dev/null
+++ b/crates/fluss/src/row/compacted/compacted_row.rs
@@ -0,0 +1,260 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Bytes;
+
+use crate::metadata::DataType;
+use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer,
CompactedRowReader};
+use crate::row::{GenericRow, InternalRow};
+
+// Reference implementation:
+//
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRow.java
+#[allow(dead_code)]
+pub struct CompactedRow {
+ arity: usize,
+ segment: Bytes,
+ offset: usize,
+ size_in_bytes: usize,
+ decoded: bool,
+ decoded_row: GenericRow<'static>,
+ reader: CompactedRowReader,
+ deserializer: CompactedRowDeserializer,
+}
+
+#[allow(dead_code)]
+impl CompactedRow {
+ pub fn calculate_bit_set_width_in_bytes(arity: usize) -> usize {
+ arity.div_ceil(8)
+ }
+
+ pub fn new(types: Vec<DataType>) -> Self {
+ let arity = types.len();
+ Self {
+ arity,
+ segment: Bytes::new(),
+ offset: 0,
+ size_in_bytes: 0,
+ decoded: false,
+ decoded_row: GenericRow::new(),
+ reader: CompactedRowReader::new(arity),
+ deserializer: CompactedRowDeserializer::new(types),
+ }
+ }
+
+ pub fn from_bytes(types: Vec<DataType>, data: Bytes) -> Self {
+ let arity = types.len();
+ let size = data.len();
+ Self {
+ arity,
+ segment: data,
+ offset: 0,
+ size_in_bytes: size,
+ decoded: false,
+ decoded_row: GenericRow::new(),
+ reader: CompactedRowReader::new(arity),
+ deserializer: CompactedRowDeserializer::new(types),
+ }
+ }
+
+ pub fn point_to(&mut self, segment: Bytes, offset: usize, size_in_bytes:
usize) {
+ self.segment = segment;
+ self.offset = offset;
+ self.size_in_bytes = size_in_bytes;
+ self.decoded = false;
+ }
+
+ pub fn get_segment(&self) -> &Bytes {
+ &self.segment
+ }
+
+ pub fn get_offset(&self) -> usize {
+ self.offset
+ }
+
+ pub fn get_size_in_bytes(&self) -> usize {
+ self.size_in_bytes
+ }
+
+ pub fn get_field_count(&self) -> usize {
+ self.arity
+ }
+
+ pub fn is_null_at(&self, pos: usize) -> bool {
+ let byte_index = pos >> 3;
+ let bit = pos & 7;
+ let idx = self.offset + byte_index;
+ (self.segment[idx] & (1u8 << bit)) != 0
+ }
+
+ fn decoded_row(&mut self) -> &GenericRow<'static> {
+ if !self.decoded {
+ self.reader
+ .point_to(self.segment.clone(), self.offset,
self.size_in_bytes);
+ self.decoded_row = self.deserializer.deserialize(&mut self.reader);
+ self.decoded = true;
+ }
+ &self.decoded_row
+ }
+
+ pub fn get_boolean(&mut self, pos: usize) -> bool {
+ self.decoded_row().get_boolean(pos)
+ }
+
+ pub fn get_byte(&mut self, pos: usize) -> i8 {
+ self.decoded_row().get_byte(pos)
+ }
+
+ pub fn get_short(&mut self, pos: usize) -> i16 {
+ self.decoded_row().get_short(pos)
+ }
+
+ pub fn get_int(&mut self, pos: usize) -> i32 {
+ self.decoded_row().get_int(pos)
+ }
+
+ pub fn get_long(&mut self, pos: usize) -> i64 {
+ self.decoded_row().get_long(pos)
+ }
+
+ pub fn get_float(&mut self, pos: usize) -> f32 {
+ self.decoded_row().get_float(pos)
+ }
+
+ pub fn get_double(&mut self, pos: usize) -> f64 {
+ self.decoded_row().get_double(pos)
+ }
+
+ pub fn get_string(&mut self, pos: usize) -> &str {
+ self.decoded_row().get_string(pos)
+ }
+
+ pub fn get_bytes(&mut self, pos: usize) -> &[u8] {
+ self.decoded_row().get_bytes(pos)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::metadata::{
+ BigIntType, BooleanType, BytesType, DoubleType, FloatType, IntType,
SmallIntType,
+ StringType, TinyIntType,
+ };
+ use crate::row::compacted::compacted_row_writer::CompactedRowWriter;
+
+ #[test]
+ fn test_compacted_row() {
+ // Test all primitive types
+ let types = vec![
+ DataType::Boolean(BooleanType::new()),
+ DataType::TinyInt(TinyIntType::new()),
+ DataType::SmallInt(SmallIntType::new()),
+ DataType::Int(IntType::new()),
+ DataType::BigInt(BigIntType::new()),
+ DataType::Float(FloatType::new()),
+ DataType::Double(DoubleType::new()),
+ DataType::String(StringType::new()),
+ DataType::Bytes(BytesType::new()),
+ ];
+
+ let mut row = CompactedRow::new(types.clone());
+ let mut writer = CompactedRowWriter::new(types.len());
+
+ writer.write_boolean(true);
+ writer.write_byte(1);
+ writer.write_short(100);
+ writer.write_int(1000);
+ writer.write_long(10000);
+ writer.write_float(1.5);
+ writer.write_double(2.5);
+ writer.write_string("Hello World");
+ writer.write_bytes(&[1, 2, 3, 4, 5]);
+
+ row.point_to(writer.to_bytes(), 0, writer.position());
+
+ assert_eq!(row.get_field_count(), 9);
+ assert!(row.get_boolean(0));
+ assert_eq!(row.get_byte(1), 1);
+ assert_eq!(row.get_short(2), 100);
+ assert_eq!(row.get_int(3), 1000);
+ assert_eq!(row.get_long(4), 10000);
+ assert_eq!(row.get_float(5), 1.5);
+ assert_eq!(row.get_double(6), 2.5);
+ assert_eq!(row.get_string(7), "Hello World");
+ assert_eq!(row.get_bytes(8), &[1, 2, 3, 4, 5]);
+
+ // Test with nulls
+ let types = vec![
+ DataType::Int(IntType::new()),
+ DataType::String(StringType::new()),
+ DataType::Double(DoubleType::new()),
+ ];
+
+ let mut row = CompactedRow::new(types.clone());
+ let mut writer = CompactedRowWriter::new(types.len());
+
+ writer.write_int(100);
+ writer.set_null_at(1);
+ writer.write_double(2.71);
+
+ row.point_to(writer.to_bytes(), 0, writer.position());
+
+ assert!(!row.is_null_at(0));
+ assert!(row.is_null_at(1));
+ assert!(!row.is_null_at(2));
+ assert_eq!(row.get_int(0), 100);
+ assert_eq!(row.get_double(2), 2.71);
+
+ // Test multiple reads (caching)
+ assert_eq!(row.get_int(0), 100);
+ assert_eq!(row.get_int(0), 100);
+
+ // Test from_bytes
+ let types = vec![
+ DataType::Int(IntType::new()),
+ DataType::String(StringType::new()),
+ ];
+
+ let mut writer = CompactedRowWriter::new(types.len());
+ writer.write_int(42);
+ writer.write_string("test");
+
+ let mut row = CompactedRow::from_bytes(types, writer.to_bytes());
+
+ assert_eq!(row.get_int(0), 42);
+ assert_eq!(row.get_string(1), "test");
+
+ // Test large row
+ let num_fields = 100;
+ let types: Vec<DataType> = (0..num_fields)
+ .map(|_| DataType::Int(IntType::new()))
+ .collect();
+
+ let mut row = CompactedRow::new(types.clone());
+ let mut writer = CompactedRowWriter::new(num_fields);
+
+ for i in 0..num_fields {
+ writer.write_int((i * 10) as i32);
+ }
+
+ row.point_to(writer.to_bytes(), 0, writer.position());
+
+ for i in 0..num_fields {
+ assert_eq!(row.get_int(i), (i * 10) as i32);
+ }
+ }
+}
diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs
b/crates/fluss/src/row/compacted/compacted_row_reader.rs
new file mode 100644
index 0000000..19afe88
--- /dev/null
+++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Bytes;
+
+use crate::{
+ metadata::DataType,
+ row::{
+ Datum, GenericRow,
+ compacted::{compacted_row::CompactedRow,
compacted_row_writer::CompactedRowWriter},
+ },
+};
+
+#[allow(dead_code)]
+pub struct CompactedRowDeserializer {
+ schema: Vec<DataType>,
+}
+
+#[allow(dead_code)]
+impl CompactedRowDeserializer {
+ pub fn new(schema: Vec<DataType>) -> Self {
+ Self { schema }
+ }
+
+ pub fn deserialize(&self, reader: &mut CompactedRowReader) ->
GenericRow<'static> {
+ let mut row = GenericRow::new();
+ for (pos, dtype) in self.schema.iter().enumerate() {
+ if reader.is_null_at(pos) {
+ row.set_field(pos, Datum::Null);
+ continue;
+ }
+ let datum = match dtype {
+ DataType::Boolean(_) => Datum::Bool(reader.read_boolean()),
+ DataType::TinyInt(_) => Datum::Int8(reader.read_byte() as i8),
+ DataType::SmallInt(_) => Datum::Int16(reader.read_short()),
+ DataType::Int(_) => Datum::Int32(reader.read_int()),
+ DataType::BigInt(_) => Datum::Int64(reader.read_long()),
+ DataType::Float(_) =>
Datum::Float32(reader.read_float().into()),
+ DataType::Double(_) =>
Datum::Float64(reader.read_double().into()),
+ // TODO: use read_char(length) in the future, but need to keep
compatibility
+ DataType::Char(_) | DataType::String(_) =>
Datum::OwnedString(reader.read_string()),
+ // TODO: use read_binary(length) in the future, but need to
keep compatibility
+ DataType::Bytes(_) | DataType::Binary(_) => {
+ Datum::Blob(reader.read_bytes().into_vec().into())
+ }
+ _ => panic!("unsupported DataType in
CompactedRowDeserializer"),
+ };
+ row.set_field(pos, datum);
+ }
+ row
+ }
+}
+
+// Reference implementation:
+//
https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java
+#[allow(dead_code)]
+pub struct CompactedRowReader {
+ segment: Bytes,
+ offset: usize,
+ position: usize,
+ limit: usize,
+ header_size_in_bytes: usize,
+}
+
+#[allow(dead_code)]
+impl CompactedRowReader {
+ pub fn new(field_count: usize) -> Self {
+ let header =
CompactedRow::calculate_bit_set_width_in_bytes(field_count);
+ Self {
+ header_size_in_bytes: header,
+ segment: Bytes::new(),
+ offset: 0,
+ position: 0,
+ limit: 0,
+ }
+ }
+
+ pub fn point_to(&mut self, data: Bytes, offset: usize, length: usize) {
+ let limit = offset + length;
+ let position = offset + self.header_size_in_bytes;
+
+ debug_assert!(limit <= data.len());
+ debug_assert!(position <= limit);
+
+ self.segment = data;
+ self.offset = offset;
+ self.position = position;
+ self.limit = limit;
+ }
+
+ pub fn is_null_at(&self, pos: usize) -> bool {
+ let byte_index = pos >> 3;
+ let bit = pos & 7;
+ debug_assert!(byte_index < self.header_size_in_bytes);
+ let idx = self.offset + byte_index;
+ (self.segment[idx] & (1u8 << bit)) != 0
+ }
+
+ pub fn read_boolean(&mut self) -> bool {
+ self.read_byte() != 0
+ }
+
+ pub fn read_byte(&mut self) -> u8 {
+ debug_assert!(self.position < self.limit);
+ let b = self.segment[self.position];
+ self.position += 1;
+ b
+ }
+
+ pub fn read_short(&mut self) -> i16 {
+ debug_assert!(self.position + 2 <= self.limit);
+ let bytes_slice = &self.segment[self.position..self.position + 2];
+ let byte_array: [u8; 2] = bytes_slice
+ .try_into()
+ .expect("Slice must be exactly 2 bytes long");
+
+ self.position += 2;
+ i16::from_ne_bytes(byte_array)
+ }
+
+ pub fn read_int(&mut self) -> i32 {
+ let mut result: u32 = 0;
+ let mut shift = 0;
+
+ for _ in 0..CompactedRowWriter::MAX_INT_SIZE {
+ let b = self.read_byte();
+ result |= ((b & 0x7F) as u32) << shift;
+ if (b & 0x80) == 0 {
+ return result as i32;
+ }
+ shift += 7;
+ }
+
+ panic!("Invalid input stream.");
+ }
+
+ pub fn read_long(&mut self) -> i64 {
+ let mut result: u64 = 0;
+ let mut shift = 0;
+
+ for _ in 0..CompactedRowWriter::MAX_LONG_SIZE {
+ let b = self.read_byte();
+ result |= ((b & 0x7F) as u64) << shift;
+ if (b & 0x80) == 0 {
+ return result as i64;
+ }
+ shift += 7;
+ }
+
+ panic!("Invalid input stream.");
+ }
+
+ pub fn read_float(&mut self) -> f32 {
+ debug_assert!(self.position + 4 <= self.limit);
+ let bytes_slice = &self.segment[self.position..self.position + 4];
+ let byte_array: [u8; 4] = bytes_slice
+ .try_into()
+ .expect("Slice must be exactly 4 bytes long");
+
+ self.position += 4;
+ f32::from_ne_bytes(byte_array)
+ }
+
+ pub fn read_double(&mut self) -> f64 {
+ debug_assert!(self.position + 8 <= self.limit);
+ let bytes_slice = &self.segment[self.position..self.position + 8];
+ let byte_array: [u8; 8] = bytes_slice
+ .try_into()
+ .expect("Slice must be exactly 8 bytes long");
+
+ self.position += 8;
+ f64::from_ne_bytes(byte_array)
+ }
+
+ pub fn read_binary(&mut self, length: usize) -> Bytes {
+ debug_assert!(self.position + length <= self.limit);
+
+ let start = self.position;
+ let end = start + length;
+ self.position = end;
+
+ self.segment.slice(start..end)
+ }
+
+ pub fn read_bytes(&mut self) -> Box<[u8]> {
+ let len = self.read_int();
+ debug_assert!(len >= 0);
+
+ let len = len as usize;
+ debug_assert!(self.position + len <= self.limit);
+
+ let start = self.position;
+ let end = start + len;
+ self.position = end;
+
+ self.segment[start..end].to_vec().into_boxed_slice()
+ }
+
+ pub fn read_string(&mut self) -> String {
+ let bytes = self.read_bytes();
+ String::from_utf8(bytes.into_vec())
+ .unwrap_or_else(|e| panic!("Invalid UTF-8 in string data: {e}"))
+ }
+}
diff --git a/crates/fluss/src/row/compacted/compacted_row_writer.rs
b/crates/fluss/src/row/compacted/compacted_row_writer.rs
index 2debab1..8345123 100644
--- a/crates/fluss/src/row/compacted/compacted_row_writer.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_writer.rs
@@ -18,6 +18,8 @@
use bytes::{Bytes, BytesMut};
use std::cmp;
+use crate::row::compacted::compacted_row::CompactedRow;
+
// Writer for CompactedRow
// Reference implementation:
//
https://github.com/apache/fluss/blob/d4a72fad240d4b81563aaf83fa3b09b5058674ed/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowWriter.java#L71
@@ -34,8 +36,7 @@ impl CompactedRowWriter {
pub const MAX_LONG_SIZE: usize = 10;
pub fn new(field_count: usize) -> Self {
- // bitset width in bytes, it should be in CompactedRow
- let header_size = field_count.div_ceil(8);
+ let header_size =
CompactedRow::calculate_bit_set_width_in_bytes(field_count);
let cap = cmp::max(64, header_size);
let mut buffer = BytesMut::with_capacity(cap);
diff --git a/crates/fluss/src/row/compacted/mod.rs
b/crates/fluss/src/row/compacted/mod.rs
index c81eb5a..3361078 100644
--- a/crates/fluss/src/row/compacted/mod.rs
+++ b/crates/fluss/src/row/compacted/mod.rs
@@ -16,6 +16,15 @@
// under the License.
mod compacted_key_writer;
+
+mod compacted_row;
+mod compacted_row_reader;
mod compacted_row_writer;
pub use compacted_key_writer::CompactedKeyWriter;
+#[allow(unused_imports)]
+pub use compacted_row::CompactedRow;
+#[allow(unused_imports)]
+pub use compacted_row_reader::{CompactedRowDeserializer, CompactedRowReader};
+#[allow(unused_imports)]
+pub use compacted_row_writer::CompactedRowWriter;
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index 28a378f..78872a9 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -53,6 +53,9 @@ pub enum Datum<'a> {
Float64(F64),
#[display("'{0}'")]
String(&'a str),
+ /// Owned string
+ #[display("'{0}'")]
+ OwnedString(String),
#[display("{0}")]
Blob(Blob),
#[display("{:?}")]
@@ -75,6 +78,7 @@ impl Datum<'_> {
pub fn as_str(&self) -> &str {
match self {
Self::String(s) => s,
+ Self::OwnedString(s) => s.as_str(),
_ => panic!("not a string: {self:?}"),
}
}
@@ -216,13 +220,14 @@ impl TryFrom<&Datum<'_>> for bool {
}
}
-impl<'a> TryFrom<&Datum<'a>> for &'a str {
+impl<'b, 'a: 'b> TryFrom<&'b Datum<'a>> for &'b str {
type Error = ();
#[inline]
- fn try_from(from: &Datum<'a>) -> std::result::Result<Self, Self::Error> {
+ fn try_from(from: &'b Datum<'a>) -> std::result::Result<Self, Self::Error>
{
match from {
Datum::String(i) => Ok(*i),
+ Datum::OwnedString(s) => Ok(s.as_str()),
_ => Err(()),
}
}
@@ -291,6 +296,7 @@ impl Datum<'_> {
Datum::Float32(v) => append_value_to_arrow!(Float32Builder,
v.into_inner()),
Datum::Float64(v) => append_value_to_arrow!(Float64Builder,
v.into_inner()),
Datum::String(v) => append_value_to_arrow!(StringBuilder, *v),
+ Datum::OwnedString(v) => append_value_to_arrow!(StringBuilder,
v.as_str()),
Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder,
v.as_ref()),
Datum::BorrowedBlob(v) => append_value_to_arrow!(BinaryBuilder,
*v),
Datum::Decimal(_) | Datum::Date(_) | Datum::Timestamp(_) |
Datum::TimestampTz(_) => {
diff --git a/crates/fluss/src/row/encode/compacted_key_encoder.rs
b/crates/fluss/src/row/encode/compacted_key_encoder.rs
index b9335a3..ebe3da2 100644
--- a/crates/fluss/src/row/encode/compacted_key_encoder.rs
+++ b/crates/fluss/src/row/encode/compacted_key_encoder.rs
@@ -49,10 +49,7 @@ impl CompactedKeyEncoder {
Some(idx) => encode_col_indexes.push(idx),
None => {
return Err(IllegalArgument {
- message: format!(
- "Field {:?} not found in input row type {:?}",
- key, row_type
- ),
+ message: format!("Field {key:?} not found in input row
type {row_type:?}"),
});
}
}
@@ -89,10 +86,7 @@ impl KeyEncoder for CompactedKeyEncoder {
match &field_getter.get_field(row) {
Datum::Null => {
return Err(IllegalArgument {
- message: format!(
- "Cannot encode key with null value at position:
{:?}",
- pos
- ),
+ message: format!("Cannot encode key with null value at
position: {pos:?}"),
});
}
value => self.field_encoders.get(pos).unwrap().write_value(