Copilot commented on code in PR #124: URL: https://github.com/apache/fluss-rust/pull/124#discussion_r2661382582
########## crates/fluss/src/row/binary/binary_writer.rs: ########## @@ -0,0 +1,327 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Error::{IllegalArgument, IoUnsupported}; +use crate::error::Result; +use crate::metadata::DataType; +use crate::row::Datum; +use crate::row::binary::BinaryRowFormat; + +/// Writer to write a composite data format, like row, array, +#[allow(dead_code)] +pub trait BinaryWriter { + /// Reset writer to prepare next write + fn reset(&mut self); + + /// Set null to this field + fn set_null_at(&mut self, pos: usize); + + fn write_boolean(&mut self, value: bool); + + fn write_byte(&mut self, value: u8); + + fn write_bytes(&mut self, value: &[u8]); + + fn write_char(&mut self, value: &str, length: usize); + + fn write_string(&mut self, value: &str); + + fn write_short(&mut self, value: i16); + + fn write_int(&mut self, value: i32); + + fn write_long(&mut self, value: i64); + + fn write_float(&mut self, value: f32); + + fn write_double(&mut self, value: f64); + + fn write_binary(&mut self, bytes: &[u8], length: usize); + + // TODO Decimal type + // fn write_decimal(&mut self, pos: i32, value: f64); + + // TODO Timestamp type + // fn write_timestamp_ntz(&mut self, pos: i32, value: i64); + + // TODO Timestamp type + // fn write_timestamp_ltz(&mut self, pos: i32, value: i64); + + // TODO InternalArray, ArraySerializer + // fn write_array(&mut self, pos: i32, value: i64); + + // TODO Row serializer + // fn write_row(&mut self, pos: i32, value: &InternalRow); + + /// Finally, complete write to set real size to binary. + fn complete(&mut self); +} + +/// Accessor for writing the fields/elements of a binary writer during runtime, the +/// fields/elements must be written in the order. +pub trait ValueWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, pos: usize, value: &Datum) -> Result<()>; +} + +#[allow(dead_code)] +impl dyn ValueWriter { + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_value_writer( + element_type: &DataType, + binary_row_format: &BinaryRowFormat, + ) -> Box<dyn ValueWriter> { + let value_writer = + Self::create_not_null_value_writer(element_type, Some(binary_row_format)).unwrap(); + + if !element_type.is_nullable() { + value_writer + } else { + Box::new(NullWriter { + delegate: value_writer, + }) Review Comment: The create_not_null_value_writer method calls unwrap() on the result, which will panic on error. Since this is a factory method that can return various error types, the error should be properly propagated using the ? operator. ```suggestion ) -> Result<Box<dyn ValueWriter>> { let value_writer = Self::create_not_null_value_writer(element_type, Some(binary_row_format))?; if !element_type.is_nullable() { Ok(value_writer) } else { Ok(Box::new(NullWriter { delegate: value_writer, })) ``` ########## crates/fluss/src/row/binary/binary_writer.rs: ########## @@ -0,0 +1,327 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Error::{IllegalArgument, IoUnsupported}; +use crate::error::Result; +use crate::metadata::DataType; +use crate::row::Datum; +use crate::row::binary::BinaryRowFormat; + +/// Writer to write a composite data format, like row, array, +#[allow(dead_code)] +pub trait BinaryWriter { + /// Reset writer to prepare next write + fn reset(&mut self); + + /// Set null to this field + fn set_null_at(&mut self, pos: usize); + + fn write_boolean(&mut self, value: bool); + + fn write_byte(&mut self, value: u8); + + fn write_bytes(&mut self, value: &[u8]); + + fn write_char(&mut self, value: &str, length: usize); + + fn write_string(&mut self, value: &str); + + fn write_short(&mut self, value: i16); + + fn write_int(&mut self, value: i32); + + fn write_long(&mut self, value: i64); + + fn write_float(&mut self, value: f32); + + fn write_double(&mut self, value: f64); + + fn write_binary(&mut self, bytes: &[u8], length: usize); + + // TODO Decimal type + // fn write_decimal(&mut self, pos: i32, value: f64); + + // TODO Timestamp type + // fn write_timestamp_ntz(&mut self, pos: i32, value: i64); + + // TODO Timestamp type + // fn write_timestamp_ltz(&mut self, pos: i32, value: i64); + + // TODO InternalArray, ArraySerializer + // fn write_array(&mut self, pos: i32, value: i64); + + // TODO Row serializer + // fn write_row(&mut self, pos: i32, value: &InternalRow); + + /// Finally, complete write to set real size to binary. + fn complete(&mut self); +} + +/// Accessor for writing the fields/elements of a binary writer during runtime, the +/// fields/elements must be written in the order. +pub trait ValueWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, pos: usize, value: &Datum) -> Result<()>; +} + +#[allow(dead_code)] +impl dyn ValueWriter { + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_value_writer( + element_type: &DataType, + binary_row_format: &BinaryRowFormat, + ) -> Box<dyn ValueWriter> { + let value_writer = + Self::create_not_null_value_writer(element_type, Some(binary_row_format)).unwrap(); + + if !element_type.is_nullable() { + value_writer + } else { + Box::new(NullWriter { + delegate: value_writer, + }) + } + } + + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_not_null_value_writer( + element_type: &DataType, + _: Option<&BinaryRowFormat>, + ) -> Result<Box<dyn ValueWriter>> { + match element_type { + DataType::Char(_) => Ok(Box::new(CharWriter)), + DataType::String(_) => Ok(Box::new(StringWriter)), + DataType::Boolean(_) => Ok(Box::new(BoolWriter)), + DataType::Binary(_) => Ok(Box::new(BinaryValueWriter)), + DataType::Bytes(_) => Ok(Box::new(BytesWriter)), + // TODO DECIMAL + DataType::TinyInt(_) => Ok(Box::new(TinyIntWriter)), + DataType::SmallInt(_) => Ok(Box::new(SmallIntWriter)), + DataType::Int(_) => Ok(Box::new(IntWriter)), + // TODO DATE + // TODO TIME_WITHOUT_TIME_ZONE + DataType::BigInt(_) => Ok(Box::new(LongWriter)), + DataType::Float(_) => Ok(Box::new(FloatWriter)), + DataType::Double(_) => Ok(Box::new(DoubleWriter)), + // TODO TIMESTAMP_WITHOUT_TIME_ZONE + // TODO TIMESTAMP_WITH_LOCAL_TIME_ZONE + // TODO ARRAY + DataType::Map(_) => Err(IoUnsupported { + message: "Map type is not supported yet. Will be added in Issue #1973.".to_string(), + }), + // TODO ROW + _ => Err(IllegalArgument { + message: format!("Type {} is not supported yet", element_type), + }), + } + } +} + +#[derive(Default)] +struct CharWriter; +impl ValueWriter for CharWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_char(v, v.len()); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct StringWriter; +impl ValueWriter for StringWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_string(v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct BoolWriter; +impl ValueWriter for BoolWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Bool(v) = value { + writer.write_boolean(*v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct BinaryValueWriter; +impl ValueWriter for BinaryValueWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + match value { + Datum::Blob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + Datum::BorrowedBlob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + _ => Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }), + } + } +} + +#[derive(Default)] +struct BytesWriter; +impl ValueWriter for BytesWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + match value { + Datum::Blob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + Datum::BorrowedBlob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + value => Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }), + } + } +} + +// TODO DecimalWriter + +#[derive(Default)] +struct TinyIntWriter; +impl ValueWriter for TinyIntWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Int8(v) = value { + writer.write_byte(*v as u8); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct SmallIntWriter; +impl ValueWriter for SmallIntWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Int16(v) = value { + writer.write_short(*v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) Review Comment: The error message "Wrong ValueWriter used to write value" is unclear about what went wrong. It should specify the expected data type and what was actually provided to help users debug the issue more effectively. ########## crates/fluss/src/row/binary/binary_writer.rs: ########## @@ -0,0 +1,327 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Error::{IllegalArgument, IoUnsupported}; +use crate::error::Result; +use crate::metadata::DataType; +use crate::row::Datum; +use crate::row::binary::BinaryRowFormat; + +/// Writer to write a composite data format, like row, array, +#[allow(dead_code)] +pub trait BinaryWriter { + /// Reset writer to prepare next write + fn reset(&mut self); + + /// Set null to this field + fn set_null_at(&mut self, pos: usize); + + fn write_boolean(&mut self, value: bool); + + fn write_byte(&mut self, value: u8); + + fn write_bytes(&mut self, value: &[u8]); + + fn write_char(&mut self, value: &str, length: usize); + + fn write_string(&mut self, value: &str); + + fn write_short(&mut self, value: i16); + + fn write_int(&mut self, value: i32); + + fn write_long(&mut self, value: i64); + + fn write_float(&mut self, value: f32); + + fn write_double(&mut self, value: f64); + + fn write_binary(&mut self, bytes: &[u8], length: usize); + + // TODO Decimal type + // fn write_decimal(&mut self, pos: i32, value: f64); + + // TODO Timestamp type + // fn write_timestamp_ntz(&mut self, pos: i32, value: i64); + + // TODO Timestamp type + // fn write_timestamp_ltz(&mut self, pos: i32, value: i64); + + // TODO InternalArray, ArraySerializer + // fn write_array(&mut self, pos: i32, value: i64); + + // TODO Row serializer + // fn write_row(&mut self, pos: i32, value: &InternalRow); + + /// Finally, complete write to set real size to binary. + fn complete(&mut self); +} + +/// Accessor for writing the fields/elements of a binary writer during runtime, the +/// fields/elements must be written in the order. +pub trait ValueWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, pos: usize, value: &Datum) -> Result<()>; +} + +#[allow(dead_code)] +impl dyn ValueWriter { + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_value_writer( + element_type: &DataType, + binary_row_format: &BinaryRowFormat, + ) -> Box<dyn ValueWriter> { + let value_writer = + Self::create_not_null_value_writer(element_type, Some(binary_row_format)).unwrap(); + + if !element_type.is_nullable() { + value_writer + } else { + Box::new(NullWriter { + delegate: value_writer, + }) + } + } + + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_not_null_value_writer( + element_type: &DataType, + _: Option<&BinaryRowFormat>, + ) -> Result<Box<dyn ValueWriter>> { + match element_type { + DataType::Char(_) => Ok(Box::new(CharWriter)), + DataType::String(_) => Ok(Box::new(StringWriter)), + DataType::Boolean(_) => Ok(Box::new(BoolWriter)), + DataType::Binary(_) => Ok(Box::new(BinaryValueWriter)), + DataType::Bytes(_) => Ok(Box::new(BytesWriter)), + // TODO DECIMAL + DataType::TinyInt(_) => Ok(Box::new(TinyIntWriter)), + DataType::SmallInt(_) => Ok(Box::new(SmallIntWriter)), + DataType::Int(_) => Ok(Box::new(IntWriter)), + // TODO DATE + // TODO TIME_WITHOUT_TIME_ZONE + DataType::BigInt(_) => Ok(Box::new(LongWriter)), + DataType::Float(_) => Ok(Box::new(FloatWriter)), + DataType::Double(_) => Ok(Box::new(DoubleWriter)), + // TODO TIMESTAMP_WITHOUT_TIME_ZONE + // TODO TIMESTAMP_WITH_LOCAL_TIME_ZONE + // TODO ARRAY + DataType::Map(_) => Err(IoUnsupported { + message: "Map type is not supported yet. Will be added in Issue #1973.".to_string(), + }), + // TODO ROW + _ => Err(IllegalArgument { + message: format!("Type {} is not supported yet", element_type), + }), + } + } +} + +#[derive(Default)] +struct CharWriter; +impl ValueWriter for CharWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_char(v, v.len()); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) Review Comment: The error message "Wrong ValueWriter used to write value" is unclear about what went wrong. It should specify the expected data type and what was actually provided to help users debug the issue more effectively. ########## crates/fluss/src/row/binary/binary_writer.rs: ########## @@ -0,0 +1,327 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Error::{IllegalArgument, IoUnsupported}; +use crate::error::Result; +use crate::metadata::DataType; +use crate::row::Datum; +use crate::row::binary::BinaryRowFormat; + +/// Writer to write a composite data format, like row, array, +#[allow(dead_code)] +pub trait BinaryWriter { + /// Reset writer to prepare next write + fn reset(&mut self); + + /// Set null to this field + fn set_null_at(&mut self, pos: usize); + + fn write_boolean(&mut self, value: bool); + + fn write_byte(&mut self, value: u8); + + fn write_bytes(&mut self, value: &[u8]); + + fn write_char(&mut self, value: &str, length: usize); + + fn write_string(&mut self, value: &str); + + fn write_short(&mut self, value: i16); + + fn write_int(&mut self, value: i32); + + fn write_long(&mut self, value: i64); + + fn write_float(&mut self, value: f32); + + fn write_double(&mut self, value: f64); + + fn write_binary(&mut self, bytes: &[u8], length: usize); + + // TODO Decimal type + // fn write_decimal(&mut self, pos: i32, value: f64); + + // TODO Timestamp type + // fn write_timestamp_ntz(&mut self, pos: i32, value: i64); + + // TODO Timestamp type + // fn write_timestamp_ltz(&mut self, pos: i32, value: i64); + + // TODO InternalArray, ArraySerializer + // fn write_array(&mut self, pos: i32, value: i64); + + // TODO Row serializer + // fn write_row(&mut self, pos: i32, value: &InternalRow); + + /// Finally, complete write to set real size to binary. + fn complete(&mut self); +} + +/// Accessor for writing the fields/elements of a binary writer during runtime, the +/// fields/elements must be written in the order. +pub trait ValueWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, pos: usize, value: &Datum) -> Result<()>; +} + +#[allow(dead_code)] +impl dyn ValueWriter { + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_value_writer( + element_type: &DataType, + binary_row_format: &BinaryRowFormat, + ) -> Box<dyn ValueWriter> { + let value_writer = + Self::create_not_null_value_writer(element_type, Some(binary_row_format)).unwrap(); + + if !element_type.is_nullable() { + value_writer + } else { + Box::new(NullWriter { + delegate: value_writer, + }) + } + } + + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_not_null_value_writer( + element_type: &DataType, + _: Option<&BinaryRowFormat>, + ) -> Result<Box<dyn ValueWriter>> { + match element_type { + DataType::Char(_) => Ok(Box::new(CharWriter)), + DataType::String(_) => Ok(Box::new(StringWriter)), + DataType::Boolean(_) => Ok(Box::new(BoolWriter)), + DataType::Binary(_) => Ok(Box::new(BinaryValueWriter)), + DataType::Bytes(_) => Ok(Box::new(BytesWriter)), + // TODO DECIMAL + DataType::TinyInt(_) => Ok(Box::new(TinyIntWriter)), + DataType::SmallInt(_) => Ok(Box::new(SmallIntWriter)), + DataType::Int(_) => Ok(Box::new(IntWriter)), + // TODO DATE + // TODO TIME_WITHOUT_TIME_ZONE + DataType::BigInt(_) => Ok(Box::new(LongWriter)), + DataType::Float(_) => Ok(Box::new(FloatWriter)), + DataType::Double(_) => Ok(Box::new(DoubleWriter)), + // TODO TIMESTAMP_WITHOUT_TIME_ZONE + // TODO TIMESTAMP_WITH_LOCAL_TIME_ZONE + // TODO ARRAY + DataType::Map(_) => Err(IoUnsupported { + message: "Map type is not supported yet. Will be added in Issue #1973.".to_string(), + }), + // TODO ROW + _ => Err(IllegalArgument { + message: format!("Type {} is not supported yet", element_type), + }), + } + } +} + +#[derive(Default)] +struct CharWriter; +impl ValueWriter for CharWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_char(v, v.len()); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct StringWriter; +impl ValueWriter for StringWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_string(v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct BoolWriter; +impl ValueWriter for BoolWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Bool(v) = value { + writer.write_boolean(*v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct BinaryValueWriter; +impl ValueWriter for BinaryValueWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + match value { + Datum::Blob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + Datum::BorrowedBlob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + _ => Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }), + } + } +} + +#[derive(Default)] +struct BytesWriter; +impl ValueWriter for BytesWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + match value { + Datum::Blob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + Datum::BorrowedBlob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + value => Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }), + } + } +} + +// TODO DecimalWriter + +#[derive(Default)] +struct TinyIntWriter; +impl ValueWriter for TinyIntWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Int8(v) = value { + writer.write_byte(*v as u8); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct SmallIntWriter; +impl ValueWriter for SmallIntWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Int16(v) = value { + writer.write_short(*v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct IntWriter; +impl ValueWriter for IntWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Int32(v) = value { + writer.write_int(*v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) Review Comment: The error message "Wrong ValueWriter used to write value" is unclear about what went wrong. It should specify the expected data type and what was actually provided to help users debug the issue more effectively. ########## crates/fluss/src/row/binary/binary_writer.rs: ########## @@ -0,0 +1,327 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Error::{IllegalArgument, IoUnsupported}; +use crate::error::Result; +use crate::metadata::DataType; +use crate::row::Datum; +use crate::row::binary::BinaryRowFormat; + +/// Writer to write a composite data format, like row, array, +#[allow(dead_code)] +pub trait BinaryWriter { + /// Reset writer to prepare next write + fn reset(&mut self); + + /// Set null to this field + fn set_null_at(&mut self, pos: usize); + + fn write_boolean(&mut self, value: bool); + + fn write_byte(&mut self, value: u8); + + fn write_bytes(&mut self, value: &[u8]); + + fn write_char(&mut self, value: &str, length: usize); + + fn write_string(&mut self, value: &str); + + fn write_short(&mut self, value: i16); + + fn write_int(&mut self, value: i32); + + fn write_long(&mut self, value: i64); + + fn write_float(&mut self, value: f32); + + fn write_double(&mut self, value: f64); + + fn write_binary(&mut self, bytes: &[u8], length: usize); + + // TODO Decimal type + // fn write_decimal(&mut self, pos: i32, value: f64); + + // TODO Timestamp type + // fn write_timestamp_ntz(&mut self, pos: i32, value: i64); + + // TODO Timestamp type + // fn write_timestamp_ltz(&mut self, pos: i32, value: i64); + + // TODO InternalArray, ArraySerializer + // fn write_array(&mut self, pos: i32, value: i64); + + // TODO Row serializer + // fn write_row(&mut self, pos: i32, value: &InternalRow); + + /// Finally, complete write to set real size to binary. + fn complete(&mut self); +} + +/// Accessor for writing the fields/elements of a binary writer during runtime, the +/// fields/elements must be written in the order. +pub trait ValueWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, pos: usize, value: &Datum) -> Result<()>; +} + +#[allow(dead_code)] +impl dyn ValueWriter { + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_value_writer( + element_type: &DataType, + binary_row_format: &BinaryRowFormat, + ) -> Box<dyn ValueWriter> { + let value_writer = + Self::create_not_null_value_writer(element_type, Some(binary_row_format)).unwrap(); + + if !element_type.is_nullable() { + value_writer + } else { + Box::new(NullWriter { + delegate: value_writer, + }) + } + } + + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_not_null_value_writer( + element_type: &DataType, + _: Option<&BinaryRowFormat>, + ) -> Result<Box<dyn ValueWriter>> { + match element_type { + DataType::Char(_) => Ok(Box::new(CharWriter)), + DataType::String(_) => Ok(Box::new(StringWriter)), + DataType::Boolean(_) => Ok(Box::new(BoolWriter)), + DataType::Binary(_) => Ok(Box::new(BinaryValueWriter)), + DataType::Bytes(_) => Ok(Box::new(BytesWriter)), + // TODO DECIMAL + DataType::TinyInt(_) => Ok(Box::new(TinyIntWriter)), + DataType::SmallInt(_) => Ok(Box::new(SmallIntWriter)), + DataType::Int(_) => Ok(Box::new(IntWriter)), + // TODO DATE + // TODO TIME_WITHOUT_TIME_ZONE + DataType::BigInt(_) => Ok(Box::new(LongWriter)), + DataType::Float(_) => Ok(Box::new(FloatWriter)), + DataType::Double(_) => Ok(Box::new(DoubleWriter)), + // TODO TIMESTAMP_WITHOUT_TIME_ZONE + // TODO TIMESTAMP_WITH_LOCAL_TIME_ZONE + // TODO ARRAY + DataType::Map(_) => Err(IoUnsupported { + message: "Map type is not supported yet. Will be added in Issue #1973.".to_string(), + }), + // TODO ROW + _ => Err(IllegalArgument { + message: format!("Type {} is not supported yet", element_type), + }), + } + } +} + +#[derive(Default)] +struct CharWriter; +impl ValueWriter for CharWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_char(v, v.len()); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct StringWriter; +impl ValueWriter for StringWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_string(v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct BoolWriter; +impl ValueWriter for BoolWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Bool(v) = value { + writer.write_boolean(*v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct BinaryValueWriter; +impl ValueWriter for BinaryValueWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + match value { + Datum::Blob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + Datum::BorrowedBlob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + _ => Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }), + } + } +} + +#[derive(Default)] +struct BytesWriter; +impl ValueWriter for BytesWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + match value { + Datum::Blob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + Datum::BorrowedBlob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + value => Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }), + } + } +} + +// TODO DecimalWriter + +#[derive(Default)] +struct TinyIntWriter; +impl ValueWriter for TinyIntWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Int8(v) = value { + writer.write_byte(*v as u8); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) Review Comment: The error message "Wrong ValueWriter used to write value" is unclear about what went wrong. It should specify the expected data type and what was actually provided to help users debug the issue more effectively. ########## crates/fluss/src/row/binary/binary_writer.rs: ########## @@ -0,0 +1,327 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Error::{IllegalArgument, IoUnsupported}; +use crate::error::Result; +use crate::metadata::DataType; +use crate::row::Datum; +use crate::row::binary::BinaryRowFormat; + +/// Writer to write a composite data format, like row, array, +#[allow(dead_code)] +pub trait BinaryWriter { + /// Reset writer to prepare next write + fn reset(&mut self); + + /// Set null to this field + fn set_null_at(&mut self, pos: usize); + + fn write_boolean(&mut self, value: bool); + + fn write_byte(&mut self, value: u8); + + fn write_bytes(&mut self, value: &[u8]); + + fn write_char(&mut self, value: &str, length: usize); + + fn write_string(&mut self, value: &str); + + fn write_short(&mut self, value: i16); + + fn write_int(&mut self, value: i32); + + fn write_long(&mut self, value: i64); + + fn write_float(&mut self, value: f32); + + fn write_double(&mut self, value: f64); + + fn write_binary(&mut self, bytes: &[u8], length: usize); + + // TODO Decimal type + // fn write_decimal(&mut self, pos: i32, value: f64); + + // TODO Timestamp type + // fn write_timestamp_ntz(&mut self, pos: i32, value: i64); + + // TODO Timestamp type + // fn write_timestamp_ltz(&mut self, pos: i32, value: i64); + + // TODO InternalArray, ArraySerializer + // fn write_array(&mut self, pos: i32, value: i64); + + // TODO Row serializer + // fn write_row(&mut self, pos: i32, value: &InternalRow); + + /// Finally, complete write to set real size to binary. + fn complete(&mut self); +} + +/// Accessor for writing the fields/elements of a binary writer during runtime, the +/// fields/elements must be written in the order. +pub trait ValueWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, pos: usize, value: &Datum) -> Result<()>; +} + +#[allow(dead_code)] +impl dyn ValueWriter { + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_value_writer( + element_type: &DataType, + binary_row_format: &BinaryRowFormat, + ) -> Box<dyn ValueWriter> { + let value_writer = + Self::create_not_null_value_writer(element_type, Some(binary_row_format)).unwrap(); + + if !element_type.is_nullable() { + value_writer + } else { + Box::new(NullWriter { + delegate: value_writer, + }) + } + } + + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_not_null_value_writer( + element_type: &DataType, + _: Option<&BinaryRowFormat>, + ) -> Result<Box<dyn ValueWriter>> { + match element_type { + DataType::Char(_) => Ok(Box::new(CharWriter)), + DataType::String(_) => Ok(Box::new(StringWriter)), + DataType::Boolean(_) => Ok(Box::new(BoolWriter)), + DataType::Binary(_) => Ok(Box::new(BinaryValueWriter)), + DataType::Bytes(_) => Ok(Box::new(BytesWriter)), + // TODO DECIMAL + DataType::TinyInt(_) => Ok(Box::new(TinyIntWriter)), + DataType::SmallInt(_) => Ok(Box::new(SmallIntWriter)), + DataType::Int(_) => Ok(Box::new(IntWriter)), + // TODO DATE + // TODO TIME_WITHOUT_TIME_ZONE + DataType::BigInt(_) => Ok(Box::new(LongWriter)), + DataType::Float(_) => Ok(Box::new(FloatWriter)), + DataType::Double(_) => Ok(Box::new(DoubleWriter)), + // TODO TIMESTAMP_WITHOUT_TIME_ZONE + // TODO TIMESTAMP_WITH_LOCAL_TIME_ZONE + // TODO ARRAY + DataType::Map(_) => Err(IoUnsupported { + message: "Map type is not supported yet. Will be added in Issue #1973.".to_string(), + }), + // TODO ROW + _ => Err(IllegalArgument { + message: format!("Type {} is not supported yet", element_type), + }), + } + } +} + +#[derive(Default)] +struct CharWriter; +impl ValueWriter for CharWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_char(v, v.len()); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct StringWriter; +impl ValueWriter for StringWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_string(v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct BoolWriter; +impl ValueWriter for BoolWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Bool(v) = value { + writer.write_boolean(*v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct BinaryValueWriter; +impl ValueWriter for BinaryValueWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + match value { + Datum::Blob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + Datum::BorrowedBlob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + _ => Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }), + } + } +} + +#[derive(Default)] +struct BytesWriter; +impl ValueWriter for BytesWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + match value { + Datum::Blob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + Datum::BorrowedBlob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + value => Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }), + } + } +} Review Comment: The BinaryValueWriter and BytesWriter implementations are nearly identical. However, BytesWriter incorrectly calls write_binary instead of write_bytes. Beyond fixing that bug, consider if these should remain separate or be consolidated given that they handle the same Datum variants and have the same structure. ########## crates/fluss/src/row/encode/compacted_key_encoder.rs: ########## @@ -0,0 +1,313 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Error::IllegalArgument; +use crate::error::Result; +use crate::metadata::RowType; +use crate::row::InternalRow; +use crate::row::binary::ValueWriter; +use crate::row::compacted::CompactedKeyWriter; +use crate::row::encode::KeyEncoder; +use crate::row::field_getter::FieldGetter; +use bytes::Bytes; + +#[allow(dead_code)] +pub struct CompactedKeyEncoder { + field_getters: Vec<Box<dyn FieldGetter>>, + field_encoders: Vec<Box<dyn ValueWriter>>, + compacted_encoder: CompactedKeyWriter, +} + +impl CompactedKeyEncoder { + /// Create a key encoder to encode the key of the input row. + /// + /// # Arguments + /// * `row_type` - the row type of the input row + /// * `keys` - the key fields to encode + /// + /// # Returns + /// * key_encoder - the [`KeyEncoder`] + pub fn create_key_encoder(row_type: &RowType, keys: &[String]) -> Result<CompactedKeyEncoder> { + let mut encode_col_indexes = Vec::with_capacity(keys.len()); + + for key in keys { + match row_type.get_field_index(key) { + Some(idx) => encode_col_indexes.push(idx), + None => { + return Err(IllegalArgument { + message: format!( + "Field {} not found in input row type {:?}", + key, row_type + ), + }); + } + } + } + + Ok(Self::new(row_type, encode_col_indexes)) + } + + #[cfg(test)] + pub fn for_test_row_type(row_type: &RowType) -> Self { + Self::new(row_type, (0..row_type.fields().len()).collect()) + } + + pub fn new(row_type: &RowType, encode_field_pos: Vec<usize>) -> CompactedKeyEncoder { + let mut field_getters: Vec<Box<dyn FieldGetter>> = + Vec::with_capacity(encode_field_pos.len()); + let mut field_encoders: Vec<Box<dyn ValueWriter>> = + Vec::with_capacity(encode_field_pos.len()); + + for pos in &encode_field_pos { + let data_type = row_type.fields().get(*pos).unwrap().data_type(); + field_getters.push(<dyn FieldGetter>::create_field_getter(data_type, *pos)); + field_encoders.push(CompactedKeyWriter::create_value_writer(data_type)); + } + + CompactedKeyEncoder { + field_encoders, + field_getters, + compacted_encoder: CompactedKeyWriter::new(), + } + } +} + +#[allow(dead_code)] +impl KeyEncoder for CompactedKeyEncoder { + fn encode_key(&mut self, row: &dyn InternalRow) -> Result<Bytes> { + self.compacted_encoder.reset(); + + // iterate all the fields of the row, and encode each field + self.field_getters + .iter() + .enumerate() + .for_each(|(pos, field_getter)| { + self.field_encoders + .get(pos) + .unwrap() + .write_value( + &mut self.compacted_encoder, + pos, + &field_getter.get_field(row), + ) + .unwrap(); + }); Review Comment: The unwrap() call here will panic if write_value returns an error. This should be propagated properly using the ? operator instead to maintain consistent error handling throughout the encoding process. ```suggestion for (pos, field_getter) in self.field_getters.iter().enumerate() { self.field_encoders .get(pos) .unwrap() .write_value( &mut self.compacted_encoder, pos, &field_getter.get_field(row), )?; } ``` ########## crates/fluss/src/row/binary/binary_writer.rs: ########## @@ -0,0 +1,327 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Error::{IllegalArgument, IoUnsupported}; +use crate::error::Result; +use crate::metadata::DataType; +use crate::row::Datum; +use crate::row::binary::BinaryRowFormat; + +/// Writer to write a composite data format, like row, array, +#[allow(dead_code)] +pub trait BinaryWriter { + /// Reset writer to prepare next write + fn reset(&mut self); + + /// Set null to this field + fn set_null_at(&mut self, pos: usize); + + fn write_boolean(&mut self, value: bool); + + fn write_byte(&mut self, value: u8); + + fn write_bytes(&mut self, value: &[u8]); + + fn write_char(&mut self, value: &str, length: usize); + + fn write_string(&mut self, value: &str); + + fn write_short(&mut self, value: i16); + + fn write_int(&mut self, value: i32); + + fn write_long(&mut self, value: i64); + + fn write_float(&mut self, value: f32); + + fn write_double(&mut self, value: f64); + + fn write_binary(&mut self, bytes: &[u8], length: usize); + + // TODO Decimal type + // fn write_decimal(&mut self, pos: i32, value: f64); + + // TODO Timestamp type + // fn write_timestamp_ntz(&mut self, pos: i32, value: i64); + + // TODO Timestamp type + // fn write_timestamp_ltz(&mut self, pos: i32, value: i64); + + // TODO InternalArray, ArraySerializer + // fn write_array(&mut self, pos: i32, value: i64); + + // TODO Row serializer + // fn write_row(&mut self, pos: i32, value: &InternalRow); + + /// Finally, complete write to set real size to binary. + fn complete(&mut self); +} + +/// Accessor for writing the fields/elements of a binary writer during runtime, the +/// fields/elements must be written in the order. +pub trait ValueWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, pos: usize, value: &Datum) -> Result<()>; +} + +#[allow(dead_code)] +impl dyn ValueWriter { + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_value_writer( + element_type: &DataType, + binary_row_format: &BinaryRowFormat, + ) -> Box<dyn ValueWriter> { + let value_writer = + Self::create_not_null_value_writer(element_type, Some(binary_row_format)).unwrap(); + + if !element_type.is_nullable() { + value_writer + } else { + Box::new(NullWriter { + delegate: value_writer, + }) + } + } + + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_not_null_value_writer( + element_type: &DataType, + _: Option<&BinaryRowFormat>, + ) -> Result<Box<dyn ValueWriter>> { + match element_type { + DataType::Char(_) => Ok(Box::new(CharWriter)), + DataType::String(_) => Ok(Box::new(StringWriter)), + DataType::Boolean(_) => Ok(Box::new(BoolWriter)), + DataType::Binary(_) => Ok(Box::new(BinaryValueWriter)), + DataType::Bytes(_) => Ok(Box::new(BytesWriter)), + // TODO DECIMAL + DataType::TinyInt(_) => Ok(Box::new(TinyIntWriter)), + DataType::SmallInt(_) => Ok(Box::new(SmallIntWriter)), + DataType::Int(_) => Ok(Box::new(IntWriter)), + // TODO DATE + // TODO TIME_WITHOUT_TIME_ZONE + DataType::BigInt(_) => Ok(Box::new(LongWriter)), + DataType::Float(_) => Ok(Box::new(FloatWriter)), + DataType::Double(_) => Ok(Box::new(DoubleWriter)), + // TODO TIMESTAMP_WITHOUT_TIME_ZONE + // TODO TIMESTAMP_WITH_LOCAL_TIME_ZONE + // TODO ARRAY + DataType::Map(_) => Err(IoUnsupported { + message: "Map type is not supported yet. Will be added in Issue #1973.".to_string(), + }), + // TODO ROW + _ => Err(IllegalArgument { + message: format!("Type {} is not supported yet", element_type), + }), + } + } +} + +#[derive(Default)] +struct CharWriter; +impl ValueWriter for CharWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_char(v, v.len()); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct StringWriter; +impl ValueWriter for StringWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_string(v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct BoolWriter; +impl ValueWriter for BoolWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Bool(v) = value { + writer.write_boolean(*v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct BinaryValueWriter; +impl ValueWriter for BinaryValueWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + match value { + Datum::Blob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + Datum::BorrowedBlob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + _ => Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }), + } + } +} + +#[derive(Default)] +struct BytesWriter; +impl ValueWriter for BytesWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + match value { + Datum::Blob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + Datum::BorrowedBlob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + value => Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }), + } + } +} + +// TODO DecimalWriter + +#[derive(Default)] +struct TinyIntWriter; +impl ValueWriter for TinyIntWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Int8(v) = value { + writer.write_byte(*v as u8); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct SmallIntWriter; +impl ValueWriter for SmallIntWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Int16(v) = value { + writer.write_short(*v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct IntWriter; +impl ValueWriter for IntWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Int32(v) = value { + writer.write_int(*v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct LongWriter; +impl ValueWriter for LongWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Int64(v) = value { + writer.write_long(*v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct FloatWriter; +impl ValueWriter for FloatWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Float32(v) = value { + writer.write_float(v.into_inner()); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) Review Comment: The error message "Wrong ValueWriter used to write value" is unclear about what went wrong. It should specify the expected data type and what was actually provided to help users debug the issue more effectively. ########## crates/fluss/src/row/compacted/compacted_key_writer.rs: ########## @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::row::compacted::compacted_row_writer::CompactedRowWriter; +use bytes::Bytes; + +use crate::error::Error::IllegalArgument; +use crate::error::Result; +use crate::metadata::DataType; +use crate::row::Datum; +use crate::row::binary::{BinaryRowFormat, BinaryWriter, ValueWriter}; +use delegate::delegate; + +/// A wrapping of [`CompactedRowWriter`] used to encode key columns. +/// The encoding is the same as [`CompactedRowWriter`], but is without header of null bits to +// represent whether the field value is null or not since the key columns must be not null. +pub struct CompactedKeyWriter { + delegate: CompactedRowWriter, +} + +impl CompactedKeyWriter { + pub fn new() -> CompactedKeyWriter { + CompactedKeyWriter { + // in compacted key encoder, we don't need to set null bits as the key columns must be not + // null, to use field count 0 to init to make the null bits 0 + delegate: CompactedRowWriter::new(0), + } + } + + pub fn create_value_writer(field_type: &DataType) -> Box<dyn ValueWriter> { + RejectNullValueWriter::wrap( + field_type.clone(), + <dyn ValueWriter>::create_not_null_value_writer( + field_type, + Some(&BinaryRowFormat::Compacted), + ) + .unwrap(), + ) Review Comment: The create_value_writer and create_not_null_value_writer methods call unwrap() on the result, which will panic on error. This should properly propagate the error using the ? operator or return a Result type to maintain consistent error handling. ```suggestion pub fn create_value_writer(field_type: &DataType) -> Result<Box<dyn ValueWriter>> { let inner = <dyn ValueWriter>::create_not_null_value_writer( field_type, Some(&BinaryRowFormat::Compacted), )?; Ok(RejectNullValueWriter::wrap(field_type.clone(), inner)) ``` ########## crates/fluss/src/row/binary/binary_writer.rs: ########## @@ -0,0 +1,327 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Error::{IllegalArgument, IoUnsupported}; +use crate::error::Result; +use crate::metadata::DataType; +use crate::row::Datum; +use crate::row::binary::BinaryRowFormat; + +/// Writer to write a composite data format, like row, array, +#[allow(dead_code)] +pub trait BinaryWriter { + /// Reset writer to prepare next write + fn reset(&mut self); + + /// Set null to this field + fn set_null_at(&mut self, pos: usize); + + fn write_boolean(&mut self, value: bool); + + fn write_byte(&mut self, value: u8); + + fn write_bytes(&mut self, value: &[u8]); + + fn write_char(&mut self, value: &str, length: usize); + + fn write_string(&mut self, value: &str); + + fn write_short(&mut self, value: i16); + + fn write_int(&mut self, value: i32); + + fn write_long(&mut self, value: i64); + + fn write_float(&mut self, value: f32); + + fn write_double(&mut self, value: f64); + + fn write_binary(&mut self, bytes: &[u8], length: usize); + + // TODO Decimal type + // fn write_decimal(&mut self, pos: i32, value: f64); + + // TODO Timestamp type + // fn write_timestamp_ntz(&mut self, pos: i32, value: i64); + + // TODO Timestamp type + // fn write_timestamp_ltz(&mut self, pos: i32, value: i64); + + // TODO InternalArray, ArraySerializer + // fn write_array(&mut self, pos: i32, value: i64); + + // TODO Row serializer + // fn write_row(&mut self, pos: i32, value: &InternalRow); + + /// Finally, complete write to set real size to binary. + fn complete(&mut self); +} + +/// Accessor for writing the fields/elements of a binary writer during runtime, the +/// fields/elements must be written in the order. +pub trait ValueWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, pos: usize, value: &Datum) -> Result<()>; +} + +#[allow(dead_code)] +impl dyn ValueWriter { + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_value_writer( + element_type: &DataType, + binary_row_format: &BinaryRowFormat, + ) -> Box<dyn ValueWriter> { + let value_writer = + Self::create_not_null_value_writer(element_type, Some(binary_row_format)).unwrap(); + + if !element_type.is_nullable() { + value_writer + } else { + Box::new(NullWriter { + delegate: value_writer, + }) + } + } + + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_not_null_value_writer( + element_type: &DataType, + _: Option<&BinaryRowFormat>, + ) -> Result<Box<dyn ValueWriter>> { + match element_type { + DataType::Char(_) => Ok(Box::new(CharWriter)), + DataType::String(_) => Ok(Box::new(StringWriter)), + DataType::Boolean(_) => Ok(Box::new(BoolWriter)), + DataType::Binary(_) => Ok(Box::new(BinaryValueWriter)), + DataType::Bytes(_) => Ok(Box::new(BytesWriter)), + // TODO DECIMAL + DataType::TinyInt(_) => Ok(Box::new(TinyIntWriter)), + DataType::SmallInt(_) => Ok(Box::new(SmallIntWriter)), + DataType::Int(_) => Ok(Box::new(IntWriter)), + // TODO DATE + // TODO TIME_WITHOUT_TIME_ZONE + DataType::BigInt(_) => Ok(Box::new(LongWriter)), + DataType::Float(_) => Ok(Box::new(FloatWriter)), + DataType::Double(_) => Ok(Box::new(DoubleWriter)), + // TODO TIMESTAMP_WITHOUT_TIME_ZONE + // TODO TIMESTAMP_WITH_LOCAL_TIME_ZONE + // TODO ARRAY + DataType::Map(_) => Err(IoUnsupported { + message: "Map type is not supported yet. Will be added in Issue #1973.".to_string(), + }), + // TODO ROW + _ => Err(IllegalArgument { + message: format!("Type {} is not supported yet", element_type), + }), + } + } +} + +#[derive(Default)] +struct CharWriter; +impl ValueWriter for CharWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_char(v, v.len()); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct StringWriter; +impl ValueWriter for StringWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_string(v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) Review Comment: The error message "Wrong ValueWriter used to write value" is unclear about what went wrong. It should specify the expected data type and what was actually provided to help users debug the issue more effectively. ########## crates/fluss/src/row/encode/mod.rs: ########## @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod compacted_key_encoder; + +use crate::error::Result; +use crate::metadata::{DataLakeFormat, RowType}; +use crate::row::InternalRow; +use crate::row::encode::compacted_key_encoder::CompactedKeyEncoder; +use bytes::Bytes; + +/// An interface for encoding key of row into bytes. +#[allow(dead_code)] +pub trait KeyEncoder { + fn encode_key(&mut self, row: &dyn InternalRow) -> Result<Bytes>; +} + +#[allow(dead_code)] +impl dyn KeyEncoder { + /// Create a key encoder to encode the key array bytes of the input row. Review Comment: The documentation says "key array bytes" but should say "key bytes" since the method encodes key fields into bytes, not an array of keys. The documentation should be clarified to accurately describe what is being encoded. ```suggestion /// Create a key encoder to encode the key bytes of the input row. ``` ########## crates/fluss/src/row/binary/binary_writer.rs: ########## @@ -0,0 +1,327 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Error::{IllegalArgument, IoUnsupported}; +use crate::error::Result; +use crate::metadata::DataType; +use crate::row::Datum; +use crate::row::binary::BinaryRowFormat; + +/// Writer to write a composite data format, like row, array, +#[allow(dead_code)] +pub trait BinaryWriter { + /// Reset writer to prepare next write + fn reset(&mut self); + + /// Set null to this field + fn set_null_at(&mut self, pos: usize); + + fn write_boolean(&mut self, value: bool); + + fn write_byte(&mut self, value: u8); + + fn write_bytes(&mut self, value: &[u8]); + + fn write_char(&mut self, value: &str, length: usize); + + fn write_string(&mut self, value: &str); + + fn write_short(&mut self, value: i16); + + fn write_int(&mut self, value: i32); + + fn write_long(&mut self, value: i64); + + fn write_float(&mut self, value: f32); + + fn write_double(&mut self, value: f64); + + fn write_binary(&mut self, bytes: &[u8], length: usize); + + // TODO Decimal type + // fn write_decimal(&mut self, pos: i32, value: f64); + + // TODO Timestamp type + // fn write_timestamp_ntz(&mut self, pos: i32, value: i64); + + // TODO Timestamp type + // fn write_timestamp_ltz(&mut self, pos: i32, value: i64); + + // TODO InternalArray, ArraySerializer + // fn write_array(&mut self, pos: i32, value: i64); + + // TODO Row serializer + // fn write_row(&mut self, pos: i32, value: &InternalRow); + + /// Finally, complete write to set real size to binary. + fn complete(&mut self); +} + +/// Accessor for writing the fields/elements of a binary writer during runtime, the +/// fields/elements must be written in the order. +pub trait ValueWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, pos: usize, value: &Datum) -> Result<()>; +} + +#[allow(dead_code)] +impl dyn ValueWriter { + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_value_writer( + element_type: &DataType, + binary_row_format: &BinaryRowFormat, + ) -> Box<dyn ValueWriter> { + let value_writer = + Self::create_not_null_value_writer(element_type, Some(binary_row_format)).unwrap(); + + if !element_type.is_nullable() { + value_writer + } else { + Box::new(NullWriter { + delegate: value_writer, + }) + } + } + + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_not_null_value_writer( + element_type: &DataType, + _: Option<&BinaryRowFormat>, + ) -> Result<Box<dyn ValueWriter>> { + match element_type { + DataType::Char(_) => Ok(Box::new(CharWriter)), + DataType::String(_) => Ok(Box::new(StringWriter)), + DataType::Boolean(_) => Ok(Box::new(BoolWriter)), + DataType::Binary(_) => Ok(Box::new(BinaryValueWriter)), + DataType::Bytes(_) => Ok(Box::new(BytesWriter)), + // TODO DECIMAL + DataType::TinyInt(_) => Ok(Box::new(TinyIntWriter)), + DataType::SmallInt(_) => Ok(Box::new(SmallIntWriter)), + DataType::Int(_) => Ok(Box::new(IntWriter)), + // TODO DATE + // TODO TIME_WITHOUT_TIME_ZONE + DataType::BigInt(_) => Ok(Box::new(LongWriter)), + DataType::Float(_) => Ok(Box::new(FloatWriter)), + DataType::Double(_) => Ok(Box::new(DoubleWriter)), + // TODO TIMESTAMP_WITHOUT_TIME_ZONE + // TODO TIMESTAMP_WITH_LOCAL_TIME_ZONE + // TODO ARRAY + DataType::Map(_) => Err(IoUnsupported { + message: "Map type is not supported yet. Will be added in Issue #1973.".to_string(), + }), + // TODO ROW + _ => Err(IllegalArgument { + message: format!("Type {} is not supported yet", element_type), + }), + } + } +} + +#[derive(Default)] +struct CharWriter; +impl ValueWriter for CharWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_char(v, v.len()); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct StringWriter; +impl ValueWriter for StringWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_string(v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct BoolWriter; +impl ValueWriter for BoolWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Bool(v) = value { + writer.write_boolean(*v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct BinaryValueWriter; +impl ValueWriter for BinaryValueWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + match value { + Datum::Blob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + Datum::BorrowedBlob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + _ => Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }), Review Comment: The error message "Wrong ValueWriter used to write value" is unclear about what went wrong. It should specify the expected data type and what was actually provided to help users debug the issue more effectively. ########## crates/fluss/src/row/compacted/compacted_key_writer.rs: ########## @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::row::compacted::compacted_row_writer::CompactedRowWriter; +use bytes::Bytes; + +use crate::error::Error::IllegalArgument; +use crate::error::Result; +use crate::metadata::DataType; +use crate::row::Datum; +use crate::row::binary::{BinaryRowFormat, BinaryWriter, ValueWriter}; +use delegate::delegate; + +/// A wrapping of [`CompactedRowWriter`] used to encode key columns. +/// The encoding is the same as [`CompactedRowWriter`], but is without header of null bits to +// represent whether the field value is null or not since the key columns must be not null. Review Comment: The comment line 30 uses "//" at the start instead of "///" for multi-line doc comments. This breaks the documentation format and should use "///" to properly extend the doc comment. ```suggestion /// represent whether the field value is null or not since the key columns must be not null. ``` ########## crates/fluss/src/row/binary/binary_writer.rs: ########## @@ -0,0 +1,327 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Error::{IllegalArgument, IoUnsupported}; +use crate::error::Result; +use crate::metadata::DataType; +use crate::row::Datum; +use crate::row::binary::BinaryRowFormat; + +/// Writer to write a composite data format, like row, array, +#[allow(dead_code)] +pub trait BinaryWriter { + /// Reset writer to prepare next write + fn reset(&mut self); + + /// Set null to this field + fn set_null_at(&mut self, pos: usize); + + fn write_boolean(&mut self, value: bool); + + fn write_byte(&mut self, value: u8); + + fn write_bytes(&mut self, value: &[u8]); + + fn write_char(&mut self, value: &str, length: usize); + + fn write_string(&mut self, value: &str); + + fn write_short(&mut self, value: i16); + + fn write_int(&mut self, value: i32); + + fn write_long(&mut self, value: i64); + + fn write_float(&mut self, value: f32); + + fn write_double(&mut self, value: f64); + + fn write_binary(&mut self, bytes: &[u8], length: usize); + + // TODO Decimal type + // fn write_decimal(&mut self, pos: i32, value: f64); + + // TODO Timestamp type + // fn write_timestamp_ntz(&mut self, pos: i32, value: i64); + + // TODO Timestamp type + // fn write_timestamp_ltz(&mut self, pos: i32, value: i64); + + // TODO InternalArray, ArraySerializer + // fn write_array(&mut self, pos: i32, value: i64); + + // TODO Row serializer + // fn write_row(&mut self, pos: i32, value: &InternalRow); + + /// Finally, complete write to set real size to binary. + fn complete(&mut self); +} + +/// Accessor for writing the fields/elements of a binary writer during runtime, the +/// fields/elements must be written in the order. +pub trait ValueWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, pos: usize, value: &Datum) -> Result<()>; +} + +#[allow(dead_code)] +impl dyn ValueWriter { + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_value_writer( + element_type: &DataType, + binary_row_format: &BinaryRowFormat, + ) -> Box<dyn ValueWriter> { + let value_writer = + Self::create_not_null_value_writer(element_type, Some(binary_row_format)).unwrap(); + + if !element_type.is_nullable() { + value_writer + } else { + Box::new(NullWriter { + delegate: value_writer, + }) + } + } + + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_not_null_value_writer( + element_type: &DataType, + _: Option<&BinaryRowFormat>, + ) -> Result<Box<dyn ValueWriter>> { + match element_type { + DataType::Char(_) => Ok(Box::new(CharWriter)), + DataType::String(_) => Ok(Box::new(StringWriter)), + DataType::Boolean(_) => Ok(Box::new(BoolWriter)), + DataType::Binary(_) => Ok(Box::new(BinaryValueWriter)), + DataType::Bytes(_) => Ok(Box::new(BytesWriter)), + // TODO DECIMAL + DataType::TinyInt(_) => Ok(Box::new(TinyIntWriter)), + DataType::SmallInt(_) => Ok(Box::new(SmallIntWriter)), + DataType::Int(_) => Ok(Box::new(IntWriter)), + // TODO DATE + // TODO TIME_WITHOUT_TIME_ZONE + DataType::BigInt(_) => Ok(Box::new(LongWriter)), + DataType::Float(_) => Ok(Box::new(FloatWriter)), + DataType::Double(_) => Ok(Box::new(DoubleWriter)), + // TODO TIMESTAMP_WITHOUT_TIME_ZONE + // TODO TIMESTAMP_WITH_LOCAL_TIME_ZONE + // TODO ARRAY + DataType::Map(_) => Err(IoUnsupported { + message: "Map type is not supported yet. Will be added in Issue #1973.".to_string(), + }), + // TODO ROW + _ => Err(IllegalArgument { + message: format!("Type {} is not supported yet", element_type), + }), + } + } +} + +#[derive(Default)] +struct CharWriter; +impl ValueWriter for CharWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_char(v, v.len()); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct StringWriter; +impl ValueWriter for StringWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_string(v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct BoolWriter; +impl ValueWriter for BoolWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Bool(v) = value { + writer.write_boolean(*v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct BinaryValueWriter; +impl ValueWriter for BinaryValueWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + match value { + Datum::Blob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + Datum::BorrowedBlob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + _ => Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }), + } + } +} + +#[derive(Default)] +struct BytesWriter; +impl ValueWriter for BytesWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + match value { + Datum::Blob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + Datum::BorrowedBlob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + value => Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }), + } + } +} Review Comment: The BytesWriter implementation calls write_binary instead of write_bytes. According to the BinaryWriter trait and CompactedRowWriter implementation, write_bytes encodes the length followed by the data, while write_binary takes a fixed length parameter. For the Bytes data type, the length-prefixed encoding should be used by calling write_bytes instead. ########## crates/fluss/src/row/binary/binary_writer.rs: ########## @@ -0,0 +1,327 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Error::{IllegalArgument, IoUnsupported}; +use crate::error::Result; +use crate::metadata::DataType; +use crate::row::Datum; +use crate::row::binary::BinaryRowFormat; + +/// Writer to write a composite data format, like row, array, +#[allow(dead_code)] +pub trait BinaryWriter { + /// Reset writer to prepare next write + fn reset(&mut self); + + /// Set null to this field + fn set_null_at(&mut self, pos: usize); + + fn write_boolean(&mut self, value: bool); + + fn write_byte(&mut self, value: u8); + + fn write_bytes(&mut self, value: &[u8]); + + fn write_char(&mut self, value: &str, length: usize); + + fn write_string(&mut self, value: &str); + + fn write_short(&mut self, value: i16); + + fn write_int(&mut self, value: i32); + + fn write_long(&mut self, value: i64); + + fn write_float(&mut self, value: f32); + + fn write_double(&mut self, value: f64); + + fn write_binary(&mut self, bytes: &[u8], length: usize); + + // TODO Decimal type + // fn write_decimal(&mut self, pos: i32, value: f64); + + // TODO Timestamp type + // fn write_timestamp_ntz(&mut self, pos: i32, value: i64); + + // TODO Timestamp type + // fn write_timestamp_ltz(&mut self, pos: i32, value: i64); + + // TODO InternalArray, ArraySerializer + // fn write_array(&mut self, pos: i32, value: i64); + + // TODO Row serializer + // fn write_row(&mut self, pos: i32, value: &InternalRow); + + /// Finally, complete write to set real size to binary. + fn complete(&mut self); +} + +/// Accessor for writing the fields/elements of a binary writer during runtime, the +/// fields/elements must be written in the order. +pub trait ValueWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, pos: usize, value: &Datum) -> Result<()>; +} + +#[allow(dead_code)] +impl dyn ValueWriter { + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_value_writer( + element_type: &DataType, + binary_row_format: &BinaryRowFormat, + ) -> Box<dyn ValueWriter> { + let value_writer = + Self::create_not_null_value_writer(element_type, Some(binary_row_format)).unwrap(); + + if !element_type.is_nullable() { + value_writer + } else { + Box::new(NullWriter { + delegate: value_writer, + }) + } + } + + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_not_null_value_writer( + element_type: &DataType, + _: Option<&BinaryRowFormat>, + ) -> Result<Box<dyn ValueWriter>> { + match element_type { + DataType::Char(_) => Ok(Box::new(CharWriter)), + DataType::String(_) => Ok(Box::new(StringWriter)), + DataType::Boolean(_) => Ok(Box::new(BoolWriter)), + DataType::Binary(_) => Ok(Box::new(BinaryValueWriter)), + DataType::Bytes(_) => Ok(Box::new(BytesWriter)), + // TODO DECIMAL + DataType::TinyInt(_) => Ok(Box::new(TinyIntWriter)), + DataType::SmallInt(_) => Ok(Box::new(SmallIntWriter)), + DataType::Int(_) => Ok(Box::new(IntWriter)), + // TODO DATE + // TODO TIME_WITHOUT_TIME_ZONE + DataType::BigInt(_) => Ok(Box::new(LongWriter)), + DataType::Float(_) => Ok(Box::new(FloatWriter)), + DataType::Double(_) => Ok(Box::new(DoubleWriter)), + // TODO TIMESTAMP_WITHOUT_TIME_ZONE + // TODO TIMESTAMP_WITH_LOCAL_TIME_ZONE + // TODO ARRAY + DataType::Map(_) => Err(IoUnsupported { + message: "Map type is not supported yet. Will be added in Issue #1973.".to_string(), + }), + // TODO ROW + _ => Err(IllegalArgument { + message: format!("Type {} is not supported yet", element_type), + }), + } + } +} + +#[derive(Default)] +struct CharWriter; +impl ValueWriter for CharWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_char(v, v.len()); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct StringWriter; +impl ValueWriter for StringWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_string(v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct BoolWriter; +impl ValueWriter for BoolWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Bool(v) = value { + writer.write_boolean(*v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct BinaryValueWriter; +impl ValueWriter for BinaryValueWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + match value { + Datum::Blob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + Datum::BorrowedBlob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + _ => Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }), + } + } +} + +#[derive(Default)] +struct BytesWriter; +impl ValueWriter for BytesWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + match value { + Datum::Blob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + Datum::BorrowedBlob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + value => Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }), Review Comment: The error message "Wrong ValueWriter used to write value" is unclear about what went wrong. It should specify the expected data type and what was actually provided to help users debug the issue more effectively. ########## crates/fluss/src/row/binary/binary_writer.rs: ########## @@ -0,0 +1,327 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Error::{IllegalArgument, IoUnsupported}; +use crate::error::Result; +use crate::metadata::DataType; +use crate::row::Datum; +use crate::row::binary::BinaryRowFormat; + +/// Writer to write a composite data format, like row, array, +#[allow(dead_code)] +pub trait BinaryWriter { + /// Reset writer to prepare next write + fn reset(&mut self); + + /// Set null to this field + fn set_null_at(&mut self, pos: usize); + + fn write_boolean(&mut self, value: bool); + + fn write_byte(&mut self, value: u8); + + fn write_bytes(&mut self, value: &[u8]); + + fn write_char(&mut self, value: &str, length: usize); + + fn write_string(&mut self, value: &str); + + fn write_short(&mut self, value: i16); + + fn write_int(&mut self, value: i32); + + fn write_long(&mut self, value: i64); + + fn write_float(&mut self, value: f32); + + fn write_double(&mut self, value: f64); + + fn write_binary(&mut self, bytes: &[u8], length: usize); + + // TODO Decimal type + // fn write_decimal(&mut self, pos: i32, value: f64); + + // TODO Timestamp type + // fn write_timestamp_ntz(&mut self, pos: i32, value: i64); + + // TODO Timestamp type + // fn write_timestamp_ltz(&mut self, pos: i32, value: i64); + + // TODO InternalArray, ArraySerializer + // fn write_array(&mut self, pos: i32, value: i64); + + // TODO Row serializer + // fn write_row(&mut self, pos: i32, value: &InternalRow); + + /// Finally, complete write to set real size to binary. + fn complete(&mut self); +} + +/// Accessor for writing the fields/elements of a binary writer during runtime, the +/// fields/elements must be written in the order. +pub trait ValueWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, pos: usize, value: &Datum) -> Result<()>; +} + +#[allow(dead_code)] +impl dyn ValueWriter { + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_value_writer( + element_type: &DataType, + binary_row_format: &BinaryRowFormat, + ) -> Box<dyn ValueWriter> { + let value_writer = + Self::create_not_null_value_writer(element_type, Some(binary_row_format)).unwrap(); + + if !element_type.is_nullable() { + value_writer + } else { + Box::new(NullWriter { + delegate: value_writer, + }) + } + } + + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_not_null_value_writer( + element_type: &DataType, + _: Option<&BinaryRowFormat>, + ) -> Result<Box<dyn ValueWriter>> { + match element_type { + DataType::Char(_) => Ok(Box::new(CharWriter)), + DataType::String(_) => Ok(Box::new(StringWriter)), + DataType::Boolean(_) => Ok(Box::new(BoolWriter)), + DataType::Binary(_) => Ok(Box::new(BinaryValueWriter)), + DataType::Bytes(_) => Ok(Box::new(BytesWriter)), + // TODO DECIMAL + DataType::TinyInt(_) => Ok(Box::new(TinyIntWriter)), + DataType::SmallInt(_) => Ok(Box::new(SmallIntWriter)), + DataType::Int(_) => Ok(Box::new(IntWriter)), + // TODO DATE + // TODO TIME_WITHOUT_TIME_ZONE + DataType::BigInt(_) => Ok(Box::new(LongWriter)), + DataType::Float(_) => Ok(Box::new(FloatWriter)), + DataType::Double(_) => Ok(Box::new(DoubleWriter)), + // TODO TIMESTAMP_WITHOUT_TIME_ZONE + // TODO TIMESTAMP_WITH_LOCAL_TIME_ZONE + // TODO ARRAY + DataType::Map(_) => Err(IoUnsupported { + message: "Map type is not supported yet. Will be added in Issue #1973.".to_string(), + }), + // TODO ROW + _ => Err(IllegalArgument { + message: format!("Type {} is not supported yet", element_type), + }), + } + } +} + +#[derive(Default)] +struct CharWriter; +impl ValueWriter for CharWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_char(v, v.len()); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct StringWriter; +impl ValueWriter for StringWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_string(v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct BoolWriter; +impl ValueWriter for BoolWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Bool(v) = value { + writer.write_boolean(*v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct BinaryValueWriter; +impl ValueWriter for BinaryValueWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + match value { + Datum::Blob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + Datum::BorrowedBlob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + _ => Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }), + } + } +} + +#[derive(Default)] +struct BytesWriter; +impl ValueWriter for BytesWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + match value { + Datum::Blob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + Datum::BorrowedBlob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + value => Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }), + } + } +} + +// TODO DecimalWriter + +#[derive(Default)] +struct TinyIntWriter; +impl ValueWriter for TinyIntWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Int8(v) = value { + writer.write_byte(*v as u8); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct SmallIntWriter; +impl ValueWriter for SmallIntWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Int16(v) = value { + writer.write_short(*v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct IntWriter; +impl ValueWriter for IntWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Int32(v) = value { + writer.write_int(*v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct LongWriter; +impl ValueWriter for LongWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Int64(v) = value { + writer.write_long(*v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) Review Comment: The error message "Wrong ValueWriter used to write value" is unclear about what went wrong. It should specify the expected data type and what was actually provided to help users debug the issue more effectively. ########## crates/fluss/src/row/encode/mod.rs: ########## @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod compacted_key_encoder; + +use crate::error::Result; +use crate::metadata::{DataLakeFormat, RowType}; +use crate::row::InternalRow; +use crate::row::encode::compacted_key_encoder::CompactedKeyEncoder; +use bytes::Bytes; + +/// An interface for encoding key of row into bytes. +#[allow(dead_code)] +pub trait KeyEncoder { + fn encode_key(&mut self, row: &dyn InternalRow) -> Result<Bytes>; +} + +#[allow(dead_code)] +impl dyn KeyEncoder { + /// Create a key encoder to encode the key array bytes of the input row. + /// # Arguments + /// * `row_type` - the row type of the input row + /// * `key_fields` - the key fields to encode + /// * `lake_format` - the datalake format Review Comment: The parameter name uses "data_lake_format" with an underscore, but the documentation comment says "datalake format" without a space. For consistency and readability, the documentation should say "data lake format" (with a space) to match the parameter naming convention. ```suggestion /// * `data_lake_format` - the data lake format ``` ########## crates/fluss/src/row/binary/binary_writer.rs: ########## @@ -0,0 +1,327 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Error::{IllegalArgument, IoUnsupported}; +use crate::error::Result; +use crate::metadata::DataType; +use crate::row::Datum; +use crate::row::binary::BinaryRowFormat; + +/// Writer to write a composite data format, like row, array, +#[allow(dead_code)] +pub trait BinaryWriter { + /// Reset writer to prepare next write + fn reset(&mut self); + + /// Set null to this field + fn set_null_at(&mut self, pos: usize); + + fn write_boolean(&mut self, value: bool); + + fn write_byte(&mut self, value: u8); + + fn write_bytes(&mut self, value: &[u8]); + + fn write_char(&mut self, value: &str, length: usize); + + fn write_string(&mut self, value: &str); + + fn write_short(&mut self, value: i16); + + fn write_int(&mut self, value: i32); + + fn write_long(&mut self, value: i64); + + fn write_float(&mut self, value: f32); + + fn write_double(&mut self, value: f64); + + fn write_binary(&mut self, bytes: &[u8], length: usize); + + // TODO Decimal type + // fn write_decimal(&mut self, pos: i32, value: f64); + + // TODO Timestamp type + // fn write_timestamp_ntz(&mut self, pos: i32, value: i64); + + // TODO Timestamp type + // fn write_timestamp_ltz(&mut self, pos: i32, value: i64); + + // TODO InternalArray, ArraySerializer + // fn write_array(&mut self, pos: i32, value: i64); + + // TODO Row serializer + // fn write_row(&mut self, pos: i32, value: &InternalRow); + + /// Finally, complete write to set real size to binary. + fn complete(&mut self); +} + +/// Accessor for writing the fields/elements of a binary writer during runtime, the +/// fields/elements must be written in the order. +pub trait ValueWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, pos: usize, value: &Datum) -> Result<()>; +} + +#[allow(dead_code)] +impl dyn ValueWriter { + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_value_writer( + element_type: &DataType, + binary_row_format: &BinaryRowFormat, + ) -> Box<dyn ValueWriter> { + let value_writer = + Self::create_not_null_value_writer(element_type, Some(binary_row_format)).unwrap(); + + if !element_type.is_nullable() { + value_writer + } else { + Box::new(NullWriter { + delegate: value_writer, + }) + } + } + + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_not_null_value_writer( + element_type: &DataType, + _: Option<&BinaryRowFormat>, + ) -> Result<Box<dyn ValueWriter>> { + match element_type { + DataType::Char(_) => Ok(Box::new(CharWriter)), + DataType::String(_) => Ok(Box::new(StringWriter)), + DataType::Boolean(_) => Ok(Box::new(BoolWriter)), + DataType::Binary(_) => Ok(Box::new(BinaryValueWriter)), + DataType::Bytes(_) => Ok(Box::new(BytesWriter)), + // TODO DECIMAL + DataType::TinyInt(_) => Ok(Box::new(TinyIntWriter)), + DataType::SmallInt(_) => Ok(Box::new(SmallIntWriter)), + DataType::Int(_) => Ok(Box::new(IntWriter)), + // TODO DATE + // TODO TIME_WITHOUT_TIME_ZONE + DataType::BigInt(_) => Ok(Box::new(LongWriter)), + DataType::Float(_) => Ok(Box::new(FloatWriter)), + DataType::Double(_) => Ok(Box::new(DoubleWriter)), + // TODO TIMESTAMP_WITHOUT_TIME_ZONE + // TODO TIMESTAMP_WITH_LOCAL_TIME_ZONE + // TODO ARRAY + DataType::Map(_) => Err(IoUnsupported { + message: "Map type is not supported yet. Will be added in Issue #1973.".to_string(), + }), + // TODO ROW + _ => Err(IllegalArgument { + message: format!("Type {} is not supported yet", element_type), + }), + } + } +} + +#[derive(Default)] +struct CharWriter; +impl ValueWriter for CharWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_char(v, v.len()); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct StringWriter; +impl ValueWriter for StringWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_string(v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct BoolWriter; +impl ValueWriter for BoolWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Bool(v) = value { + writer.write_boolean(*v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct BinaryValueWriter; +impl ValueWriter for BinaryValueWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + match value { + Datum::Blob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + Datum::BorrowedBlob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + _ => Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }), + } + } +} + +#[derive(Default)] +struct BytesWriter; +impl ValueWriter for BytesWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + match value { + Datum::Blob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + Datum::BorrowedBlob(v) => { + writer.write_binary(v.as_ref(), v.len()); + Ok(()) + } + value => Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }), + } + } +} + +// TODO DecimalWriter + +#[derive(Default)] +struct TinyIntWriter; +impl ValueWriter for TinyIntWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Int8(v) = value { + writer.write_byte(*v as u8); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct SmallIntWriter; +impl ValueWriter for SmallIntWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Int16(v) = value { + writer.write_short(*v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct IntWriter; +impl ValueWriter for IntWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Int32(v) = value { + writer.write_int(*v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct LongWriter; +impl ValueWriter for LongWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Int64(v) = value { + writer.write_long(*v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct FloatWriter; +impl ValueWriter for FloatWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Float32(v) = value { + writer.write_float(v.into_inner()); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct DoubleWriter; +impl ValueWriter for DoubleWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Float64(v) = value { + writer.write_double(v.into_inner()); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) Review Comment: The error message "Wrong ValueWriter used to write value" is unclear about what went wrong. It should specify the expected data type and what was actually provided to help users debug the issue more effectively. ########## crates/fluss/src/row/binary/binary_writer.rs: ########## @@ -0,0 +1,327 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Error::{IllegalArgument, IoUnsupported}; +use crate::error::Result; +use crate::metadata::DataType; +use crate::row::Datum; +use crate::row::binary::BinaryRowFormat; + +/// Writer to write a composite data format, like row, array, +#[allow(dead_code)] +pub trait BinaryWriter { + /// Reset writer to prepare next write + fn reset(&mut self); + + /// Set null to this field + fn set_null_at(&mut self, pos: usize); + + fn write_boolean(&mut self, value: bool); + + fn write_byte(&mut self, value: u8); + + fn write_bytes(&mut self, value: &[u8]); + + fn write_char(&mut self, value: &str, length: usize); + + fn write_string(&mut self, value: &str); + + fn write_short(&mut self, value: i16); + + fn write_int(&mut self, value: i32); + + fn write_long(&mut self, value: i64); + + fn write_float(&mut self, value: f32); + + fn write_double(&mut self, value: f64); + + fn write_binary(&mut self, bytes: &[u8], length: usize); + + // TODO Decimal type + // fn write_decimal(&mut self, pos: i32, value: f64); + + // TODO Timestamp type + // fn write_timestamp_ntz(&mut self, pos: i32, value: i64); + + // TODO Timestamp type + // fn write_timestamp_ltz(&mut self, pos: i32, value: i64); + + // TODO InternalArray, ArraySerializer + // fn write_array(&mut self, pos: i32, value: i64); + + // TODO Row serializer + // fn write_row(&mut self, pos: i32, value: &InternalRow); + + /// Finally, complete write to set real size to binary. + fn complete(&mut self); +} + +/// Accessor for writing the fields/elements of a binary writer during runtime, the +/// fields/elements must be written in the order. +pub trait ValueWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, pos: usize, value: &Datum) -> Result<()>; +} + +#[allow(dead_code)] +impl dyn ValueWriter { + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_value_writer( + element_type: &DataType, + binary_row_format: &BinaryRowFormat, + ) -> Box<dyn ValueWriter> { + let value_writer = + Self::create_not_null_value_writer(element_type, Some(binary_row_format)).unwrap(); + + if !element_type.is_nullable() { + value_writer + } else { + Box::new(NullWriter { + delegate: value_writer, + }) + } + } + + /// Creates an accessor for setting the elements of a binary writer during runtime. + pub fn create_not_null_value_writer( + element_type: &DataType, + _: Option<&BinaryRowFormat>, + ) -> Result<Box<dyn ValueWriter>> { + match element_type { + DataType::Char(_) => Ok(Box::new(CharWriter)), + DataType::String(_) => Ok(Box::new(StringWriter)), + DataType::Boolean(_) => Ok(Box::new(BoolWriter)), + DataType::Binary(_) => Ok(Box::new(BinaryValueWriter)), + DataType::Bytes(_) => Ok(Box::new(BytesWriter)), + // TODO DECIMAL + DataType::TinyInt(_) => Ok(Box::new(TinyIntWriter)), + DataType::SmallInt(_) => Ok(Box::new(SmallIntWriter)), + DataType::Int(_) => Ok(Box::new(IntWriter)), + // TODO DATE + // TODO TIME_WITHOUT_TIME_ZONE + DataType::BigInt(_) => Ok(Box::new(LongWriter)), + DataType::Float(_) => Ok(Box::new(FloatWriter)), + DataType::Double(_) => Ok(Box::new(DoubleWriter)), + // TODO TIMESTAMP_WITHOUT_TIME_ZONE + // TODO TIMESTAMP_WITH_LOCAL_TIME_ZONE + // TODO ARRAY + DataType::Map(_) => Err(IoUnsupported { + message: "Map type is not supported yet. Will be added in Issue #1973.".to_string(), + }), + // TODO ROW + _ => Err(IllegalArgument { + message: format!("Type {} is not supported yet", element_type), + }), + } + } +} + +#[derive(Default)] +struct CharWriter; +impl ValueWriter for CharWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_char(v, v.len()); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct StringWriter; +impl ValueWriter for StringWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::String(v) = value { + writer.write_string(v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) + } +} + +#[derive(Default)] +struct BoolWriter; +impl ValueWriter for BoolWriter { + fn write_value(&self, writer: &mut dyn BinaryWriter, _pos: usize, value: &Datum) -> Result<()> { + if let Datum::Bool(v) = value { + writer.write_boolean(*v); + return Ok(()); + } + + Err(IllegalArgument { + message: format!("Wrong ValueWriter used to write value: {:?}", value), + }) Review Comment: The error message "Wrong ValueWriter used to write value" is unclear about what went wrong. It should specify the expected data type and what was actually provided to help users debug the issue more effectively. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
