This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new ed646e2 feat: Arrow serialization for decimal and temporal types
(#196)
ed646e2 is described below
commit ed646e28881cd4734f4dab2e2ba79206ec79d7dc
Author: Anton Borisov <[email protected]>
AuthorDate: Fri Jan 23 03:34:58 2026 +0000
feat: Arrow serialization for decimal and temporal types (#196)
---
crates/fluss/src/client/table/log_fetch_buffer.rs | 12 +-
crates/fluss/src/client/table/scanner.rs | 8 +-
crates/fluss/src/client/write/accumulator.rs | 2 +-
crates/fluss/src/client/write/batch.rs | 8 +-
crates/fluss/src/record/arrow.rs | 569 +++++++++++++++++-----
crates/fluss/src/row/datum.rs | 468 +++++++++++++++++-
6 files changed, 916 insertions(+), 151 deletions(-)
diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index ca0a253..214a79c 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -657,13 +657,13 @@ mod tests {
use std::sync::Arc;
use std::time::Duration;
- fn test_read_context() -> ReadContext {
+ fn test_read_context() -> Result<ReadContext> {
let row_type = RowType::new(vec![DataField::new(
"id".to_string(),
DataTypes::int(),
None,
)]);
- ReadContext::new(to_arrow_schema(&row_type), false)
+ Ok(ReadContext::new(to_arrow_schema(&row_type)?, false))
}
struct ErrorPendingFetch {
@@ -689,7 +689,7 @@ mod tests {
#[tokio::test]
async fn await_not_empty_returns_wakeup_error() {
- let buffer = LogFetchBuffer::new(test_read_context());
+ let buffer = LogFetchBuffer::new(test_read_context().unwrap());
buffer.wakeup();
let result = buffer.await_not_empty(Duration::from_millis(10)).await;
@@ -698,7 +698,7 @@ mod tests {
#[tokio::test]
async fn await_not_empty_returns_pending_error() {
- let buffer = LogFetchBuffer::new(test_read_context());
+ let buffer = LogFetchBuffer::new(test_read_context().unwrap());
let table_bucket = TableBucket::new(1, 0);
buffer.pend(Box::new(ErrorPendingFetch {
table_bucket: table_bucket.clone(),
@@ -728,7 +728,7 @@ mod tests {
compression_type: ArrowCompressionType::None,
compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
},
- );
+ )?;
let mut row = GenericRow::new();
row.set_field(0, 1_i32);
@@ -738,7 +738,7 @@ mod tests {
let data = builder.build()?;
let log_records = LogRecordsBatches::new(data.clone());
- let read_context = ReadContext::new(to_arrow_schema(&row_type), false);
+ let read_context = ReadContext::new(to_arrow_schema(&row_type)?,
false);
let mut fetch = DefaultCompletedFetch::new(
TableBucket::new(1, 0),
log_records,
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index e9b2ce1..cf0b257 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -470,7 +470,7 @@ impl LogFetcher {
log_scanner_status: Arc<LogScannerStatus>,
projected_fields: Option<Vec<usize>>,
) -> Result<Self> {
- let full_arrow_schema = to_arrow_schema(table_info.get_row_type());
+ let full_arrow_schema = to_arrow_schema(table_info.get_row_type())?;
let read_context =
Self::create_read_context(full_arrow_schema.clone(),
projected_fields.clone(), false)?;
let remote_read_context =
@@ -1445,7 +1445,7 @@ mod tests {
compression_type: ArrowCompressionType::None,
compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
},
- );
+ )?;
let record = WriteRecord::for_append(
table_path,
1,
@@ -1477,7 +1477,7 @@ mod tests {
let data = build_records(&table_info, Arc::new(table_path))?;
let log_records = LogRecordsBatches::new(data.clone());
- let read_context =
ReadContext::new(to_arrow_schema(table_info.get_row_type()), false);
+ let read_context =
ReadContext::new(to_arrow_schema(table_info.get_row_type())?, false);
let completed =
DefaultCompletedFetch::new(bucket.clone(), log_records,
data.len(), read_context, 0, 0);
fetcher.log_fetch_buffer.add(Box::new(completed));
@@ -1506,7 +1506,7 @@ mod tests {
let bucket = TableBucket::new(1, 0);
let data = build_records(&table_info, Arc::new(table_path))?;
let log_records = LogRecordsBatches::new(data.clone());
- let read_context =
ReadContext::new(to_arrow_schema(table_info.get_row_type()), false);
+ let read_context =
ReadContext::new(to_arrow_schema(table_info.get_row_type())?, false);
let mut completed: Box<dyn CompletedFetch> =
Box::new(DefaultCompletedFetch::new(
bucket,
log_records,
diff --git a/crates/fluss/src/client/write/accumulator.rs
b/crates/fluss/src/client/write/accumulator.rs
index fb7b544..46c822c 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -112,7 +112,7 @@ impl RecordAccumulator {
bucket_id,
current_time_ms(),
matches!(&record.record,
Record::Log(LogWriteRecord::RecordBatch(_))),
- )),
+ )?),
Record::Kv(kv_record) => Kv(KvWriteBatch::new(
self.batch_id.fetch_add(1, Ordering::Relaxed),
table_path.as_ref().clone(),
diff --git a/crates/fluss/src/client/write/batch.rs
b/crates/fluss/src/client/write/batch.rs
index 159e313..78381c6 100644
--- a/crates/fluss/src/client/write/batch.rs
+++ b/crates/fluss/src/client/write/batch.rs
@@ -197,18 +197,18 @@ impl ArrowLogWriteBatch {
bucket_id: BucketId,
create_ms: i64,
to_append_record_batch: bool,
- ) -> Self {
+ ) -> Result<Self> {
let base = InnerWriteBatch::new(batch_id, table_path, create_ms,
bucket_id);
- Self {
+ Ok(Self {
write_batch: base,
arrow_builder: MemoryLogRecordsArrowBuilder::new(
schema_id,
row_type,
to_append_record_batch,
arrow_compression_info,
- ),
+ )?,
built_records: None,
- }
+ })
}
pub fn batch_id(&self) -> i64 {
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 3c94b72..39114d3 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -22,9 +22,12 @@ use crate::metadata::{DataType, RowType};
use crate::record::{ChangeType, ScanRecord};
use crate::row::{ColumnarRow, GenericRow};
use arrow::array::{
- ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Float32Builder,
Float64Builder,
- Int8Builder, Int16Builder, Int32Builder, Int64Builder, StringBuilder,
UInt8Builder,
- UInt16Builder, UInt32Builder, UInt64Builder,
+ ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder,
Decimal128Builder,
+ Float32Builder, Float64Builder, Int8Builder, Int16Builder, Int32Builder,
Int64Builder,
+ StringBuilder, Time32MillisecondBuilder, Time32SecondBuilder,
Time64MicrosecondBuilder,
+ Time64NanosecondBuilder, TimestampMicrosecondBuilder,
TimestampMillisecondBuilder,
+ TimestampNanosecondBuilder, TimestampSecondBuilder, UInt8Builder,
UInt16Builder, UInt32Builder,
+ UInt64Builder,
};
use arrow::{
array::RecordBatch,
@@ -42,7 +45,6 @@ use byteorder::WriteBytesExt;
use byteorder::{ByteOrder, LittleEndian};
use bytes::Bytes;
use crc32c::crc32c;
-use parking_lot::Mutex;
use std::{
io::{Cursor, Write},
sync::Arc,
@@ -113,7 +115,7 @@ pub struct MemoryLogRecordsArrowBuilder {
}
pub trait ArrowRecordBatchInnerBuilder: Send + Sync {
- fn build_arrow_record_batch(&self) -> Result<Arc<RecordBatch>>;
+ fn build_arrow_record_batch(&mut self) -> Result<Arc<RecordBatch>>;
fn append(&mut self, row: &GenericRow) -> Result<bool>;
@@ -133,7 +135,7 @@ pub struct PrebuiltRecordBatchBuilder {
}
impl ArrowRecordBatchInnerBuilder for PrebuiltRecordBatchBuilder {
- fn build_arrow_record_batch(&self) -> Result<Arc<RecordBatch>> {
+ fn build_arrow_record_batch(&mut self) -> Result<Arc<RecordBatch>> {
Ok(self.arrow_record_batch.as_ref().unwrap().clone())
}
@@ -167,66 +169,132 @@ impl ArrowRecordBatchInnerBuilder for
PrebuiltRecordBatchBuilder {
pub struct RowAppendRecordBatchBuilder {
table_schema: SchemaRef,
- arrow_column_builders: Mutex<Vec<Box<dyn ArrayBuilder>>>,
+ arrow_column_builders: Vec<Box<dyn ArrayBuilder>>,
records_count: i32,
}
impl RowAppendRecordBatchBuilder {
- pub fn new(row_type: &RowType) -> Self {
- let schema_ref = to_arrow_schema(row_type);
- let builders = Mutex::new(
- schema_ref
- .fields()
- .iter()
- .map(|field| Self::create_builder(field.data_type()))
- .collect(),
- );
- Self {
+ pub fn new(row_type: &RowType) -> Result<Self> {
+ let schema_ref = to_arrow_schema(row_type)?;
+ let builders: Result<Vec<_>> = schema_ref
+ .fields()
+ .iter()
+ .map(|field| Self::create_builder(field.data_type()))
+ .collect();
+ Ok(Self {
table_schema: schema_ref.clone(),
- arrow_column_builders: builders,
+ arrow_column_builders: builders?,
records_count: 0,
- }
+ })
}
- fn create_builder(data_type: &arrow_schema::DataType) -> Box<dyn
ArrayBuilder> {
+ fn create_builder(data_type: &arrow_schema::DataType) -> Result<Box<dyn
ArrayBuilder>> {
match data_type {
- arrow_schema::DataType::Int8 => Box::new(Int8Builder::new()),
- arrow_schema::DataType::Int16 => Box::new(Int16Builder::new()),
- arrow_schema::DataType::Int32 => Box::new(Int32Builder::new()),
- arrow_schema::DataType::Int64 => Box::new(Int64Builder::new()),
- arrow_schema::DataType::UInt8 => Box::new(UInt8Builder::new()),
- arrow_schema::DataType::UInt16 => Box::new(UInt16Builder::new()),
- arrow_schema::DataType::UInt32 => Box::new(UInt32Builder::new()),
- arrow_schema::DataType::UInt64 => Box::new(UInt64Builder::new()),
- arrow_schema::DataType::Float32 => Box::new(Float32Builder::new()),
- arrow_schema::DataType::Float64 => Box::new(Float64Builder::new()),
- arrow_schema::DataType::Boolean => Box::new(BooleanBuilder::new()),
- arrow_schema::DataType::Utf8 => Box::new(StringBuilder::new()),
- arrow_schema::DataType::Binary => Box::new(BinaryBuilder::new()),
- dt => panic!("Unsupported data type: {dt:?}"),
+ arrow_schema::DataType::Int8 => Ok(Box::new(Int8Builder::new())),
+ arrow_schema::DataType::Int16 => Ok(Box::new(Int16Builder::new())),
+ arrow_schema::DataType::Int32 => Ok(Box::new(Int32Builder::new())),
+ arrow_schema::DataType::Int64 => Ok(Box::new(Int64Builder::new())),
+ arrow_schema::DataType::UInt8 => Ok(Box::new(UInt8Builder::new())),
+ arrow_schema::DataType::UInt16 =>
Ok(Box::new(UInt16Builder::new())),
+ arrow_schema::DataType::UInt32 =>
Ok(Box::new(UInt32Builder::new())),
+ arrow_schema::DataType::UInt64 =>
Ok(Box::new(UInt64Builder::new())),
+ arrow_schema::DataType::Float32 =>
Ok(Box::new(Float32Builder::new())),
+ arrow_schema::DataType::Float64 =>
Ok(Box::new(Float64Builder::new())),
+ arrow_schema::DataType::Boolean =>
Ok(Box::new(BooleanBuilder::new())),
+ arrow_schema::DataType::Utf8 => Ok(Box::new(StringBuilder::new())),
+ arrow_schema::DataType::Binary =>
Ok(Box::new(BinaryBuilder::new())),
+ arrow_schema::DataType::Decimal128(precision, scale) => {
+ let builder = Decimal128Builder::new()
+ .with_precision_and_scale(*precision, *scale)
+ .map_err(|e| Error::IllegalArgument {
+ message: format!(
+ "Invalid decimal precision {} or scale {}: {}",
+ precision, scale, e
+ ),
+ })?;
+ Ok(Box::new(builder))
+ }
+ arrow_schema::DataType::Date32 =>
Ok(Box::new(Date32Builder::new())),
+ arrow_schema::DataType::Time32(unit) => match unit {
+ arrow_schema::TimeUnit::Second =>
Ok(Box::new(Time32SecondBuilder::new())),
+ arrow_schema::TimeUnit::Millisecond => {
+ Ok(Box::new(Time32MillisecondBuilder::new()))
+ }
+ _ => Err(Error::IllegalArgument {
+ message: format!(
+ "Time32 only supports Second and Millisecond units,
got: {:?}",
+ unit
+ ),
+ }),
+ },
+ arrow_schema::DataType::Time64(unit) => match unit {
+ arrow_schema::TimeUnit::Microsecond => {
+ Ok(Box::new(Time64MicrosecondBuilder::new()))
+ }
+ arrow_schema::TimeUnit::Nanosecond =>
Ok(Box::new(Time64NanosecondBuilder::new())),
+ _ => Err(Error::IllegalArgument {
+ message: format!(
+ "Time64 only supports Microsecond and Nanosecond
units, got: {:?}",
+ unit
+ ),
+ }),
+ },
+ arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second,
_) => {
+ Ok(Box::new(TimestampSecondBuilder::new()))
+ }
+
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => {
+ Ok(Box::new(TimestampMillisecondBuilder::new()))
+ }
+
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => {
+ Ok(Box::new(TimestampMicrosecondBuilder::new()))
+ }
+
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
+ Ok(Box::new(TimestampNanosecondBuilder::new()))
+ }
+ dt => Err(Error::IllegalArgument {
+ message: format!("Unsupported data type: {dt:?}"),
+ }),
}
}
}
impl ArrowRecordBatchInnerBuilder for RowAppendRecordBatchBuilder {
- fn build_arrow_record_batch(&self) -> Result<Arc<RecordBatch>> {
- let arrays = self
+ fn build_arrow_record_batch(&mut self) -> Result<Arc<RecordBatch>> {
+ let arrays: Result<Vec<ArrayRef>> = self
.arrow_column_builders
- .lock()
.iter_mut()
- .map(|b| b.finish())
- .collect::<Vec<ArrayRef>>();
+ .enumerate()
+ .map(|(idx, b)| {
+ let array = b.finish();
+ let expected_type = self.table_schema.field(idx).data_type();
+
+ // Validate array type matches schema
+ if array.data_type() != expected_type {
+ return Err(Error::IllegalArgument {
+ message: format!(
+ "Builder type mismatch at column {}: expected
{:?}, got {:?}",
+ idx,
+ expected_type,
+ array.data_type()
+ ),
+ });
+ }
+
+ Ok(array)
+ })
+ .collect();
+
Ok(Arc::new(RecordBatch::try_new(
self.table_schema.clone(),
- arrays,
+ arrays?,
)?))
}
fn append(&mut self, row: &GenericRow) -> Result<bool> {
for (idx, value) in row.values.iter().enumerate() {
- let mut builder_binding = self.arrow_column_builders.lock();
- let builder = builder_binding.get_mut(idx).unwrap();
- value.append_to(builder.as_mut())?;
+ let field_type = self.table_schema.field(idx).data_type();
+ let builder = self.arrow_column_builders.get_mut(idx).unwrap();
+ value.append_to(builder.as_mut(), field_type)?;
}
self.records_count += 1;
Ok(true)
@@ -255,15 +323,15 @@ impl MemoryLogRecordsArrowBuilder {
row_type: &RowType,
to_append_record_batch: bool,
arrow_compression_info: ArrowCompressionInfo,
- ) -> Self {
+ ) -> Result<Self> {
let arrow_batch_builder: Box<dyn ArrowRecordBatchInnerBuilder> = {
if to_append_record_batch {
Box::new(PrebuiltRecordBatchBuilder::default())
} else {
- Box::new(RowAppendRecordBatchBuilder::new(row_type))
+ Box::new(RowAppendRecordBatchBuilder::new(row_type)?)
}
};
- MemoryLogRecordsArrowBuilder {
+ Ok(MemoryLogRecordsArrowBuilder {
base_log_offset: BUILDER_DEFAULT_OFFSET,
schema_id,
magic: CURRENT_LOG_MAGIC_VALUE,
@@ -272,7 +340,7 @@ impl MemoryLogRecordsArrowBuilder {
is_closed: false,
arrow_record_batch_builder: arrow_batch_builder,
arrow_compression_info,
- }
+ })
}
pub fn append(&mut self, record: &WriteRecord) -> Result<bool> {
@@ -302,7 +370,7 @@ impl MemoryLogRecordsArrowBuilder {
self.is_closed = true;
}
- pub fn build(&self) -> Result<Vec<u8>> {
+ pub fn build(&mut self) -> Result<Vec<u8>> {
// serialize arrow batch
let mut arrow_batch_bytes = vec![];
let table_schema = self.arrow_record_batch_builder.schema();
@@ -641,24 +709,24 @@ fn parse_ipc_message(
Ok((batch_metadata, body_buffer, message.version()))
}
-pub fn to_arrow_schema(fluss_schema: &RowType) -> SchemaRef {
- let fields: Vec<Field> = fluss_schema
+pub fn to_arrow_schema(fluss_schema: &RowType) -> Result<SchemaRef> {
+ let fields: Result<Vec<Field>> = fluss_schema
.fields()
.iter()
.map(|f| {
- Field::new(
+ Ok(Field::new(
f.name(),
- to_arrow_type(f.data_type()),
+ to_arrow_type(f.data_type())?,
f.data_type().is_nullable(),
- )
+ ))
})
.collect();
- SchemaRef::new(arrow_schema::Schema::new(fields))
+ Ok(SchemaRef::new(arrow_schema::Schema::new(fields?)))
}
-pub fn to_arrow_type(fluss_type: &DataType) -> ArrowDataType {
- match fluss_type {
+pub fn to_arrow_type(fluss_type: &DataType) -> Result<ArrowDataType> {
+ Ok(match fluss_type {
DataType::Boolean(_) => ArrowDataType::Boolean,
DataType::TinyInt(_) => ArrowDataType::Int8,
DataType::SmallInt(_) => ArrowDataType::Int16,
@@ -668,58 +736,91 @@ pub fn to_arrow_type(fluss_type: &DataType) ->
ArrowDataType {
DataType::Double(_) => ArrowDataType::Float64,
DataType::Char(_) => ArrowDataType::Utf8,
DataType::String(_) => ArrowDataType::Utf8,
- DataType::Decimal(decimal_type) => ArrowDataType::Decimal128(
- decimal_type
- .precision()
- .try_into()
- .expect("precision exceeds u8::MAX"),
- decimal_type
+ DataType::Decimal(decimal_type) => {
+ let precision =
+ decimal_type
+ .precision()
+ .try_into()
+ .map_err(|_| Error::IllegalArgument {
+ message: format!(
+ "Decimal precision {} exceeds Arrow's maximum
(u8::MAX)",
+ decimal_type.precision()
+ ),
+ })?;
+ let scale = decimal_type
.scale()
.try_into()
- .expect("scale exceeds i8::MAX"),
- ),
+ .map_err(|_| Error::IllegalArgument {
+ message: format!(
+ "Decimal scale {} exceeds Arrow's maximum (i8::MAX)",
+ decimal_type.scale()
+ ),
+ })?;
+ ArrowDataType::Decimal128(precision, scale)
+ }
DataType::Date(_) => ArrowDataType::Date32,
DataType::Time(time_type) => match time_type.precision() {
0 => ArrowDataType::Time32(arrow_schema::TimeUnit::Second),
1..=3 =>
ArrowDataType::Time32(arrow_schema::TimeUnit::Millisecond),
4..=6 =>
ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond),
7..=9 => ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond),
- // This arm should never be reached due to validation in TimeType.
- invalid => panic!("Invalid precision value for TimeType:
{invalid}"),
+ invalid => {
+ return Err(Error::IllegalArgument {
+ message: format!("Invalid precision {} for TimeType (must
be 0-9)", invalid),
+ });
+ }
},
DataType::Timestamp(timestamp_type) => match
timestamp_type.precision() {
0 => ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second,
None),
1..=3 =>
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
4..=6 =>
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None),
7..=9 =>
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None),
- // This arm should never be reached due to validation in Timestamp.
- invalid => panic!("Invalid precision value for TimestampType:
{invalid}"),
+ invalid => {
+ return Err(Error::IllegalArgument {
+ message: format!(
+ "Invalid precision {} for TimestampType (must be 0-9)",
+ invalid
+ ),
+ });
+ }
},
DataType::TimestampLTz(timestamp_ltz_type) => match
timestamp_ltz_type.precision() {
0 => ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second,
None),
1..=3 =>
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
4..=6 =>
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None),
7..=9 =>
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None),
- // This arm should never be reached due to validation in
TimestampLTz.
- invalid => panic!("Invalid precision value for TimestampLTzType:
{invalid}"),
+ invalid => {
+ return Err(Error::IllegalArgument {
+ message: format!(
+ "Invalid precision {} for TimestampLTzType (must be
0-9)",
+ invalid
+ ),
+ });
+ }
},
DataType::Bytes(_) => ArrowDataType::Binary,
- DataType::Binary(binary_type) => ArrowDataType::FixedSizeBinary(
- binary_type
+ DataType::Binary(binary_type) => {
+ let length = binary_type
.length()
.try_into()
- .expect("length exceeds i32::MAX"),
- ),
+ .map_err(|_| Error::IllegalArgument {
+ message: format!(
+ "Binary length {} exceeds Arrow's maximum (i32::MAX)",
+ binary_type.length()
+ ),
+ })?;
+ ArrowDataType::FixedSizeBinary(length)
+ }
DataType::Array(array_type) => ArrowDataType::List(
Field::new_list_field(
- to_arrow_type(array_type.get_element_type()),
+ to_arrow_type(array_type.get_element_type())?,
fluss_type.is_nullable(),
)
.into(),
),
DataType::Map(map_type) => {
- let key_type = to_arrow_type(map_type.key_type());
- let value_type = to_arrow_type(map_type.value_type());
+ let key_type = to_arrow_type(map_type.key_type())?;
+ let value_type = to_arrow_type(map_type.value_type())?;
let entry_fields = vec![
Field::new("key", key_type, map_type.key_type().is_nullable()),
Field::new("value", value_type,
map_type.value_type().is_nullable()),
@@ -733,20 +834,21 @@ pub fn to_arrow_type(fluss_type: &DataType) ->
ArrowDataType {
false,
)
}
- DataType::Row(row_type) =>
ArrowDataType::Struct(arrow_schema::Fields::from(
- row_type
+ DataType::Row(row_type) => {
+ let fields: Result<Vec<Field>> = row_type
.fields()
.iter()
.map(|f| {
- Field::new(
+ Ok(Field::new(
f.name(),
- to_arrow_type(f.data_type()),
+ to_arrow_type(f.data_type())?,
f.data_type().is_nullable(),
- )
+ ))
})
- .collect::<Vec<Field>>(),
- )),
- }
+ .collect();
+ ArrowDataType::Struct(arrow_schema::Fields::from(fields?))
+ }
+ })
}
#[derive(Clone)]
@@ -1059,81 +1161,114 @@ mod tests {
#[test]
fn test_to_array_type() {
- assert_eq!(to_arrow_type(&DataTypes::boolean()),
ArrowDataType::Boolean);
- assert_eq!(to_arrow_type(&DataTypes::tinyint()), ArrowDataType::Int8);
- assert_eq!(to_arrow_type(&DataTypes::smallint()),
ArrowDataType::Int16);
- assert_eq!(to_arrow_type(&DataTypes::bigint()), ArrowDataType::Int64);
- assert_eq!(to_arrow_type(&DataTypes::int()), ArrowDataType::Int32);
- assert_eq!(to_arrow_type(&DataTypes::float()), ArrowDataType::Float32);
- assert_eq!(to_arrow_type(&DataTypes::double()),
ArrowDataType::Float64);
- assert_eq!(to_arrow_type(&DataTypes::char(16)), ArrowDataType::Utf8);
- assert_eq!(to_arrow_type(&DataTypes::string()), ArrowDataType::Utf8);
assert_eq!(
- to_arrow_type(&DataTypes::decimal(10, 2)),
+ to_arrow_type(&DataTypes::boolean()).unwrap(),
+ ArrowDataType::Boolean
+ );
+ assert_eq!(
+ to_arrow_type(&DataTypes::tinyint()).unwrap(),
+ ArrowDataType::Int8
+ );
+ assert_eq!(
+ to_arrow_type(&DataTypes::smallint()).unwrap(),
+ ArrowDataType::Int16
+ );
+ assert_eq!(
+ to_arrow_type(&DataTypes::bigint()).unwrap(),
+ ArrowDataType::Int64
+ );
+ assert_eq!(
+ to_arrow_type(&DataTypes::int()).unwrap(),
+ ArrowDataType::Int32
+ );
+ assert_eq!(
+ to_arrow_type(&DataTypes::float()).unwrap(),
+ ArrowDataType::Float32
+ );
+ assert_eq!(
+ to_arrow_type(&DataTypes::double()).unwrap(),
+ ArrowDataType::Float64
+ );
+ assert_eq!(
+ to_arrow_type(&DataTypes::char(16)).unwrap(),
+ ArrowDataType::Utf8
+ );
+ assert_eq!(
+ to_arrow_type(&DataTypes::string()).unwrap(),
+ ArrowDataType::Utf8
+ );
+ assert_eq!(
+ to_arrow_type(&DataTypes::decimal(10, 2)).unwrap(),
ArrowDataType::Decimal128(10, 2)
);
- assert_eq!(to_arrow_type(&DataTypes::date()), ArrowDataType::Date32);
assert_eq!(
- to_arrow_type(&DataTypes::time()),
+ to_arrow_type(&DataTypes::date()).unwrap(),
+ ArrowDataType::Date32
+ );
+ assert_eq!(
+ to_arrow_type(&DataTypes::time()).unwrap(),
ArrowDataType::Time32(arrow_schema::TimeUnit::Second)
);
assert_eq!(
- to_arrow_type(&DataTypes::time_with_precision(3)),
+ to_arrow_type(&DataTypes::time_with_precision(3)).unwrap(),
ArrowDataType::Time32(arrow_schema::TimeUnit::Millisecond)
);
assert_eq!(
- to_arrow_type(&DataTypes::time_with_precision(6)),
+ to_arrow_type(&DataTypes::time_with_precision(6)).unwrap(),
ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond)
);
assert_eq!(
- to_arrow_type(&DataTypes::time_with_precision(9)),
+ to_arrow_type(&DataTypes::time_with_precision(9)).unwrap(),
ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond)
);
assert_eq!(
- to_arrow_type(&DataTypes::timestamp_with_precision(0)),
+ to_arrow_type(&DataTypes::timestamp_with_precision(0)).unwrap(),
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second, None)
);
assert_eq!(
- to_arrow_type(&DataTypes::timestamp_with_precision(3)),
+ to_arrow_type(&DataTypes::timestamp_with_precision(3)).unwrap(),
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None)
);
assert_eq!(
- to_arrow_type(&DataTypes::timestamp_with_precision(6)),
+ to_arrow_type(&DataTypes::timestamp_with_precision(6)).unwrap(),
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
);
assert_eq!(
- to_arrow_type(&DataTypes::timestamp_with_precision(9)),
+ to_arrow_type(&DataTypes::timestamp_with_precision(9)).unwrap(),
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None)
);
assert_eq!(
- to_arrow_type(&DataTypes::timestamp_ltz_with_precision(0)),
+
to_arrow_type(&DataTypes::timestamp_ltz_with_precision(0)).unwrap(),
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second, None)
);
assert_eq!(
- to_arrow_type(&DataTypes::timestamp_ltz_with_precision(3)),
+
to_arrow_type(&DataTypes::timestamp_ltz_with_precision(3)).unwrap(),
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None)
);
assert_eq!(
- to_arrow_type(&DataTypes::timestamp_ltz_with_precision(6)),
+
to_arrow_type(&DataTypes::timestamp_ltz_with_precision(6)).unwrap(),
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
);
assert_eq!(
- to_arrow_type(&DataTypes::timestamp_ltz_with_precision(9)),
+
to_arrow_type(&DataTypes::timestamp_ltz_with_precision(9)).unwrap(),
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None)
);
- assert_eq!(to_arrow_type(&DataTypes::bytes()), ArrowDataType::Binary);
assert_eq!(
- to_arrow_type(&DataTypes::binary(16)),
+ to_arrow_type(&DataTypes::bytes()).unwrap(),
+ ArrowDataType::Binary
+ );
+ assert_eq!(
+ to_arrow_type(&DataTypes::binary(16)).unwrap(),
ArrowDataType::FixedSizeBinary(16)
);
assert_eq!(
- to_arrow_type(&DataTypes::array(DataTypes::int())),
+ to_arrow_type(&DataTypes::array(DataTypes::int())).unwrap(),
ArrowDataType::List(Field::new_list_field(ArrowDataType::Int32,
true).into())
);
assert_eq!(
- to_arrow_type(&DataTypes::map(DataTypes::string(),
DataTypes::int())),
+ to_arrow_type(&DataTypes::map(DataTypes::string(),
DataTypes::int())).unwrap(),
ArrowDataType::Map(
Arc::new(Field::new(
"entries",
@@ -1151,7 +1286,8 @@ mod tests {
to_arrow_type(&DataTypes::row(vec![
DataTypes::field("f1".to_string(), DataTypes::int()),
DataTypes::field("f2".to_string(), DataTypes::string()),
- ])),
+ ]))
+ .unwrap(),
ArrowDataType::Struct(arrow_schema::Fields::from(vec![
Field::new("f1", ArrowDataType::Int32, true),
Field::new("f2", ArrowDataType::Utf8, true),
@@ -1215,7 +1351,7 @@ mod tests {
DataField::new("id".to_string(), DataTypes::int(), None),
DataField::new("name".to_string(), DataTypes::string(), None),
]);
- let schema = to_arrow_schema(&row_type);
+ let schema = to_arrow_schema(&row_type).unwrap();
let result = ReadContext::with_projection_pushdown(schema, vec![0, 2],
false);
assert!(matches!(result, Err(IllegalArgument { .. })));
@@ -1249,4 +1385,209 @@ mod tests {
}
out
}
+
+ #[test]
+ fn test_temporal_and_decimal_builder_validation() {
+ use arrow::array::Array;
+
+ // Test valid builder creation with precision=10, scale=2
+ let mut builder =
+
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(10,
2)).unwrap();
+ let decimal_builder = builder
+ .as_any_mut()
+ .downcast_mut::<Decimal128Builder>()
+ .expect("Expected Decimal128Builder");
+ // Verify precision and scale
+ let array = decimal_builder.finish();
+ assert_eq!(array.data_type(), &ArrowDataType::Decimal128(10, 2));
+
+ // Test error case: invalid precision/scale
+ let result =
+
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(100,
50));
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn test_decimal_rescaling_and_validation() -> Result<()> {
+ use crate::row::{Datum, Decimal, GenericRow};
+ use arrow::array::Decimal128Array;
+ use bigdecimal::BigDecimal;
+ use std::str::FromStr;
+
+ // Test 1: Rescaling from scale 3 to scale 2
+ let row_type = RowType::new(vec![DataField::new(
+ "amount".to_string(),
+ DataTypes::decimal(10, 2),
+ None,
+ )]);
+ let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?;
+ let decimal =
Decimal::from_big_decimal(BigDecimal::from_str("123.456").unwrap(), 10, 3)?;
+ builder.append(&GenericRow {
+ values: vec![Datum::Decimal(decimal)],
+ })?;
+ let batch = builder.build_arrow_record_batch()?;
+ let array = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Decimal128Array>()
+ .unwrap();
+ assert_eq!(array.value(0), 12346); // 123.456 rounded to 2 decimal
places
+ assert_eq!(array.scale(), 2);
+
+ // Test 2: Precision overflow (should error)
+ let row_type = RowType::new(vec![DataField::new(
+ "amount".to_string(),
+ DataTypes::decimal(5, 2),
+ None,
+ )]);
+ let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?;
+ let decimal =
Decimal::from_big_decimal(BigDecimal::from_str("123456.78").unwrap(), 10, 2)?;
+ let result = builder.append(&GenericRow {
+ values: vec![Datum::Decimal(decimal)],
+ });
+ assert!(result.is_err());
+ assert!(
+ result
+ .unwrap_err()
+ .to_string()
+ .contains("precision overflow")
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_all_types_end_to_end() -> Result<()> {
+ use crate::row::{Date, Datum, Decimal, GenericRow, Time, TimestampLtz,
TimestampNtz};
+ use arrow::array::{
+ Date32Array, Decimal128Array, Int32Array, Time32MillisecondArray,
+ Time64NanosecondArray, TimestampMicrosecondArray,
TimestampNanosecondArray,
+ };
+ use bigdecimal::BigDecimal;
+ use std::str::FromStr;
+
+ // Schema with int, decimal, date, time (ms + ns), timestamps (μs + ns)
+ let row_type = RowType::new(vec![
+ DataField::new("id".to_string(), DataTypes::int(), None),
+ DataField::new("amount".to_string(), DataTypes::decimal(10, 2),
None),
+ DataField::new("date".to_string(), DataTypes::date(), None),
+ DataField::new(
+ "time_ms".to_string(),
+ DataTypes::time_with_precision(3),
+ None,
+ ),
+ DataField::new(
+ "time_ns".to_string(),
+ DataTypes::time_with_precision(9),
+ None,
+ ),
+ DataField::new(
+ "ts_us".to_string(),
+ DataTypes::timestamp_with_precision(6),
+ None,
+ ),
+ DataField::new(
+ "ts_ltz_ns".to_string(),
+ DataTypes::timestamp_ltz_with_precision(9),
+ None,
+ ),
+ ]);
+
+ let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?;
+
+ // Append rows with various data types
+ builder.append(&GenericRow {
+ values: vec![
+ Datum::Int32(1),
+ Datum::Decimal(Decimal::from_big_decimal(
+ BigDecimal::from_str("123.456").unwrap(),
+ 10,
+ 3,
+ )?),
+ // 18000 days since epoch = 2019-04-14
+ Datum::Date(Date::new(18000)),
+ // 43200000 ms = 12:00:00.000 (noon)
+ Datum::Time(Time::new(43200000)),
+ // 12345 ms = 00:00:12.345
+ Datum::Time(Time::new(12345)),
+ // 1609459200000 ms = 2021-01-01 00:00:00 UTC, with 123456
additional nanoseconds
+
Datum::TimestampNtz(TimestampNtz::from_millis_nanos(1609459200000, 123456)?),
+ // 1609459200000 ms = 2021-01-01 00:00:00 UTC, with 987654
additional nanoseconds
+
Datum::TimestampLtz(TimestampLtz::from_millis_nanos(1609459200000, 987654)?),
+ ],
+ })?;
+
+ let batch = builder.build_arrow_record_batch()?;
+
+ // Verify all conversions
+ assert_eq!(
+ batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap()
+ .value(0),
+ 1
+ );
+
+ let dec = batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<Decimal128Array>()
+ .unwrap();
+ assert_eq!(dec.value(0), 12346); // 123.456 rounded to 2 decimal places
+
+ assert_eq!(
+ batch
+ .column(2)
+ .as_any()
+ .downcast_ref::<Date32Array>()
+ .unwrap()
+ .value(0),
+ 18000
+ );
+
+ assert_eq!(
+ batch
+ .column(3)
+ .as_any()
+ .downcast_ref::<Time32MillisecondArray>()
+ .unwrap()
+ .value(0),
+ 43200000
+ );
+
+ assert_eq!(
+ batch
+ .column(4)
+ .as_any()
+ .downcast_ref::<Time64NanosecondArray>()
+ .unwrap()
+ .value(0),
+ 12345000000
+ );
+
+ // Timestamp with sub-millisecond nanos preserved
+ assert_eq!(
+ batch
+ .column(5)
+ .as_any()
+ .downcast_ref::<TimestampMicrosecondArray>()
+ .unwrap()
+ .value(0),
+ 1609459200000123
+ );
+
+ assert_eq!(
+ batch
+ .column(6)
+ .as_any()
+ .downcast_ref::<TimestampNanosecondArray>()
+ .unwrap()
+ .value(0),
+ 1609459200000987654
+ );
+
+ Ok(())
+ }
}
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index 5b21b38..7b3850f 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -19,9 +19,13 @@ use crate::error::Error::RowConvertError;
use crate::error::Result;
use crate::row::Decimal;
use arrow::array::{
- ArrayBuilder, BinaryBuilder, BooleanBuilder, Float32Builder,
Float64Builder, Int8Builder,
- Int16Builder, Int32Builder, Int64Builder, StringBuilder,
+ ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder,
Decimal128Builder, Float32Builder,
+ Float64Builder, Int8Builder, Int16Builder, Int32Builder, Int64Builder,
StringBuilder,
+ Time32MillisecondBuilder, Time32SecondBuilder, Time64MicrosecondBuilder,
+ Time64NanosecondBuilder, TimestampMicrosecondBuilder,
TimestampMillisecondBuilder,
+ TimestampNanosecondBuilder, TimestampSecondBuilder,
};
+use arrow::datatypes as arrow_schema;
use jiff::ToSpan;
use ordered_float::OrderedFloat;
use parse_display::Display;
@@ -83,6 +87,41 @@ impl Datum<'_> {
_ => panic!("not a blob: {self:?}"),
}
}
+
+ pub fn as_decimal(&self) -> &Decimal {
+ match self {
+ Self::Decimal(d) => d,
+ _ => panic!("not a decimal: {self:?}"),
+ }
+ }
+
+ pub fn as_date(&self) -> Date {
+ match self {
+ Self::Date(d) => *d,
+ _ => panic!("not a date: {self:?}"),
+ }
+ }
+
+ pub fn as_time(&self) -> Time {
+ match self {
+ Self::Time(t) => *t,
+ _ => panic!("not a time: {self:?}"),
+ }
+ }
+
+ pub fn as_timestamp_ntz(&self) -> TimestampNtz {
+ match self {
+ Self::TimestampNtz(ts) => *ts,
+ _ => panic!("not a timestamp ntz: {self:?}"),
+ }
+ }
+
+ pub fn as_timestamp_ltz(&self) -> TimestampLtz {
+ match self {
+ Self::TimestampLtz(ts) => *ts,
+ _ => panic!("not a timestamp ltz: {self:?}"),
+ }
+ }
}
// ----------- implement from
@@ -246,6 +285,66 @@ impl TryFrom<&Datum<'_>> for i8 {
}
}
+impl TryFrom<&Datum<'_>> for Decimal {
+ type Error = ();
+
+ #[inline]
+ fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
+ match from {
+ Datum::Decimal(d) => Ok(d.clone()),
+ _ => Err(()),
+ }
+ }
+}
+
+impl TryFrom<&Datum<'_>> for Date {
+ type Error = ();
+
+ #[inline]
+ fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
+ match from {
+ Datum::Date(d) => Ok(*d),
+ _ => Err(()),
+ }
+ }
+}
+
+impl TryFrom<&Datum<'_>> for Time {
+ type Error = ();
+
+ #[inline]
+ fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
+ match from {
+ Datum::Time(t) => Ok(*t),
+ _ => Err(()),
+ }
+ }
+}
+
+impl TryFrom<&Datum<'_>> for TimestampNtz {
+ type Error = ();
+
+ #[inline]
+ fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
+ match from {
+ Datum::TimestampNtz(ts) => Ok(*ts),
+ _ => Err(()),
+ }
+ }
+}
+
+impl TryFrom<&Datum<'_>> for TimestampLtz {
+ type Error = ();
+
+ #[inline]
+ fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
+ match from {
+ Datum::TimestampLtz(ts) => Ok(*ts),
+ _ => Err(()),
+ }
+ }
+}
+
impl<'a> From<bool> for Datum<'a> {
#[inline]
fn from(b: bool) -> Datum<'a> {
@@ -253,12 +352,103 @@ impl<'a> From<bool> for Datum<'a> {
}
}
+impl<'a> From<Decimal> for Datum<'a> {
+ #[inline]
+ fn from(d: Decimal) -> Datum<'a> {
+ Datum::Decimal(d)
+ }
+}
+
+impl<'a> From<Date> for Datum<'a> {
+ #[inline]
+ fn from(d: Date) -> Datum<'a> {
+ Datum::Date(d)
+ }
+}
+
+impl<'a> From<Time> for Datum<'a> {
+ #[inline]
+ fn from(t: Time) -> Datum<'a> {
+ Datum::Time(t)
+ }
+}
+
+impl<'a> From<TimestampNtz> for Datum<'a> {
+ #[inline]
+ fn from(ts: TimestampNtz) -> Datum<'a> {
+ Datum::TimestampNtz(ts)
+ }
+}
+
+impl<'a> From<TimestampLtz> for Datum<'a> {
+ #[inline]
+ fn from(ts: TimestampLtz) -> Datum<'a> {
+ Datum::TimestampLtz(ts)
+ }
+}
+
pub trait ToArrow {
- fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()>;
+ fn append_to(
+ &self,
+ builder: &mut dyn ArrayBuilder,
+ data_type: &arrow_schema::DataType,
+ ) -> Result<()>;
+}
+
+// Time unit conversion constants
+const MILLIS_PER_SECOND: i64 = 1_000;
+const MICROS_PER_MILLI: i64 = 1_000;
+const NANOS_PER_MILLI: i64 = 1_000_000;
+
+/// Converts milliseconds and nanoseconds-within-millisecond to total
microseconds.
+/// Returns an error if the conversion would overflow.
+fn millis_nanos_to_micros(millis: i64, nanos: i32) -> Result<i64> {
+ let millis_micros = millis
+ .checked_mul(MICROS_PER_MILLI)
+ .ok_or_else(|| RowConvertError {
+ message: format!(
+ "Timestamp milliseconds {} overflows when converting to
microseconds",
+ millis
+ ),
+ })?;
+ let nanos_micros = (nanos as i64) / MICROS_PER_MILLI;
+ millis_micros
+ .checked_add(nanos_micros)
+ .ok_or_else(|| RowConvertError {
+ message: format!(
+ "Timestamp overflow when adding microseconds: {} + {}",
+ millis_micros, nanos_micros
+ ),
+ })
+}
+
+/// Converts milliseconds and nanoseconds-within-millisecond to total
nanoseconds.
+/// Returns an error if the conversion would overflow.
+fn millis_nanos_to_nanos(millis: i64, nanos: i32) -> Result<i64> {
+ let millis_nanos = millis
+ .checked_mul(NANOS_PER_MILLI)
+ .ok_or_else(|| RowConvertError {
+ message: format!(
+ "Timestamp milliseconds {} overflows when converting to
nanoseconds",
+ millis
+ ),
+ })?;
+ millis_nanos
+ .checked_add(nanos as i64)
+ .ok_or_else(|| RowConvertError {
+ message: format!(
+ "Timestamp overflow when adding nanoseconds: {} + {}",
+ millis_nanos, nanos
+ ),
+ })
}
impl Datum<'_> {
- pub fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()> {
+ pub fn append_to(
+ &self,
+ builder: &mut dyn ArrayBuilder,
+ data_type: &arrow_schema::DataType,
+ ) -> Result<()> {
macro_rules! append_null_to_arrow {
($builder_type:ty) => {
if let Some(b) =
builder.as_any_mut().downcast_mut::<$builder_type>() {
@@ -288,6 +478,16 @@ impl Datum<'_> {
append_null_to_arrow!(Float64Builder);
append_null_to_arrow!(StringBuilder);
append_null_to_arrow!(BinaryBuilder);
+ append_null_to_arrow!(Decimal128Builder);
+ append_null_to_arrow!(Date32Builder);
+ append_null_to_arrow!(Time32SecondBuilder);
+ append_null_to_arrow!(Time32MillisecondBuilder);
+ append_null_to_arrow!(Time64MicrosecondBuilder);
+ append_null_to_arrow!(Time64NanosecondBuilder);
+ append_null_to_arrow!(TimestampSecondBuilder);
+ append_null_to_arrow!(TimestampMillisecondBuilder);
+ append_null_to_arrow!(TimestampMicrosecondBuilder);
+ append_null_to_arrow!(TimestampNanosecondBuilder);
}
Datum::Bool(v) => append_value_to_arrow!(BooleanBuilder, *v),
Datum::Int8(v) => append_value_to_arrow!(Int8Builder, *v),
@@ -298,16 +498,221 @@ impl Datum<'_> {
Datum::Float64(v) => append_value_to_arrow!(Float64Builder,
v.into_inner()),
Datum::String(v) => append_value_to_arrow!(StringBuilder,
v.as_ref()),
Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder,
v.as_ref()),
- Datum::Decimal(_)
- | Datum::Date(_)
- | Datum::Time(_)
- | Datum::TimestampNtz(_)
- | Datum::TimestampLtz(_) => {
+ Datum::Decimal(decimal) => {
+ // Extract target precision and scale from Arrow schema
+ let (p, s) = match data_type {
+ arrow_schema::DataType::Decimal128(p, s) => (*p, *s),
+ _ => {
+ return Err(RowConvertError {
+ message: format!(
+ "Expected Decimal128 Arrow type, got: {:?}",
+ data_type
+ ),
+ });
+ }
+ };
+
+ // Validate scale is non-negative (Fluss doesn't support
negative scales)
+ if s < 0 {
+ return Err(RowConvertError {
+ message: format!("Negative decimal scale {} is not
supported", s),
+ });
+ }
+
+ let target_precision = p as u32;
+ let target_scale = s as i64; // Safe now: 0..127 → 0i64..127i64
+
+ if let Some(b) =
builder.as_any_mut().downcast_mut::<Decimal128Builder>() {
+ use bigdecimal::RoundingMode;
+
+ // Rescale the decimal to match Arrow's target scale
+ let bd = decimal.to_big_decimal();
+ let rescaled = bd.with_scale_round(target_scale,
RoundingMode::HalfUp);
+ let (unscaled, _) = rescaled.as_bigint_and_exponent();
+
+ // Validate precision
+ let actual_precision =
Decimal::compute_precision(&unscaled);
+ if actual_precision > target_precision as usize {
+ return Err(RowConvertError {
+ message: format!(
+ "Decimal precision overflow: value has {}
digits but Arrow expects {} (value: {})",
+ actual_precision, target_precision, rescaled
+ ),
+ });
+ }
+
+ // Convert to i128 for Arrow
+ let i128_val: i128 = match unscaled.try_into() {
+ Ok(v) => v,
+ Err(_) => {
+ return Err(RowConvertError {
+ message: format!("Decimal value exceeds i128
range: {}", rescaled),
+ });
+ }
+ };
+
+ b.append_value(i128_val);
+ return Ok(());
+ }
+
return Err(RowConvertError {
- message: format!(
- "Type {:?} is not yet supported for Arrow conversion",
- std::mem::discriminant(self)
- ),
+ message: "Builder type mismatch for
Decimal128".to_string(),
+ });
+ }
+ Datum::Date(date) => {
+ append_value_to_arrow!(Date32Builder, date.get_inner());
+ }
+ Datum::Time(time) => {
+ // Time is stored as milliseconds since midnight in Fluss
+ // Convert to Arrow's time unit based on schema
+ let millis = time.get_inner();
+
+ match data_type {
+
arrow_schema::DataType::Time32(arrow_schema::TimeUnit::Second) => {
+ if let Some(b) =
builder.as_any_mut().downcast_mut::<Time32SecondBuilder>()
+ {
+ // Validate no sub-second precision is lost
+ if millis % MILLIS_PER_SECOND as i32 != 0 {
+ return Err(RowConvertError {
+ message: format!(
+ "Time value {} ms has sub-second
precision but schema expects seconds only",
+ millis
+ ),
+ });
+ }
+ b.append_value(millis / MILLIS_PER_SECOND as i32);
+ return Ok(());
+ }
+ }
+
arrow_schema::DataType::Time32(arrow_schema::TimeUnit::Millisecond) => {
+ if let Some(b) = builder
+ .as_any_mut()
+ .downcast_mut::<Time32MillisecondBuilder>()
+ {
+ b.append_value(millis);
+ return Ok(());
+ }
+ }
+
arrow_schema::DataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
+ if let Some(b) = builder
+ .as_any_mut()
+ .downcast_mut::<Time64MicrosecondBuilder>()
+ {
+ let micros = (millis as i64)
+ .checked_mul(MICROS_PER_MILLI)
+ .ok_or_else(|| RowConvertError {
+ message: format!(
+ "Time value {} ms overflows when
converting to microseconds",
+ millis
+ ),
+ })?;
+ b.append_value(micros);
+ return Ok(());
+ }
+ }
+
arrow_schema::DataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
+ if let Some(b) = builder
+ .as_any_mut()
+ .downcast_mut::<Time64NanosecondBuilder>()
+ {
+ let nanos = (millis as
i64).checked_mul(NANOS_PER_MILLI).ok_or_else(
+ || RowConvertError {
+ message: format!(
+ "Time value {} ms overflows when
converting to nanoseconds",
+ millis
+ ),
+ },
+ )?;
+ b.append_value(nanos);
+ return Ok(());
+ }
+ }
+ _ => {
+ return Err(RowConvertError {
+ message: format!(
+ "Expected Time32/Time64 Arrow type, got: {:?}",
+ data_type
+ ),
+ });
+ }
+ }
+
+ return Err(RowConvertError {
+ message: "Builder type mismatch for Time".to_string(),
+ });
+ }
+ Datum::TimestampNtz(ts) => {
+ let millis = ts.get_millisecond();
+ let nanos = ts.get_nano_of_millisecond();
+
+ if let Some(b) = builder
+ .as_any_mut()
+ .downcast_mut::<TimestampSecondBuilder>()
+ {
+ b.append_value(millis / MILLIS_PER_SECOND);
+ return Ok(());
+ }
+ if let Some(b) = builder
+ .as_any_mut()
+ .downcast_mut::<TimestampMillisecondBuilder>()
+ {
+ b.append_value(millis);
+ return Ok(());
+ }
+ if let Some(b) = builder
+ .as_any_mut()
+ .downcast_mut::<TimestampMicrosecondBuilder>()
+ {
+ b.append_value(millis_nanos_to_micros(millis, nanos)?);
+ return Ok(());
+ }
+ if let Some(b) = builder
+ .as_any_mut()
+ .downcast_mut::<TimestampNanosecondBuilder>()
+ {
+ b.append_value(millis_nanos_to_nanos(millis, nanos)?);
+ return Ok(());
+ }
+
+ return Err(RowConvertError {
+ message: "Builder type mismatch for
TimestampNtz".to_string(),
+ });
+ }
+ Datum::TimestampLtz(ts) => {
+ let millis = ts.get_epoch_millisecond();
+ let nanos = ts.get_nano_of_millisecond();
+
+ if let Some(b) = builder
+ .as_any_mut()
+ .downcast_mut::<TimestampSecondBuilder>()
+ {
+ b.append_value(millis / MILLIS_PER_SECOND);
+ return Ok(());
+ }
+ if let Some(b) = builder
+ .as_any_mut()
+ .downcast_mut::<TimestampMillisecondBuilder>()
+ {
+ b.append_value(millis);
+ return Ok(());
+ }
+ if let Some(b) = builder
+ .as_any_mut()
+ .downcast_mut::<TimestampMicrosecondBuilder>()
+ {
+ b.append_value(millis_nanos_to_micros(millis, nanos)?);
+ return Ok(());
+ }
+ if let Some(b) = builder
+ .as_any_mut()
+ .downcast_mut::<TimestampNanosecondBuilder>()
+ {
+ b.append_value(millis_nanos_to_nanos(millis, nanos)?);
+ return Ok(());
+ }
+
+ return Err(RowConvertError {
+ message: "Builder type mismatch for
TimestampLtz".to_string(),
});
}
}
@@ -325,7 +730,11 @@ impl Datum<'_> {
macro_rules! impl_to_arrow {
($ty:ty, $variant:ident) => {
impl ToArrow for $ty {
- fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()> {
+ fn append_to(
+ &self,
+ builder: &mut dyn ArrayBuilder,
+ _data_type: &arrow_schema::DataType,
+ ) -> Result<()> {
if let Some(b) =
builder.as_any_mut().downcast_mut::<$variant>() {
b.append_value(*self);
Ok(())
@@ -536,24 +945,39 @@ mod tests {
assert_eq!(value, 42);
let value: std::result::Result<i16, _> = (&datum).try_into();
assert!(value.is_err());
+
+ // Test temporal types
+ let decimal = Decimal::from_unscaled_long(12345, 10, 2).unwrap();
+ let datum: Datum = decimal.clone().into();
+ assert_eq!(datum.as_decimal(), &decimal);
+ let extracted: Decimal = (&datum).try_into().unwrap();
+ assert_eq!(extracted, decimal);
+
+ let date = Date::new(19000);
+ let datum: Datum = date.into();
+ assert_eq!(datum.as_date(), date);
+
+ let ts_ltz = TimestampLtz::new(1672531200000);
+ let datum: Datum = ts_ltz.into();
+ assert_eq!(datum.as_timestamp_ltz(), ts_ltz);
}
#[test]
fn datum_append_to_builder() {
let mut builder = Int32Builder::new();
- Datum::Null.append_to(&mut builder).unwrap();
- Datum::Int32(5).append_to(&mut builder).unwrap();
+ Datum::Null
+ .append_to(&mut builder, &arrow_schema::DataType::Int32)
+ .unwrap();
+ Datum::Int32(5)
+ .append_to(&mut builder, &arrow_schema::DataType::Int32)
+ .unwrap();
let array = builder.finish();
assert!(array.is_null(0));
assert_eq!(array.value(1), 5);
let mut builder = StringBuilder::new();
- let err = Datum::Int32(1).append_to(&mut builder).unwrap_err();
- assert!(matches!(err, crate::error::Error::RowConvertError { .. }));
-
- let mut builder = Int32Builder::new();
- let err = Datum::Date(Date::new(0))
- .append_to(&mut builder)
+ let err = Datum::Int32(1)
+ .append_to(&mut builder, &arrow_schema::DataType::Utf8)
.unwrap_err();
assert!(matches!(err, crate::error::Error::RowConvertError { .. }));
}