This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new ed646e2  feat: Arrow serialization for decimal and temporal types 
(#196)
ed646e2 is described below

commit ed646e28881cd4734f4dab2e2ba79206ec79d7dc
Author: Anton Borisov <[email protected]>
AuthorDate: Fri Jan 23 03:34:58 2026 +0000

    feat: Arrow serialization for decimal and temporal types (#196)
---
 crates/fluss/src/client/table/log_fetch_buffer.rs |  12 +-
 crates/fluss/src/client/table/scanner.rs          |   8 +-
 crates/fluss/src/client/write/accumulator.rs      |   2 +-
 crates/fluss/src/client/write/batch.rs            |   8 +-
 crates/fluss/src/record/arrow.rs                  | 569 +++++++++++++++++-----
 crates/fluss/src/row/datum.rs                     | 468 +++++++++++++++++-
 6 files changed, 916 insertions(+), 151 deletions(-)

diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs 
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index ca0a253..214a79c 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -657,13 +657,13 @@ mod tests {
     use std::sync::Arc;
     use std::time::Duration;
 
-    fn test_read_context() -> ReadContext {
+    fn test_read_context() -> Result<ReadContext> {
         let row_type = RowType::new(vec![DataField::new(
             "id".to_string(),
             DataTypes::int(),
             None,
         )]);
-        ReadContext::new(to_arrow_schema(&row_type), false)
+        Ok(ReadContext::new(to_arrow_schema(&row_type)?, false))
     }
 
     struct ErrorPendingFetch {
@@ -689,7 +689,7 @@ mod tests {
 
     #[tokio::test]
     async fn await_not_empty_returns_wakeup_error() {
-        let buffer = LogFetchBuffer::new(test_read_context());
+        let buffer = LogFetchBuffer::new(test_read_context().unwrap());
         buffer.wakeup();
 
         let result = buffer.await_not_empty(Duration::from_millis(10)).await;
@@ -698,7 +698,7 @@ mod tests {
 
     #[tokio::test]
     async fn await_not_empty_returns_pending_error() {
-        let buffer = LogFetchBuffer::new(test_read_context());
+        let buffer = LogFetchBuffer::new(test_read_context().unwrap());
         let table_bucket = TableBucket::new(1, 0);
         buffer.pend(Box::new(ErrorPendingFetch {
             table_bucket: table_bucket.clone(),
@@ -728,7 +728,7 @@ mod tests {
                 compression_type: ArrowCompressionType::None,
                 compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
             },
-        );
+        )?;
 
         let mut row = GenericRow::new();
         row.set_field(0, 1_i32);
@@ -738,7 +738,7 @@ mod tests {
 
         let data = builder.build()?;
         let log_records = LogRecordsBatches::new(data.clone());
-        let read_context = ReadContext::new(to_arrow_schema(&row_type), false);
+        let read_context = ReadContext::new(to_arrow_schema(&row_type)?, 
false);
         let mut fetch = DefaultCompletedFetch::new(
             TableBucket::new(1, 0),
             log_records,
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index e9b2ce1..cf0b257 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -470,7 +470,7 @@ impl LogFetcher {
         log_scanner_status: Arc<LogScannerStatus>,
         projected_fields: Option<Vec<usize>>,
     ) -> Result<Self> {
-        let full_arrow_schema = to_arrow_schema(table_info.get_row_type());
+        let full_arrow_schema = to_arrow_schema(table_info.get_row_type())?;
         let read_context =
             Self::create_read_context(full_arrow_schema.clone(), 
projected_fields.clone(), false)?;
         let remote_read_context =
@@ -1445,7 +1445,7 @@ mod tests {
                 compression_type: ArrowCompressionType::None,
                 compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
             },
-        );
+        )?;
         let record = WriteRecord::for_append(
             table_path,
             1,
@@ -1477,7 +1477,7 @@ mod tests {
 
         let data = build_records(&table_info, Arc::new(table_path))?;
         let log_records = LogRecordsBatches::new(data.clone());
-        let read_context = 
ReadContext::new(to_arrow_schema(table_info.get_row_type()), false);
+        let read_context = 
ReadContext::new(to_arrow_schema(table_info.get_row_type())?, false);
         let completed =
             DefaultCompletedFetch::new(bucket.clone(), log_records, 
data.len(), read_context, 0, 0);
         fetcher.log_fetch_buffer.add(Box::new(completed));
@@ -1506,7 +1506,7 @@ mod tests {
         let bucket = TableBucket::new(1, 0);
         let data = build_records(&table_info, Arc::new(table_path))?;
         let log_records = LogRecordsBatches::new(data.clone());
-        let read_context = 
ReadContext::new(to_arrow_schema(table_info.get_row_type()), false);
+        let read_context = 
ReadContext::new(to_arrow_schema(table_info.get_row_type())?, false);
         let mut completed: Box<dyn CompletedFetch> = 
Box::new(DefaultCompletedFetch::new(
             bucket,
             log_records,
diff --git a/crates/fluss/src/client/write/accumulator.rs 
b/crates/fluss/src/client/write/accumulator.rs
index fb7b544..46c822c 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -112,7 +112,7 @@ impl RecordAccumulator {
                 bucket_id,
                 current_time_ms(),
                 matches!(&record.record, 
Record::Log(LogWriteRecord::RecordBatch(_))),
-            )),
+            )?),
             Record::Kv(kv_record) => Kv(KvWriteBatch::new(
                 self.batch_id.fetch_add(1, Ordering::Relaxed),
                 table_path.as_ref().clone(),
diff --git a/crates/fluss/src/client/write/batch.rs 
b/crates/fluss/src/client/write/batch.rs
index 159e313..78381c6 100644
--- a/crates/fluss/src/client/write/batch.rs
+++ b/crates/fluss/src/client/write/batch.rs
@@ -197,18 +197,18 @@ impl ArrowLogWriteBatch {
         bucket_id: BucketId,
         create_ms: i64,
         to_append_record_batch: bool,
-    ) -> Self {
+    ) -> Result<Self> {
         let base = InnerWriteBatch::new(batch_id, table_path, create_ms, 
bucket_id);
-        Self {
+        Ok(Self {
             write_batch: base,
             arrow_builder: MemoryLogRecordsArrowBuilder::new(
                 schema_id,
                 row_type,
                 to_append_record_batch,
                 arrow_compression_info,
-            ),
+            )?,
             built_records: None,
-        }
+        })
     }
 
     pub fn batch_id(&self) -> i64 {
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 3c94b72..39114d3 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -22,9 +22,12 @@ use crate::metadata::{DataType, RowType};
 use crate::record::{ChangeType, ScanRecord};
 use crate::row::{ColumnarRow, GenericRow};
 use arrow::array::{
-    ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Float32Builder, 
Float64Builder,
-    Int8Builder, Int16Builder, Int32Builder, Int64Builder, StringBuilder, 
UInt8Builder,
-    UInt16Builder, UInt32Builder, UInt64Builder,
+    ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, 
Decimal128Builder,
+    Float32Builder, Float64Builder, Int8Builder, Int16Builder, Int32Builder, 
Int64Builder,
+    StringBuilder, Time32MillisecondBuilder, Time32SecondBuilder, 
Time64MicrosecondBuilder,
+    Time64NanosecondBuilder, TimestampMicrosecondBuilder, 
TimestampMillisecondBuilder,
+    TimestampNanosecondBuilder, TimestampSecondBuilder, UInt8Builder, 
UInt16Builder, UInt32Builder,
+    UInt64Builder,
 };
 use arrow::{
     array::RecordBatch,
@@ -42,7 +45,6 @@ use byteorder::WriteBytesExt;
 use byteorder::{ByteOrder, LittleEndian};
 use bytes::Bytes;
 use crc32c::crc32c;
-use parking_lot::Mutex;
 use std::{
     io::{Cursor, Write},
     sync::Arc,
@@ -113,7 +115,7 @@ pub struct MemoryLogRecordsArrowBuilder {
 }
 
 pub trait ArrowRecordBatchInnerBuilder: Send + Sync {
-    fn build_arrow_record_batch(&self) -> Result<Arc<RecordBatch>>;
+    fn build_arrow_record_batch(&mut self) -> Result<Arc<RecordBatch>>;
 
     fn append(&mut self, row: &GenericRow) -> Result<bool>;
 
@@ -133,7 +135,7 @@ pub struct PrebuiltRecordBatchBuilder {
 }
 
 impl ArrowRecordBatchInnerBuilder for PrebuiltRecordBatchBuilder {
-    fn build_arrow_record_batch(&self) -> Result<Arc<RecordBatch>> {
+    fn build_arrow_record_batch(&mut self) -> Result<Arc<RecordBatch>> {
         Ok(self.arrow_record_batch.as_ref().unwrap().clone())
     }
 
@@ -167,66 +169,132 @@ impl ArrowRecordBatchInnerBuilder for 
PrebuiltRecordBatchBuilder {
 
 pub struct RowAppendRecordBatchBuilder {
     table_schema: SchemaRef,
-    arrow_column_builders: Mutex<Vec<Box<dyn ArrayBuilder>>>,
+    arrow_column_builders: Vec<Box<dyn ArrayBuilder>>,
     records_count: i32,
 }
 
 impl RowAppendRecordBatchBuilder {
-    pub fn new(row_type: &RowType) -> Self {
-        let schema_ref = to_arrow_schema(row_type);
-        let builders = Mutex::new(
-            schema_ref
-                .fields()
-                .iter()
-                .map(|field| Self::create_builder(field.data_type()))
-                .collect(),
-        );
-        Self {
+    pub fn new(row_type: &RowType) -> Result<Self> {
+        let schema_ref = to_arrow_schema(row_type)?;
+        let builders: Result<Vec<_>> = schema_ref
+            .fields()
+            .iter()
+            .map(|field| Self::create_builder(field.data_type()))
+            .collect();
+        Ok(Self {
             table_schema: schema_ref.clone(),
-            arrow_column_builders: builders,
+            arrow_column_builders: builders?,
             records_count: 0,
-        }
+        })
     }
 
-    fn create_builder(data_type: &arrow_schema::DataType) -> Box<dyn 
ArrayBuilder> {
+    fn create_builder(data_type: &arrow_schema::DataType) -> Result<Box<dyn 
ArrayBuilder>> {
         match data_type {
-            arrow_schema::DataType::Int8 => Box::new(Int8Builder::new()),
-            arrow_schema::DataType::Int16 => Box::new(Int16Builder::new()),
-            arrow_schema::DataType::Int32 => Box::new(Int32Builder::new()),
-            arrow_schema::DataType::Int64 => Box::new(Int64Builder::new()),
-            arrow_schema::DataType::UInt8 => Box::new(UInt8Builder::new()),
-            arrow_schema::DataType::UInt16 => Box::new(UInt16Builder::new()),
-            arrow_schema::DataType::UInt32 => Box::new(UInt32Builder::new()),
-            arrow_schema::DataType::UInt64 => Box::new(UInt64Builder::new()),
-            arrow_schema::DataType::Float32 => Box::new(Float32Builder::new()),
-            arrow_schema::DataType::Float64 => Box::new(Float64Builder::new()),
-            arrow_schema::DataType::Boolean => Box::new(BooleanBuilder::new()),
-            arrow_schema::DataType::Utf8 => Box::new(StringBuilder::new()),
-            arrow_schema::DataType::Binary => Box::new(BinaryBuilder::new()),
-            dt => panic!("Unsupported data type: {dt:?}"),
+            arrow_schema::DataType::Int8 => Ok(Box::new(Int8Builder::new())),
+            arrow_schema::DataType::Int16 => Ok(Box::new(Int16Builder::new())),
+            arrow_schema::DataType::Int32 => Ok(Box::new(Int32Builder::new())),
+            arrow_schema::DataType::Int64 => Ok(Box::new(Int64Builder::new())),
+            arrow_schema::DataType::UInt8 => Ok(Box::new(UInt8Builder::new())),
+            arrow_schema::DataType::UInt16 => 
Ok(Box::new(UInt16Builder::new())),
+            arrow_schema::DataType::UInt32 => 
Ok(Box::new(UInt32Builder::new())),
+            arrow_schema::DataType::UInt64 => 
Ok(Box::new(UInt64Builder::new())),
+            arrow_schema::DataType::Float32 => 
Ok(Box::new(Float32Builder::new())),
+            arrow_schema::DataType::Float64 => 
Ok(Box::new(Float64Builder::new())),
+            arrow_schema::DataType::Boolean => 
Ok(Box::new(BooleanBuilder::new())),
+            arrow_schema::DataType::Utf8 => Ok(Box::new(StringBuilder::new())),
+            arrow_schema::DataType::Binary => 
Ok(Box::new(BinaryBuilder::new())),
+            arrow_schema::DataType::Decimal128(precision, scale) => {
+                let builder = Decimal128Builder::new()
+                    .with_precision_and_scale(*precision, *scale)
+                    .map_err(|e| Error::IllegalArgument {
+                        message: format!(
+                            "Invalid decimal precision {} or scale {}: {}",
+                            precision, scale, e
+                        ),
+                    })?;
+                Ok(Box::new(builder))
+            }
+            arrow_schema::DataType::Date32 => 
Ok(Box::new(Date32Builder::new())),
+            arrow_schema::DataType::Time32(unit) => match unit {
+                arrow_schema::TimeUnit::Second => 
Ok(Box::new(Time32SecondBuilder::new())),
+                arrow_schema::TimeUnit::Millisecond => {
+                    Ok(Box::new(Time32MillisecondBuilder::new()))
+                }
+                _ => Err(Error::IllegalArgument {
+                    message: format!(
+                        "Time32 only supports Second and Millisecond units, 
got: {:?}",
+                        unit
+                    ),
+                }),
+            },
+            arrow_schema::DataType::Time64(unit) => match unit {
+                arrow_schema::TimeUnit::Microsecond => {
+                    Ok(Box::new(Time64MicrosecondBuilder::new()))
+                }
+                arrow_schema::TimeUnit::Nanosecond => 
Ok(Box::new(Time64NanosecondBuilder::new())),
+                _ => Err(Error::IllegalArgument {
+                    message: format!(
+                        "Time64 only supports Microsecond and Nanosecond 
units, got: {:?}",
+                        unit
+                    ),
+                }),
+            },
+            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, 
_) => {
+                Ok(Box::new(TimestampSecondBuilder::new()))
+            }
+            
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => {
+                Ok(Box::new(TimestampMillisecondBuilder::new()))
+            }
+            
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => {
+                Ok(Box::new(TimestampMicrosecondBuilder::new()))
+            }
+            
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
+                Ok(Box::new(TimestampNanosecondBuilder::new()))
+            }
+            dt => Err(Error::IllegalArgument {
+                message: format!("Unsupported data type: {dt:?}"),
+            }),
         }
     }
 }
 
 impl ArrowRecordBatchInnerBuilder for RowAppendRecordBatchBuilder {
-    fn build_arrow_record_batch(&self) -> Result<Arc<RecordBatch>> {
-        let arrays = self
+    fn build_arrow_record_batch(&mut self) -> Result<Arc<RecordBatch>> {
+        let arrays: Result<Vec<ArrayRef>> = self
             .arrow_column_builders
-            .lock()
             .iter_mut()
-            .map(|b| b.finish())
-            .collect::<Vec<ArrayRef>>();
+            .enumerate()
+            .map(|(idx, b)| {
+                let array = b.finish();
+                let expected_type = self.table_schema.field(idx).data_type();
+
+                // Validate array type matches schema
+                if array.data_type() != expected_type {
+                    return Err(Error::IllegalArgument {
+                        message: format!(
+                            "Builder type mismatch at column {}: expected 
{:?}, got {:?}",
+                            idx,
+                            expected_type,
+                            array.data_type()
+                        ),
+                    });
+                }
+
+                Ok(array)
+            })
+            .collect();
+
         Ok(Arc::new(RecordBatch::try_new(
             self.table_schema.clone(),
-            arrays,
+            arrays?,
         )?))
     }
 
     fn append(&mut self, row: &GenericRow) -> Result<bool> {
         for (idx, value) in row.values.iter().enumerate() {
-            let mut builder_binding = self.arrow_column_builders.lock();
-            let builder = builder_binding.get_mut(idx).unwrap();
-            value.append_to(builder.as_mut())?;
+            let field_type = self.table_schema.field(idx).data_type();
+            let builder = self.arrow_column_builders.get_mut(idx).unwrap();
+            value.append_to(builder.as_mut(), field_type)?;
         }
         self.records_count += 1;
         Ok(true)
@@ -255,15 +323,15 @@ impl MemoryLogRecordsArrowBuilder {
         row_type: &RowType,
         to_append_record_batch: bool,
         arrow_compression_info: ArrowCompressionInfo,
-    ) -> Self {
+    ) -> Result<Self> {
         let arrow_batch_builder: Box<dyn ArrowRecordBatchInnerBuilder> = {
             if to_append_record_batch {
                 Box::new(PrebuiltRecordBatchBuilder::default())
             } else {
-                Box::new(RowAppendRecordBatchBuilder::new(row_type))
+                Box::new(RowAppendRecordBatchBuilder::new(row_type)?)
             }
         };
-        MemoryLogRecordsArrowBuilder {
+        Ok(MemoryLogRecordsArrowBuilder {
             base_log_offset: BUILDER_DEFAULT_OFFSET,
             schema_id,
             magic: CURRENT_LOG_MAGIC_VALUE,
@@ -272,7 +340,7 @@ impl MemoryLogRecordsArrowBuilder {
             is_closed: false,
             arrow_record_batch_builder: arrow_batch_builder,
             arrow_compression_info,
-        }
+        })
     }
 
     pub fn append(&mut self, record: &WriteRecord) -> Result<bool> {
@@ -302,7 +370,7 @@ impl MemoryLogRecordsArrowBuilder {
         self.is_closed = true;
     }
 
-    pub fn build(&self) -> Result<Vec<u8>> {
+    pub fn build(&mut self) -> Result<Vec<u8>> {
         // serialize arrow batch
         let mut arrow_batch_bytes = vec![];
         let table_schema = self.arrow_record_batch_builder.schema();
@@ -641,24 +709,24 @@ fn parse_ipc_message(
     Ok((batch_metadata, body_buffer, message.version()))
 }
 
-pub fn to_arrow_schema(fluss_schema: &RowType) -> SchemaRef {
-    let fields: Vec<Field> = fluss_schema
+pub fn to_arrow_schema(fluss_schema: &RowType) -> Result<SchemaRef> {
+    let fields: Result<Vec<Field>> = fluss_schema
         .fields()
         .iter()
         .map(|f| {
-            Field::new(
+            Ok(Field::new(
                 f.name(),
-                to_arrow_type(f.data_type()),
+                to_arrow_type(f.data_type())?,
                 f.data_type().is_nullable(),
-            )
+            ))
         })
         .collect();
 
-    SchemaRef::new(arrow_schema::Schema::new(fields))
+    Ok(SchemaRef::new(arrow_schema::Schema::new(fields?)))
 }
 
-pub fn to_arrow_type(fluss_type: &DataType) -> ArrowDataType {
-    match fluss_type {
+pub fn to_arrow_type(fluss_type: &DataType) -> Result<ArrowDataType> {
+    Ok(match fluss_type {
         DataType::Boolean(_) => ArrowDataType::Boolean,
         DataType::TinyInt(_) => ArrowDataType::Int8,
         DataType::SmallInt(_) => ArrowDataType::Int16,
@@ -668,58 +736,91 @@ pub fn to_arrow_type(fluss_type: &DataType) -> 
ArrowDataType {
         DataType::Double(_) => ArrowDataType::Float64,
         DataType::Char(_) => ArrowDataType::Utf8,
         DataType::String(_) => ArrowDataType::Utf8,
-        DataType::Decimal(decimal_type) => ArrowDataType::Decimal128(
-            decimal_type
-                .precision()
-                .try_into()
-                .expect("precision exceeds u8::MAX"),
-            decimal_type
+        DataType::Decimal(decimal_type) => {
+            let precision =
+                decimal_type
+                    .precision()
+                    .try_into()
+                    .map_err(|_| Error::IllegalArgument {
+                        message: format!(
+                            "Decimal precision {} exceeds Arrow's maximum 
(u8::MAX)",
+                            decimal_type.precision()
+                        ),
+                    })?;
+            let scale = decimal_type
                 .scale()
                 .try_into()
-                .expect("scale exceeds i8::MAX"),
-        ),
+                .map_err(|_| Error::IllegalArgument {
+                    message: format!(
+                        "Decimal scale {} exceeds Arrow's maximum (i8::MAX)",
+                        decimal_type.scale()
+                    ),
+                })?;
+            ArrowDataType::Decimal128(precision, scale)
+        }
         DataType::Date(_) => ArrowDataType::Date32,
         DataType::Time(time_type) => match time_type.precision() {
             0 => ArrowDataType::Time32(arrow_schema::TimeUnit::Second),
             1..=3 => 
ArrowDataType::Time32(arrow_schema::TimeUnit::Millisecond),
             4..=6 => 
ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond),
             7..=9 => ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond),
-            // This arm should never be reached due to validation in TimeType.
-            invalid => panic!("Invalid precision value for TimeType: 
{invalid}"),
+            invalid => {
+                return Err(Error::IllegalArgument {
+                    message: format!("Invalid precision {} for TimeType (must 
be 0-9)", invalid),
+                });
+            }
         },
         DataType::Timestamp(timestamp_type) => match 
timestamp_type.precision() {
             0 => ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second, 
None),
             1..=3 => 
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
             4..=6 => 
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None),
             7..=9 => 
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None),
-            // This arm should never be reached due to validation in Timestamp.
-            invalid => panic!("Invalid precision value for TimestampType: 
{invalid}"),
+            invalid => {
+                return Err(Error::IllegalArgument {
+                    message: format!(
+                        "Invalid precision {} for TimestampType (must be 0-9)",
+                        invalid
+                    ),
+                });
+            }
         },
         DataType::TimestampLTz(timestamp_ltz_type) => match 
timestamp_ltz_type.precision() {
             0 => ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second, 
None),
             1..=3 => 
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
             4..=6 => 
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None),
             7..=9 => 
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None),
-            // This arm should never be reached due to validation in 
TimestampLTz.
-            invalid => panic!("Invalid precision value for TimestampLTzType: 
{invalid}"),
+            invalid => {
+                return Err(Error::IllegalArgument {
+                    message: format!(
+                        "Invalid precision {} for TimestampLTzType (must be 
0-9)",
+                        invalid
+                    ),
+                });
+            }
         },
         DataType::Bytes(_) => ArrowDataType::Binary,
-        DataType::Binary(binary_type) => ArrowDataType::FixedSizeBinary(
-            binary_type
+        DataType::Binary(binary_type) => {
+            let length = binary_type
                 .length()
                 .try_into()
-                .expect("length exceeds i32::MAX"),
-        ),
+                .map_err(|_| Error::IllegalArgument {
+                    message: format!(
+                        "Binary length {} exceeds Arrow's maximum (i32::MAX)",
+                        binary_type.length()
+                    ),
+                })?;
+            ArrowDataType::FixedSizeBinary(length)
+        }
         DataType::Array(array_type) => ArrowDataType::List(
             Field::new_list_field(
-                to_arrow_type(array_type.get_element_type()),
+                to_arrow_type(array_type.get_element_type())?,
                 fluss_type.is_nullable(),
             )
             .into(),
         ),
         DataType::Map(map_type) => {
-            let key_type = to_arrow_type(map_type.key_type());
-            let value_type = to_arrow_type(map_type.value_type());
+            let key_type = to_arrow_type(map_type.key_type())?;
+            let value_type = to_arrow_type(map_type.value_type())?;
             let entry_fields = vec![
                 Field::new("key", key_type, map_type.key_type().is_nullable()),
                 Field::new("value", value_type, 
map_type.value_type().is_nullable()),
@@ -733,20 +834,21 @@ pub fn to_arrow_type(fluss_type: &DataType) -> 
ArrowDataType {
                 false,
             )
         }
-        DataType::Row(row_type) => 
ArrowDataType::Struct(arrow_schema::Fields::from(
-            row_type
+        DataType::Row(row_type) => {
+            let fields: Result<Vec<Field>> = row_type
                 .fields()
                 .iter()
                 .map(|f| {
-                    Field::new(
+                    Ok(Field::new(
                         f.name(),
-                        to_arrow_type(f.data_type()),
+                        to_arrow_type(f.data_type())?,
                         f.data_type().is_nullable(),
-                    )
+                    ))
                 })
-                .collect::<Vec<Field>>(),
-        )),
-    }
+                .collect();
+            ArrowDataType::Struct(arrow_schema::Fields::from(fields?))
+        }
+    })
 }
 
 #[derive(Clone)]
@@ -1059,81 +1161,114 @@ mod tests {
 
     #[test]
     fn test_to_array_type() {
-        assert_eq!(to_arrow_type(&DataTypes::boolean()), 
ArrowDataType::Boolean);
-        assert_eq!(to_arrow_type(&DataTypes::tinyint()), ArrowDataType::Int8);
-        assert_eq!(to_arrow_type(&DataTypes::smallint()), 
ArrowDataType::Int16);
-        assert_eq!(to_arrow_type(&DataTypes::bigint()), ArrowDataType::Int64);
-        assert_eq!(to_arrow_type(&DataTypes::int()), ArrowDataType::Int32);
-        assert_eq!(to_arrow_type(&DataTypes::float()), ArrowDataType::Float32);
-        assert_eq!(to_arrow_type(&DataTypes::double()), 
ArrowDataType::Float64);
-        assert_eq!(to_arrow_type(&DataTypes::char(16)), ArrowDataType::Utf8);
-        assert_eq!(to_arrow_type(&DataTypes::string()), ArrowDataType::Utf8);
         assert_eq!(
-            to_arrow_type(&DataTypes::decimal(10, 2)),
+            to_arrow_type(&DataTypes::boolean()).unwrap(),
+            ArrowDataType::Boolean
+        );
+        assert_eq!(
+            to_arrow_type(&DataTypes::tinyint()).unwrap(),
+            ArrowDataType::Int8
+        );
+        assert_eq!(
+            to_arrow_type(&DataTypes::smallint()).unwrap(),
+            ArrowDataType::Int16
+        );
+        assert_eq!(
+            to_arrow_type(&DataTypes::bigint()).unwrap(),
+            ArrowDataType::Int64
+        );
+        assert_eq!(
+            to_arrow_type(&DataTypes::int()).unwrap(),
+            ArrowDataType::Int32
+        );
+        assert_eq!(
+            to_arrow_type(&DataTypes::float()).unwrap(),
+            ArrowDataType::Float32
+        );
+        assert_eq!(
+            to_arrow_type(&DataTypes::double()).unwrap(),
+            ArrowDataType::Float64
+        );
+        assert_eq!(
+            to_arrow_type(&DataTypes::char(16)).unwrap(),
+            ArrowDataType::Utf8
+        );
+        assert_eq!(
+            to_arrow_type(&DataTypes::string()).unwrap(),
+            ArrowDataType::Utf8
+        );
+        assert_eq!(
+            to_arrow_type(&DataTypes::decimal(10, 2)).unwrap(),
             ArrowDataType::Decimal128(10, 2)
         );
-        assert_eq!(to_arrow_type(&DataTypes::date()), ArrowDataType::Date32);
         assert_eq!(
-            to_arrow_type(&DataTypes::time()),
+            to_arrow_type(&DataTypes::date()).unwrap(),
+            ArrowDataType::Date32
+        );
+        assert_eq!(
+            to_arrow_type(&DataTypes::time()).unwrap(),
             ArrowDataType::Time32(arrow_schema::TimeUnit::Second)
         );
         assert_eq!(
-            to_arrow_type(&DataTypes::time_with_precision(3)),
+            to_arrow_type(&DataTypes::time_with_precision(3)).unwrap(),
             ArrowDataType::Time32(arrow_schema::TimeUnit::Millisecond)
         );
         assert_eq!(
-            to_arrow_type(&DataTypes::time_with_precision(6)),
+            to_arrow_type(&DataTypes::time_with_precision(6)).unwrap(),
             ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond)
         );
         assert_eq!(
-            to_arrow_type(&DataTypes::time_with_precision(9)),
+            to_arrow_type(&DataTypes::time_with_precision(9)).unwrap(),
             ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond)
         );
         assert_eq!(
-            to_arrow_type(&DataTypes::timestamp_with_precision(0)),
+            to_arrow_type(&DataTypes::timestamp_with_precision(0)).unwrap(),
             ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second, None)
         );
         assert_eq!(
-            to_arrow_type(&DataTypes::timestamp_with_precision(3)),
+            to_arrow_type(&DataTypes::timestamp_with_precision(3)).unwrap(),
             ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None)
         );
         assert_eq!(
-            to_arrow_type(&DataTypes::timestamp_with_precision(6)),
+            to_arrow_type(&DataTypes::timestamp_with_precision(6)).unwrap(),
             ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
         );
         assert_eq!(
-            to_arrow_type(&DataTypes::timestamp_with_precision(9)),
+            to_arrow_type(&DataTypes::timestamp_with_precision(9)).unwrap(),
             ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None)
         );
         assert_eq!(
-            to_arrow_type(&DataTypes::timestamp_ltz_with_precision(0)),
+            
to_arrow_type(&DataTypes::timestamp_ltz_with_precision(0)).unwrap(),
             ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second, None)
         );
         assert_eq!(
-            to_arrow_type(&DataTypes::timestamp_ltz_with_precision(3)),
+            
to_arrow_type(&DataTypes::timestamp_ltz_with_precision(3)).unwrap(),
             ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None)
         );
         assert_eq!(
-            to_arrow_type(&DataTypes::timestamp_ltz_with_precision(6)),
+            
to_arrow_type(&DataTypes::timestamp_ltz_with_precision(6)).unwrap(),
             ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
         );
         assert_eq!(
-            to_arrow_type(&DataTypes::timestamp_ltz_with_precision(9)),
+            
to_arrow_type(&DataTypes::timestamp_ltz_with_precision(9)).unwrap(),
             ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None)
         );
-        assert_eq!(to_arrow_type(&DataTypes::bytes()), ArrowDataType::Binary);
         assert_eq!(
-            to_arrow_type(&DataTypes::binary(16)),
+            to_arrow_type(&DataTypes::bytes()).unwrap(),
+            ArrowDataType::Binary
+        );
+        assert_eq!(
+            to_arrow_type(&DataTypes::binary(16)).unwrap(),
             ArrowDataType::FixedSizeBinary(16)
         );
 
         assert_eq!(
-            to_arrow_type(&DataTypes::array(DataTypes::int())),
+            to_arrow_type(&DataTypes::array(DataTypes::int())).unwrap(),
             ArrowDataType::List(Field::new_list_field(ArrowDataType::Int32, 
true).into())
         );
 
         assert_eq!(
-            to_arrow_type(&DataTypes::map(DataTypes::string(), 
DataTypes::int())),
+            to_arrow_type(&DataTypes::map(DataTypes::string(), 
DataTypes::int())).unwrap(),
             ArrowDataType::Map(
                 Arc::new(Field::new(
                     "entries",
@@ -1151,7 +1286,8 @@ mod tests {
             to_arrow_type(&DataTypes::row(vec![
                 DataTypes::field("f1".to_string(), DataTypes::int()),
                 DataTypes::field("f2".to_string(), DataTypes::string()),
-            ])),
+            ]))
+            .unwrap(),
             ArrowDataType::Struct(arrow_schema::Fields::from(vec![
                 Field::new("f1", ArrowDataType::Int32, true),
                 Field::new("f2", ArrowDataType::Utf8, true),
@@ -1215,7 +1351,7 @@ mod tests {
             DataField::new("id".to_string(), DataTypes::int(), None),
             DataField::new("name".to_string(), DataTypes::string(), None),
         ]);
-        let schema = to_arrow_schema(&row_type);
+        let schema = to_arrow_schema(&row_type).unwrap();
         let result = ReadContext::with_projection_pushdown(schema, vec![0, 2], 
false);
 
         assert!(matches!(result, Err(IllegalArgument { .. })));
@@ -1249,4 +1385,209 @@ mod tests {
         }
         out
     }
+
+    #[test]
+    fn test_temporal_and_decimal_builder_validation() {
+        use arrow::array::Array;
+
+        // Test valid builder creation with precision=10, scale=2
+        let mut builder =
+            
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(10, 
2)).unwrap();
+        let decimal_builder = builder
+            .as_any_mut()
+            .downcast_mut::<Decimal128Builder>()
+            .expect("Expected Decimal128Builder");
+        // Verify precision and scale
+        let array = decimal_builder.finish();
+        assert_eq!(array.data_type(), &ArrowDataType::Decimal128(10, 2));
+
+        // Test error case: invalid precision/scale
+        let result =
+            
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(100, 
50));
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn test_decimal_rescaling_and_validation() -> Result<()> {
+        use crate::row::{Datum, Decimal, GenericRow};
+        use arrow::array::Decimal128Array;
+        use bigdecimal::BigDecimal;
+        use std::str::FromStr;
+
+        // Test 1: Rescaling from scale 3 to scale 2
+        let row_type = RowType::new(vec![DataField::new(
+            "amount".to_string(),
+            DataTypes::decimal(10, 2),
+            None,
+        )]);
+        let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?;
+        let decimal = 
Decimal::from_big_decimal(BigDecimal::from_str("123.456").unwrap(), 10, 3)?;
+        builder.append(&GenericRow {
+            values: vec![Datum::Decimal(decimal)],
+        })?;
+        let batch = builder.build_arrow_record_batch()?;
+        let array = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<Decimal128Array>()
+            .unwrap();
+        assert_eq!(array.value(0), 12346); // 123.456 rounded to 2 decimal 
places
+        assert_eq!(array.scale(), 2);
+
+        // Test 2: Precision overflow (should error)
+        let row_type = RowType::new(vec![DataField::new(
+            "amount".to_string(),
+            DataTypes::decimal(5, 2),
+            None,
+        )]);
+        let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?;
+        let decimal = 
Decimal::from_big_decimal(BigDecimal::from_str("123456.78").unwrap(), 10, 2)?;
+        let result = builder.append(&GenericRow {
+            values: vec![Datum::Decimal(decimal)],
+        });
+        assert!(result.is_err());
+        assert!(
+            result
+                .unwrap_err()
+                .to_string()
+                .contains("precision overflow")
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_all_types_end_to_end() -> Result<()> {
+        use crate::row::{Date, Datum, Decimal, GenericRow, Time, TimestampLtz, 
TimestampNtz};
+        use arrow::array::{
+            Date32Array, Decimal128Array, Int32Array, Time32MillisecondArray,
+            Time64NanosecondArray, TimestampMicrosecondArray, 
TimestampNanosecondArray,
+        };
+        use bigdecimal::BigDecimal;
+        use std::str::FromStr;
+
+        // Schema with int, decimal, date, time (ms + ns), timestamps (μs + ns)
+        let row_type = RowType::new(vec![
+            DataField::new("id".to_string(), DataTypes::int(), None),
+            DataField::new("amount".to_string(), DataTypes::decimal(10, 2), 
None),
+            DataField::new("date".to_string(), DataTypes::date(), None),
+            DataField::new(
+                "time_ms".to_string(),
+                DataTypes::time_with_precision(3),
+                None,
+            ),
+            DataField::new(
+                "time_ns".to_string(),
+                DataTypes::time_with_precision(9),
+                None,
+            ),
+            DataField::new(
+                "ts_us".to_string(),
+                DataTypes::timestamp_with_precision(6),
+                None,
+            ),
+            DataField::new(
+                "ts_ltz_ns".to_string(),
+                DataTypes::timestamp_ltz_with_precision(9),
+                None,
+            ),
+        ]);
+
+        let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?;
+
+        // Append rows with various data types
+        builder.append(&GenericRow {
+            values: vec![
+                Datum::Int32(1),
+                Datum::Decimal(Decimal::from_big_decimal(
+                    BigDecimal::from_str("123.456").unwrap(),
+                    10,
+                    3,
+                )?),
+                // 18000 days since epoch = 2019-04-14
+                Datum::Date(Date::new(18000)),
+                // 43200000 ms = 12:00:00.000 (noon)
+                Datum::Time(Time::new(43200000)),
+                // 12345 ms = 00:00:12.345
+                Datum::Time(Time::new(12345)),
+                // 1609459200000 ms = 2021-01-01 00:00:00 UTC, with 123456 
additional nanoseconds
+                
Datum::TimestampNtz(TimestampNtz::from_millis_nanos(1609459200000, 123456)?),
+                // 1609459200000 ms = 2021-01-01 00:00:00 UTC, with 987654 
additional nanoseconds
+                
Datum::TimestampLtz(TimestampLtz::from_millis_nanos(1609459200000, 987654)?),
+            ],
+        })?;
+
+        let batch = builder.build_arrow_record_batch()?;
+
+        // Verify all conversions
+        assert_eq!(
+            batch
+                .column(0)
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .unwrap()
+                .value(0),
+            1
+        );
+
+        let dec = batch
+            .column(1)
+            .as_any()
+            .downcast_ref::<Decimal128Array>()
+            .unwrap();
+        assert_eq!(dec.value(0), 12346); // 123.456 rounded to 2 decimal places
+
+        assert_eq!(
+            batch
+                .column(2)
+                .as_any()
+                .downcast_ref::<Date32Array>()
+                .unwrap()
+                .value(0),
+            18000
+        );
+
+        assert_eq!(
+            batch
+                .column(3)
+                .as_any()
+                .downcast_ref::<Time32MillisecondArray>()
+                .unwrap()
+                .value(0),
+            43200000
+        );
+
+        assert_eq!(
+            batch
+                .column(4)
+                .as_any()
+                .downcast_ref::<Time64NanosecondArray>()
+                .unwrap()
+                .value(0),
+            12345000000
+        );
+
+        // Timestamp with sub-millisecond nanos preserved
+        assert_eq!(
+            batch
+                .column(5)
+                .as_any()
+                .downcast_ref::<TimestampMicrosecondArray>()
+                .unwrap()
+                .value(0),
+            1609459200000123
+        );
+
+        assert_eq!(
+            batch
+                .column(6)
+                .as_any()
+                .downcast_ref::<TimestampNanosecondArray>()
+                .unwrap()
+                .value(0),
+            1609459200000987654
+        );
+
+        Ok(())
+    }
 }
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index 5b21b38..7b3850f 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -19,9 +19,13 @@ use crate::error::Error::RowConvertError;
 use crate::error::Result;
 use crate::row::Decimal;
 use arrow::array::{
-    ArrayBuilder, BinaryBuilder, BooleanBuilder, Float32Builder, 
Float64Builder, Int8Builder,
-    Int16Builder, Int32Builder, Int64Builder, StringBuilder,
+    ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder, 
Decimal128Builder, Float32Builder,
+    Float64Builder, Int8Builder, Int16Builder, Int32Builder, Int64Builder, 
StringBuilder,
+    Time32MillisecondBuilder, Time32SecondBuilder, Time64MicrosecondBuilder,
+    Time64NanosecondBuilder, TimestampMicrosecondBuilder, 
TimestampMillisecondBuilder,
+    TimestampNanosecondBuilder, TimestampSecondBuilder,
 };
+use arrow::datatypes as arrow_schema;
 use jiff::ToSpan;
 use ordered_float::OrderedFloat;
 use parse_display::Display;
@@ -83,6 +87,41 @@ impl Datum<'_> {
             _ => panic!("not a blob: {self:?}"),
         }
     }
+
+    pub fn as_decimal(&self) -> &Decimal {
+        match self {
+            Self::Decimal(d) => d,
+            _ => panic!("not a decimal: {self:?}"),
+        }
+    }
+
+    pub fn as_date(&self) -> Date {
+        match self {
+            Self::Date(d) => *d,
+            _ => panic!("not a date: {self:?}"),
+        }
+    }
+
+    pub fn as_time(&self) -> Time {
+        match self {
+            Self::Time(t) => *t,
+            _ => panic!("not a time: {self:?}"),
+        }
+    }
+
+    pub fn as_timestamp_ntz(&self) -> TimestampNtz {
+        match self {
+            Self::TimestampNtz(ts) => *ts,
+            _ => panic!("not a timestamp ntz: {self:?}"),
+        }
+    }
+
+    pub fn as_timestamp_ltz(&self) -> TimestampLtz {
+        match self {
+            Self::TimestampLtz(ts) => *ts,
+            _ => panic!("not a timestamp ltz: {self:?}"),
+        }
+    }
 }
 
 // ----------- implement from
@@ -246,6 +285,66 @@ impl TryFrom<&Datum<'_>> for i8 {
     }
 }
 
+impl TryFrom<&Datum<'_>> for Decimal {
+    type Error = ();
+
+    #[inline]
+    fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
+        match from {
+            Datum::Decimal(d) => Ok(d.clone()),
+            _ => Err(()),
+        }
+    }
+}
+
+impl TryFrom<&Datum<'_>> for Date {
+    type Error = ();
+
+    #[inline]
+    fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
+        match from {
+            Datum::Date(d) => Ok(*d),
+            _ => Err(()),
+        }
+    }
+}
+
+impl TryFrom<&Datum<'_>> for Time {
+    type Error = ();
+
+    #[inline]
+    fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
+        match from {
+            Datum::Time(t) => Ok(*t),
+            _ => Err(()),
+        }
+    }
+}
+
+impl TryFrom<&Datum<'_>> for TimestampNtz {
+    type Error = ();
+
+    #[inline]
+    fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
+        match from {
+            Datum::TimestampNtz(ts) => Ok(*ts),
+            _ => Err(()),
+        }
+    }
+}
+
+impl TryFrom<&Datum<'_>> for TimestampLtz {
+    type Error = ();
+
+    #[inline]
+    fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
+        match from {
+            Datum::TimestampLtz(ts) => Ok(*ts),
+            _ => Err(()),
+        }
+    }
+}
+
 impl<'a> From<bool> for Datum<'a> {
     #[inline]
     fn from(b: bool) -> Datum<'a> {
@@ -253,12 +352,103 @@ impl<'a> From<bool> for Datum<'a> {
     }
 }
 
+impl<'a> From<Decimal> for Datum<'a> {
+    #[inline]
+    fn from(d: Decimal) -> Datum<'a> {
+        Datum::Decimal(d)
+    }
+}
+
+impl<'a> From<Date> for Datum<'a> {
+    #[inline]
+    fn from(d: Date) -> Datum<'a> {
+        Datum::Date(d)
+    }
+}
+
+impl<'a> From<Time> for Datum<'a> {
+    #[inline]
+    fn from(t: Time) -> Datum<'a> {
+        Datum::Time(t)
+    }
+}
+
+impl<'a> From<TimestampNtz> for Datum<'a> {
+    #[inline]
+    fn from(ts: TimestampNtz) -> Datum<'a> {
+        Datum::TimestampNtz(ts)
+    }
+}
+
+impl<'a> From<TimestampLtz> for Datum<'a> {
+    #[inline]
+    fn from(ts: TimestampLtz) -> Datum<'a> {
+        Datum::TimestampLtz(ts)
+    }
+}
+
 pub trait ToArrow {
-    fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()>;
+    fn append_to(
+        &self,
+        builder: &mut dyn ArrayBuilder,
+        data_type: &arrow_schema::DataType,
+    ) -> Result<()>;
+}
+
+// Time unit conversion constants
+const MILLIS_PER_SECOND: i64 = 1_000;
+const MICROS_PER_MILLI: i64 = 1_000;
+const NANOS_PER_MILLI: i64 = 1_000_000;
+
+/// Converts milliseconds and nanoseconds-within-millisecond to total 
microseconds.
+/// Returns an error if the conversion would overflow.
+fn millis_nanos_to_micros(millis: i64, nanos: i32) -> Result<i64> {
+    let millis_micros = millis
+        .checked_mul(MICROS_PER_MILLI)
+        .ok_or_else(|| RowConvertError {
+            message: format!(
+                "Timestamp milliseconds {} overflows when converting to 
microseconds",
+                millis
+            ),
+        })?;
+    let nanos_micros = (nanos as i64) / MICROS_PER_MILLI;
+    millis_micros
+        .checked_add(nanos_micros)
+        .ok_or_else(|| RowConvertError {
+            message: format!(
+                "Timestamp overflow when adding microseconds: {} + {}",
+                millis_micros, nanos_micros
+            ),
+        })
+}
+
+/// Converts milliseconds and nanoseconds-within-millisecond to total 
nanoseconds.
+/// Returns an error if the conversion would overflow.
+fn millis_nanos_to_nanos(millis: i64, nanos: i32) -> Result<i64> {
+    let millis_nanos = millis
+        .checked_mul(NANOS_PER_MILLI)
+        .ok_or_else(|| RowConvertError {
+            message: format!(
+                "Timestamp milliseconds {} overflows when converting to 
nanoseconds",
+                millis
+            ),
+        })?;
+    millis_nanos
+        .checked_add(nanos as i64)
+        .ok_or_else(|| RowConvertError {
+            message: format!(
+                "Timestamp overflow when adding nanoseconds: {} + {}",
+                millis_nanos, nanos
+            ),
+        })
 }
 
 impl Datum<'_> {
-    pub fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()> {
+    pub fn append_to(
+        &self,
+        builder: &mut dyn ArrayBuilder,
+        data_type: &arrow_schema::DataType,
+    ) -> Result<()> {
         macro_rules! append_null_to_arrow {
             ($builder_type:ty) => {
                 if let Some(b) = 
builder.as_any_mut().downcast_mut::<$builder_type>() {
@@ -288,6 +478,16 @@ impl Datum<'_> {
                 append_null_to_arrow!(Float64Builder);
                 append_null_to_arrow!(StringBuilder);
                 append_null_to_arrow!(BinaryBuilder);
+                append_null_to_arrow!(Decimal128Builder);
+                append_null_to_arrow!(Date32Builder);
+                append_null_to_arrow!(Time32SecondBuilder);
+                append_null_to_arrow!(Time32MillisecondBuilder);
+                append_null_to_arrow!(Time64MicrosecondBuilder);
+                append_null_to_arrow!(Time64NanosecondBuilder);
+                append_null_to_arrow!(TimestampSecondBuilder);
+                append_null_to_arrow!(TimestampMillisecondBuilder);
+                append_null_to_arrow!(TimestampMicrosecondBuilder);
+                append_null_to_arrow!(TimestampNanosecondBuilder);
             }
             Datum::Bool(v) => append_value_to_arrow!(BooleanBuilder, *v),
             Datum::Int8(v) => append_value_to_arrow!(Int8Builder, *v),
@@ -298,16 +498,221 @@ impl Datum<'_> {
             Datum::Float64(v) => append_value_to_arrow!(Float64Builder, 
v.into_inner()),
             Datum::String(v) => append_value_to_arrow!(StringBuilder, 
v.as_ref()),
             Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder, 
v.as_ref()),
-            Datum::Decimal(_)
-            | Datum::Date(_)
-            | Datum::Time(_)
-            | Datum::TimestampNtz(_)
-            | Datum::TimestampLtz(_) => {
+            Datum::Decimal(decimal) => {
+                // Extract target precision and scale from Arrow schema
+                let (p, s) = match data_type {
+                    arrow_schema::DataType::Decimal128(p, s) => (*p, *s),
+                    _ => {
+                        return Err(RowConvertError {
+                            message: format!(
+                                "Expected Decimal128 Arrow type, got: {:?}",
+                                data_type
+                            ),
+                        });
+                    }
+                };
+
+                // Validate scale is non-negative (Fluss doesn't support 
negative scales)
+                if s < 0 {
+                    return Err(RowConvertError {
+                        message: format!("Negative decimal scale {} is not 
supported", s),
+                    });
+                }
+
+                let target_precision = p as u32;
+                let target_scale = s as i64; // Safe now: 0..127 → 0i64..127i64
+
+                if let Some(b) = 
builder.as_any_mut().downcast_mut::<Decimal128Builder>() {
+                    use bigdecimal::RoundingMode;
+
+                    // Rescale the decimal to match Arrow's target scale
+                    let bd = decimal.to_big_decimal();
+                    let rescaled = bd.with_scale_round(target_scale, 
RoundingMode::HalfUp);
+                    let (unscaled, _) = rescaled.as_bigint_and_exponent();
+
+                    // Validate precision
+                    let actual_precision = 
Decimal::compute_precision(&unscaled);
+                    if actual_precision > target_precision as usize {
+                        return Err(RowConvertError {
+                            message: format!(
+                                "Decimal precision overflow: value has {} 
digits but Arrow expects {} (value: {})",
+                                actual_precision, target_precision, rescaled
+                            ),
+                        });
+                    }
+
+                    // Convert to i128 for Arrow
+                    let i128_val: i128 = match unscaled.try_into() {
+                        Ok(v) => v,
+                        Err(_) => {
+                            return Err(RowConvertError {
+                                message: format!("Decimal value exceeds i128 
range: {}", rescaled),
+                            });
+                        }
+                    };
+
+                    b.append_value(i128_val);
+                    return Ok(());
+                }
+
                 return Err(RowConvertError {
-                    message: format!(
-                        "Type {:?} is not yet supported for Arrow conversion",
-                        std::mem::discriminant(self)
-                    ),
+                    message: "Builder type mismatch for 
Decimal128".to_string(),
+                });
+            }
+            Datum::Date(date) => {
+                append_value_to_arrow!(Date32Builder, date.get_inner());
+            }
+            Datum::Time(time) => {
+                // Time is stored as milliseconds since midnight in Fluss
+                // Convert to Arrow's time unit based on schema
+                let millis = time.get_inner();
+
+                match data_type {
+                    
arrow_schema::DataType::Time32(arrow_schema::TimeUnit::Second) => {
+                        if let Some(b) = 
builder.as_any_mut().downcast_mut::<Time32SecondBuilder>()
+                        {
+                            // Validate no sub-second precision is lost
+                            if millis % MILLIS_PER_SECOND as i32 != 0 {
+                                return Err(RowConvertError {
+                                    message: format!(
+                                        "Time value {} ms has sub-second 
precision but schema expects seconds only",
+                                        millis
+                                    ),
+                                });
+                            }
+                            b.append_value(millis / MILLIS_PER_SECOND as i32);
+                            return Ok(());
+                        }
+                    }
+                    
arrow_schema::DataType::Time32(arrow_schema::TimeUnit::Millisecond) => {
+                        if let Some(b) = builder
+                            .as_any_mut()
+                            .downcast_mut::<Time32MillisecondBuilder>()
+                        {
+                            b.append_value(millis);
+                            return Ok(());
+                        }
+                    }
+                    
arrow_schema::DataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
+                        if let Some(b) = builder
+                            .as_any_mut()
+                            .downcast_mut::<Time64MicrosecondBuilder>()
+                        {
+                            let micros = (millis as i64)
+                                .checked_mul(MICROS_PER_MILLI)
+                                .ok_or_else(|| RowConvertError {
+                                    message: format!(
+                                        "Time value {} ms overflows when 
converting to microseconds",
+                                        millis
+                                    ),
+                                })?;
+                            b.append_value(micros);
+                            return Ok(());
+                        }
+                    }
+                    
arrow_schema::DataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
+                        if let Some(b) = builder
+                            .as_any_mut()
+                            .downcast_mut::<Time64NanosecondBuilder>()
+                        {
+                            let nanos = (millis as 
i64).checked_mul(NANOS_PER_MILLI).ok_or_else(
+                                || RowConvertError {
+                                    message: format!(
+                                        "Time value {} ms overflows when 
converting to nanoseconds",
+                                        millis
+                                    ),
+                                },
+                            )?;
+                            b.append_value(nanos);
+                            return Ok(());
+                        }
+                    }
+                    _ => {
+                        return Err(RowConvertError {
+                            message: format!(
+                                "Expected Time32/Time64 Arrow type, got: {:?}",
+                                data_type
+                            ),
+                        });
+                    }
+                }
+
+                return Err(RowConvertError {
+                    message: "Builder type mismatch for Time".to_string(),
+                });
+            }
+            Datum::TimestampNtz(ts) => {
+                let millis = ts.get_millisecond();
+                let nanos = ts.get_nano_of_millisecond();
+
+                if let Some(b) = builder
+                    .as_any_mut()
+                    .downcast_mut::<TimestampSecondBuilder>()
+                {
+                    b.append_value(millis / MILLIS_PER_SECOND);
+                    return Ok(());
+                }
+                if let Some(b) = builder
+                    .as_any_mut()
+                    .downcast_mut::<TimestampMillisecondBuilder>()
+                {
+                    b.append_value(millis);
+                    return Ok(());
+                }
+                if let Some(b) = builder
+                    .as_any_mut()
+                    .downcast_mut::<TimestampMicrosecondBuilder>()
+                {
+                    b.append_value(millis_nanos_to_micros(millis, nanos)?);
+                    return Ok(());
+                }
+                if let Some(b) = builder
+                    .as_any_mut()
+                    .downcast_mut::<TimestampNanosecondBuilder>()
+                {
+                    b.append_value(millis_nanos_to_nanos(millis, nanos)?);
+                    return Ok(());
+                }
+
+                return Err(RowConvertError {
+                    message: "Builder type mismatch for 
TimestampNtz".to_string(),
+                });
+            }
+            Datum::TimestampLtz(ts) => {
+                let millis = ts.get_epoch_millisecond();
+                let nanos = ts.get_nano_of_millisecond();
+
+                if let Some(b) = builder
+                    .as_any_mut()
+                    .downcast_mut::<TimestampSecondBuilder>()
+                {
+                    b.append_value(millis / MILLIS_PER_SECOND);
+                    return Ok(());
+                }
+                if let Some(b) = builder
+                    .as_any_mut()
+                    .downcast_mut::<TimestampMillisecondBuilder>()
+                {
+                    b.append_value(millis);
+                    return Ok(());
+                }
+                if let Some(b) = builder
+                    .as_any_mut()
+                    .downcast_mut::<TimestampMicrosecondBuilder>()
+                {
+                    b.append_value(millis_nanos_to_micros(millis, nanos)?);
+                    return Ok(());
+                }
+                if let Some(b) = builder
+                    .as_any_mut()
+                    .downcast_mut::<TimestampNanosecondBuilder>()
+                {
+                    b.append_value(millis_nanos_to_nanos(millis, nanos)?);
+                    return Ok(());
+                }
+
+                return Err(RowConvertError {
+                    message: "Builder type mismatch for 
TimestampLtz".to_string(),
                 });
             }
         }
@@ -325,7 +730,11 @@ impl Datum<'_> {
 macro_rules! impl_to_arrow {
     ($ty:ty, $variant:ident) => {
         impl ToArrow for $ty {
-            fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()> {
+            fn append_to(
+                &self,
+                builder: &mut dyn ArrayBuilder,
+                _data_type: &arrow_schema::DataType,
+            ) -> Result<()> {
                 if let Some(b) = 
builder.as_any_mut().downcast_mut::<$variant>() {
                     b.append_value(*self);
                     Ok(())
@@ -536,24 +945,39 @@ mod tests {
         assert_eq!(value, 42);
         let value: std::result::Result<i16, _> = (&datum).try_into();
         assert!(value.is_err());
+
+        // Test temporal types
+        let decimal = Decimal::from_unscaled_long(12345, 10, 2).unwrap();
+        let datum: Datum = decimal.clone().into();
+        assert_eq!(datum.as_decimal(), &decimal);
+        let extracted: Decimal = (&datum).try_into().unwrap();
+        assert_eq!(extracted, decimal);
+
+        let date = Date::new(19000);
+        let datum: Datum = date.into();
+        assert_eq!(datum.as_date(), date);
+
+        let ts_ltz = TimestampLtz::new(1672531200000);
+        let datum: Datum = ts_ltz.into();
+        assert_eq!(datum.as_timestamp_ltz(), ts_ltz);
     }
 
     #[test]
     fn datum_append_to_builder() {
         let mut builder = Int32Builder::new();
-        Datum::Null.append_to(&mut builder).unwrap();
-        Datum::Int32(5).append_to(&mut builder).unwrap();
+        Datum::Null
+            .append_to(&mut builder, &arrow_schema::DataType::Int32)
+            .unwrap();
+        Datum::Int32(5)
+            .append_to(&mut builder, &arrow_schema::DataType::Int32)
+            .unwrap();
         let array = builder.finish();
         assert!(array.is_null(0));
         assert_eq!(array.value(1), 5);
 
         let mut builder = StringBuilder::new();
-        let err = Datum::Int32(1).append_to(&mut builder).unwrap_err();
-        assert!(matches!(err, crate::error::Error::RowConvertError { .. }));
-
-        let mut builder = Int32Builder::new();
-        let err = Datum::Date(Date::new(0))
-            .append_to(&mut builder)
+        let err = Datum::Int32(1)
+            .append_to(&mut builder, &arrow_schema::DataType::Utf8)
             .unwrap_err();
         assert!(matches!(err, crate::error::Error::RowConvertError { .. }));
     }

Reply via email to