This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 4b95fc2 feat: introduce CompactedKeyEncoder (#124)
4b95fc2 is described below
commit 4b95fc2fc3b3c8462cfca0c65c6bf00eb95ba51e
Author: Keith Lee <[email protected]>
AuthorDate: Fri Jan 9 12:56:59 2026 +0000
feat: introduce CompactedKeyEncoder (#124)
---
crates/fluss/Cargo.toml | 1 +
crates/fluss/src/metadata/datatype.rs | 30 ++
crates/fluss/src/row/binary/binary_writer.rs | 210 +++++++++++++
crates/fluss/src/row/{compacted => binary}/mod.rs | 12 +-
.../src/row/compacted/compacted_key_writer.rs | 97 ++++++
crates/fluss/src/row/compacted/mod.rs | 3 +
crates/fluss/src/row/datum.rs | 10 +
.../fluss/src/row/encode/compacted_key_encoder.rs | 329 +++++++++++++++++++++
crates/fluss/src/row/encode/mod.rs | 64 ++++
crates/fluss/src/row/field_getter.rs | 116 ++++++++
crates/fluss/src/row/mod.rs | 9 +
11 files changed, 880 insertions(+), 1 deletion(-)
diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index 27604ee..e8c851f 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -59,6 +59,7 @@ uuid = { version = "1.10", features = ["v4"] }
tempfile = "3.23.0"
snafu = "0.8.3"
scopeguard = "1.2.0"
+delegate = "0.13.5"
[target.'cfg(target_arch = "wasm32")'.dependencies]
jiff = { workspace = true, features = ["js"] }
diff --git a/crates/fluss/src/metadata/datatype.rs
b/crates/fluss/src/metadata/datatype.rs
index 8ad4f7e..e5ccb9a 100644
--- a/crates/fluss/src/metadata/datatype.rs
+++ b/crates/fluss/src/metadata/datatype.rs
@@ -852,6 +852,36 @@ impl RowType {
pub fn fields(&self) -> &Vec<DataField> {
&self.fields
}
+
+ pub fn get_field_index(&self, field_name: &str) -> Option<usize> {
+ self.fields.iter().position(|f| f.name == field_name)
+ }
+
+ #[cfg(test)]
+ pub fn with_data_types(data_types: Vec<DataType>) -> Self {
+ let mut fields: Vec<DataField> = Vec::new();
+ data_types.iter().enumerate().for_each(|(idx, data_type)| {
+ fields.push(DataField::new(format!("f{}", idx), data_type.clone(),
None));
+ });
+
+ Self::with_nullable(true, fields)
+ }
+
+ #[cfg(test)]
+ pub fn with_data_types_and_field_names(
+ data_types: Vec<DataType>,
+ field_names: Vec<&str>,
+ ) -> Self {
+ let fields = data_types
+ .into_iter()
+ .zip(field_names)
+ .map(|(data_type, field_name)| {
+ DataField::new(field_name.to_string(), data_type.clone(), None)
+ })
+ .collect::<Vec<_>>();
+
+ Self::with_nullable(true, fields)
+ }
}
impl Display for RowType {
diff --git a/crates/fluss/src/row/binary/binary_writer.rs
b/crates/fluss/src/row/binary/binary_writer.rs
new file mode 100644
index 0000000..a296777
--- /dev/null
+++ b/crates/fluss/src/row/binary/binary_writer.rs
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+use crate::metadata::DataType;
+use crate::row::Datum;
+use crate::row::binary::BinaryRowFormat;
+
+/// Writer to write a composite data format, like row, array,
+#[allow(dead_code)]
+pub trait BinaryWriter {
+ /// Reset writer to prepare next write
+ fn reset(&mut self);
+
+ /// Set null to this field
+ fn set_null_at(&mut self, pos: usize);
+
+ fn write_boolean(&mut self, value: bool);
+
+ fn write_byte(&mut self, value: u8);
+
+ fn write_bytes(&mut self, value: &[u8]);
+
+ fn write_char(&mut self, value: &str, length: usize);
+
+ fn write_string(&mut self, value: &str);
+
+ fn write_short(&mut self, value: i16);
+
+ fn write_int(&mut self, value: i32);
+
+ fn write_long(&mut self, value: i64);
+
+ fn write_float(&mut self, value: f32);
+
+ fn write_double(&mut self, value: f64);
+
+ fn write_binary(&mut self, bytes: &[u8], length: usize);
+
+ // TODO Decimal type
+ // fn write_decimal(&mut self, pos: i32, value: f64);
+
+ // TODO Timestamp type
+ // fn write_timestamp_ntz(&mut self, pos: i32, value: i64);
+
+ // TODO Timestamp type
+ // fn write_timestamp_ltz(&mut self, pos: i32, value: i64);
+
+ // TODO InternalArray, ArraySerializer
+ // fn write_array(&mut self, pos: i32, value: i64);
+
+ // TODO Row serializer
+ // fn write_row(&mut self, pos: i32, value: &InternalRow);
+
+ /// Finally, complete write to set real size to binary.
+ fn complete(&mut self);
+}
+
+pub enum ValueWriter {
+ Nullable(InnerValueWriter),
+ NonNullable(InnerValueWriter),
+}
+
+impl ValueWriter {
+ pub fn create_value_writer(
+ element_type: &DataType,
+ binary_row_format: Option<&BinaryRowFormat>,
+ ) -> Result<ValueWriter> {
+ let value_writer =
+ InnerValueWriter::create_inner_value_writer(element_type,
binary_row_format)?;
+ if element_type.is_nullable() {
+ Ok(Self::Nullable(value_writer))
+ } else {
+ Ok(Self::NonNullable(value_writer))
+ }
+ }
+
+ pub fn write_value<W: BinaryWriter>(
+ &self,
+ writer: &mut W,
+ pos: usize,
+ value: &Datum,
+ ) -> Result<()> {
+ match self {
+ Self::Nullable(inner_value_writer) => {
+ if let Datum::Null = value {
+ writer.set_null_at(pos);
+ Ok(())
+ } else {
+ inner_value_writer.write_value(writer, pos, value)
+ }
+ }
+ Self::NonNullable(inner_value_writer) => {
+ inner_value_writer.write_value(writer, pos, value)
+ }
+ }
+ }
+}
+
+#[derive(Debug)]
+pub enum InnerValueWriter {
+ Char,
+ String,
+ Boolean,
+ Binary,
+ Bytes,
+ TinyInt,
+ SmallInt,
+ Int,
+ BigInt,
+ Float,
+ Double,
+ // TODO Decimal, Date, TimeWithoutTimeZone, TimestampWithoutTimeZone,
TimestampWithLocalTimeZone, Array, Row
+}
+
+/// Accessor for writing the fields/elements of a binary writer during
runtime, the
+/// fields/elements must be written in the order.
+impl InnerValueWriter {
+ pub fn create_inner_value_writer(
+ data_type: &DataType,
+ _: Option<&BinaryRowFormat>,
+ ) -> Result<InnerValueWriter> {
+ match data_type {
+ DataType::Char(_) => Ok(InnerValueWriter::Char),
+ DataType::String(_) => Ok(InnerValueWriter::String),
+ DataType::Boolean(_) => Ok(InnerValueWriter::Boolean),
+ DataType::Binary(_) => Ok(InnerValueWriter::Binary),
+ DataType::Bytes(_) => Ok(InnerValueWriter::Bytes),
+ DataType::TinyInt(_) => Ok(InnerValueWriter::TinyInt),
+ DataType::SmallInt(_) => Ok(InnerValueWriter::SmallInt),
+ DataType::Int(_) => Ok(InnerValueWriter::Int),
+ DataType::BigInt(_) => Ok(InnerValueWriter::BigInt),
+ DataType::Float(_) => Ok(InnerValueWriter::Float),
+ DataType::Double(_) => Ok(InnerValueWriter::Double),
+ _ => unimplemented!(
+ "ValueWriter for DataType {:?} is currently not implemented",
+ data_type
+ ),
+ }
+ }
+ pub fn write_value<W: BinaryWriter>(
+ &self,
+ writer: &mut W,
+ _pos: usize,
+ value: &Datum,
+ ) -> Result<()> {
+ match (self, value) {
+ (InnerValueWriter::Char, Datum::String(v)) => {
+ writer.write_char(v, v.len());
+ }
+ (InnerValueWriter::String, Datum::String(v)) => {
+ writer.write_string(v);
+ }
+ (InnerValueWriter::Boolean, Datum::Bool(v)) => {
+ writer.write_boolean(*v);
+ }
+ (InnerValueWriter::Binary, Datum::Blob(v)) => {
+ writer.write_binary(v.as_ref(), v.len());
+ }
+ (InnerValueWriter::Binary, Datum::BorrowedBlob(v)) => {
+ writer.write_binary(v.as_ref(), v.len());
+ }
+ (InnerValueWriter::Bytes, Datum::Blob(v)) => {
+ writer.write_bytes(v.as_ref());
+ }
+ (InnerValueWriter::Bytes, Datum::BorrowedBlob(v)) => {
+ writer.write_bytes(v.as_ref());
+ }
+ (InnerValueWriter::TinyInt, Datum::Int8(v)) => {
+ writer.write_byte(*v as u8);
+ }
+ (InnerValueWriter::SmallInt, Datum::Int16(v)) => {
+ writer.write_short(*v);
+ }
+ (InnerValueWriter::Int, Datum::Int32(v)) => {
+ writer.write_int(*v);
+ }
+ (InnerValueWriter::BigInt, Datum::Int64(v)) => {
+ writer.write_long(*v);
+ }
+ (InnerValueWriter::Float, Datum::Float32(v)) => {
+ writer.write_float(v.into_inner());
+ }
+ (InnerValueWriter::Double, Datum::Float64(v)) => {
+ writer.write_double(v.into_inner());
+ }
+ _ => {
+ return Err(IllegalArgument {
+ message: format!("{:?} used to write value {:?}", self,
value),
+ });
+ }
+ }
+ Ok(())
+ }
+}
diff --git a/crates/fluss/src/row/compacted/mod.rs
b/crates/fluss/src/row/binary/mod.rs
similarity index 76%
copy from crates/fluss/src/row/compacted/mod.rs
copy to crates/fluss/src/row/binary/mod.rs
index 695cdad..c31cbd5 100644
--- a/crates/fluss/src/row/compacted/mod.rs
+++ b/crates/fluss/src/row/binary/mod.rs
@@ -15,4 +15,14 @@
// specific language governing permissions and limitations
// under the License.
-mod compacted_row_writer;
+mod binary_writer;
+
+pub use binary_writer::*;
+
+/// The binary row format types, it indicates the generated [`BinaryRow`] type
by the [`BinaryWriter`]
+#[allow(dead_code)]
+pub enum BinaryRowFormat {
+ Compacted,
+ Aligned,
+ Indexed,
+}
diff --git a/crates/fluss/src/row/compacted/compacted_key_writer.rs
b/crates/fluss/src/row/compacted/compacted_key_writer.rs
new file mode 100644
index 0000000..84a6b22
--- /dev/null
+++ b/crates/fluss/src/row/compacted/compacted_key_writer.rs
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::row::compacted::compacted_row_writer::CompactedRowWriter;
+use bytes::Bytes;
+
+use crate::error::Result;
+use crate::metadata::DataType;
+use crate::row::binary::{BinaryRowFormat, BinaryWriter, ValueWriter};
+use delegate::delegate;
+
+/// A wrapping of [`CompactedRowWriter`] used to encode key columns.
+/// The encoding is the same as [`CompactedRowWriter`], but is without header
of null bits to
+/// represent whether the field value is null or not since the key columns
must be not null.
+pub struct CompactedKeyWriter {
+ delegate: CompactedRowWriter,
+}
+
+impl CompactedKeyWriter {
+ pub fn new() -> CompactedKeyWriter {
+ CompactedKeyWriter {
+ // in compacted key encoder, we don't need to set null bits as the
key columns must be not
+ // null, to use field count 0 to init to make the null bits 0
+ delegate: CompactedRowWriter::new(0),
+ }
+ }
+
+ pub fn create_value_writer(field_type: &DataType) -> Result<ValueWriter> {
+ ValueWriter::create_value_writer(field_type,
Some(&BinaryRowFormat::Compacted))
+ }
+
+ delegate! {
+ to self.delegate {
+ pub fn reset(&mut self);
+
+ #[allow(dead_code)]
+ pub fn position(&self) -> usize;
+
+ #[allow(dead_code)]
+ pub fn buffer(&self) -> &[u8];
+
+ pub fn to_bytes(&self) -> Bytes;
+ }
+ }
+}
+
+impl BinaryWriter for CompactedKeyWriter {
+ delegate! {
+ to self.delegate {
+ fn reset(&mut self);
+
+ fn set_null_at(&mut self, pos: usize);
+
+ fn write_boolean(&mut self, value: bool);
+
+ fn write_byte(&mut self, value: u8);
+
+ fn write_binary(&mut self, bytes: &[u8], length: usize);
+
+ fn write_bytes(&mut self, value: &[u8]);
+
+ fn write_char(&mut self, value: &str, _length: usize);
+
+ fn write_string(&mut self, value: &str);
+
+ fn write_short(&mut self, value: i16);
+
+ fn write_int(&mut self, value: i32);
+
+ fn write_long(&mut self, value: i64);
+
+ fn write_float(&mut self, value: f32);
+
+ fn write_double(&mut self, value: f64);
+
+
+ }
+ }
+
+ fn complete(&mut self) {
+ // do nothing
+ }
+}
diff --git a/crates/fluss/src/row/compacted/mod.rs
b/crates/fluss/src/row/compacted/mod.rs
index 695cdad..c81eb5a 100644
--- a/crates/fluss/src/row/compacted/mod.rs
+++ b/crates/fluss/src/row/compacted/mod.rs
@@ -15,4 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+mod compacted_key_writer;
mod compacted_row_writer;
+
+pub use compacted_key_writer::CompactedKeyWriter;
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index 1ea3933..28a378f 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -55,6 +55,8 @@ pub enum Datum<'a> {
String(&'a str),
#[display("{0}")]
Blob(Blob),
+ #[display("{:?}")]
+ BorrowedBlob(&'a [u8]),
#[display("{0}")]
Decimal(Decimal),
#[display("{0}")]
@@ -80,6 +82,7 @@ impl Datum<'_> {
pub fn as_blob(&self) -> &[u8] {
match self {
Self::Blob(blob) => blob.as_ref(),
+ Self::BorrowedBlob(blob) => blob,
_ => panic!("not a blob: {self:?}"),
}
}
@@ -289,6 +292,7 @@ impl Datum<'_> {
Datum::Float64(v) => append_value_to_arrow!(Float64Builder,
v.into_inner()),
Datum::String(v) => append_value_to_arrow!(StringBuilder, *v),
Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder,
v.as_ref()),
+ Datum::BorrowedBlob(v) => append_value_to_arrow!(BinaryBuilder,
*v),
Datum::Decimal(_) | Datum::Date(_) | Datum::Timestamp(_) |
Datum::TimestampTz(_) => {
return Err(RowConvertError {
message: format!(
@@ -406,6 +410,12 @@ impl From<Vec<u8>> for Blob {
}
}
+impl<'a> From<&'a [u8]> for Datum<'a> {
+ fn from(bytes: &'a [u8]) -> Datum<'a> {
+ Datum::BorrowedBlob(bytes)
+ }
+}
+
const UNIX_EPOCH_DAY: jiff::civil::Date = jiff::civil::date(1970, 1, 1);
impl Date {
diff --git a/crates/fluss/src/row/encode/compacted_key_encoder.rs
b/crates/fluss/src/row/encode/compacted_key_encoder.rs
new file mode 100644
index 0000000..b9335a3
--- /dev/null
+++ b/crates/fluss/src/row/encode/compacted_key_encoder.rs
@@ -0,0 +1,329 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+use crate::metadata::RowType;
+use crate::row::binary::ValueWriter;
+use crate::row::compacted::CompactedKeyWriter;
+use crate::row::encode::KeyEncoder;
+use crate::row::field_getter::FieldGetter;
+use crate::row::{Datum, InternalRow};
+use bytes::Bytes;
+
+#[allow(dead_code)]
+pub struct CompactedKeyEncoder {
+ field_getters: Vec<FieldGetter>,
+ field_encoders: Vec<ValueWriter>,
+ compacted_encoder: CompactedKeyWriter,
+}
+
+impl CompactedKeyEncoder {
+ /// Create a key encoder to encode the key of the input row.
+ ///
+ /// # Arguments
+ /// * `row_type` - the row type of the input row
+ /// * `keys` - the key fields to encode
+ ///
+ /// # Returns
+ /// * key_encoder - the [`KeyEncoder`]
+ pub fn create_key_encoder(row_type: &RowType, keys: &[String]) ->
Result<CompactedKeyEncoder> {
+ let mut encode_col_indexes = Vec::with_capacity(keys.len());
+
+ for key in keys {
+ match row_type.get_field_index(key) {
+ Some(idx) => encode_col_indexes.push(idx),
+ None => {
+ return Err(IllegalArgument {
+ message: format!(
+ "Field {:?} not found in input row type {:?}",
+ key, row_type
+ ),
+ });
+ }
+ }
+ }
+
+ Self::new(row_type, encode_col_indexes)
+ }
+
+ pub fn new(row_type: &RowType, encode_field_pos: Vec<usize>) ->
Result<CompactedKeyEncoder> {
+ let mut field_getters: Vec<FieldGetter> =
Vec::with_capacity(encode_field_pos.len());
+ let mut field_encoders: Vec<ValueWriter> =
Vec::with_capacity(encode_field_pos.len());
+
+ for pos in &encode_field_pos {
+ let data_type = row_type.fields().get(*pos).unwrap().data_type();
+ field_getters.push(FieldGetter::create(data_type, *pos));
+
field_encoders.push(CompactedKeyWriter::create_value_writer(data_type)?);
+ }
+
+ Ok(CompactedKeyEncoder {
+ field_encoders,
+ field_getters,
+ compacted_encoder: CompactedKeyWriter::new(),
+ })
+ }
+}
+
+#[allow(dead_code)]
+impl KeyEncoder for CompactedKeyEncoder {
+ fn encode_key(&mut self, row: &dyn InternalRow) -> Result<Bytes> {
+ self.compacted_encoder.reset();
+
+ // iterate all the fields of the row, and encode each field
+ for (pos, field_getter) in self.field_getters.iter().enumerate() {
+ match &field_getter.get_field(row) {
+ Datum::Null => {
+ return Err(IllegalArgument {
+ message: format!(
+ "Cannot encode key with null value at position:
{:?}",
+ pos
+ ),
+ });
+ }
+ value => self.field_encoders.get(pos).unwrap().write_value(
+ &mut self.compacted_encoder,
+ pos,
+ value,
+ )?,
+ }
+ }
+
+ Ok(self.compacted_encoder.to_bytes())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::metadata::DataTypes;
+ use crate::row::{Datum, GenericRow};
+
+ pub fn for_test_row_type(row_type: &RowType) -> CompactedKeyEncoder {
+ CompactedKeyEncoder::new(row_type,
(0..row_type.fields().len()).collect())
+ .expect("CompactedKeyEncoder initialization failed")
+ }
+
+ #[test]
+ fn test_encode_key() {
+ let row_type = RowType::with_data_types(vec![
+ DataTypes::int(),
+ DataTypes::bigint(),
+ DataTypes::int(),
+ ]);
+ let row = GenericRow::from_data(vec![
+ Datum::from(1i32),
+ Datum::from(3i64),
+ Datum::from(2i32),
+ ]);
+
+ let mut encoder = for_test_row_type(&row_type);
+
+ assert_eq!(
+ encoder.encode_key(&row).unwrap().iter().as_slice(),
+ [1u8, 3u8, 2u8]
+ );
+
+ let row = GenericRow::from_data(vec![
+ Datum::from(2i32),
+ Datum::from(5i64),
+ Datum::from(6i32),
+ ]);
+
+ assert_eq!(
+ encoder.encode_key(&row).unwrap().iter().as_slice(),
+ [2u8, 5u8, 6u8]
+ );
+ }
+
+ #[test]
+ fn test_encode_key_with_key_names() {
+ let data_types = vec![
+ DataTypes::string(),
+ DataTypes::bigint(),
+ DataTypes::string(),
+ ];
+ let field_names = vec!["partition", "f1", "f2"];
+
+ let row_type = RowType::with_data_types_and_field_names(data_types,
field_names);
+
+ let primary_keys = &["f2".to_string()];
+
+ let mut encoder = CompactedKeyEncoder::create_key_encoder(&row_type,
primary_keys).unwrap();
+
+ let row = GenericRow::from_data(vec![
+ Datum::from("p1"),
+ Datum::from(1i64),
+ Datum::from("a2"),
+ ]);
+
+ // should only get "a2" 's ASCII representation
+ assert_eq!(
+ encoder.encode_key(&row).unwrap().iter().as_slice(),
+ // 2 (start of text), 97 (the letter a), 50 (the number 2)
+ [2u8, 97u8, 50u8]
+ );
+ }
+
+ #[test]
+ #[should_panic(expected = "Cannot encode key with null value at position:
2")]
+ fn test_null_primary_key() {
+ let row_type = RowType::with_data_types(vec![
+ DataTypes::int(),
+ DataTypes::bigint(),
+ DataTypes::int(),
+ DataTypes::string(),
+ ]);
+
+ let primary_key_indices = vec![0, 1, 2];
+
+ let mut encoder = CompactedKeyEncoder::new(&row_type,
primary_key_indices)
+ .expect("CompactedKeyEncoder initialization failed");
+
+ let row = GenericRow::from_data(vec![
+ Datum::from(1i32),
+ Datum::from(3i64),
+ Datum::from(2i32),
+ Datum::from("a2"),
+ ]);
+
+ assert_eq!(
+ encoder.encode_key(&row).unwrap().iter().as_slice(),
+ [1u8, 3u8, 2u8]
+ );
+
+ let row = GenericRow::from_data(vec![
+ Datum::from(1i32),
+ Datum::from(3i64),
+ Datum::Null,
+ Datum::from("a2"),
+ ]);
+
+ encoder.encode_key(&row).unwrap();
+ }
+
+ #[test]
+ fn test_int_string_as_primary_key() {
+ let row_type = RowType::with_data_types(vec![
+ DataTypes::string(),
+ DataTypes::int(),
+ DataTypes::string(),
+ DataTypes::string(),
+ ]);
+
+ let primary_key_indices = vec![1, 2];
+ let mut encoder = CompactedKeyEncoder::new(&row_type,
primary_key_indices)
+ .expect("CompactedKeyEncoder initialization failed");
+
+ let row = GenericRow::from_data(vec![
+ Datum::from("a1"),
+ Datum::from(1i32),
+ Datum::from("a2"),
+ Datum::from("a3"),
+ ]);
+
+ assert_eq!(
+ encoder.encode_key(&row).unwrap().iter().as_slice(),
+ // 1 (1i32), 2 (start of text), 97 (the letter a), 50 (the number
2)
+ [1u8, 2u8, 97u8, 50u8]
+ );
+ }
+
+ #[test]
+ fn test_all_data_types() {
+ let row_type = RowType::with_data_types(vec![
+ DataTypes::boolean(),
+ DataTypes::tinyint(),
+ DataTypes::smallint(),
+ DataTypes::int(),
+ DataTypes::bigint(),
+ DataTypes::float(),
+ DataTypes::double(),
+ // TODO Date
+ // TODO Time
+ DataTypes::binary(20),
+ DataTypes::bytes(),
+ DataTypes::char(2),
+ DataTypes::string(),
+ // TODO Decimal
+ // TODO Timestamp
+ // TODO Timestamp LTZ
+ // TODO Array of Int
+ // TODO Array of Float
+ // TODO Array of String
+ // TODO: Add Map and Row fields in Issue #1973
+ ]);
+
+ let row = GenericRow::from_data(vec![
+ Datum::from(true),
+ Datum::from(2i8),
+ Datum::from(10i16),
+ Datum::from(100i32),
+ Datum::from(-6101065172474983726i64), // from Java test case: new
BigInteger("12345678901234567890").longValue()
+ Datum::from(13.2f32),
+ Datum::from(15.21f64),
+ // TODO Date
+ // TODO Time
+ Datum::from("1234567890".as_bytes()),
+ Datum::from("20".as_bytes()),
+ Datum::from("1"),
+ Datum::from("hello"),
+ // TODO Decimal
+ // TODO Timestamp
+ // TODO Timestamp LTZ
+ // TODO Array of Int
+ // TODO Array of Float
+ // TODO Array of String
+ // TODO: Add Map and Row fields in Issue #1973
+ ]);
+
+ let mut encoder = for_test_row_type(&row_type);
+
+ let mut expected: Vec<u8> = Vec::new();
+ // BOOLEAN: true
+ expected.extend(vec![0x01]);
+ // TINYINT: 2
+ expected.extend(vec![0x02]);
+ // SMALLINT: 10
+ expected.extend(vec![0x0A]);
+ // INT: 100
+ expected.extend(vec![0x00, 0x64]);
+ // BIGINT: -6101065172474983726
+ expected.extend(vec![
+ 0xD2, 0x95, 0xFC, 0xD8, 0xCE, 0xB1, 0xAA, 0xAA, 0xAB, 0x01,
+ ]);
+ // FLOAT: 13.2
+ expected.extend(vec![0x33, 0x33, 0x53, 0x41]);
+ // DOUBLE: 15.21
+ expected.extend(vec![0xEC, 0x51, 0xB8, 0x1E, 0x85, 0x6B, 0x2E, 0x40]);
+ // BINARY(20): "1234567890".getBytes()
+ expected.extend(vec![
+ 0x0A, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x30,
+ ]);
+
+ // BYTES: "20".getBytes()
+ expected.extend(vec![0x02, 0x32, 0x30]);
+ // CHAR(2): "1"
+ expected.extend(vec![0x01, 0x31]);
+ // STRING: String: "hello"
+ expected.extend(vec![0x05, 0x68, 0x65, 0x6C, 0x6C, 0x6F]);
+ assert_eq!(
+ encoder.encode_key(&row).unwrap().iter().as_slice(),
+ expected.as_slice()
+ );
+ }
+}
diff --git a/crates/fluss/src/row/encode/mod.rs
b/crates/fluss/src/row/encode/mod.rs
new file mode 100644
index 0000000..6c6eed9
--- /dev/null
+++ b/crates/fluss/src/row/encode/mod.rs
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod compacted_key_encoder;
+
+use crate::error::Result;
+use crate::metadata::{DataLakeFormat, RowType};
+use crate::row::InternalRow;
+use crate::row::encode::compacted_key_encoder::CompactedKeyEncoder;
+use bytes::Bytes;
+
+/// An interface for encoding key of row into bytes.
+#[allow(dead_code)]
+pub trait KeyEncoder {
+ fn encode_key(&mut self, row: &dyn InternalRow) -> Result<Bytes>;
+}
+
+#[allow(dead_code)]
+impl dyn KeyEncoder {
+ /// Create a key encoder to encode the key bytes of the input row.
+ /// # Arguments
+ /// * `row_type` - the row type of the input row
+ /// * `key_fields` - the key fields to encode
+ /// * `lake_format` - the data lake format
+ ///
+ /// # Returns
+ /// key encoder
+ pub fn of(
+ row_type: &RowType,
+ key_fields: Vec<String>,
+ data_lake_format: Option<DataLakeFormat>,
+ ) -> Result<Box<dyn KeyEncoder>> {
+ match data_lake_format {
+ Some(DataLakeFormat::Paimon) => {
+ unimplemented!("KeyEncoder for Paimon format is currently
unimplemented")
+ }
+ Some(DataLakeFormat::Lance) =>
Ok(Box::new(CompactedKeyEncoder::create_key_encoder(
+ row_type,
+ key_fields.as_slice(),
+ )?)),
+ Some(DataLakeFormat::Iceberg) => {
+ unimplemented!("KeyEncoder for Iceberg format is currently
unimplemented")
+ }
+ None => Ok(Box::new(CompactedKeyEncoder::create_key_encoder(
+ row_type,
+ key_fields.as_slice(),
+ )?)),
+ }
+ }
+}
diff --git a/crates/fluss/src/row/field_getter.rs
b/crates/fluss/src/row/field_getter.rs
new file mode 100644
index 0000000..3a9cf0f
--- /dev/null
+++ b/crates/fluss/src/row/field_getter.rs
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::DataType;
+use crate::row::{Datum, InternalRow};
+
+pub enum FieldGetter {
+ Nullable(InnerFieldGetter),
+ NonNullable(InnerFieldGetter),
+}
+impl FieldGetter {
+ pub fn get_field<'a>(&self, row: &'a dyn InternalRow) -> Datum<'a> {
+ match self {
+ FieldGetter::Nullable(getter) => {
+ if row.is_null_at(getter.pos()) {
+ Datum::Null
+ } else {
+ getter.get_field(row)
+ }
+ }
+ FieldGetter::NonNullable(getter) => getter.get_field(row),
+ }
+ }
+
+ pub fn create(data_type: &DataType, pos: usize) -> FieldGetter {
+ let inner_field_getter = match data_type {
+ DataType::Char(t) => InnerFieldGetter::Char {
+ pos,
+ len: t.length() as usize,
+ },
+ DataType::String(_) => InnerFieldGetter::String { pos },
+ DataType::Boolean(_) => InnerFieldGetter::Bool { pos },
+ DataType::Binary(t) => InnerFieldGetter::Binary {
+ pos,
+ len: t.length(),
+ },
+ DataType::Bytes(_) => InnerFieldGetter::Bytes { pos },
+ DataType::TinyInt(_) => InnerFieldGetter::TinyInt { pos },
+ DataType::SmallInt(_) => InnerFieldGetter::SmallInt { pos },
+ DataType::Int(_) => InnerFieldGetter::Int { pos },
+ DataType::BigInt(_) => InnerFieldGetter::BigInt { pos },
+ DataType::Float(_) => InnerFieldGetter::Float { pos },
+ DataType::Double(_) => InnerFieldGetter::Double { pos },
+ _ => unimplemented!("DataType {:?} is currently unimplemented",
data_type),
+ };
+
+ if data_type.is_nullable() {
+ Self::Nullable(inner_field_getter)
+ } else {
+ Self::NonNullable(inner_field_getter)
+ }
+ }
+}
+
+pub enum InnerFieldGetter {
+ Char { pos: usize, len: usize },
+ String { pos: usize },
+ Bool { pos: usize },
+ Binary { pos: usize, len: usize },
+ Bytes { pos: usize },
+ TinyInt { pos: usize },
+ SmallInt { pos: usize },
+ Int { pos: usize },
+ BigInt { pos: usize },
+ Float { pos: usize },
+ Double { pos: usize },
+}
+
+impl InnerFieldGetter {
+ pub fn get_field<'a>(&self, row: &'a dyn InternalRow) -> Datum<'a> {
+ match self {
+ InnerFieldGetter::Char { pos, len } =>
Datum::String(row.get_char(*pos, *len)),
+ InnerFieldGetter::String { pos } =>
Datum::from(row.get_string(*pos)),
+ InnerFieldGetter::Bool { pos } =>
Datum::from(row.get_boolean(*pos)),
+ InnerFieldGetter::Binary { pos, len } =>
Datum::from(row.get_binary(*pos, *len)),
+ InnerFieldGetter::Bytes { pos } =>
Datum::from(row.get_bytes(*pos)),
+ InnerFieldGetter::TinyInt { pos } =>
Datum::from(row.get_byte(*pos)),
+ InnerFieldGetter::SmallInt { pos } =>
Datum::from(row.get_short(*pos)),
+ InnerFieldGetter::Int { pos } => Datum::from(row.get_int(*pos)),
+ InnerFieldGetter::BigInt { pos } =>
Datum::from(row.get_long(*pos)),
+ InnerFieldGetter::Float { pos } =>
Datum::from(row.get_float(*pos)),
+ InnerFieldGetter::Double { pos } =>
Datum::from(row.get_double(*pos)),
+ //TODO Decimal, Date, Time, Timestamp, TimestampLTZ, Array, Map,
Row
+ }
+ }
+
+ pub fn pos(&self) -> usize {
+ match self {
+ Self::Char { pos, .. }
+ | Self::String { pos }
+ | Self::Bool { pos }
+ | Self::Binary { pos, .. }
+ | Self::Bytes { pos }
+ | Self::TinyInt { pos }
+ | Self::SmallInt { pos, .. }
+ | Self::Int { pos }
+ | Self::BigInt { pos }
+ | Self::Float { pos, .. }
+ | Self::Double { pos } => *pos,
+ }
+ }
+}
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index 86fdf90..c321ab9 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -19,11 +19,15 @@ mod column;
mod datum;
+mod binary;
mod compacted;
+mod encode;
+mod field_getter;
pub use column::*;
pub use datum::*;
+// TODO make functions return Result<?> for better error handling
pub trait InternalRow {
/// Returns the number of fields in this row
fn get_field_count(&self) -> usize;
@@ -143,6 +147,11 @@ impl<'a> Default for GenericRow<'a> {
}
impl<'a> GenericRow<'a> {
+ pub fn from_data(data: Vec<impl Into<Datum<'a>>>) -> GenericRow<'a> {
+ GenericRow {
+ values: data.into_iter().map(Into::into).collect(),
+ }
+ }
pub fn new() -> GenericRow<'a> {
GenericRow { values: vec![] }
}