This is an automated email from the ASF dual-hosted git repository.
leekei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new fc231d3 feat(iceberg): add IcebergBinaryRowWriter (#366)
fc231d3 is described below
commit fc231d3418b1e87c80188c4b8482c3e034633bc8
Author: Kaiqi Dong <[email protected]>
AuthorDate: Sun Apr 5 21:13:44 2026 +0200
feat(iceberg): add IcebergBinaryRowWriter (#366)
* feat(iceberg): add IcebergBinaryRowWriter
* feat(iceberg): add IcebergBinaryRowWriter
* explict panicing and align with java for micros conversions
* address the reviews
* rebase and align how java side behaviour now
* address comments
---
.../src/row/binary/iceberg_binary_row_writer.rs | 555 +++++++++++++++++++++
crates/fluss/src/row/binary/mod.rs | 2 +
2 files changed, 557 insertions(+)
diff --git a/crates/fluss/src/row/binary/iceberg_binary_row_writer.rs
b/crates/fluss/src/row/binary/iceberg_binary_row_writer.rs
new file mode 100644
index 0000000..c87ae10
--- /dev/null
+++ b/crates/fluss/src/row/binary/iceberg_binary_row_writer.rs
@@ -0,0 +1,555 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::{Bytes, BytesMut};
+
+use crate::error::{Error, Result};
+use crate::metadata::DataType;
+use crate::row::Decimal;
+use crate::row::binary::{BinaryWriter, ValueWriter};
+
+const MICROS_PER_MILLI: i64 = 1_000;
+
+/// Iceberg-specific binary writer for encoding key columns.
+///
+/// Unlike [`CompactedRowWriter`] which uses varint encoding and
length-prefixed
+/// variable-length fields, this writer follows Iceberg's encoding conventions:
+/// - Integers (int, date) are written as i64 (8 bytes, little-endian)
+/// - Time values are converted from milliseconds to microseconds
+/// - Timestamps are converted to microseconds
+/// - Floats/doubles use fixed-width little-endian encoding
+/// - Variable-length types (string, binary) are written without length
prefixes
+/// - Decimals are written as unscaled big-endian bytes without length prefixes
+///
+/// The encoded bytes feed directly into [`IcebergBucketingFunction`]'s
MurmurHash
+/// for bucket assignment and must match the Java Fluss server's encoding
exactly.
+///
+/// [`CompactedRowWriter`]: crate::row::compacted::CompactedRowWriter
+/// [`IcebergBucketingFunction`]: crate::bucketing::IcebergBucketingFunction
+pub struct IcebergBinaryRowWriter {
+ position: usize,
+ buffer: BytesMut,
+}
+
+impl Default for IcebergBinaryRowWriter {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl IcebergBinaryRowWriter {
+ pub fn new() -> Self {
+ let buffer = BytesMut::zeroed(64);
+ Self {
+ position: 0,
+ buffer,
+ }
+ }
+
+ // Dependency order note:
+ // 1) Keep this PR scoped to writer-level Java parity.
+ // 2) Wire the writer through IcebergKeyEncoder in follow-up #308.
+ // TODO(#308): add end-to-end key-encoding tests via IcebergKeyEncoder
+ // (similar to CompactedKeyEncoder tests for CompactedKeyWriter).
+ pub fn create_value_writer(field_type: &DataType) -> Result<ValueWriter> {
+ match field_type {
+ // Match Java IcebergBinaryRowWriter.createFieldWriter() supported
types exactly.
+ DataType::Int(_)
+ | DataType::Date(_)
+ | DataType::Time(_)
+ | DataType::BigInt(_)
+ | DataType::Float(_)
+ | DataType::Double(_)
+ | DataType::Timestamp(_)
+ | DataType::Decimal(_)
+ | DataType::String(_)
+ | DataType::Char(_)
+ | DataType::Binary(_)
+ | DataType::Bytes(_) =>
ValueWriter::create_value_writer(field_type, None),
+
+ // Keep Java's explicit scalar-only rejection messaging for
ARRAY/MAP.
+ DataType::Array(_) => Err(Error::UnsupportedOperation {
+ message:
+ "Array types cannot be used as bucket keys. Bucket keys
must be scalar types."
+ .to_string(),
+ }),
+ DataType::Map(_) => Err(Error::UnsupportedOperation {
+ message:
+ "Map types cannot be used as bucket keys. Bucket keys must
be scalar types."
+ .to_string(),
+ }),
+
+ // BOOLEAN, TINYINT, SMALLINT, TIMESTAMP_LTZ, ROW and any future
types.
+ _ => Err(Error::UnsupportedOperation {
+ message: format!(
+ "Unsupported type for Iceberg binary row writer: {:?}",
+ field_type
+ ),
+ }),
+ }
+ }
+
+ #[allow(dead_code)]
+ pub fn position(&self) -> usize {
+ self.position
+ }
+
+ #[allow(dead_code)]
+ pub fn buffer(&self) -> &[u8] {
+ &self.buffer[..self.position]
+ }
+
+ pub fn to_bytes(&self) -> Bytes {
+ Bytes::copy_from_slice(&self.buffer[..self.position])
+ }
+
+ fn ensure_capacity(&mut self, need_len: usize) {
+ if (self.buffer.len() - self.position) < need_len {
+ let new_len = std::cmp::max(self.buffer.len() * 2,
self.buffer.len() + need_len);
+ self.buffer.resize(new_len, 0);
+ }
+ }
+
+ fn write_raw(&mut self, src: &[u8]) {
+ let end = self.position + src.len();
+ self.ensure_capacity(src.len());
+ self.buffer[self.position..end].copy_from_slice(src);
+ self.position = end;
+ }
+}
+
+impl BinaryWriter for IcebergBinaryRowWriter {
+ fn reset(&mut self) {
+ if self.position > 0 {
+ self.buffer[..self.position].fill(0);
+ }
+ self.position = 0;
+ }
+
+ fn set_null_at(&mut self, _pos: usize) {
+ panic!("Iceberg key columns do not support null values");
+ }
+
+ fn write_boolean(&mut self, value: bool) {
+ self.write_raw(&[if value { 1u8 } else { 0u8 }]);
+ }
+
+ fn write_byte(&mut self, value: u8) {
+ self.write_raw(&[value]);
+ }
+
+ fn write_bytes(&mut self, value: &[u8]) {
+ // Iceberg: raw bytes, no length prefix
+ self.write_raw(value);
+ }
+
+ fn write_char(&mut self, value: &str, _length: usize) {
+ // Iceberg: same as string — raw UTF-8, no length prefix
+ self.write_string(value);
+ }
+
+ fn write_string(&mut self, value: &str) {
+ // Iceberg: raw UTF-8 bytes, no length prefix
+ self.write_raw(value.as_bytes());
+ }
+
+ fn write_short(&mut self, value: i16) {
+ self.write_raw(&value.to_le_bytes());
+ }
+
+ fn write_int(&mut self, value: i32) {
+ // Iceberg: promote i32 to i64, write as 8 bytes little-endian
+ self.write_raw(&(value as i64).to_le_bytes());
+ }
+
+ fn write_long(&mut self, value: i64) {
+ self.write_raw(&value.to_le_bytes());
+ }
+
+ fn write_float(&mut self, value: f32) {
+ self.write_raw(&value.to_le_bytes());
+ }
+
+ fn write_double(&mut self, value: f64) {
+ self.write_raw(&value.to_le_bytes());
+ }
+
+ fn write_binary(&mut self, bytes: &[u8], length: usize) {
+ // Iceberg: raw bytes, no length prefix
+ self.write_raw(&bytes[..length.min(bytes.len())]);
+ }
+
+ fn write_decimal(&mut self, value: &Decimal, _precision: u32) {
+ // Iceberg: unscaled big-endian bytes, no length prefix
+ let unscaled_bytes = value.to_unscaled_bytes();
+ self.write_raw(&unscaled_bytes);
+ }
+
+ fn write_time(&mut self, value: i32, _precision: u32) {
+ // NOTE: this is the same with Java's long arithmetic wraps on
overflow.
+ let micros = (value as i64).wrapping_mul(MICROS_PER_MILLI);
+ self.write_raw(µs.to_le_bytes());
+ }
+
+ fn write_timestamp_ntz(&mut self, value: &crate::row::datum::TimestampNtz,
_precision: u32) {
+ // NOTE: this is the same with Java's long arithmetic wraps on
overflow.
+ let millis = value.get_millisecond();
+ let nanos = value.get_nano_of_millisecond();
+ let micros = millis
+ .wrapping_mul(MICROS_PER_MILLI)
+ .wrapping_add((nanos as i64) / MICROS_PER_MILLI);
+ self.write_raw(µs.to_le_bytes());
+ }
+
+ fn write_timestamp_ltz(&mut self, value: &crate::row::datum::TimestampLtz,
_precision: u32) {
+ // NOTE: this is the same with Java's long arithmetic wraps on
overflow.
+ let millis = value.get_epoch_millisecond();
+ let nanos = value.get_nano_of_millisecond();
+ let micros = millis
+ .wrapping_mul(MICROS_PER_MILLI)
+ .wrapping_add((nanos as i64) / MICROS_PER_MILLI);
+ self.write_raw(µs.to_le_bytes());
+ }
+
+ fn complete(&mut self) {
+ // No finalization needed for Iceberg key encoding
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::metadata::{DataTypes, SmallIntType, TinyIntType};
+ use crate::row::datum::{TimestampLtz, TimestampNtz};
+ use bigdecimal::{BigDecimal, num_bigint::BigInt};
+
+ fn assert_unsupported_type(dt: DataType, expected_fragment: &str) {
+ match IcebergBinaryRowWriter::create_value_writer(&dt) {
+ Err(e) => assert!(
+ e.to_string().contains(expected_fragment),
+ "unexpected error for {dt:?}: {e}"
+ ),
+ Ok(_) => panic!("expected error for unsupported type {dt:?}, got
Ok"),
+ }
+ }
+
+ #[test]
+ fn test_write_int_as_i64_le() {
+ let mut w = IcebergBinaryRowWriter::new();
+ w.write_int(42);
+ assert_eq!(w.buffer(), &42i64.to_le_bytes());
+ }
+
+ #[test]
+ fn test_write_int_negative() {
+ let mut w = IcebergBinaryRowWriter::new();
+ w.write_int(-1);
+ assert_eq!(w.buffer(), &(-1i64).to_le_bytes());
+ }
+
+ #[test]
+ fn test_write_long() {
+ let mut w = IcebergBinaryRowWriter::new();
+ w.write_long(123456789012345i64);
+ assert_eq!(w.buffer(), &123456789012345i64.to_le_bytes());
+ }
+
+ #[test]
+ fn test_write_float() {
+ let mut w = IcebergBinaryRowWriter::new();
+ let val = 1.23f32;
+ w.write_float(val);
+ assert_eq!(w.buffer(), &val.to_le_bytes());
+ }
+
+ #[test]
+ fn test_write_double() {
+ let mut w = IcebergBinaryRowWriter::new();
+ let val = 9.876543210f64;
+ w.write_double(val);
+ assert_eq!(w.buffer(), &val.to_le_bytes());
+ }
+
+ #[test]
+ fn test_write_string_no_length_prefix() {
+ let mut w = IcebergBinaryRowWriter::new();
+ w.write_string("hello");
+ assert_eq!(w.buffer(), b"hello");
+ }
+
+ #[test]
+ fn test_write_bytes_no_length_prefix() {
+ let mut w = IcebergBinaryRowWriter::new();
+ let data = &[0xDE, 0xAD, 0xBE, 0xEF];
+ w.write_bytes(data);
+ assert_eq!(w.buffer(), data);
+ }
+
+ #[test]
+ fn test_write_binary_no_length_prefix() {
+ let mut w = IcebergBinaryRowWriter::new();
+ let data = &[1, 2, 3, 4, 5];
+ w.write_binary(data, 3);
+ assert_eq!(w.buffer(), &[1, 2, 3]);
+ }
+
+ #[test]
+ fn test_write_time_millis_to_micros() {
+ let mut w = IcebergBinaryRowWriter::new();
+ // 1000 ms = 1_000_000 µs
+ w.write_time(1000, 0);
+ assert_eq!(w.buffer(), &1_000_000i64.to_le_bytes());
+ }
+
+ #[test]
+ fn test_write_timestamp_ntz_compact() {
+ let mut w = IcebergBinaryRowWriter::new();
+ let ts = TimestampNtz::new(1672531200000); // 2023-01-01 00:00:00 UTC
+ w.write_timestamp_ntz(&ts, 3);
+ let expected_micros = 1672531200000i64 * 1000;
+ assert_eq!(w.buffer(), &expected_micros.to_le_bytes());
+ }
+
+ #[test]
+ fn test_write_timestamp_ntz_with_nanos() {
+ let mut w = IcebergBinaryRowWriter::new();
+ let ts = TimestampNtz::from_millis_nanos(1000, 500_000).unwrap();
+ w.write_timestamp_ntz(&ts, 6);
+ // 1000ms * 1000 + 500_000ns / 1000 = 1_000_000 + 500 = 1_000_500 µs
+ assert_eq!(w.buffer(), &1_000_500i64.to_le_bytes());
+ }
+
+ #[test]
+ fn test_write_timestamp_ltz() {
+ let mut w = IcebergBinaryRowWriter::new();
+ let ts = TimestampLtz::from_millis_nanos(2000, 300_000).unwrap();
+ w.write_timestamp_ltz(&ts, 6);
+ // 2000ms * 1000 + 300_000ns / 1000 = 2_000_000 + 300 = 2_000_300 µs
+ assert_eq!(w.buffer(), &2_000_300i64.to_le_bytes());
+ }
+
+ #[test]
+ fn test_write_timestamp_ntz_overflow_wraps_like_java() {
+ let mut w = IcebergBinaryRowWriter::new();
+ let ts = TimestampNtz::from_millis_nanos(i64::MAX, 999_999).unwrap();
+ w.write_timestamp_ntz(&ts, 9);
+
+ let expected =
i64::MAX.wrapping_mul(MICROS_PER_MILLI).wrapping_add(999);
+ assert_eq!(w.buffer(), &expected.to_le_bytes());
+ }
+
+ #[test]
+ fn test_write_timestamp_ltz_overflow_wraps_like_java() {
+ let mut w = IcebergBinaryRowWriter::new();
+ let ts = TimestampLtz::from_millis_nanos(i64::MIN, 999_999).unwrap();
+ w.write_timestamp_ltz(&ts, 9);
+
+ let expected =
i64::MIN.wrapping_mul(MICROS_PER_MILLI).wrapping_add(999);
+ assert_eq!(w.buffer(), &expected.to_le_bytes());
+ }
+
+ #[test]
+ fn test_write_decimal_compact() {
+ let mut w = IcebergBinaryRowWriter::new();
+ let bd = BigDecimal::new(BigInt::from(12345), 2); // 123.45
+ let decimal = Decimal::from_big_decimal(bd, 10, 2).unwrap();
+ w.write_decimal(&decimal, 10);
+
+ let expected = BigInt::from(12345).to_signed_bytes_be();
+ assert_eq!(w.buffer(), expected.as_slice());
+ }
+
+ #[test]
+ fn test_write_decimal_non_compact() {
+ let mut w = IcebergBinaryRowWriter::new();
+ let bd = BigDecimal::new(BigInt::from(12345), 0);
+ let decimal = Decimal::from_big_decimal(bd, 28, 0).unwrap();
+ w.write_decimal(&decimal, 28);
+
+ let expected = BigInt::from(12345).to_signed_bytes_be();
+ assert_eq!(w.buffer(), expected.as_slice());
+ }
+
+ #[test]
+ fn test_write_boolean() {
+ let mut w = IcebergBinaryRowWriter::new();
+ w.write_boolean(true);
+ assert_eq!(w.buffer(), &[1u8]);
+
+ w.reset();
+ w.write_boolean(false);
+ assert_eq!(w.buffer(), &[0u8]);
+ }
+
+ #[test]
+ #[should_panic(expected = "Iceberg key columns do not support null
values")]
+ fn test_set_null_panics() {
+ let mut w = IcebergBinaryRowWriter::new();
+ w.set_null_at(0);
+ }
+
+ #[test]
+ fn test_reset_clears_position() {
+ let mut w = IcebergBinaryRowWriter::new();
+ w.write_int(42);
+ assert_eq!(w.position(), 8);
+ w.reset();
+ assert_eq!(w.position(), 0);
+ assert_eq!(w.buffer().len(), 0);
+ }
+
+ #[test]
+ fn test_to_bytes() {
+ let mut w = IcebergBinaryRowWriter::new();
+ w.write_string("test");
+ let bytes = w.to_bytes();
+ assert_eq!(bytes.as_ref(), b"test");
+ }
+
+ #[test]
+ fn test_multiple_writes() {
+ let mut w = IcebergBinaryRowWriter::new();
+ w.write_int(1);
+ w.write_string("ab");
+ let buf = w.buffer().to_vec();
+ // 8 bytes for int-as-i64 + 2 bytes for "ab"
+ assert_eq!(buf.len(), 10);
+ assert_eq!(&buf[..8], &1i64.to_le_bytes());
+ assert_eq!(&buf[8..], b"ab");
+ }
+
+ #[test]
+ fn test_buffer_growth() {
+ let mut w = IcebergBinaryRowWriter::new();
+ // Write more than 64 bytes to trigger buffer growth
+ let large = vec![0xAAu8; 128];
+ w.write_bytes(&large);
+ assert_eq!(w.buffer(), large.as_slice());
+ }
+
+ #[test]
+ fn test_create_value_writer_rejects_tinyint() {
+ let dt = DataType::TinyInt(TinyIntType::new());
+ match IcebergBinaryRowWriter::create_value_writer(&dt) {
+ Err(e) => assert!(
+ e.to_string()
+ .contains("Unsupported type for Iceberg binary row
writer"),
+ "unexpected error: {e}",
+ ),
+ Ok(_) => panic!("expected error for TinyInt, got Ok"),
+ }
+ }
+
+ #[test]
+ fn test_create_value_writer_rejects_smallint() {
+ let dt = DataType::SmallInt(SmallIntType::new());
+ match IcebergBinaryRowWriter::create_value_writer(&dt) {
+ Err(e) => assert!(
+ e.to_string()
+ .contains("Unsupported type for Iceberg binary row
writer"),
+ "unexpected error: {e}",
+ ),
+ Ok(_) => panic!("expected error for SmallInt, got Ok"),
+ }
+ }
+
+ #[test]
+ fn test_create_value_writer_rejects_boolean() {
+ assert_unsupported_type(
+ DataTypes::boolean(),
+ "Unsupported type for Iceberg binary row writer",
+ );
+ }
+
+ #[test]
+ fn test_create_value_writer_rejects_timestamp_ltz() {
+ assert_unsupported_type(
+ DataTypes::timestamp_ltz(),
+ "Unsupported type for Iceberg binary row writer",
+ );
+ }
+
+ #[test]
+ fn test_create_value_writer_rejects_array() {
+ assert_unsupported_type(
+ DataTypes::array(DataTypes::int()),
+ "Array types cannot be used as bucket keys",
+ );
+ }
+
+ #[test]
+ fn test_create_value_writer_rejects_map() {
+ assert_unsupported_type(
+ DataTypes::map(DataTypes::string(), DataTypes::int()),
+ "Map types cannot be used as bucket keys",
+ );
+ }
+
+ #[test]
+ fn test_create_value_writer_rejects_row() {
+ assert_unsupported_type(
+ DataTypes::row(vec![DataTypes::field("f0", DataTypes::int())]),
+ "Unsupported type for Iceberg binary row writer",
+ );
+ }
+
+ #[test]
+ fn test_create_value_writer_accepts_java_supported_scalar_types() {
+ let supported_types = vec![
+ ("int", DataTypes::int()),
+ ("date", DataTypes::date()),
+ ("time", DataTypes::time()),
+ ("bigint", DataTypes::bigint()),
+ ("float", DataTypes::float()),
+ ("double", DataTypes::double()),
+ ("timestamp_ntz", DataTypes::timestamp()),
+ ("decimal", DataTypes::decimal(10, 2)),
+ ("string", DataTypes::string()),
+ ("char", DataTypes::char(16)),
+ ("binary", DataTypes::binary(8)),
+ ("bytes", DataTypes::bytes()),
+ ];
+
+ for (name, data_type) in supported_types {
+ let res = IcebergBinaryRowWriter::create_value_writer(&data_type);
+ if let Err(e) = res {
+ panic!("expected {name} to be supported, got error: {e}");
+ }
+ }
+ }
+
+ #[test]
+ fn test_write_char_same_as_string() {
+ let mut w1 = IcebergBinaryRowWriter::new();
+ w1.write_char("hello", 10);
+
+ let mut w2 = IcebergBinaryRowWriter::new();
+ w2.write_string("hello");
+
+ assert_eq!(w1.buffer(), w2.buffer());
+ }
+
+ #[test]
+ fn test_write_date_as_int() {
+ // Date encoding goes through write_int (via InnerValueWriter::Date)
+ // which writes as i64 LE in Iceberg encoding
+ let mut w = IcebergBinaryRowWriter::new();
+ let days_since_epoch = 19000i32; // ~2022-01-06
+ w.write_int(days_since_epoch);
+ assert_eq!(w.buffer(), &(days_since_epoch as i64).to_le_bytes());
+ }
+}
diff --git a/crates/fluss/src/row/binary/mod.rs
b/crates/fluss/src/row/binary/mod.rs
index 2a88ee1..d6248dc 100644
--- a/crates/fluss/src/row/binary/mod.rs
+++ b/crates/fluss/src/row/binary/mod.rs
@@ -16,8 +16,10 @@
// under the License.
mod binary_writer;
+mod iceberg_binary_row_writer;
pub use binary_writer::*;
+pub use iceberg_binary_row_writer::IcebergBinaryRowWriter;
/// The binary row format types, it indicates the generated row type by the
[`BinaryWriter`]
#[allow(dead_code)]