This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 9b0e38c  chore: Fix issue where FixedSizedBinary type cannot be build 
in arrow (#304)
9b0e38c is described below

commit 9b0e38c91aa79c3f5ccb50632e8ce73b4badc4cf
Author: Keith Lee <[email protected]>
AuthorDate: Fri Feb 13 14:46:58 2026 +0000

    chore: Fix issue where FixedSizedBinary type cannot be build in arrow (#304)
---
 crates/fluss/src/record/arrow.rs            | 13 +++++---
 crates/fluss/src/row/datum.rs               | 48 ++++++++++++++++++++++++-----
 crates/fluss/tests/integration/log_table.rs | 25 ++++++++++-----
 3 files changed, 66 insertions(+), 20 deletions(-)

diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index fe2f2f4..7fb9d34 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -24,11 +24,11 @@ use crate::row::field_getter::FieldGetter;
 use crate::row::{ColumnarRow, InternalRow};
 use arrow::array::{
     ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, 
Decimal128Builder,
-    Float32Builder, Float64Builder, Int8Builder, Int16Builder, Int32Builder, 
Int64Builder,
-    StringBuilder, Time32MillisecondBuilder, Time32SecondBuilder, 
Time64MicrosecondBuilder,
-    Time64NanosecondBuilder, TimestampMicrosecondBuilder, 
TimestampMillisecondBuilder,
-    TimestampNanosecondBuilder, TimestampSecondBuilder, UInt8Builder, 
UInt16Builder, UInt32Builder,
-    UInt64Builder,
+    FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int8Builder, 
Int16Builder,
+    Int32Builder, Int64Builder, StringBuilder, Time32MillisecondBuilder, 
Time32SecondBuilder,
+    Time64MicrosecondBuilder, Time64NanosecondBuilder, 
TimestampMicrosecondBuilder,
+    TimestampMillisecondBuilder, TimestampNanosecondBuilder, 
TimestampSecondBuilder, UInt8Builder,
+    UInt16Builder, UInt32Builder, UInt64Builder,
 };
 use arrow::{
     array::RecordBatch,
@@ -266,6 +266,9 @@ impl RowAppendRecordBatchBuilder {
             arrow_schema::DataType::Boolean => 
Ok(Box::new(BooleanBuilder::new())),
             arrow_schema::DataType::Utf8 => Ok(Box::new(StringBuilder::new())),
             arrow_schema::DataType::Binary => 
Ok(Box::new(BinaryBuilder::new())),
+            arrow_schema::DataType::FixedSizeBinary(size) => {
+                Ok(Box::new(FixedSizeBinaryBuilder::new(*size)))
+            }
             arrow_schema::DataType::Decimal128(precision, scale) => {
                 let builder = Decimal128Builder::new()
                     .with_precision_and_scale(*precision, *scale)
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index e1b70ad..b370fb1 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -19,13 +19,14 @@ use crate::error::Error::RowConvertError;
 use crate::error::Result;
 use crate::row::Decimal;
 use arrow::array::{
-    ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder, 
Decimal128Builder, Float32Builder,
-    Float64Builder, Int8Builder, Int16Builder, Int32Builder, Int64Builder, 
StringBuilder,
-    Time32MillisecondBuilder, Time32SecondBuilder, Time64MicrosecondBuilder,
-    Time64NanosecondBuilder, TimestampMicrosecondBuilder, 
TimestampMillisecondBuilder,
-    TimestampNanosecondBuilder, TimestampSecondBuilder,
+    ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder, 
Decimal128Builder,
+    FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int8Builder, 
Int16Builder,
+    Int32Builder, Int64Builder, StringBuilder, Time32MillisecondBuilder, 
Time32SecondBuilder,
+    Time64MicrosecondBuilder, Time64NanosecondBuilder, 
TimestampMicrosecondBuilder,
+    TimestampMillisecondBuilder, TimestampNanosecondBuilder, 
TimestampSecondBuilder,
 };
 use arrow::datatypes as arrow_schema;
+use arrow::error::ArrowError;
 use jiff::ToSpan;
 use ordered_float::OrderedFloat;
 use parse_display::Display;
@@ -439,6 +440,24 @@ fn millis_nanos_to_nanos(millis: i64, nanos: i32) -> 
Result<i64> {
         })
 }
 
+trait AppendResult {
+    fn into_append_result(self) -> Result<()>;
+}
+
+impl AppendResult for () {
+    fn into_append_result(self) -> Result<()> {
+        Ok(())
+    }
+}
+
+impl AppendResult for std::result::Result<(), ArrowError> {
+    fn into_append_result(self) -> Result<()> {
+        self.map_err(|e| RowConvertError {
+            message: format!("Failed to append value: {e}"),
+        })
+    }
+}
+
 impl Datum<'_> {
     pub fn append_to(
         &self,
@@ -457,7 +476,7 @@ impl Datum<'_> {
         macro_rules! append_value_to_arrow {
             ($builder_type:ty, $value:expr) => {
                 if let Some(b) = 
builder.as_any_mut().downcast_mut::<$builder_type>() {
-                    b.append_value($value);
+                    b.append_value($value).into_append_result()?;
                     return Ok(());
                 }
             };
@@ -474,6 +493,7 @@ impl Datum<'_> {
                 append_null_to_arrow!(Float64Builder);
                 append_null_to_arrow!(StringBuilder);
                 append_null_to_arrow!(BinaryBuilder);
+                append_null_to_arrow!(FixedSizeBinaryBuilder);
                 append_null_to_arrow!(Decimal128Builder);
                 append_null_to_arrow!(Date32Builder);
                 append_null_to_arrow!(Time32SecondBuilder);
@@ -493,7 +513,21 @@ impl Datum<'_> {
             Datum::Float32(v) => append_value_to_arrow!(Float32Builder, 
v.into_inner()),
             Datum::Float64(v) => append_value_to_arrow!(Float64Builder, 
v.into_inner()),
             Datum::String(v) => append_value_to_arrow!(StringBuilder, 
v.as_ref()),
-            Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder, 
v.as_ref()),
+            Datum::Blob(v) => match data_type {
+                arrow_schema::DataType::Binary => {
+                    append_value_to_arrow!(BinaryBuilder, v.as_ref());
+                }
+                arrow_schema::DataType::FixedSizeBinary(_) => {
+                    append_value_to_arrow!(FixedSizeBinaryBuilder, v.as_ref());
+                }
+                _ => {
+                    return Err(RowConvertError {
+                        message: format!(
+                            "Expected Binary or FixedSizeBinary Arrow type, 
got: {data_type:?}"
+                        ),
+                    });
+                }
+            },
             Datum::Decimal(decimal) => {
                 // Extract target precision and scale from Arrow schema
                 let (p, s) = match data_type {
diff --git a/crates/fluss/tests/integration/log_table.rs 
b/crates/fluss/tests/integration/log_table.rs
index 22e893c..efb445f 100644
--- a/crates/fluss/tests/integration/log_table.rs
+++ b/crates/fluss/tests/integration/log_table.rs
@@ -646,6 +646,8 @@ mod table_test {
                     )
                     // Bytes type
                     .column("col_bytes", DataTypes::bytes())
+                    // Fixed-size binary type
+                    .column("col_binary", DataTypes::binary(4))
                     // Timestamp types with negative values (before Unix epoch)
                     .column(
                         "col_timestamp_us_neg",
@@ -713,6 +715,7 @@ mod table_test {
         let col_timestamp_ltz_us = 
TimestampLtz::from_millis_nanos(1769163227123, 456000).unwrap();
         let col_timestamp_ltz_ns = 
TimestampLtz::from_millis_nanos(1769163227123, 999_999).unwrap();
         let col_bytes: Vec<u8> = b"binary data".to_vec();
+        let col_binary: Vec<u8> = vec![0xDE, 0xAD, 0xBE, 0xEF];
 
         // 1960-06-15 08:30:45.123456 UTC (before 1970)
         let col_timestamp_us_neg = 
TimestampNtz::from_millis_nanos(-301234154877, 456000).unwrap();
@@ -749,10 +752,11 @@ mod table_test {
         row.set_field(21, col_timestamp_ltz_us.clone());
         row.set_field(22, col_timestamp_ltz_ns.clone());
         row.set_field(23, col_bytes.as_slice());
-        row.set_field(24, col_timestamp_us_neg.clone());
-        row.set_field(25, col_timestamp_ns_neg.clone());
-        row.set_field(26, col_timestamp_ltz_us_neg.clone());
-        row.set_field(27, col_timestamp_ltz_ns_neg.clone());
+        row.set_field(24, col_binary.as_slice());
+        row.set_field(25, col_timestamp_us_neg.clone());
+        row.set_field(26, col_timestamp_ns_neg.clone());
+        row.set_field(27, col_timestamp_ltz_us_neg.clone());
+        row.set_field(28, col_timestamp_ltz_ns_neg.clone());
 
         append_writer
             .append(&row)
@@ -910,9 +914,14 @@ mod table_test {
             "col_timestamp_ltz_ns nanos mismatch"
         );
         assert_eq!(found_row.get_bytes(23), col_bytes, "col_bytes mismatch");
+        assert_eq!(
+            found_row.get_binary(24, 4),
+            col_binary,
+            "col_binary mismatch"
+        );
 
         // Verify timestamps before Unix epoch (negative timestamps)
-        let read_ts_us_neg = found_row.get_timestamp_ntz(24, 6);
+        let read_ts_us_neg = found_row.get_timestamp_ntz(25, 6);
         assert_eq!(
             read_ts_us_neg.get_millisecond(),
             col_timestamp_us_neg.get_millisecond(),
@@ -924,7 +933,7 @@ mod table_test {
             "col_timestamp_us_neg nanos mismatch"
         );
 
-        let read_ts_ns_neg = found_row.get_timestamp_ntz(25, 9);
+        let read_ts_ns_neg = found_row.get_timestamp_ntz(26, 9);
         assert_eq!(
             read_ts_ns_neg.get_millisecond(),
             col_timestamp_ns_neg.get_millisecond(),
@@ -936,7 +945,7 @@ mod table_test {
             "col_timestamp_ns_neg nanos mismatch"
         );
 
-        let read_ts_ltz_us_neg = found_row.get_timestamp_ltz(26, 6);
+        let read_ts_ltz_us_neg = found_row.get_timestamp_ltz(27, 6);
         assert_eq!(
             read_ts_ltz_us_neg.get_epoch_millisecond(),
             col_timestamp_ltz_us_neg.get_epoch_millisecond(),
@@ -948,7 +957,7 @@ mod table_test {
             "col_timestamp_ltz_us_neg nanos mismatch"
         );
 
-        let read_ts_ltz_ns_neg = found_row.get_timestamp_ltz(27, 9);
+        let read_ts_ltz_ns_neg = found_row.get_timestamp_ltz(28, 9);
         assert_eq!(
             read_ts_ltz_ns_neg.get_epoch_millisecond(),
             col_timestamp_ltz_ns_neg.get_epoch_millisecond(),

Reply via email to