luoyuxia commented on code in PR #124:
URL: https://github.com/apache/fluss-rust/pull/124#discussion_r2675649341


##########
crates/fluss/src/row/encode/compacted_key_encoder.rs:
##########
@@ -0,0 +1,330 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+use crate::metadata::RowType;
+use crate::row::binary::ValueWriter;
+use crate::row::compacted::CompactedKeyWriter;
+use crate::row::encode::KeyEncoder;
+use crate::row::field_getter::FieldGetter;
+use crate::row::{Datum, InternalRow};
+use bytes::Bytes;
+
+#[allow(dead_code)]
+pub struct CompactedKeyEncoder {
+    field_getters: Vec<FieldGetter>,
+    field_encoders: Vec<ValueWriter>,
+    compacted_encoder: CompactedKeyWriter,
+}
+
+impl CompactedKeyEncoder {
+    /// Create a key encoder to encode the key of the input row.
+    ///
+    /// # Arguments
+    /// * `row_type` - the row type of the input row
+    /// * `keys` - the key fields to encode
+    ///
+    /// # Returns
+    /// * key_encoder - the [`KeyEncoder`]
+    pub fn create_key_encoder(row_type: &RowType, keys: &[String]) -> 
Result<CompactedKeyEncoder> {
+        let mut encode_col_indexes = Vec::with_capacity(keys.len());
+
+        for key in keys {
+            match row_type.get_field_index(key) {
+                Some(idx) => encode_col_indexes.push(idx),
+                None => {
+                    return Err(IllegalArgument {
+                        message: format!(
+                            "Field {:?} not found in input row type {:?}",
+                            key, row_type
+                        ),
+                    });
+                }
+            }
+        }
+
+        Self::new(row_type, encode_col_indexes)
+    }
+
+    #[cfg(test)]
+    pub fn for_test_row_type(row_type: &RowType) -> Self {
+        Self::new(row_type, (0..row_type.fields().len()).collect())
+            .expect("CompactedKeyEncoder initialization failed")
+    }
+
+    pub fn new(row_type: &RowType, encode_field_pos: Vec<usize>) -> 
Result<CompactedKeyEncoder> {
+        let mut field_getters: Vec<FieldGetter> = 
Vec::with_capacity(encode_field_pos.len());
+        let mut field_encoders: Vec<ValueWriter> = 
Vec::with_capacity(encode_field_pos.len());
+
+        for pos in &encode_field_pos {
+            let data_type = row_type.fields().get(*pos).unwrap().data_type();
+            field_getters.push(FieldGetter::create(data_type, *pos));
+            
field_encoders.push(CompactedKeyWriter::create_value_writer(data_type)?);
+        }
+
+        Ok(CompactedKeyEncoder {
+            field_encoders,
+            field_getters,
+            compacted_encoder: CompactedKeyWriter::new(),
+        })
+    }
+}
+
+#[allow(dead_code)]
+impl KeyEncoder for CompactedKeyEncoder {
+    fn encode_key(&mut self, row: &dyn InternalRow) -> Result<Bytes> {
+        self.compacted_encoder.reset();
+
+        // iterate all the fields of the row, and encode each field
+        for (pos, field_getter) in self.field_getters.iter().enumerate() {
+            match &field_getter.get_field(row) {
+                Datum::Null => {
+                    return Err(IllegalArgument {
+                        message: format!(
+                            "Cannot encode key with null value at position: 
{:?}",
+                            pos
+                        ),
+                    });
+                }
+                value => self.field_encoders.get(pos).unwrap().write_value(
+                    &mut self.compacted_encoder,
+                    pos,
+                    value,
+                )?,
+            }
+        }
+
+        Ok(self.compacted_encoder.to_bytes())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::metadata::DataTypes;
+    use crate::row::{Datum, GenericRow};
+
+    #[test]
+    fn test_encode_key() {

Review Comment:
   Just want to have a double check, have you ever check the result same with 
java side?



##########
crates/fluss/src/row/encode/compacted_key_encoder.rs:
##########
@@ -0,0 +1,330 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+use crate::metadata::RowType;
+use crate::row::binary::ValueWriter;
+use crate::row::compacted::CompactedKeyWriter;
+use crate::row::encode::KeyEncoder;
+use crate::row::field_getter::FieldGetter;
+use crate::row::{Datum, InternalRow};
+use bytes::Bytes;
+
+#[allow(dead_code)]
+pub struct CompactedKeyEncoder {
+    field_getters: Vec<FieldGetter>,
+    field_encoders: Vec<ValueWriter>,
+    compacted_encoder: CompactedKeyWriter,
+}
+
+impl CompactedKeyEncoder {
+    /// Create a key encoder to encode the key of the input row.
+    ///
+    /// # Arguments
+    /// * `row_type` - the row type of the input row
+    /// * `keys` - the key fields to encode
+    ///
+    /// # Returns
+    /// * key_encoder - the [`KeyEncoder`]
+    pub fn create_key_encoder(row_type: &RowType, keys: &[String]) -> 
Result<CompactedKeyEncoder> {
+        let mut encode_col_indexes = Vec::with_capacity(keys.len());
+
+        for key in keys {
+            match row_type.get_field_index(key) {
+                Some(idx) => encode_col_indexes.push(idx),
+                None => {
+                    return Err(IllegalArgument {
+                        message: format!(
+                            "Field {:?} not found in input row type {:?}",
+                            key, row_type
+                        ),
+                    });
+                }
+            }
+        }
+
+        Self::new(row_type, encode_col_indexes)
+    }
+
+    #[cfg(test)]
+    pub fn for_test_row_type(row_type: &RowType) -> Self {

Review Comment:
   can we move to following `tests` module?



##########
crates/fluss/src/row/datum.rs:
##########
@@ -55,6 +55,8 @@ pub enum Datum<'a> {
     String(&'a str),
     #[display("{0}")]
     Blob(Blob),
+    #[display("{:?}")]
+    BorrowedBlob(&'a [u8]),

Review Comment:
   Greate. I always want to make it to be a reference.
   
   Could you please remove `Blob(Blob)` and rename BorrowedBlob to Blob



##########
crates/fluss/src/metadata/datatype.rs:
##########
@@ -852,6 +852,37 @@ impl RowType {
     pub fn fields(&self) -> &Vec<DataField> {
         &self.fields
     }
+
+    pub fn get_field_index(&self, field_name: &str) -> Option<usize> {
+        self.fields.iter().position(|f| f.name == field_name)
+    }
+
+    #[cfg(test)]
+    pub fn with_data_types(data_types: Vec<DataType>) -> Self {
+        let mut fields: Vec<DataField> = Vec::new();
+        data_types.iter().enumerate().for_each(|(idx, data_type)| {
+            fields.push(DataField::new(format!("f{}", idx), data_type.clone(), 
None));
+        });
+
+        Self::with_nullable(true, fields)
+    }
+
+    #[cfg(test)]
+    pub fn with_data_types_and_field_names(
+        data_types: Vec<DataType>,
+        field_names: Vec<&str>,
+    ) -> Self {
+        let mut fields: Vec<DataField> = Vec::new();
+        data_types.iter().enumerate().for_each(|(idx, data_type)| {

Review Comment:
   nit: we can use `zip`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to