Copilot commented on code in PR #308: URL: https://github.com/apache/fluss-rust/pull/308#discussion_r2802987235
########## crates/fluss/src/row/encode/iceberg_key_encoder.rs: ########## @@ -0,0 +1,311 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Iceberg key encoding. +//! +//! This encoder follows Fluss Java's `IcebergKeyEncoder` / `IcebergBinaryRowWriter`: +//! - **Exactly one** key field is supported for Iceberg format. +//! - INT / DATE are encoded as **8-byte little-endian long** (int promoted to long). +//! - TIME is encoded as **microseconds since midnight** in 8-byte little-endian long. +//! - TIMESTAMP (without time zone) is encoded as **microseconds since epoch** in 8-byte +//! little-endian long: `millis * 1000 + (nanos_of_millis / 1000)`. +//! - BIGINT / FLOAT / DOUBLE are encoded as little-endian primitives. +//! - DECIMAL is encoded as **unscaled value bytes** (two's complement big-endian) with **no** +//! length prefix. +//! - STRING / CHAR are encoded as UTF-8 bytes with **no** length prefix. +//! - BYTES / BINARY are encoded as raw bytes with **no** length prefix. + +use crate::error::Error::IllegalArgument; +use crate::error::Result; +use crate::metadata::{DataType, RowType}; +use crate::row::encode::KeyEncoder; +use crate::row::field_getter::FieldGetter; +use crate::row::{Datum, InternalRow}; +use bytes::Bytes; + +#[allow(dead_code)] +pub struct IcebergKeyEncoder { + field_getter: FieldGetter, + field_type: DataType, +} + +impl IcebergKeyEncoder { + pub fn create_key_encoder(row_type: &RowType, keys: &[String]) -> Result<IcebergKeyEncoder> { + if keys.len() != 1 { + return Err(IllegalArgument { + message: format!( + "Key fields must have exactly one field for iceberg format, but got: {keys:?}" + ), + }); + } + + let key = &keys[0]; + let key_index = row_type + .get_field_index(key) + .ok_or_else(|| IllegalArgument { + message: format!("Field {key:?} not found in input row type {row_type:?}"), + })?; + + let field_type = row_type + .fields() + .get(key_index) + .ok_or_else(|| IllegalArgument { + message: format!("Invalid key field index {key_index} for row type {row_type:?}"), + })? + .data_type() + .clone(); + + // Fail fast on unsupported types to match Java behavior. + Self::validate_supported_type(&field_type)?; + + Ok(IcebergKeyEncoder { + field_getter: FieldGetter::create(&field_type, key_index), + field_type, + }) + } + + fn validate_supported_type(field_type: &DataType) -> Result<()> { + match field_type { + DataType::Int(_) + | DataType::BigInt(_) + | DataType::Float(_) + | DataType::Double(_) + | DataType::Date(_) + | DataType::Time(_) + | DataType::Timestamp(_) + | DataType::Decimal(_) + | DataType::String(_) + | DataType::Char(_) + | DataType::Binary(_) + | DataType::Bytes(_) => Ok(()), + + DataType::Array(_) => Err(IllegalArgument { + message: + "Array types cannot be used as bucket keys. Bucket keys must be scalar types." + .to_string(), + }), + DataType::Map(_) => Err(IllegalArgument { + message: + "Map types cannot be used as bucket keys. Bucket keys must be scalar types." + .to_string(), + }), + other => Err(IllegalArgument { + message: format!("Unsupported type for Iceberg key encoder: {other}"), + }), + } + } +} + +#[allow(dead_code)] +impl KeyEncoder for IcebergKeyEncoder { + fn encode_key(&mut self, row: &dyn InternalRow) -> Result<Bytes> { + let value = self.field_getter.get_field(row); + if value.is_null() { + return Err(IllegalArgument { + message: "Cannot encode Iceberg key with null value".to_string(), + }); + } + + let bytes: Vec<u8> = match (&self.field_type, value) { + (DataType::Int(_), Datum::Int32(v)) => (v as i64).to_le_bytes().to_vec(), + (DataType::Date(_), Datum::Date(v)) => (v.get_inner() as i64).to_le_bytes().to_vec(), + + (DataType::Time(_), Datum::Time(v)) => { + let micros = v.get_inner() as i64 * 1000; + micros.to_le_bytes().to_vec() + } + + (DataType::BigInt(_), Datum::Int64(v)) => v.to_le_bytes().to_vec(), + (DataType::Float(_), Datum::Float32(v)) => v.0.to_le_bytes().to_vec(), + (DataType::Double(_), Datum::Float64(v)) => v.0.to_le_bytes().to_vec(), + + (DataType::Timestamp(_), Datum::TimestampNtz(ts)) => { + let micros = + ts.get_millisecond() * 1000 + (ts.get_nano_of_millisecond() as i64 / 1000); + micros.to_le_bytes().to_vec() + } + + (DataType::Decimal(_), Datum::Decimal(d)) => d.to_unscaled_bytes(), + (DataType::String(_), Datum::String(s)) => s.as_bytes().to_vec(), + (DataType::Char(_), Datum::String(s)) => s.as_bytes().to_vec(), + (DataType::Binary(_), Datum::Blob(b)) => b.as_ref().to_vec(), + (DataType::Bytes(_), Datum::Blob(b)) => b.as_ref().to_vec(), + + // FieldGetter uses Datum::String for CHAR, Datum::Blob for BINARY/BYTES. + (expected_type, actual) => { + return Err(IllegalArgument { + message: format!( + "IcebergKeyEncoder type mismatch: expected {expected_type}, got {actual:?}" + ), + }); + } + }; + + Ok(Bytes::from(bytes)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::metadata::DataTypes; + use crate::row::datum::{Date, Time, TimestampNtz}; + use crate::row::{Decimal, GenericRow}; + use bigdecimal::BigDecimal; + use std::str::FromStr; + + #[test] + fn test_single_key_field_requirement() { + let row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int(), DataTypes::string()], + vec!["id", "name"], + ); + + // ok with single key + let _ = IcebergKeyEncoder::create_key_encoder(&row_type, &["id".to_string()]).unwrap(); + + // error with multiple keys + let err = IcebergKeyEncoder::create_key_encoder( + &row_type, + &["id".to_string(), "name".to_string()], + ) + .err() + .unwrap(); + assert!(matches!(err, crate::error::Error::IllegalArgument { .. })); + assert!( + err.to_string() + .contains("Key fields must have exactly one field for iceberg format") + ); + } + + #[test] + fn test_integer_encoding() { + let row_type = RowType::with_data_types_and_field_names(vec![DataTypes::int()], vec!["id"]); + let row = GenericRow::from_data(vec![Datum::from(42i32)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["id".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), (42i64).to_le_bytes().as_slice()); + } + + #[test] + fn test_long_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::bigint()], vec!["id"]); + let v = 1234567890123456789i64; + let row = GenericRow::from_data(vec![Datum::from(v)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["id".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), v.to_le_bytes().as_slice()); + } + + #[test] + fn test_string_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::string()], vec!["name"]); + let s = "Hello Iceberg, Fluss this side!"; + let row = GenericRow::from_data(vec![Datum::from(s)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["name".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), s.as_bytes()); + } + + #[test] + fn test_decimal_encoding() { + let row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::decimal(10, 2)], + vec!["amount"], + ); + + let bd = BigDecimal::from_str("123.45").unwrap(); + let decimal = Decimal::from_big_decimal(bd.clone(), 10, 2).unwrap(); + let row = GenericRow::from_data(vec![Datum::Decimal(decimal.clone())]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["amount".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), decimal.to_unscaled_bytes().as_slice()); + } + + #[test] + fn test_timestamp_encoding() { + let row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::timestamp_with_precision(6)], + vec!["event_time"], + ); + + let millis = 1698235273182i64; + let nanos = 123_456i32; + let ts = TimestampNtz::from_millis_nanos(millis, nanos).unwrap(); + let micros = millis * 1000 + (nanos as i64 / 1000); + + let row = GenericRow::from_data(vec![Datum::TimestampNtz(ts)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["event_time".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), micros.to_le_bytes().as_slice()); + } + + #[test] + fn test_date_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::date()], vec!["date"]); + + let days = 19655i32; // 2023-10-25 + let row = GenericRow::from_data(vec![Datum::Date(Date::new(days))]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["date".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), (days as i64).to_le_bytes().as_slice()); + } + + #[test] + fn test_time_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::time()], vec!["time"]); + + let millis_since_midnight = 34_200_000i32; + let micros = millis_since_midnight as i64 * 1000; + let row = GenericRow::from_data(vec![Datum::Time(Time::new(millis_since_midnight))]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["time".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), micros.to_le_bytes().as_slice()); + } + + #[test] + fn test_binary_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::bytes()], vec!["data"]); + + let bytes = b"Hello i only understand binary data".as_slice(); + let row = GenericRow::from_data(vec![Datum::from(bytes)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["data".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), bytes); + } Review Comment: Missing test coverage for null key handling. The encoder has logic to reject null values at lines 117-121, but there is no test case that verifies this behavior. Consider adding a test that attempts to encode a row with a null key value and verifies that it returns an IllegalArgument error with the message "Cannot encode Iceberg key with null value". ```suggestion } #[test] fn test_null_key_value_rejected() { let row_type = RowType::with_data_types_and_field_names(vec![DataTypes::int32()], vec!["id"]); // Create a row with a null value for the key field. let row = GenericRow::from_data(vec![Datum::Null]); let mut encoder = IcebergKeyEncoder::create_key_encoder(&row_type, &["id".to_string()]).unwrap(); let result = encoder.encode_key(&row); match result { Err(IllegalArgument(msg)) => { assert_eq!(msg, "Cannot encode Iceberg key with null value"); } _ => panic!("Expected IllegalArgument error for null key value"), } } ``` ########## crates/fluss/src/row/encode/iceberg_key_encoder.rs: ########## @@ -0,0 +1,311 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Iceberg key encoding. +//! +//! This encoder follows Fluss Java's `IcebergKeyEncoder` / `IcebergBinaryRowWriter`: +//! - **Exactly one** key field is supported for Iceberg format. +//! - INT / DATE are encoded as **8-byte little-endian long** (int promoted to long). +//! - TIME is encoded as **microseconds since midnight** in 8-byte little-endian long. +//! - TIMESTAMP (without time zone) is encoded as **microseconds since epoch** in 8-byte +//! little-endian long: `millis * 1000 + (nanos_of_millis / 1000)`. +//! - BIGINT / FLOAT / DOUBLE are encoded as little-endian primitives. +//! - DECIMAL is encoded as **unscaled value bytes** (two's complement big-endian) with **no** +//! length prefix. +//! - STRING / CHAR are encoded as UTF-8 bytes with **no** length prefix. +//! - BYTES / BINARY are encoded as raw bytes with **no** length prefix. + +use crate::error::Error::IllegalArgument; +use crate::error::Result; +use crate::metadata::{DataType, RowType}; +use crate::row::encode::KeyEncoder; +use crate::row::field_getter::FieldGetter; +use crate::row::{Datum, InternalRow}; +use bytes::Bytes; + +#[allow(dead_code)] +pub struct IcebergKeyEncoder { + field_getter: FieldGetter, + field_type: DataType, +} + +impl IcebergKeyEncoder { + pub fn create_key_encoder(row_type: &RowType, keys: &[String]) -> Result<IcebergKeyEncoder> { + if keys.len() != 1 { + return Err(IllegalArgument { + message: format!( + "Key fields must have exactly one field for iceberg format, but got: {keys:?}" + ), + }); + } + + let key = &keys[0]; + let key_index = row_type + .get_field_index(key) + .ok_or_else(|| IllegalArgument { + message: format!("Field {key:?} not found in input row type {row_type:?}"), + })?; + + let field_type = row_type + .fields() + .get(key_index) + .ok_or_else(|| IllegalArgument { + message: format!("Invalid key field index {key_index} for row type {row_type:?}"), + })? + .data_type() + .clone(); + + // Fail fast on unsupported types to match Java behavior. + Self::validate_supported_type(&field_type)?; + + Ok(IcebergKeyEncoder { + field_getter: FieldGetter::create(&field_type, key_index), + field_type, + }) + } + + fn validate_supported_type(field_type: &DataType) -> Result<()> { + match field_type { + DataType::Int(_) + | DataType::BigInt(_) + | DataType::Float(_) + | DataType::Double(_) + | DataType::Date(_) + | DataType::Time(_) + | DataType::Timestamp(_) + | DataType::Decimal(_) + | DataType::String(_) + | DataType::Char(_) + | DataType::Binary(_) + | DataType::Bytes(_) => Ok(()), + + DataType::Array(_) => Err(IllegalArgument { + message: + "Array types cannot be used as bucket keys. Bucket keys must be scalar types." + .to_string(), + }), + DataType::Map(_) => Err(IllegalArgument { + message: + "Map types cannot be used as bucket keys. Bucket keys must be scalar types." + .to_string(), + }), + other => Err(IllegalArgument { + message: format!("Unsupported type for Iceberg key encoder: {other}"), + }), + } + } +} + +#[allow(dead_code)] +impl KeyEncoder for IcebergKeyEncoder { + fn encode_key(&mut self, row: &dyn InternalRow) -> Result<Bytes> { + let value = self.field_getter.get_field(row); + if value.is_null() { + return Err(IllegalArgument { + message: "Cannot encode Iceberg key with null value".to_string(), + }); + } + + let bytes: Vec<u8> = match (&self.field_type, value) { + (DataType::Int(_), Datum::Int32(v)) => (v as i64).to_le_bytes().to_vec(), + (DataType::Date(_), Datum::Date(v)) => (v.get_inner() as i64).to_le_bytes().to_vec(), + + (DataType::Time(_), Datum::Time(v)) => { + let micros = v.get_inner() as i64 * 1000; + micros.to_le_bytes().to_vec() + } + + (DataType::BigInt(_), Datum::Int64(v)) => v.to_le_bytes().to_vec(), + (DataType::Float(_), Datum::Float32(v)) => v.0.to_le_bytes().to_vec(), + (DataType::Double(_), Datum::Float64(v)) => v.0.to_le_bytes().to_vec(), + + (DataType::Timestamp(_), Datum::TimestampNtz(ts)) => { + let micros = + ts.get_millisecond() * 1000 + (ts.get_nano_of_millisecond() as i64 / 1000); + micros.to_le_bytes().to_vec() + } + + (DataType::Decimal(_), Datum::Decimal(d)) => d.to_unscaled_bytes(), + (DataType::String(_), Datum::String(s)) => s.as_bytes().to_vec(), + (DataType::Char(_), Datum::String(s)) => s.as_bytes().to_vec(), + (DataType::Binary(_), Datum::Blob(b)) => b.as_ref().to_vec(), + (DataType::Bytes(_), Datum::Blob(b)) => b.as_ref().to_vec(), + + // FieldGetter uses Datum::String for CHAR, Datum::Blob for BINARY/BYTES. + (expected_type, actual) => { + return Err(IllegalArgument { + message: format!( + "IcebergKeyEncoder type mismatch: expected {expected_type}, got {actual:?}" + ), + }); + } + }; + + Ok(Bytes::from(bytes)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::metadata::DataTypes; + use crate::row::datum::{Date, Time, TimestampNtz}; + use crate::row::{Decimal, GenericRow}; + use bigdecimal::BigDecimal; + use std::str::FromStr; + + #[test] + fn test_single_key_field_requirement() { + let row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int(), DataTypes::string()], + vec!["id", "name"], + ); + + // ok with single key + let _ = IcebergKeyEncoder::create_key_encoder(&row_type, &["id".to_string()]).unwrap(); + + // error with multiple keys + let err = IcebergKeyEncoder::create_key_encoder( + &row_type, + &["id".to_string(), "name".to_string()], + ) + .err() + .unwrap(); + assert!(matches!(err, crate::error::Error::IllegalArgument { .. })); + assert!( + err.to_string() + .contains("Key fields must have exactly one field for iceberg format") + ); + } + + #[test] + fn test_integer_encoding() { + let row_type = RowType::with_data_types_and_field_names(vec![DataTypes::int()], vec!["id"]); + let row = GenericRow::from_data(vec![Datum::from(42i32)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["id".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), (42i64).to_le_bytes().as_slice()); + } + + #[test] + fn test_long_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::bigint()], vec!["id"]); + let v = 1234567890123456789i64; + let row = GenericRow::from_data(vec![Datum::from(v)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["id".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), v.to_le_bytes().as_slice()); + } + + #[test] + fn test_string_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::string()], vec!["name"]); + let s = "Hello Iceberg, Fluss this side!"; + let row = GenericRow::from_data(vec![Datum::from(s)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["name".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), s.as_bytes()); + } + + #[test] + fn test_decimal_encoding() { + let row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::decimal(10, 2)], + vec!["amount"], + ); + + let bd = BigDecimal::from_str("123.45").unwrap(); + let decimal = Decimal::from_big_decimal(bd.clone(), 10, 2).unwrap(); + let row = GenericRow::from_data(vec![Datum::Decimal(decimal.clone())]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["amount".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), decimal.to_unscaled_bytes().as_slice()); + } + + #[test] + fn test_timestamp_encoding() { + let row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::timestamp_with_precision(6)], + vec!["event_time"], + ); + + let millis = 1698235273182i64; + let nanos = 123_456i32; + let ts = TimestampNtz::from_millis_nanos(millis, nanos).unwrap(); + let micros = millis * 1000 + (nanos as i64 / 1000); + + let row = GenericRow::from_data(vec![Datum::TimestampNtz(ts)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["event_time".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), micros.to_le_bytes().as_slice()); + } + + #[test] + fn test_date_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::date()], vec!["date"]); + + let days = 19655i32; // 2023-10-25 + let row = GenericRow::from_data(vec![Datum::Date(Date::new(days))]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["date".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), (days as i64).to_le_bytes().as_slice()); + } + + #[test] + fn test_time_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::time()], vec!["time"]); + + let millis_since_midnight = 34_200_000i32; + let micros = millis_since_midnight as i64 * 1000; + let row = GenericRow::from_data(vec![Datum::Time(Time::new(millis_since_midnight))]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["time".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), micros.to_le_bytes().as_slice()); + } + + #[test] + fn test_binary_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::bytes()], vec!["data"]); + + let bytes = b"Hello i only understand binary data".as_slice(); + let row = GenericRow::from_data(vec![Datum::from(bytes)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["data".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), bytes); + } Review Comment: Missing test coverage for Char type. The encoder supports Char type (line 144) which is encoded the same way as String, but there is no dedicated test case for it. Consider adding a test to verify that Char fields are correctly encoded as UTF-8 bytes. ```suggestion } #[test] fn test_char_encoding() { let row_type = RowType::with_data_types_and_field_names( vec![DataTypes::char_type(5)], vec!["char_field"], ); let value = "hello"; let row = GenericRow::from_data(vec![Datum::from(value)]); let mut encoder = IcebergKeyEncoder::create_key_encoder( &row_type, &["char_field".to_string()], ) .unwrap(); let encoded = encoder.encode_key(&row).unwrap(); assert_eq!(encoded.as_ref(), value.as_bytes()); } ``` ########## crates/fluss/src/row/encode/iceberg_key_encoder.rs: ########## @@ -0,0 +1,311 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Iceberg key encoding. +//! +//! This encoder follows Fluss Java's `IcebergKeyEncoder` / `IcebergBinaryRowWriter`: +//! - **Exactly one** key field is supported for Iceberg format. +//! - INT / DATE are encoded as **8-byte little-endian long** (int promoted to long). +//! - TIME is encoded as **microseconds since midnight** in 8-byte little-endian long. +//! - TIMESTAMP (without time zone) is encoded as **microseconds since epoch** in 8-byte +//! little-endian long: `millis * 1000 + (nanos_of_millis / 1000)`. +//! - BIGINT / FLOAT / DOUBLE are encoded as little-endian primitives. +//! - DECIMAL is encoded as **unscaled value bytes** (two's complement big-endian) with **no** +//! length prefix. +//! - STRING / CHAR are encoded as UTF-8 bytes with **no** length prefix. +//! - BYTES / BINARY are encoded as raw bytes with **no** length prefix. + +use crate::error::Error::IllegalArgument; +use crate::error::Result; +use crate::metadata::{DataType, RowType}; +use crate::row::encode::KeyEncoder; +use crate::row::field_getter::FieldGetter; +use crate::row::{Datum, InternalRow}; +use bytes::Bytes; + +#[allow(dead_code)] +pub struct IcebergKeyEncoder { + field_getter: FieldGetter, + field_type: DataType, +} + +impl IcebergKeyEncoder { + pub fn create_key_encoder(row_type: &RowType, keys: &[String]) -> Result<IcebergKeyEncoder> { + if keys.len() != 1 { + return Err(IllegalArgument { + message: format!( + "Key fields must have exactly one field for iceberg format, but got: {keys:?}" + ), + }); + } + + let key = &keys[0]; + let key_index = row_type + .get_field_index(key) + .ok_or_else(|| IllegalArgument { + message: format!("Field {key:?} not found in input row type {row_type:?}"), + })?; + + let field_type = row_type + .fields() + .get(key_index) + .ok_or_else(|| IllegalArgument { + message: format!("Invalid key field index {key_index} for row type {row_type:?}"), + })? + .data_type() + .clone(); + + // Fail fast on unsupported types to match Java behavior. + Self::validate_supported_type(&field_type)?; + + Ok(IcebergKeyEncoder { + field_getter: FieldGetter::create(&field_type, key_index), + field_type, + }) + } + + fn validate_supported_type(field_type: &DataType) -> Result<()> { + match field_type { + DataType::Int(_) + | DataType::BigInt(_) + | DataType::Float(_) + | DataType::Double(_) + | DataType::Date(_) + | DataType::Time(_) + | DataType::Timestamp(_) + | DataType::Decimal(_) + | DataType::String(_) + | DataType::Char(_) + | DataType::Binary(_) + | DataType::Bytes(_) => Ok(()), + + DataType::Array(_) => Err(IllegalArgument { + message: + "Array types cannot be used as bucket keys. Bucket keys must be scalar types." + .to_string(), + }), + DataType::Map(_) => Err(IllegalArgument { + message: + "Map types cannot be used as bucket keys. Bucket keys must be scalar types." + .to_string(), + }), + other => Err(IllegalArgument { + message: format!("Unsupported type for Iceberg key encoder: {other}"), + }), + } + } +} + +#[allow(dead_code)] +impl KeyEncoder for IcebergKeyEncoder { + fn encode_key(&mut self, row: &dyn InternalRow) -> Result<Bytes> { + let value = self.field_getter.get_field(row); + if value.is_null() { + return Err(IllegalArgument { + message: "Cannot encode Iceberg key with null value".to_string(), + }); + } + + let bytes: Vec<u8> = match (&self.field_type, value) { + (DataType::Int(_), Datum::Int32(v)) => (v as i64).to_le_bytes().to_vec(), + (DataType::Date(_), Datum::Date(v)) => (v.get_inner() as i64).to_le_bytes().to_vec(), + + (DataType::Time(_), Datum::Time(v)) => { + let micros = v.get_inner() as i64 * 1000; + micros.to_le_bytes().to_vec() + } + + (DataType::BigInt(_), Datum::Int64(v)) => v.to_le_bytes().to_vec(), + (DataType::Float(_), Datum::Float32(v)) => v.0.to_le_bytes().to_vec(), + (DataType::Double(_), Datum::Float64(v)) => v.0.to_le_bytes().to_vec(), + + (DataType::Timestamp(_), Datum::TimestampNtz(ts)) => { + let micros = + ts.get_millisecond() * 1000 + (ts.get_nano_of_millisecond() as i64 / 1000); + micros.to_le_bytes().to_vec() + } + + (DataType::Decimal(_), Datum::Decimal(d)) => d.to_unscaled_bytes(), + (DataType::String(_), Datum::String(s)) => s.as_bytes().to_vec(), + (DataType::Char(_), Datum::String(s)) => s.as_bytes().to_vec(), + (DataType::Binary(_), Datum::Blob(b)) => b.as_ref().to_vec(), + (DataType::Bytes(_), Datum::Blob(b)) => b.as_ref().to_vec(), + + // FieldGetter uses Datum::String for CHAR, Datum::Blob for BINARY/BYTES. + (expected_type, actual) => { + return Err(IllegalArgument { + message: format!( + "IcebergKeyEncoder type mismatch: expected {expected_type}, got {actual:?}" + ), + }); + } + }; Review Comment: The encode_key function is missing match arms for several data types that could potentially be returned by FieldGetter: - Boolean (DataType::Boolean) - would return Datum::Bool - TinyInt (DataType::TinyInt) - would return Datum::Int8 - SmallInt (DataType::SmallInt) - would return Datum::Int16 - TimestampLtz (DataType::TimestampLTz) - would return Datum::TimestampLtz If these types pass validation (e.g., if validation is updated), the match statement would fail at runtime with an "IcebergKeyEncoder type mismatch" error at line 150-154. Consider adding explicit handling for these types in the match statement, even if only to encode them (for numeric types) or to return a more specific error message explaining they are unsupported for Iceberg keys. ########## crates/fluss/src/row/encode/iceberg_key_encoder.rs: ########## @@ -0,0 +1,311 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Iceberg key encoding. +//! +//! This encoder follows Fluss Java's `IcebergKeyEncoder` / `IcebergBinaryRowWriter`: +//! - **Exactly one** key field is supported for Iceberg format. +//! - INT / DATE are encoded as **8-byte little-endian long** (int promoted to long). +//! - TIME is encoded as **microseconds since midnight** in 8-byte little-endian long. +//! - TIMESTAMP (without time zone) is encoded as **microseconds since epoch** in 8-byte +//! little-endian long: `millis * 1000 + (nanos_of_millis / 1000)`. +//! - BIGINT / FLOAT / DOUBLE are encoded as little-endian primitives. +//! - DECIMAL is encoded as **unscaled value bytes** (two's complement big-endian) with **no** +//! length prefix. +//! - STRING / CHAR are encoded as UTF-8 bytes with **no** length prefix. +//! - BYTES / BINARY are encoded as raw bytes with **no** length prefix. + +use crate::error::Error::IllegalArgument; +use crate::error::Result; +use crate::metadata::{DataType, RowType}; +use crate::row::encode::KeyEncoder; +use crate::row::field_getter::FieldGetter; +use crate::row::{Datum, InternalRow}; +use bytes::Bytes; + +#[allow(dead_code)] +pub struct IcebergKeyEncoder { + field_getter: FieldGetter, + field_type: DataType, +} + +impl IcebergKeyEncoder { + pub fn create_key_encoder(row_type: &RowType, keys: &[String]) -> Result<IcebergKeyEncoder> { + if keys.len() != 1 { + return Err(IllegalArgument { + message: format!( + "Key fields must have exactly one field for iceberg format, but got: {keys:?}" + ), + }); + } + + let key = &keys[0]; + let key_index = row_type + .get_field_index(key) + .ok_or_else(|| IllegalArgument { + message: format!("Field {key:?} not found in input row type {row_type:?}"), + })?; + + let field_type = row_type + .fields() + .get(key_index) + .ok_or_else(|| IllegalArgument { + message: format!("Invalid key field index {key_index} for row type {row_type:?}"), + })? + .data_type() + .clone(); + + // Fail fast on unsupported types to match Java behavior. + Self::validate_supported_type(&field_type)?; + + Ok(IcebergKeyEncoder { + field_getter: FieldGetter::create(&field_type, key_index), + field_type, + }) + } + + fn validate_supported_type(field_type: &DataType) -> Result<()> { + match field_type { + DataType::Int(_) + | DataType::BigInt(_) + | DataType::Float(_) + | DataType::Double(_) + | DataType::Date(_) + | DataType::Time(_) + | DataType::Timestamp(_) + | DataType::Decimal(_) + | DataType::String(_) + | DataType::Char(_) + | DataType::Binary(_) + | DataType::Bytes(_) => Ok(()), + + DataType::Array(_) => Err(IllegalArgument { + message: + "Array types cannot be used as bucket keys. Bucket keys must be scalar types." + .to_string(), + }), + DataType::Map(_) => Err(IllegalArgument { + message: + "Map types cannot be used as bucket keys. Bucket keys must be scalar types." + .to_string(), + }), + other => Err(IllegalArgument { + message: format!("Unsupported type for Iceberg key encoder: {other}"), + }), + } + } Review Comment: The type validation is incomplete. Several data types are supported by the FieldGetter but are missing from this validation: - Boolean (DataType::Boolean) - which would be returned as Datum::Bool - TinyInt (DataType::TinyInt) - which would be returned as Datum::Int8 - SmallInt (DataType::SmallInt) - which would be returned as Datum::Int16 - TimestampLtz (DataType::TimestampLTz) - which would be returned as Datum::TimestampLtz Without explicit handling for these types, if a user attempts to use them as key fields, they would get a generic "Unsupported type" error at validation time. However, if this is intentional and matches the Java implementation behavior, then these types should be listed explicitly in validation with appropriate error messages explaining they are not supported for Iceberg keys. ########## crates/fluss/src/row/encode/iceberg_key_encoder.rs: ########## @@ -0,0 +1,311 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Iceberg key encoding. +//! +//! This encoder follows Fluss Java's `IcebergKeyEncoder` / `IcebergBinaryRowWriter`: +//! - **Exactly one** key field is supported for Iceberg format. +//! - INT / DATE are encoded as **8-byte little-endian long** (int promoted to long). +//! - TIME is encoded as **microseconds since midnight** in 8-byte little-endian long. +//! - TIMESTAMP (without time zone) is encoded as **microseconds since epoch** in 8-byte +//! little-endian long: `millis * 1000 + (nanos_of_millis / 1000)`. +//! - BIGINT / FLOAT / DOUBLE are encoded as little-endian primitives. +//! - DECIMAL is encoded as **unscaled value bytes** (two's complement big-endian) with **no** +//! length prefix. +//! - STRING / CHAR are encoded as UTF-8 bytes with **no** length prefix. +//! - BYTES / BINARY are encoded as raw bytes with **no** length prefix. + +use crate::error::Error::IllegalArgument; +use crate::error::Result; +use crate::metadata::{DataType, RowType}; +use crate::row::encode::KeyEncoder; +use crate::row::field_getter::FieldGetter; +use crate::row::{Datum, InternalRow}; +use bytes::Bytes; + +#[allow(dead_code)] +pub struct IcebergKeyEncoder { + field_getter: FieldGetter, + field_type: DataType, +} + +impl IcebergKeyEncoder { + pub fn create_key_encoder(row_type: &RowType, keys: &[String]) -> Result<IcebergKeyEncoder> { + if keys.len() != 1 { + return Err(IllegalArgument { + message: format!( + "Key fields must have exactly one field for iceberg format, but got: {keys:?}" + ), + }); + } + + let key = &keys[0]; + let key_index = row_type + .get_field_index(key) + .ok_or_else(|| IllegalArgument { + message: format!("Field {key:?} not found in input row type {row_type:?}"), + })?; + + let field_type = row_type + .fields() + .get(key_index) + .ok_or_else(|| IllegalArgument { + message: format!("Invalid key field index {key_index} for row type {row_type:?}"), + })? + .data_type() + .clone(); + + // Fail fast on unsupported types to match Java behavior. + Self::validate_supported_type(&field_type)?; + + Ok(IcebergKeyEncoder { + field_getter: FieldGetter::create(&field_type, key_index), + field_type, + }) + } + + fn validate_supported_type(field_type: &DataType) -> Result<()> { + match field_type { + DataType::Int(_) + | DataType::BigInt(_) + | DataType::Float(_) + | DataType::Double(_) + | DataType::Date(_) + | DataType::Time(_) + | DataType::Timestamp(_) + | DataType::Decimal(_) + | DataType::String(_) + | DataType::Char(_) + | DataType::Binary(_) + | DataType::Bytes(_) => Ok(()), + + DataType::Array(_) => Err(IllegalArgument { + message: + "Array types cannot be used as bucket keys. Bucket keys must be scalar types." + .to_string(), + }), + DataType::Map(_) => Err(IllegalArgument { + message: + "Map types cannot be used as bucket keys. Bucket keys must be scalar types." + .to_string(), + }), + other => Err(IllegalArgument { + message: format!("Unsupported type for Iceberg key encoder: {other}"), + }), + } + } +} + +#[allow(dead_code)] +impl KeyEncoder for IcebergKeyEncoder { + fn encode_key(&mut self, row: &dyn InternalRow) -> Result<Bytes> { + let value = self.field_getter.get_field(row); + if value.is_null() { + return Err(IllegalArgument { + message: "Cannot encode Iceberg key with null value".to_string(), + }); + } + + let bytes: Vec<u8> = match (&self.field_type, value) { + (DataType::Int(_), Datum::Int32(v)) => (v as i64).to_le_bytes().to_vec(), + (DataType::Date(_), Datum::Date(v)) => (v.get_inner() as i64).to_le_bytes().to_vec(), + + (DataType::Time(_), Datum::Time(v)) => { + let micros = v.get_inner() as i64 * 1000; + micros.to_le_bytes().to_vec() + } + + (DataType::BigInt(_), Datum::Int64(v)) => v.to_le_bytes().to_vec(), + (DataType::Float(_), Datum::Float32(v)) => v.0.to_le_bytes().to_vec(), + (DataType::Double(_), Datum::Float64(v)) => v.0.to_le_bytes().to_vec(), + + (DataType::Timestamp(_), Datum::TimestampNtz(ts)) => { + let micros = + ts.get_millisecond() * 1000 + (ts.get_nano_of_millisecond() as i64 / 1000); + micros.to_le_bytes().to_vec() + } + + (DataType::Decimal(_), Datum::Decimal(d)) => d.to_unscaled_bytes(), + (DataType::String(_), Datum::String(s)) => s.as_bytes().to_vec(), + (DataType::Char(_), Datum::String(s)) => s.as_bytes().to_vec(), + (DataType::Binary(_), Datum::Blob(b)) => b.as_ref().to_vec(), + (DataType::Bytes(_), Datum::Blob(b)) => b.as_ref().to_vec(), + + // FieldGetter uses Datum::String for CHAR, Datum::Blob for BINARY/BYTES. + (expected_type, actual) => { + return Err(IllegalArgument { + message: format!( + "IcebergKeyEncoder type mismatch: expected {expected_type}, got {actual:?}" + ), + }); + } + }; + + Ok(Bytes::from(bytes)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::metadata::DataTypes; + use crate::row::datum::{Date, Time, TimestampNtz}; + use crate::row::{Decimal, GenericRow}; + use bigdecimal::BigDecimal; + use std::str::FromStr; + + #[test] + fn test_single_key_field_requirement() { + let row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int(), DataTypes::string()], + vec!["id", "name"], + ); + + // ok with single key + let _ = IcebergKeyEncoder::create_key_encoder(&row_type, &["id".to_string()]).unwrap(); + + // error with multiple keys + let err = IcebergKeyEncoder::create_key_encoder( + &row_type, + &["id".to_string(), "name".to_string()], + ) + .err() + .unwrap(); + assert!(matches!(err, crate::error::Error::IllegalArgument { .. })); + assert!( + err.to_string() + .contains("Key fields must have exactly one field for iceberg format") + ); + } + + #[test] + fn test_integer_encoding() { + let row_type = RowType::with_data_types_and_field_names(vec![DataTypes::int()], vec!["id"]); + let row = GenericRow::from_data(vec![Datum::from(42i32)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["id".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), (42i64).to_le_bytes().as_slice()); + } + + #[test] + fn test_long_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::bigint()], vec!["id"]); + let v = 1234567890123456789i64; + let row = GenericRow::from_data(vec![Datum::from(v)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["id".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), v.to_le_bytes().as_slice()); + } + + #[test] + fn test_string_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::string()], vec!["name"]); + let s = "Hello Iceberg, Fluss this side!"; + let row = GenericRow::from_data(vec![Datum::from(s)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["name".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), s.as_bytes()); + } + + #[test] + fn test_decimal_encoding() { + let row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::decimal(10, 2)], + vec!["amount"], + ); + + let bd = BigDecimal::from_str("123.45").unwrap(); + let decimal = Decimal::from_big_decimal(bd.clone(), 10, 2).unwrap(); + let row = GenericRow::from_data(vec![Datum::Decimal(decimal.clone())]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["amount".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), decimal.to_unscaled_bytes().as_slice()); + } + + #[test] + fn test_timestamp_encoding() { + let row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::timestamp_with_precision(6)], + vec!["event_time"], + ); + + let millis = 1698235273182i64; + let nanos = 123_456i32; + let ts = TimestampNtz::from_millis_nanos(millis, nanos).unwrap(); + let micros = millis * 1000 + (nanos as i64 / 1000); + + let row = GenericRow::from_data(vec![Datum::TimestampNtz(ts)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["event_time".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), micros.to_le_bytes().as_slice()); + } + + #[test] + fn test_date_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::date()], vec!["date"]); + + let days = 19655i32; // 2023-10-25 + let row = GenericRow::from_data(vec![Datum::Date(Date::new(days))]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["date".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), (days as i64).to_le_bytes().as_slice()); + } + + #[test] + fn test_time_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::time()], vec!["time"]); + + let millis_since_midnight = 34_200_000i32; + let micros = millis_since_midnight as i64 * 1000; + let row = GenericRow::from_data(vec![Datum::Time(Time::new(millis_since_midnight))]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["time".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), micros.to_le_bytes().as_slice()); + } + + #[test] + fn test_binary_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::bytes()], vec!["data"]); + + let bytes = b"Hello i only understand binary data".as_slice(); + let row = GenericRow::from_data(vec![Datum::from(bytes)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["data".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), bytes); + } Review Comment: Missing test coverage for Float and Double types. The encoder supports these types (lines 133-134), but there are no test cases that verify their encoding. Consider adding tests for both DataType::Float and DataType::Double to ensure they are correctly encoded as little-endian primitives. ```suggestion } #[test] fn test_float_encoding() { let row_type = RowType::with_data_types_and_field_names(vec![DataTypes::float()], vec!["value"]); let value: f32 = 3.1415927_f32; let row = GenericRow::from_data(vec![Datum::Float(value)]); let mut encoder = IcebergKeyEncoder::create_key_encoder(&row_type, &["value".to_string()]).unwrap(); let encoded = encoder.encode_key(&row).unwrap(); assert_eq!(encoded.as_ref(), value.to_le_bytes().as_slice()); } #[test] fn test_double_encoding() { let row_type = RowType::with_data_types_and_field_names(vec![DataTypes::double()], vec!["value"]); let value: f64 = 3.141592653589793_f64; let row = GenericRow::from_data(vec![Datum::Double(value)]); let mut encoder = IcebergKeyEncoder::create_key_encoder(&row_type, &["value".to_string()]).unwrap(); let encoded = encoder.encode_key(&row).unwrap(); assert_eq!(encoded.as_ref(), value.to_le_bytes().as_slice()); } ``` ########## crates/fluss/src/row/encode/iceberg_key_encoder.rs: ########## @@ -0,0 +1,311 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Iceberg key encoding. +//! +//! This encoder follows Fluss Java's `IcebergKeyEncoder` / `IcebergBinaryRowWriter`: +//! - **Exactly one** key field is supported for Iceberg format. +//! - INT / DATE are encoded as **8-byte little-endian long** (int promoted to long). +//! - TIME is encoded as **microseconds since midnight** in 8-byte little-endian long. +//! - TIMESTAMP (without time zone) is encoded as **microseconds since epoch** in 8-byte +//! little-endian long: `millis * 1000 + (nanos_of_millis / 1000)`. +//! - BIGINT / FLOAT / DOUBLE are encoded as little-endian primitives. +//! - DECIMAL is encoded as **unscaled value bytes** (two's complement big-endian) with **no** +//! length prefix. +//! - STRING / CHAR are encoded as UTF-8 bytes with **no** length prefix. +//! - BYTES / BINARY are encoded as raw bytes with **no** length prefix. + +use crate::error::Error::IllegalArgument; +use crate::error::Result; +use crate::metadata::{DataType, RowType}; +use crate::row::encode::KeyEncoder; +use crate::row::field_getter::FieldGetter; +use crate::row::{Datum, InternalRow}; +use bytes::Bytes; + +#[allow(dead_code)] +pub struct IcebergKeyEncoder { + field_getter: FieldGetter, + field_type: DataType, +} + +impl IcebergKeyEncoder { + pub fn create_key_encoder(row_type: &RowType, keys: &[String]) -> Result<IcebergKeyEncoder> { + if keys.len() != 1 { + return Err(IllegalArgument { + message: format!( + "Key fields must have exactly one field for iceberg format, but got: {keys:?}" + ), + }); + } + + let key = &keys[0]; + let key_index = row_type + .get_field_index(key) + .ok_or_else(|| IllegalArgument { + message: format!("Field {key:?} not found in input row type {row_type:?}"), + })?; + + let field_type = row_type + .fields() + .get(key_index) + .ok_or_else(|| IllegalArgument { + message: format!("Invalid key field index {key_index} for row type {row_type:?}"), + })? + .data_type() + .clone(); + + // Fail fast on unsupported types to match Java behavior. + Self::validate_supported_type(&field_type)?; + + Ok(IcebergKeyEncoder { + field_getter: FieldGetter::create(&field_type, key_index), + field_type, + }) + } + + fn validate_supported_type(field_type: &DataType) -> Result<()> { + match field_type { + DataType::Int(_) + | DataType::BigInt(_) + | DataType::Float(_) + | DataType::Double(_) + | DataType::Date(_) + | DataType::Time(_) + | DataType::Timestamp(_) + | DataType::Decimal(_) + | DataType::String(_) + | DataType::Char(_) + | DataType::Binary(_) + | DataType::Bytes(_) => Ok(()), + + DataType::Array(_) => Err(IllegalArgument { + message: + "Array types cannot be used as bucket keys. Bucket keys must be scalar types." + .to_string(), + }), + DataType::Map(_) => Err(IllegalArgument { + message: + "Map types cannot be used as bucket keys. Bucket keys must be scalar types." + .to_string(), + }), + other => Err(IllegalArgument { + message: format!("Unsupported type for Iceberg key encoder: {other}"), + }), + } + } +} + +#[allow(dead_code)] +impl KeyEncoder for IcebergKeyEncoder { + fn encode_key(&mut self, row: &dyn InternalRow) -> Result<Bytes> { + let value = self.field_getter.get_field(row); + if value.is_null() { + return Err(IllegalArgument { + message: "Cannot encode Iceberg key with null value".to_string(), + }); + } + + let bytes: Vec<u8> = match (&self.field_type, value) { + (DataType::Int(_), Datum::Int32(v)) => (v as i64).to_le_bytes().to_vec(), + (DataType::Date(_), Datum::Date(v)) => (v.get_inner() as i64).to_le_bytes().to_vec(), + + (DataType::Time(_), Datum::Time(v)) => { + let micros = v.get_inner() as i64 * 1000; + micros.to_le_bytes().to_vec() + } + + (DataType::BigInt(_), Datum::Int64(v)) => v.to_le_bytes().to_vec(), + (DataType::Float(_), Datum::Float32(v)) => v.0.to_le_bytes().to_vec(), + (DataType::Double(_), Datum::Float64(v)) => v.0.to_le_bytes().to_vec(), + + (DataType::Timestamp(_), Datum::TimestampNtz(ts)) => { + let micros = + ts.get_millisecond() * 1000 + (ts.get_nano_of_millisecond() as i64 / 1000); + micros.to_le_bytes().to_vec() Review Comment: Potential integer overflow in timestamp conversion. At line 138, `ts.get_millisecond() * 1000` could overflow for very large timestamp values. The maximum i64 value is 9,223,372,036,854,775,807, so multiplying a timestamp in milliseconds by 1000 could overflow for timestamps beyond year 292,277,026. Consider using checked_mul() to detect overflow, or documenting this limitation if it's acceptable based on the expected range of timestamps in Iceberg format. ########## crates/fluss/src/row/encode/iceberg_key_encoder.rs: ########## @@ -0,0 +1,311 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Iceberg key encoding. +//! +//! This encoder follows Fluss Java's `IcebergKeyEncoder` / `IcebergBinaryRowWriter`: +//! - **Exactly one** key field is supported for Iceberg format. +//! - INT / DATE are encoded as **8-byte little-endian long** (int promoted to long). +//! - TIME is encoded as **microseconds since midnight** in 8-byte little-endian long. +//! - TIMESTAMP (without time zone) is encoded as **microseconds since epoch** in 8-byte +//! little-endian long: `millis * 1000 + (nanos_of_millis / 1000)`. +//! - BIGINT / FLOAT / DOUBLE are encoded as little-endian primitives. +//! - DECIMAL is encoded as **unscaled value bytes** (two's complement big-endian) with **no** +//! length prefix. +//! - STRING / CHAR are encoded as UTF-8 bytes with **no** length prefix. +//! - BYTES / BINARY are encoded as raw bytes with **no** length prefix. + +use crate::error::Error::IllegalArgument; +use crate::error::Result; +use crate::metadata::{DataType, RowType}; +use crate::row::encode::KeyEncoder; +use crate::row::field_getter::FieldGetter; +use crate::row::{Datum, InternalRow}; +use bytes::Bytes; + +#[allow(dead_code)] +pub struct IcebergKeyEncoder { + field_getter: FieldGetter, + field_type: DataType, +} + +impl IcebergKeyEncoder { + pub fn create_key_encoder(row_type: &RowType, keys: &[String]) -> Result<IcebergKeyEncoder> { + if keys.len() != 1 { + return Err(IllegalArgument { + message: format!( + "Key fields must have exactly one field for iceberg format, but got: {keys:?}" + ), + }); + } + + let key = &keys[0]; + let key_index = row_type + .get_field_index(key) + .ok_or_else(|| IllegalArgument { + message: format!("Field {key:?} not found in input row type {row_type:?}"), + })?; + + let field_type = row_type + .fields() + .get(key_index) + .ok_or_else(|| IllegalArgument { + message: format!("Invalid key field index {key_index} for row type {row_type:?}"), + })? + .data_type() + .clone(); + + // Fail fast on unsupported types to match Java behavior. + Self::validate_supported_type(&field_type)?; + + Ok(IcebergKeyEncoder { + field_getter: FieldGetter::create(&field_type, key_index), + field_type, + }) + } + + fn validate_supported_type(field_type: &DataType) -> Result<()> { + match field_type { + DataType::Int(_) + | DataType::BigInt(_) + | DataType::Float(_) + | DataType::Double(_) + | DataType::Date(_) + | DataType::Time(_) + | DataType::Timestamp(_) + | DataType::Decimal(_) + | DataType::String(_) + | DataType::Char(_) + | DataType::Binary(_) + | DataType::Bytes(_) => Ok(()), + + DataType::Array(_) => Err(IllegalArgument { + message: + "Array types cannot be used as bucket keys. Bucket keys must be scalar types." + .to_string(), + }), + DataType::Map(_) => Err(IllegalArgument { + message: + "Map types cannot be used as bucket keys. Bucket keys must be scalar types." + .to_string(), + }), + other => Err(IllegalArgument { + message: format!("Unsupported type for Iceberg key encoder: {other}"), + }), + } + } +} + +#[allow(dead_code)] +impl KeyEncoder for IcebergKeyEncoder { + fn encode_key(&mut self, row: &dyn InternalRow) -> Result<Bytes> { + let value = self.field_getter.get_field(row); + if value.is_null() { + return Err(IllegalArgument { + message: "Cannot encode Iceberg key with null value".to_string(), + }); + } + + let bytes: Vec<u8> = match (&self.field_type, value) { + (DataType::Int(_), Datum::Int32(v)) => (v as i64).to_le_bytes().to_vec(), + (DataType::Date(_), Datum::Date(v)) => (v.get_inner() as i64).to_le_bytes().to_vec(), + + (DataType::Time(_), Datum::Time(v)) => { + let micros = v.get_inner() as i64 * 1000; + micros.to_le_bytes().to_vec() + } + + (DataType::BigInt(_), Datum::Int64(v)) => v.to_le_bytes().to_vec(), + (DataType::Float(_), Datum::Float32(v)) => v.0.to_le_bytes().to_vec(), + (DataType::Double(_), Datum::Float64(v)) => v.0.to_le_bytes().to_vec(), + + (DataType::Timestamp(_), Datum::TimestampNtz(ts)) => { + let micros = + ts.get_millisecond() * 1000 + (ts.get_nano_of_millisecond() as i64 / 1000); + micros.to_le_bytes().to_vec() + } + + (DataType::Decimal(_), Datum::Decimal(d)) => d.to_unscaled_bytes(), + (DataType::String(_), Datum::String(s)) => s.as_bytes().to_vec(), + (DataType::Char(_), Datum::String(s)) => s.as_bytes().to_vec(), + (DataType::Binary(_), Datum::Blob(b)) => b.as_ref().to_vec(), + (DataType::Bytes(_), Datum::Blob(b)) => b.as_ref().to_vec(), + + // FieldGetter uses Datum::String for CHAR, Datum::Blob for BINARY/BYTES. + (expected_type, actual) => { + return Err(IllegalArgument { + message: format!( + "IcebergKeyEncoder type mismatch: expected {expected_type}, got {actual:?}" + ), + }); + } + }; + + Ok(Bytes::from(bytes)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::metadata::DataTypes; + use crate::row::datum::{Date, Time, TimestampNtz}; + use crate::row::{Decimal, GenericRow}; + use bigdecimal::BigDecimal; + use std::str::FromStr; + + #[test] + fn test_single_key_field_requirement() { + let row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int(), DataTypes::string()], + vec!["id", "name"], + ); + + // ok with single key + let _ = IcebergKeyEncoder::create_key_encoder(&row_type, &["id".to_string()]).unwrap(); + + // error with multiple keys + let err = IcebergKeyEncoder::create_key_encoder( + &row_type, + &["id".to_string(), "name".to_string()], + ) + .err() + .unwrap(); + assert!(matches!(err, crate::error::Error::IllegalArgument { .. })); + assert!( + err.to_string() + .contains("Key fields must have exactly one field for iceberg format") + ); + } + + #[test] + fn test_integer_encoding() { + let row_type = RowType::with_data_types_and_field_names(vec![DataTypes::int()], vec!["id"]); + let row = GenericRow::from_data(vec![Datum::from(42i32)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["id".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), (42i64).to_le_bytes().as_slice()); + } + + #[test] + fn test_long_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::bigint()], vec!["id"]); + let v = 1234567890123456789i64; + let row = GenericRow::from_data(vec![Datum::from(v)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["id".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), v.to_le_bytes().as_slice()); + } + + #[test] + fn test_string_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::string()], vec!["name"]); + let s = "Hello Iceberg, Fluss this side!"; + let row = GenericRow::from_data(vec![Datum::from(s)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["name".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), s.as_bytes()); + } + + #[test] + fn test_decimal_encoding() { + let row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::decimal(10, 2)], + vec!["amount"], + ); + + let bd = BigDecimal::from_str("123.45").unwrap(); + let decimal = Decimal::from_big_decimal(bd.clone(), 10, 2).unwrap(); + let row = GenericRow::from_data(vec![Datum::Decimal(decimal.clone())]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["amount".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), decimal.to_unscaled_bytes().as_slice()); + } + + #[test] + fn test_timestamp_encoding() { + let row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::timestamp_with_precision(6)], + vec!["event_time"], + ); + + let millis = 1698235273182i64; + let nanos = 123_456i32; + let ts = TimestampNtz::from_millis_nanos(millis, nanos).unwrap(); + let micros = millis * 1000 + (nanos as i64 / 1000); + + let row = GenericRow::from_data(vec![Datum::TimestampNtz(ts)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["event_time".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), micros.to_le_bytes().as_slice()); + } + + #[test] + fn test_date_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::date()], vec!["date"]); + + let days = 19655i32; // 2023-10-25 + let row = GenericRow::from_data(vec![Datum::Date(Date::new(days))]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["date".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), (days as i64).to_le_bytes().as_slice()); + } + + #[test] + fn test_time_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::time()], vec!["time"]); + + let millis_since_midnight = 34_200_000i32; + let micros = millis_since_midnight as i64 * 1000; + let row = GenericRow::from_data(vec![Datum::Time(Time::new(millis_since_midnight))]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["time".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), micros.to_le_bytes().as_slice()); + } + + #[test] + fn test_binary_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::bytes()], vec!["data"]); + + let bytes = b"Hello i only understand binary data".as_slice(); + let row = GenericRow::from_data(vec![Datum::from(bytes)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["data".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), bytes); + } Review Comment: Missing test coverage for Array and Map type validation. The validation function rejects Array and Map types at lines 96-105, but there are no test cases that verify these error messages are returned when attempting to create an encoder with these types. ```suggestion } #[test] fn test_array_type_not_supported_for_key_encoding() { // Construct a row type with a LIST/Array field, which should be rejected let row_type = RowType::with_data_types_and_field_names( vec![DataTypes::list(DataTypes::int32())], vec!["arr"], ); let result = IcebergKeyEncoder::create_key_encoder(&row_type, &["arr".to_string()]); let err = result.expect_err("expected Array/List type to be rejected"); match err { crate::error::Error::IllegalArgument(msg) => { // The message should clearly indicate that Array/List types are not supported assert!( msg.contains("Array") || msg.contains("LIST") || msg.contains("list"), "unexpected error message for Array/List type: {}", msg ); } other => panic!("expected IllegalArgument error, got {:?}", other), } } #[test] fn test_map_type_not_supported_for_key_encoding() { // Construct a row type with a MAP field, which should be rejected let row_type = RowType::with_data_types_and_field_names( vec![DataTypes::map(DataTypes::string(), DataTypes::int32())], vec!["map_field"], ); let result = IcebergKeyEncoder::create_key_encoder(&row_type, &["map_field".to_string()]); let err = result.expect_err("expected Map type to be rejected"); match err { crate::error::Error::IllegalArgument(msg) => { // The message should clearly indicate that Map types are not supported assert!( msg.contains("Map") || msg.contains("MAP") || msg.contains("map"), "unexpected error message for Map type: {}", msg ); } other => panic!("expected IllegalArgument error, got {:?}", other), } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
