This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 9b0e38c chore: Fix issue where FixedSizedBinary type cannot be build
in arrow (#304)
9b0e38c is described below
commit 9b0e38c91aa79c3f5ccb50632e8ce73b4badc4cf
Author: Keith Lee <[email protected]>
AuthorDate: Fri Feb 13 14:46:58 2026 +0000
chore: Fix issue where FixedSizedBinary type cannot be build in arrow (#304)
---
crates/fluss/src/record/arrow.rs | 13 +++++---
crates/fluss/src/row/datum.rs | 48 ++++++++++++++++++++++++-----
crates/fluss/tests/integration/log_table.rs | 25 ++++++++++-----
3 files changed, 66 insertions(+), 20 deletions(-)
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index fe2f2f4..7fb9d34 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -24,11 +24,11 @@ use crate::row::field_getter::FieldGetter;
use crate::row::{ColumnarRow, InternalRow};
use arrow::array::{
ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder,
Decimal128Builder,
- Float32Builder, Float64Builder, Int8Builder, Int16Builder, Int32Builder,
Int64Builder,
- StringBuilder, Time32MillisecondBuilder, Time32SecondBuilder,
Time64MicrosecondBuilder,
- Time64NanosecondBuilder, TimestampMicrosecondBuilder,
TimestampMillisecondBuilder,
- TimestampNanosecondBuilder, TimestampSecondBuilder, UInt8Builder,
UInt16Builder, UInt32Builder,
- UInt64Builder,
+ FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int8Builder,
Int16Builder,
+ Int32Builder, Int64Builder, StringBuilder, Time32MillisecondBuilder,
Time32SecondBuilder,
+ Time64MicrosecondBuilder, Time64NanosecondBuilder,
TimestampMicrosecondBuilder,
+ TimestampMillisecondBuilder, TimestampNanosecondBuilder,
TimestampSecondBuilder, UInt8Builder,
+ UInt16Builder, UInt32Builder, UInt64Builder,
};
use arrow::{
array::RecordBatch,
@@ -266,6 +266,9 @@ impl RowAppendRecordBatchBuilder {
arrow_schema::DataType::Boolean =>
Ok(Box::new(BooleanBuilder::new())),
arrow_schema::DataType::Utf8 => Ok(Box::new(StringBuilder::new())),
arrow_schema::DataType::Binary =>
Ok(Box::new(BinaryBuilder::new())),
+ arrow_schema::DataType::FixedSizeBinary(size) => {
+ Ok(Box::new(FixedSizeBinaryBuilder::new(*size)))
+ }
arrow_schema::DataType::Decimal128(precision, scale) => {
let builder = Decimal128Builder::new()
.with_precision_and_scale(*precision, *scale)
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index e1b70ad..b370fb1 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -19,13 +19,14 @@ use crate::error::Error::RowConvertError;
use crate::error::Result;
use crate::row::Decimal;
use arrow::array::{
- ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder,
Decimal128Builder, Float32Builder,
- Float64Builder, Int8Builder, Int16Builder, Int32Builder, Int64Builder,
StringBuilder,
- Time32MillisecondBuilder, Time32SecondBuilder, Time64MicrosecondBuilder,
- Time64NanosecondBuilder, TimestampMicrosecondBuilder,
TimestampMillisecondBuilder,
- TimestampNanosecondBuilder, TimestampSecondBuilder,
+ ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder,
Decimal128Builder,
+ FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int8Builder,
Int16Builder,
+ Int32Builder, Int64Builder, StringBuilder, Time32MillisecondBuilder,
Time32SecondBuilder,
+ Time64MicrosecondBuilder, Time64NanosecondBuilder,
TimestampMicrosecondBuilder,
+ TimestampMillisecondBuilder, TimestampNanosecondBuilder,
TimestampSecondBuilder,
};
use arrow::datatypes as arrow_schema;
+use arrow::error::ArrowError;
use jiff::ToSpan;
use ordered_float::OrderedFloat;
use parse_display::Display;
@@ -439,6 +440,24 @@ fn millis_nanos_to_nanos(millis: i64, nanos: i32) ->
Result<i64> {
})
}
+trait AppendResult {
+ fn into_append_result(self) -> Result<()>;
+}
+
+impl AppendResult for () {
+ fn into_append_result(self) -> Result<()> {
+ Ok(())
+ }
+}
+
+impl AppendResult for std::result::Result<(), ArrowError> {
+ fn into_append_result(self) -> Result<()> {
+ self.map_err(|e| RowConvertError {
+ message: format!("Failed to append value: {e}"),
+ })
+ }
+}
+
impl Datum<'_> {
pub fn append_to(
&self,
@@ -457,7 +476,7 @@ impl Datum<'_> {
macro_rules! append_value_to_arrow {
($builder_type:ty, $value:expr) => {
if let Some(b) =
builder.as_any_mut().downcast_mut::<$builder_type>() {
- b.append_value($value);
+ b.append_value($value).into_append_result()?;
return Ok(());
}
};
@@ -474,6 +493,7 @@ impl Datum<'_> {
append_null_to_arrow!(Float64Builder);
append_null_to_arrow!(StringBuilder);
append_null_to_arrow!(BinaryBuilder);
+ append_null_to_arrow!(FixedSizeBinaryBuilder);
append_null_to_arrow!(Decimal128Builder);
append_null_to_arrow!(Date32Builder);
append_null_to_arrow!(Time32SecondBuilder);
@@ -493,7 +513,21 @@ impl Datum<'_> {
Datum::Float32(v) => append_value_to_arrow!(Float32Builder,
v.into_inner()),
Datum::Float64(v) => append_value_to_arrow!(Float64Builder,
v.into_inner()),
Datum::String(v) => append_value_to_arrow!(StringBuilder,
v.as_ref()),
- Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder,
v.as_ref()),
+ Datum::Blob(v) => match data_type {
+ arrow_schema::DataType::Binary => {
+ append_value_to_arrow!(BinaryBuilder, v.as_ref());
+ }
+ arrow_schema::DataType::FixedSizeBinary(_) => {
+ append_value_to_arrow!(FixedSizeBinaryBuilder, v.as_ref());
+ }
+ _ => {
+ return Err(RowConvertError {
+ message: format!(
+ "Expected Binary or FixedSizeBinary Arrow type,
got: {data_type:?}"
+ ),
+ });
+ }
+ },
Datum::Decimal(decimal) => {
// Extract target precision and scale from Arrow schema
let (p, s) = match data_type {
diff --git a/crates/fluss/tests/integration/log_table.rs
b/crates/fluss/tests/integration/log_table.rs
index 22e893c..efb445f 100644
--- a/crates/fluss/tests/integration/log_table.rs
+++ b/crates/fluss/tests/integration/log_table.rs
@@ -646,6 +646,8 @@ mod table_test {
)
// Bytes type
.column("col_bytes", DataTypes::bytes())
+ // Fixed-size binary type
+ .column("col_binary", DataTypes::binary(4))
// Timestamp types with negative values (before Unix epoch)
.column(
"col_timestamp_us_neg",
@@ -713,6 +715,7 @@ mod table_test {
let col_timestamp_ltz_us =
TimestampLtz::from_millis_nanos(1769163227123, 456000).unwrap();
let col_timestamp_ltz_ns =
TimestampLtz::from_millis_nanos(1769163227123, 999_999).unwrap();
let col_bytes: Vec<u8> = b"binary data".to_vec();
+ let col_binary: Vec<u8> = vec![0xDE, 0xAD, 0xBE, 0xEF];
// 1960-06-15 08:30:45.123456 UTC (before 1970)
let col_timestamp_us_neg =
TimestampNtz::from_millis_nanos(-301234154877, 456000).unwrap();
@@ -749,10 +752,11 @@ mod table_test {
row.set_field(21, col_timestamp_ltz_us.clone());
row.set_field(22, col_timestamp_ltz_ns.clone());
row.set_field(23, col_bytes.as_slice());
- row.set_field(24, col_timestamp_us_neg.clone());
- row.set_field(25, col_timestamp_ns_neg.clone());
- row.set_field(26, col_timestamp_ltz_us_neg.clone());
- row.set_field(27, col_timestamp_ltz_ns_neg.clone());
+ row.set_field(24, col_binary.as_slice());
+ row.set_field(25, col_timestamp_us_neg.clone());
+ row.set_field(26, col_timestamp_ns_neg.clone());
+ row.set_field(27, col_timestamp_ltz_us_neg.clone());
+ row.set_field(28, col_timestamp_ltz_ns_neg.clone());
append_writer
.append(&row)
@@ -910,9 +914,14 @@ mod table_test {
"col_timestamp_ltz_ns nanos mismatch"
);
assert_eq!(found_row.get_bytes(23), col_bytes, "col_bytes mismatch");
+ assert_eq!(
+ found_row.get_binary(24, 4),
+ col_binary,
+ "col_binary mismatch"
+ );
// Verify timestamps before Unix epoch (negative timestamps)
- let read_ts_us_neg = found_row.get_timestamp_ntz(24, 6);
+ let read_ts_us_neg = found_row.get_timestamp_ntz(25, 6);
assert_eq!(
read_ts_us_neg.get_millisecond(),
col_timestamp_us_neg.get_millisecond(),
@@ -924,7 +933,7 @@ mod table_test {
"col_timestamp_us_neg nanos mismatch"
);
- let read_ts_ns_neg = found_row.get_timestamp_ntz(25, 9);
+ let read_ts_ns_neg = found_row.get_timestamp_ntz(26, 9);
assert_eq!(
read_ts_ns_neg.get_millisecond(),
col_timestamp_ns_neg.get_millisecond(),
@@ -936,7 +945,7 @@ mod table_test {
"col_timestamp_ns_neg nanos mismatch"
);
- let read_ts_ltz_us_neg = found_row.get_timestamp_ltz(26, 6);
+ let read_ts_ltz_us_neg = found_row.get_timestamp_ltz(27, 6);
assert_eq!(
read_ts_ltz_us_neg.get_epoch_millisecond(),
col_timestamp_ltz_us_neg.get_epoch_millisecond(),
@@ -948,7 +957,7 @@ mod table_test {
"col_timestamp_ltz_us_neg nanos mismatch"
);
- let read_ts_ltz_ns_neg = found_row.get_timestamp_ltz(27, 9);
+ let read_ts_ltz_ns_neg = found_row.get_timestamp_ltz(28, 9);
assert_eq!(
read_ts_ltz_ns_neg.get_epoch_millisecond(),
col_timestamp_ltz_ns_neg.get_epoch_millisecond(),