This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b95fc2  feat:  introduce CompactedKeyEncoder (#124)
4b95fc2 is described below

commit 4b95fc2fc3b3c8462cfca0c65c6bf00eb95ba51e
Author: Keith Lee <[email protected]>
AuthorDate: Fri Jan 9 12:56:59 2026 +0000

    feat:  introduce CompactedKeyEncoder (#124)
---
 crates/fluss/Cargo.toml                            |   1 +
 crates/fluss/src/metadata/datatype.rs              |  30 ++
 crates/fluss/src/row/binary/binary_writer.rs       | 210 +++++++++++++
 crates/fluss/src/row/{compacted => binary}/mod.rs  |  12 +-
 .../src/row/compacted/compacted_key_writer.rs      |  97 ++++++
 crates/fluss/src/row/compacted/mod.rs              |   3 +
 crates/fluss/src/row/datum.rs                      |  10 +
 .../fluss/src/row/encode/compacted_key_encoder.rs  | 329 +++++++++++++++++++++
 crates/fluss/src/row/encode/mod.rs                 |  64 ++++
 crates/fluss/src/row/field_getter.rs               | 116 ++++++++
 crates/fluss/src/row/mod.rs                        |   9 +
 11 files changed, 880 insertions(+), 1 deletion(-)

diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index 27604ee..e8c851f 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -59,6 +59,7 @@ uuid = { version = "1.10", features = ["v4"] }
 tempfile = "3.23.0"
 snafu = "0.8.3"
 scopeguard = "1.2.0"
+delegate = "0.13.5"
 
 [target.'cfg(target_arch = "wasm32")'.dependencies]
 jiff = { workspace = true, features = ["js"] }
diff --git a/crates/fluss/src/metadata/datatype.rs 
b/crates/fluss/src/metadata/datatype.rs
index 8ad4f7e..e5ccb9a 100644
--- a/crates/fluss/src/metadata/datatype.rs
+++ b/crates/fluss/src/metadata/datatype.rs
@@ -852,6 +852,36 @@ impl RowType {
     pub fn fields(&self) -> &Vec<DataField> {
         &self.fields
     }
+
+    pub fn get_field_index(&self, field_name: &str) -> Option<usize> {
+        self.fields.iter().position(|f| f.name == field_name)
+    }
+
+    #[cfg(test)]
+    pub fn with_data_types(data_types: Vec<DataType>) -> Self {
+        let mut fields: Vec<DataField> = Vec::new();
+        data_types.iter().enumerate().for_each(|(idx, data_type)| {
+            fields.push(DataField::new(format!("f{}", idx), data_type.clone(), 
None));
+        });
+
+        Self::with_nullable(true, fields)
+    }
+
+    #[cfg(test)]
+    pub fn with_data_types_and_field_names(
+        data_types: Vec<DataType>,
+        field_names: Vec<&str>,
+    ) -> Self {
+        let fields = data_types
+            .into_iter()
+            .zip(field_names)
+            .map(|(data_type, field_name)| {
+                DataField::new(field_name.to_string(), data_type.clone(), None)
+            })
+            .collect::<Vec<_>>();
+
+        Self::with_nullable(true, fields)
+    }
 }
 
 impl Display for RowType {
diff --git a/crates/fluss/src/row/binary/binary_writer.rs 
b/crates/fluss/src/row/binary/binary_writer.rs
new file mode 100644
index 0000000..a296777
--- /dev/null
+++ b/crates/fluss/src/row/binary/binary_writer.rs
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+use crate::metadata::DataType;
+use crate::row::Datum;
+use crate::row::binary::BinaryRowFormat;
+
+/// Writer to write a composite data format, like row, array,
+#[allow(dead_code)]
+pub trait BinaryWriter {
+    /// Reset writer to prepare next write
+    fn reset(&mut self);
+
+    /// Set null to this field
+    fn set_null_at(&mut self, pos: usize);
+
+    fn write_boolean(&mut self, value: bool);
+
+    fn write_byte(&mut self, value: u8);
+
+    fn write_bytes(&mut self, value: &[u8]);
+
+    fn write_char(&mut self, value: &str, length: usize);
+
+    fn write_string(&mut self, value: &str);
+
+    fn write_short(&mut self, value: i16);
+
+    fn write_int(&mut self, value: i32);
+
+    fn write_long(&mut self, value: i64);
+
+    fn write_float(&mut self, value: f32);
+
+    fn write_double(&mut self, value: f64);
+
+    fn write_binary(&mut self, bytes: &[u8], length: usize);
+
+    // TODO Decimal type
+    // fn write_decimal(&mut self, pos: i32, value: f64);
+
+    // TODO Timestamp type
+    // fn write_timestamp_ntz(&mut self, pos: i32, value: i64);
+
+    // TODO Timestamp type
+    // fn write_timestamp_ltz(&mut self, pos: i32, value: i64);
+
+    // TODO InternalArray, ArraySerializer
+    // fn write_array(&mut self, pos: i32, value: i64);
+
+    // TODO Row serializer
+    // fn write_row(&mut self, pos: i32, value: &InternalRow);
+
+    /// Finally, complete write to set real size to binary.
+    fn complete(&mut self);
+}
+
+pub enum ValueWriter {
+    Nullable(InnerValueWriter),
+    NonNullable(InnerValueWriter),
+}
+
+impl ValueWriter {
+    pub fn create_value_writer(
+        element_type: &DataType,
+        binary_row_format: Option<&BinaryRowFormat>,
+    ) -> Result<ValueWriter> {
+        let value_writer =
+            InnerValueWriter::create_inner_value_writer(element_type, 
binary_row_format)?;
+        if element_type.is_nullable() {
+            Ok(Self::Nullable(value_writer))
+        } else {
+            Ok(Self::NonNullable(value_writer))
+        }
+    }
+
+    pub fn write_value<W: BinaryWriter>(
+        &self,
+        writer: &mut W,
+        pos: usize,
+        value: &Datum,
+    ) -> Result<()> {
+        match self {
+            Self::Nullable(inner_value_writer) => {
+                if let Datum::Null = value {
+                    writer.set_null_at(pos);
+                    Ok(())
+                } else {
+                    inner_value_writer.write_value(writer, pos, value)
+                }
+            }
+            Self::NonNullable(inner_value_writer) => {
+                inner_value_writer.write_value(writer, pos, value)
+            }
+        }
+    }
+}
+
+#[derive(Debug)]
+pub enum InnerValueWriter {
+    Char,
+    String,
+    Boolean,
+    Binary,
+    Bytes,
+    TinyInt,
+    SmallInt,
+    Int,
+    BigInt,
+    Float,
+    Double,
+    // TODO Decimal, Date, TimeWithoutTimeZone, TimestampWithoutTimeZone, 
TimestampWithLocalTimeZone, Array, Row
+}
+
+/// Accessor for writing the fields/elements of a binary writer during 
runtime, the
+/// fields/elements must be written in the order.
+impl InnerValueWriter {
+    pub fn create_inner_value_writer(
+        data_type: &DataType,
+        _: Option<&BinaryRowFormat>,
+    ) -> Result<InnerValueWriter> {
+        match data_type {
+            DataType::Char(_) => Ok(InnerValueWriter::Char),
+            DataType::String(_) => Ok(InnerValueWriter::String),
+            DataType::Boolean(_) => Ok(InnerValueWriter::Boolean),
+            DataType::Binary(_) => Ok(InnerValueWriter::Binary),
+            DataType::Bytes(_) => Ok(InnerValueWriter::Bytes),
+            DataType::TinyInt(_) => Ok(InnerValueWriter::TinyInt),
+            DataType::SmallInt(_) => Ok(InnerValueWriter::SmallInt),
+            DataType::Int(_) => Ok(InnerValueWriter::Int),
+            DataType::BigInt(_) => Ok(InnerValueWriter::BigInt),
+            DataType::Float(_) => Ok(InnerValueWriter::Float),
+            DataType::Double(_) => Ok(InnerValueWriter::Double),
+            _ => unimplemented!(
+                "ValueWriter for DataType {:?} is currently not implemented",
+                data_type
+            ),
+        }
+    }
+    pub fn write_value<W: BinaryWriter>(
+        &self,
+        writer: &mut W,
+        _pos: usize,
+        value: &Datum,
+    ) -> Result<()> {
+        match (self, value) {
+            (InnerValueWriter::Char, Datum::String(v)) => {
+                writer.write_char(v, v.len());
+            }
+            (InnerValueWriter::String, Datum::String(v)) => {
+                writer.write_string(v);
+            }
+            (InnerValueWriter::Boolean, Datum::Bool(v)) => {
+                writer.write_boolean(*v);
+            }
+            (InnerValueWriter::Binary, Datum::Blob(v)) => {
+                writer.write_binary(v.as_ref(), v.len());
+            }
+            (InnerValueWriter::Binary, Datum::BorrowedBlob(v)) => {
+                writer.write_binary(v.as_ref(), v.len());
+            }
+            (InnerValueWriter::Bytes, Datum::Blob(v)) => {
+                writer.write_bytes(v.as_ref());
+            }
+            (InnerValueWriter::Bytes, Datum::BorrowedBlob(v)) => {
+                writer.write_bytes(v.as_ref());
+            }
+            (InnerValueWriter::TinyInt, Datum::Int8(v)) => {
+                writer.write_byte(*v as u8);
+            }
+            (InnerValueWriter::SmallInt, Datum::Int16(v)) => {
+                writer.write_short(*v);
+            }
+            (InnerValueWriter::Int, Datum::Int32(v)) => {
+                writer.write_int(*v);
+            }
+            (InnerValueWriter::BigInt, Datum::Int64(v)) => {
+                writer.write_long(*v);
+            }
+            (InnerValueWriter::Float, Datum::Float32(v)) => {
+                writer.write_float(v.into_inner());
+            }
+            (InnerValueWriter::Double, Datum::Float64(v)) => {
+                writer.write_double(v.into_inner());
+            }
+            _ => {
+                return Err(IllegalArgument {
+                    message: format!("{:?} used to write value {:?}", self, 
value),
+                });
+            }
+        }
+        Ok(())
+    }
+}
diff --git a/crates/fluss/src/row/compacted/mod.rs 
b/crates/fluss/src/row/binary/mod.rs
similarity index 76%
copy from crates/fluss/src/row/compacted/mod.rs
copy to crates/fluss/src/row/binary/mod.rs
index 695cdad..c31cbd5 100644
--- a/crates/fluss/src/row/compacted/mod.rs
+++ b/crates/fluss/src/row/binary/mod.rs
@@ -15,4 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod compacted_row_writer;
+mod binary_writer;
+
+pub use binary_writer::*;
+
+/// The binary row format types, it indicates the generated [`BinaryRow`] type 
by the [`BinaryWriter`]
+#[allow(dead_code)]
+pub enum BinaryRowFormat {
+    Compacted,
+    Aligned,
+    Indexed,
+}
diff --git a/crates/fluss/src/row/compacted/compacted_key_writer.rs 
b/crates/fluss/src/row/compacted/compacted_key_writer.rs
new file mode 100644
index 0000000..84a6b22
--- /dev/null
+++ b/crates/fluss/src/row/compacted/compacted_key_writer.rs
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::row::compacted::compacted_row_writer::CompactedRowWriter;
+use bytes::Bytes;
+
+use crate::error::Result;
+use crate::metadata::DataType;
+use crate::row::binary::{BinaryRowFormat, BinaryWriter, ValueWriter};
+use delegate::delegate;
+
+/// A wrapping of [`CompactedRowWriter`] used to encode key columns.
+/// The encoding is the same as [`CompactedRowWriter`], but is without header 
of null bits to
+/// represent whether the field value is null or not since the key columns 
must be not null.
+pub struct CompactedKeyWriter {
+    delegate: CompactedRowWriter,
+}
+
+impl CompactedKeyWriter {
+    pub fn new() -> CompactedKeyWriter {
+        CompactedKeyWriter {
+            // in compacted key encoder, we don't need to set null bits as the 
key columns must be not
+            // null, to use field count 0 to init to make the null bits 0
+            delegate: CompactedRowWriter::new(0),
+        }
+    }
+
+    pub fn create_value_writer(field_type: &DataType) -> Result<ValueWriter> {
+        ValueWriter::create_value_writer(field_type, 
Some(&BinaryRowFormat::Compacted))
+    }
+
+    delegate! {
+        to self.delegate {
+            pub fn reset(&mut self);
+
+            #[allow(dead_code)]
+            pub fn position(&self) -> usize;
+
+            #[allow(dead_code)]
+            pub fn buffer(&self) -> &[u8];
+
+            pub fn to_bytes(&self) -> Bytes;
+        }
+    }
+}
+
+impl BinaryWriter for CompactedKeyWriter {
+    delegate! {
+        to self.delegate {
+            fn reset(&mut self);
+
+            fn set_null_at(&mut self, pos: usize);
+
+            fn write_boolean(&mut self, value: bool);
+
+            fn write_byte(&mut self, value: u8);
+
+            fn write_binary(&mut self, bytes: &[u8], length: usize);
+
+            fn write_bytes(&mut self, value: &[u8]);
+
+            fn write_char(&mut self, value: &str, _length: usize);
+
+            fn write_string(&mut self, value: &str);
+
+            fn write_short(&mut self, value: i16);
+
+            fn write_int(&mut self, value: i32);
+
+            fn write_long(&mut self, value: i64);
+
+            fn write_float(&mut self, value: f32);
+
+            fn write_double(&mut self, value: f64);
+
+
+        }
+    }
+
+    fn complete(&mut self) {
+        // do nothing
+    }
+}
diff --git a/crates/fluss/src/row/compacted/mod.rs 
b/crates/fluss/src/row/compacted/mod.rs
index 695cdad..c81eb5a 100644
--- a/crates/fluss/src/row/compacted/mod.rs
+++ b/crates/fluss/src/row/compacted/mod.rs
@@ -15,4 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+mod compacted_key_writer;
 mod compacted_row_writer;
+
+pub use compacted_key_writer::CompactedKeyWriter;
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index 1ea3933..28a378f 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -55,6 +55,8 @@ pub enum Datum<'a> {
     String(&'a str),
     #[display("{0}")]
     Blob(Blob),
+    #[display("{:?}")]
+    BorrowedBlob(&'a [u8]),
     #[display("{0}")]
     Decimal(Decimal),
     #[display("{0}")]
@@ -80,6 +82,7 @@ impl Datum<'_> {
     pub fn as_blob(&self) -> &[u8] {
         match self {
             Self::Blob(blob) => blob.as_ref(),
+            Self::BorrowedBlob(blob) => blob,
             _ => panic!("not a blob: {self:?}"),
         }
     }
@@ -289,6 +292,7 @@ impl Datum<'_> {
             Datum::Float64(v) => append_value_to_arrow!(Float64Builder, 
v.into_inner()),
             Datum::String(v) => append_value_to_arrow!(StringBuilder, *v),
             Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder, 
v.as_ref()),
+            Datum::BorrowedBlob(v) => append_value_to_arrow!(BinaryBuilder, 
*v),
             Datum::Decimal(_) | Datum::Date(_) | Datum::Timestamp(_) | 
Datum::TimestampTz(_) => {
                 return Err(RowConvertError {
                     message: format!(
@@ -406,6 +410,12 @@ impl From<Vec<u8>> for Blob {
     }
 }
 
+impl<'a> From<&'a [u8]> for Datum<'a> {
+    fn from(bytes: &'a [u8]) -> Datum<'a> {
+        Datum::BorrowedBlob(bytes)
+    }
+}
+
 const UNIX_EPOCH_DAY: jiff::civil::Date = jiff::civil::date(1970, 1, 1);
 
 impl Date {
diff --git a/crates/fluss/src/row/encode/compacted_key_encoder.rs 
b/crates/fluss/src/row/encode/compacted_key_encoder.rs
new file mode 100644
index 0000000..b9335a3
--- /dev/null
+++ b/crates/fluss/src/row/encode/compacted_key_encoder.rs
@@ -0,0 +1,329 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+use crate::metadata::RowType;
+use crate::row::binary::ValueWriter;
+use crate::row::compacted::CompactedKeyWriter;
+use crate::row::encode::KeyEncoder;
+use crate::row::field_getter::FieldGetter;
+use crate::row::{Datum, InternalRow};
+use bytes::Bytes;
+
+#[allow(dead_code)]
+pub struct CompactedKeyEncoder {
+    field_getters: Vec<FieldGetter>,
+    field_encoders: Vec<ValueWriter>,
+    compacted_encoder: CompactedKeyWriter,
+}
+
+impl CompactedKeyEncoder {
+    /// Create a key encoder to encode the key of the input row.
+    ///
+    /// # Arguments
+    /// * `row_type` - the row type of the input row
+    /// * `keys` - the key fields to encode
+    ///
+    /// # Returns
+    /// * key_encoder - the [`KeyEncoder`]
+    pub fn create_key_encoder(row_type: &RowType, keys: &[String]) -> 
Result<CompactedKeyEncoder> {
+        let mut encode_col_indexes = Vec::with_capacity(keys.len());
+
+        for key in keys {
+            match row_type.get_field_index(key) {
+                Some(idx) => encode_col_indexes.push(idx),
+                None => {
+                    return Err(IllegalArgument {
+                        message: format!(
+                            "Field {:?} not found in input row type {:?}",
+                            key, row_type
+                        ),
+                    });
+                }
+            }
+        }
+
+        Self::new(row_type, encode_col_indexes)
+    }
+
+    pub fn new(row_type: &RowType, encode_field_pos: Vec<usize>) -> 
Result<CompactedKeyEncoder> {
+        let mut field_getters: Vec<FieldGetter> = 
Vec::with_capacity(encode_field_pos.len());
+        let mut field_encoders: Vec<ValueWriter> = 
Vec::with_capacity(encode_field_pos.len());
+
+        for pos in &encode_field_pos {
+            let data_type = row_type.fields().get(*pos).unwrap().data_type();
+            field_getters.push(FieldGetter::create(data_type, *pos));
+            
field_encoders.push(CompactedKeyWriter::create_value_writer(data_type)?);
+        }
+
+        Ok(CompactedKeyEncoder {
+            field_encoders,
+            field_getters,
+            compacted_encoder: CompactedKeyWriter::new(),
+        })
+    }
+}
+
+#[allow(dead_code)]
+impl KeyEncoder for CompactedKeyEncoder {
+    fn encode_key(&mut self, row: &dyn InternalRow) -> Result<Bytes> {
+        self.compacted_encoder.reset();
+
+        // iterate all the fields of the row, and encode each field
+        for (pos, field_getter) in self.field_getters.iter().enumerate() {
+            match &field_getter.get_field(row) {
+                Datum::Null => {
+                    return Err(IllegalArgument {
+                        message: format!(
+                            "Cannot encode key with null value at position: 
{:?}",
+                            pos
+                        ),
+                    });
+                }
+                value => self.field_encoders.get(pos).unwrap().write_value(
+                    &mut self.compacted_encoder,
+                    pos,
+                    value,
+                )?,
+            }
+        }
+
+        Ok(self.compacted_encoder.to_bytes())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::metadata::DataTypes;
+    use crate::row::{Datum, GenericRow};
+
+    pub fn for_test_row_type(row_type: &RowType) -> CompactedKeyEncoder {
+        CompactedKeyEncoder::new(row_type, 
(0..row_type.fields().len()).collect())
+            .expect("CompactedKeyEncoder initialization failed")
+    }
+
+    #[test]
+    fn test_encode_key() {
+        let row_type = RowType::with_data_types(vec![
+            DataTypes::int(),
+            DataTypes::bigint(),
+            DataTypes::int(),
+        ]);
+        let row = GenericRow::from_data(vec![
+            Datum::from(1i32),
+            Datum::from(3i64),
+            Datum::from(2i32),
+        ]);
+
+        let mut encoder = for_test_row_type(&row_type);
+
+        assert_eq!(
+            encoder.encode_key(&row).unwrap().iter().as_slice(),
+            [1u8, 3u8, 2u8]
+        );
+
+        let row = GenericRow::from_data(vec![
+            Datum::from(2i32),
+            Datum::from(5i64),
+            Datum::from(6i32),
+        ]);
+
+        assert_eq!(
+            encoder.encode_key(&row).unwrap().iter().as_slice(),
+            [2u8, 5u8, 6u8]
+        );
+    }
+
+    #[test]
+    fn test_encode_key_with_key_names() {
+        let data_types = vec![
+            DataTypes::string(),
+            DataTypes::bigint(),
+            DataTypes::string(),
+        ];
+        let field_names = vec!["partition", "f1", "f2"];
+
+        let row_type = RowType::with_data_types_and_field_names(data_types, 
field_names);
+
+        let primary_keys = &["f2".to_string()];
+
+        let mut encoder = CompactedKeyEncoder::create_key_encoder(&row_type, 
primary_keys).unwrap();
+
+        let row = GenericRow::from_data(vec![
+            Datum::from("p1"),
+            Datum::from(1i64),
+            Datum::from("a2"),
+        ]);
+
+        // should only get "a2" 's ASCII representation
+        assert_eq!(
+            encoder.encode_key(&row).unwrap().iter().as_slice(),
+            //  2 (start of text), 97 (the letter a), 50 (the number 2)
+            [2u8, 97u8, 50u8]
+        );
+    }
+
+    #[test]
+    #[should_panic(expected = "Cannot encode key with null value at position: 
2")]
+    fn test_null_primary_key() {
+        let row_type = RowType::with_data_types(vec![
+            DataTypes::int(),
+            DataTypes::bigint(),
+            DataTypes::int(),
+            DataTypes::string(),
+        ]);
+
+        let primary_key_indices = vec![0, 1, 2];
+
+        let mut encoder = CompactedKeyEncoder::new(&row_type, 
primary_key_indices)
+            .expect("CompactedKeyEncoder initialization failed");
+
+        let row = GenericRow::from_data(vec![
+            Datum::from(1i32),
+            Datum::from(3i64),
+            Datum::from(2i32),
+            Datum::from("a2"),
+        ]);
+
+        assert_eq!(
+            encoder.encode_key(&row).unwrap().iter().as_slice(),
+            [1u8, 3u8, 2u8]
+        );
+
+        let row = GenericRow::from_data(vec![
+            Datum::from(1i32),
+            Datum::from(3i64),
+            Datum::Null,
+            Datum::from("a2"),
+        ]);
+
+        encoder.encode_key(&row).unwrap();
+    }
+
+    #[test]
+    fn test_int_string_as_primary_key() {
+        let row_type = RowType::with_data_types(vec![
+            DataTypes::string(),
+            DataTypes::int(),
+            DataTypes::string(),
+            DataTypes::string(),
+        ]);
+
+        let primary_key_indices = vec![1, 2];
+        let mut encoder = CompactedKeyEncoder::new(&row_type, 
primary_key_indices)
+            .expect("CompactedKeyEncoder initialization failed");
+
+        let row = GenericRow::from_data(vec![
+            Datum::from("a1"),
+            Datum::from(1i32),
+            Datum::from("a2"),
+            Datum::from("a3"),
+        ]);
+
+        assert_eq!(
+            encoder.encode_key(&row).unwrap().iter().as_slice(),
+            // 1 (1i32), 2 (start of text), 97 (the letter a), 50 (the number 
2)
+            [1u8, 2u8, 97u8, 50u8]
+        );
+    }
+
+    #[test]
+    fn test_all_data_types() {
+        let row_type = RowType::with_data_types(vec![
+            DataTypes::boolean(),
+            DataTypes::tinyint(),
+            DataTypes::smallint(),
+            DataTypes::int(),
+            DataTypes::bigint(),
+            DataTypes::float(),
+            DataTypes::double(),
+            // TODO Date
+            // TODO Time
+            DataTypes::binary(20),
+            DataTypes::bytes(),
+            DataTypes::char(2),
+            DataTypes::string(),
+            // TODO Decimal
+            // TODO Timestamp
+            // TODO Timestamp LTZ
+            // TODO Array of Int
+            // TODO Array of Float
+            // TODO Array of String
+            // TODO: Add Map and Row fields in Issue #1973
+        ]);
+
+        let row = GenericRow::from_data(vec![
+            Datum::from(true),
+            Datum::from(2i8),
+            Datum::from(10i16),
+            Datum::from(100i32),
+            Datum::from(-6101065172474983726i64), // from Java test case: new 
BigInteger("12345678901234567890").longValue()
+            Datum::from(13.2f32),
+            Datum::from(15.21f64),
+            // TODO Date
+            // TODO Time
+            Datum::from("1234567890".as_bytes()),
+            Datum::from("20".as_bytes()),
+            Datum::from("1"),
+            Datum::from("hello"),
+            // TODO Decimal
+            // TODO Timestamp
+            // TODO Timestamp LTZ
+            // TODO Array of Int
+            // TODO Array of Float
+            // TODO Array of String
+            // TODO: Add Map and Row fields in Issue #1973
+        ]);
+
+        let mut encoder = for_test_row_type(&row_type);
+
+        let mut expected: Vec<u8> = Vec::new();
+        // BOOLEAN: true
+        expected.extend(vec![0x01]);
+        // TINYINT: 2
+        expected.extend(vec![0x02]);
+        // SMALLINT: 10
+        expected.extend(vec![0x0A]);
+        // INT: 100
+        expected.extend(vec![0x00, 0x64]);
+        // BIGINT: -6101065172474983726
+        expected.extend(vec![
+            0xD2, 0x95, 0xFC, 0xD8, 0xCE, 0xB1, 0xAA, 0xAA, 0xAB, 0x01,
+        ]);
+        // FLOAT: 13.2
+        expected.extend(vec![0x33, 0x33, 0x53, 0x41]);
+        // DOUBLE: 15.21
+        expected.extend(vec![0xEC, 0x51, 0xB8, 0x1E, 0x85, 0x6B, 0x2E, 0x40]);
+        // BINARY(20): "1234567890".getBytes()
+        expected.extend(vec![
+            0x0A, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x30,
+        ]);
+
+        // BYTES: "20".getBytes()
+        expected.extend(vec![0x02, 0x32, 0x30]);
+        // CHAR(2): "1"
+        expected.extend(vec![0x01, 0x31]);
+        // STRING: String: "hello"
+        expected.extend(vec![0x05, 0x68, 0x65, 0x6C, 0x6C, 0x6F]);
+        assert_eq!(
+            encoder.encode_key(&row).unwrap().iter().as_slice(),
+            expected.as_slice()
+        );
+    }
+}
diff --git a/crates/fluss/src/row/encode/mod.rs 
b/crates/fluss/src/row/encode/mod.rs
new file mode 100644
index 0000000..6c6eed9
--- /dev/null
+++ b/crates/fluss/src/row/encode/mod.rs
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod compacted_key_encoder;
+
+use crate::error::Result;
+use crate::metadata::{DataLakeFormat, RowType};
+use crate::row::InternalRow;
+use crate::row::encode::compacted_key_encoder::CompactedKeyEncoder;
+use bytes::Bytes;
+
+/// An interface for encoding key of row into bytes.
+#[allow(dead_code)]
+pub trait KeyEncoder {
+    fn encode_key(&mut self, row: &dyn InternalRow) -> Result<Bytes>;
+}
+
+#[allow(dead_code)]
+impl dyn KeyEncoder {
+    /// Create a key encoder to encode the key bytes of the input row.
+    /// # Arguments
+    /// * `row_type` - the row type of the input row
+    /// * `key_fields` - the key fields to encode
+    /// * `lake_format` - the data lake format
+    ///
+    /// # Returns
+    /// key encoder
+    pub fn of(
+        row_type: &RowType,
+        key_fields: Vec<String>,
+        data_lake_format: Option<DataLakeFormat>,
+    ) -> Result<Box<dyn KeyEncoder>> {
+        match data_lake_format {
+            Some(DataLakeFormat::Paimon) => {
+                unimplemented!("KeyEncoder for Paimon format is currently 
unimplemented")
+            }
+            Some(DataLakeFormat::Lance) => 
Ok(Box::new(CompactedKeyEncoder::create_key_encoder(
+                row_type,
+                key_fields.as_slice(),
+            )?)),
+            Some(DataLakeFormat::Iceberg) => {
+                unimplemented!("KeyEncoder for Iceberg format is currently 
unimplemented")
+            }
+            None => Ok(Box::new(CompactedKeyEncoder::create_key_encoder(
+                row_type,
+                key_fields.as_slice(),
+            )?)),
+        }
+    }
+}
diff --git a/crates/fluss/src/row/field_getter.rs 
b/crates/fluss/src/row/field_getter.rs
new file mode 100644
index 0000000..3a9cf0f
--- /dev/null
+++ b/crates/fluss/src/row/field_getter.rs
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::DataType;
+use crate::row::{Datum, InternalRow};
+
+pub enum FieldGetter {
+    Nullable(InnerFieldGetter),
+    NonNullable(InnerFieldGetter),
+}
+impl FieldGetter {
+    pub fn get_field<'a>(&self, row: &'a dyn InternalRow) -> Datum<'a> {
+        match self {
+            FieldGetter::Nullable(getter) => {
+                if row.is_null_at(getter.pos()) {
+                    Datum::Null
+                } else {
+                    getter.get_field(row)
+                }
+            }
+            FieldGetter::NonNullable(getter) => getter.get_field(row),
+        }
+    }
+
+    pub fn create(data_type: &DataType, pos: usize) -> FieldGetter {
+        let inner_field_getter = match data_type {
+            DataType::Char(t) => InnerFieldGetter::Char {
+                pos,
+                len: t.length() as usize,
+            },
+            DataType::String(_) => InnerFieldGetter::String { pos },
+            DataType::Boolean(_) => InnerFieldGetter::Bool { pos },
+            DataType::Binary(t) => InnerFieldGetter::Binary {
+                pos,
+                len: t.length(),
+            },
+            DataType::Bytes(_) => InnerFieldGetter::Bytes { pos },
+            DataType::TinyInt(_) => InnerFieldGetter::TinyInt { pos },
+            DataType::SmallInt(_) => InnerFieldGetter::SmallInt { pos },
+            DataType::Int(_) => InnerFieldGetter::Int { pos },
+            DataType::BigInt(_) => InnerFieldGetter::BigInt { pos },
+            DataType::Float(_) => InnerFieldGetter::Float { pos },
+            DataType::Double(_) => InnerFieldGetter::Double { pos },
+            _ => unimplemented!("DataType {:?} is currently unimplemented", 
data_type),
+        };
+
+        if data_type.is_nullable() {
+            Self::Nullable(inner_field_getter)
+        } else {
+            Self::NonNullable(inner_field_getter)
+        }
+    }
+}
+
+pub enum InnerFieldGetter {
+    Char { pos: usize, len: usize },
+    String { pos: usize },
+    Bool { pos: usize },
+    Binary { pos: usize, len: usize },
+    Bytes { pos: usize },
+    TinyInt { pos: usize },
+    SmallInt { pos: usize },
+    Int { pos: usize },
+    BigInt { pos: usize },
+    Float { pos: usize },
+    Double { pos: usize },
+}
+
+impl InnerFieldGetter {
+    pub fn get_field<'a>(&self, row: &'a dyn InternalRow) -> Datum<'a> {
+        match self {
+            InnerFieldGetter::Char { pos, len } => 
Datum::String(row.get_char(*pos, *len)),
+            InnerFieldGetter::String { pos } => 
Datum::from(row.get_string(*pos)),
+            InnerFieldGetter::Bool { pos } => 
Datum::from(row.get_boolean(*pos)),
+            InnerFieldGetter::Binary { pos, len } => 
Datum::from(row.get_binary(*pos, *len)),
+            InnerFieldGetter::Bytes { pos } => 
Datum::from(row.get_bytes(*pos)),
+            InnerFieldGetter::TinyInt { pos } => 
Datum::from(row.get_byte(*pos)),
+            InnerFieldGetter::SmallInt { pos } => 
Datum::from(row.get_short(*pos)),
+            InnerFieldGetter::Int { pos } => Datum::from(row.get_int(*pos)),
+            InnerFieldGetter::BigInt { pos } => 
Datum::from(row.get_long(*pos)),
+            InnerFieldGetter::Float { pos } => 
Datum::from(row.get_float(*pos)),
+            InnerFieldGetter::Double { pos } => 
Datum::from(row.get_double(*pos)),
+            //TODO Decimal, Date, Time, Timestamp, TimestampLTZ, Array, Map, 
Row
+        }
+    }
+
+    pub fn pos(&self) -> usize {
+        match self {
+            Self::Char { pos, .. }
+            | Self::String { pos }
+            | Self::Bool { pos }
+            | Self::Binary { pos, .. }
+            | Self::Bytes { pos }
+            | Self::TinyInt { pos }
+            | Self::SmallInt { pos, .. }
+            | Self::Int { pos }
+            | Self::BigInt { pos }
+            | Self::Float { pos, .. }
+            | Self::Double { pos } => *pos,
+        }
+    }
+}
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index 86fdf90..c321ab9 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -19,11 +19,15 @@ mod column;
 
 mod datum;
 
+mod binary;
 mod compacted;
+mod encode;
+mod field_getter;
 
 pub use column::*;
 pub use datum::*;
 
+// TODO make functions return Result<?> for better error handling
 pub trait InternalRow {
     /// Returns the number of fields in this row
     fn get_field_count(&self) -> usize;
@@ -143,6 +147,11 @@ impl<'a> Default for GenericRow<'a> {
 }
 
 impl<'a> GenericRow<'a> {
+    pub fn from_data(data: Vec<impl Into<Datum<'a>>>) -> GenericRow<'a> {
+        GenericRow {
+            values: data.into_iter().map(Into::into).collect(),
+        }
+    }
     pub fn new() -> GenericRow<'a> {
         GenericRow { values: vec![] }
     }


Reply via email to