This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new f7f3abf  feat: Support Arrow append operations for decimals, temporal 
types in Python (#206)
f7f3abf is described below

commit f7f3abf7fbf501022f052a966effc2e2bca3d87e
Author: Anton Borisov <[email protected]>
AuthorDate: Sun Jan 25 08:31:55 2026 +0000

    feat: Support Arrow append operations for decimals, temporal types in 
Python (#206)
---
 bindings/python/Cargo.toml         |   1 +
 bindings/python/example/example.py |  59 ++++-
 bindings/python/src/table.rs       | 448 ++++++++++++++++++++++++++++++++++++-
 bindings/python/src/utils.rs       |  35 ++-
 4 files changed, 528 insertions(+), 15 deletions(-)

diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml
index 4da8bf8..ff4d687 100644
--- a/bindings/python/Cargo.toml
+++ b/bindings/python/Cargo.toml
@@ -36,3 +36,4 @@ arrow-schema = "57.0.0"
 arrow-array = "57.0.0"
 pyo3-async-runtimes = { version = "0.26.0", features = ["tokio-runtime"] }
 jiff = { workspace = true }
+bigdecimal = "0.4"
diff --git a/bindings/python/example/example.py 
b/bindings/python/example/example.py
index f1f20d1..730416b 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -17,6 +17,8 @@
 
 import asyncio
 import time
+from datetime import date, time as dt_time, datetime
+from decimal import Decimal
 
 import pandas as pd
 import pyarrow as pa
@@ -45,6 +47,11 @@ async def main():
         pa.field("name", pa.string()),
         pa.field("score", pa.float32()),
         pa.field("age", pa.int32()),
+        pa.field("birth_date", pa.date32()),
+        pa.field("check_in_time", pa.time32("ms")),
+        pa.field("created_at", pa.timestamp("us")),  # TIMESTAMP (NTZ)
+        pa.field("updated_at", pa.timestamp("us", tz="UTC")),  # TIMESTAMP_LTZ
+        pa.field("salary", pa.decimal128(10, 2)),
     ]
 
     # Create a PyArrow schema
@@ -60,7 +67,7 @@ async def main():
     admin = await conn.get_admin()
 
     # Create a Fluss table
-    table_path = fluss.TablePath("fluss", "sample_table")
+    table_path = fluss.TablePath("fluss", "sample_table_types")
 
     try:
         await admin.create_table(table_path, table_descriptor, True)
@@ -96,6 +103,11 @@ async def main():
                 pa.array(["Alice", "Bob", "Charlie"], type=pa.string()),
                 pa.array([95.2, 87.2, 92.1], type=pa.float32()),
                 pa.array([25, 30, 35], type=pa.int32()),
+                pa.array([date(1999, 5, 15), date(1994, 3, 20), date(1989, 11, 
8)], type=pa.date32()),
+                pa.array([dt_time(9, 0, 0), dt_time(9, 30, 0), dt_time(10, 0, 
0)], type=pa.time32("ms")),
+                pa.array([datetime(2024, 1, 15, 10, 30), datetime(2024, 1, 15, 
11, 0), datetime(2024, 1, 15, 11, 30)], type=pa.timestamp("us")),
+                pa.array([datetime(2024, 1, 15, 10, 30), datetime(2024, 1, 15, 
11, 0), datetime(2024, 1, 15, 11, 30)], type=pa.timestamp("us", tz="UTC")),
+                pa.array([Decimal("75000.00"), Decimal("82000.50"), 
Decimal("95000.75")], type=pa.decimal128(10, 2)),
             ],
             schema=schema,
         )
@@ -111,6 +123,11 @@ async def main():
                 pa.array(["David", "Eve"], type=pa.string()),
                 pa.array([88.5, 91.0], type=pa.float32()),
                 pa.array([28, 32], type=pa.int32()),
+                pa.array([date(1996, 7, 22), date(1992, 12, 1)], 
type=pa.date32()),
+                pa.array([dt_time(14, 15, 0), dt_time(8, 45, 0)], 
type=pa.time32("ms")),
+                pa.array([datetime(2024, 1, 16, 9, 0), datetime(2024, 1, 16, 
9, 30)], type=pa.timestamp("us")),
+                pa.array([datetime(2024, 1, 16, 9, 0), datetime(2024, 1, 16, 
9, 30)], type=pa.timestamp("us", tz="UTC")),
+                pa.array([Decimal("68000.00"), Decimal("72500.25")], 
type=pa.decimal128(10, 2)),
             ],
             schema=schema,
         )
@@ -118,15 +135,32 @@ async def main():
         append_writer.write_arrow_batch(pa_record_batch)
         print("Successfully wrote PyArrow RecordBatch")
 
-        # Test 3: Append single rows
-        print("\n--- Testing single row append ---")
-        # Dict input
-        await append_writer.append({"id": 8, "name": "Helen", "score": 93.5, 
"age": 26})
-        print("Successfully appended row (dict)")
-
-        # List input
-        await append_writer.append([9, "Ivan", 90.0, 31])
-        print("Successfully appended row (list)")
+        # Test 3: Append single rows with Date, Time, Timestamp, Decimal
+        print("\n--- Testing single row append with temporal/decimal types 
---")
+        # Dict input with all types including Date, Time, Timestamp, Decimal
+        await append_writer.append({
+            "id": 8,
+            "name": "Helen",
+            "score": 93.5,
+            "age": 26,
+            "birth_date": date(1998, 4, 10),
+            "check_in_time": dt_time(11, 30, 45),
+            "created_at": datetime(2024, 1, 17, 14, 0, 0),
+            "updated_at": datetime(2024, 1, 17, 14, 0, 0),
+            "salary": Decimal("88000.00"),
+        })
+        print("Successfully appended row (dict with Date, Time, Timestamp, 
Decimal)")
+
+        # List input with all types
+        await append_writer.append([
+            9, "Ivan", 90.0, 31,
+            date(1993, 8, 25),
+            dt_time(16, 45, 0),
+            datetime(2024, 1, 17, 15, 30, 0),
+            datetime(2024, 1, 17, 15, 30, 0),
+            Decimal("91500.50"),
+        ])
+        print("Successfully appended row (list with Date, Time, Timestamp, 
Decimal)")
 
         # Test 4: Write Pandas DataFrame
         print("\n--- Testing Pandas DataFrame write ---")
@@ -136,6 +170,11 @@ async def main():
                 "name": ["Frank", "Grace"],
                 "score": [89.3, 94.7],
                 "age": [29, 27],
+                "birth_date": [date(1995, 2, 14), date(1997, 9, 30)],
+                "check_in_time": [dt_time(10, 0, 0), dt_time(10, 30, 0)],
+                "created_at": [datetime(2024, 1, 18, 8, 0), datetime(2024, 1, 
18, 8, 30)],
+                "updated_at": [datetime(2024, 1, 18, 8, 0), datetime(2024, 1, 
18, 8, 30)],
+                "salary": [Decimal("79000.00"), Decimal("85500.75")],
             }
         )
 
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 773354e..b56a29d 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -18,12 +18,22 @@
 use crate::TOKIO_RUNTIME;
 use crate::*;
 use arrow::array::RecordBatch;
-use arrow_pyarrow::FromPyArrow;
+use arrow_pyarrow::{FromPyArrow, ToPyArrow};
 use fluss::client::EARLIEST_OFFSET;
 use fluss::rpc::message::OffsetSpec;
+use pyo3::types::IntoPyDict;
 use pyo3_async_runtimes::tokio::future_into_py;
 use std::sync::Arc;
 
+// Time conversion constants
+const MILLIS_PER_SECOND: i64 = 1_000;
+const MILLIS_PER_MINUTE: i64 = 60_000;
+const MILLIS_PER_HOUR: i64 = 3_600_000;
+const MICROS_PER_MILLI: i64 = 1_000;
+const MICROS_PER_SECOND: i64 = 1_000_000;
+const MICROS_PER_DAY: i64 = 86_400_000_000;
+const NANOS_PER_MILLI: i64 = 1_000_000;
+
 /// Represents a Fluss table for data operations
 #[pyclass]
 pub struct FlussTable {
@@ -246,14 +256,29 @@ impl AppendWriter {
 
     /// Write Pandas DataFrame data
     pub fn write_pandas(&self, py: Python, df: Py<PyAny>) -> PyResult<()> {
+        // Get the expected Arrow schema from the Fluss table
+        let row_type = self.table_info.get_row_type();
+        let expected_schema = fcore::record::to_arrow_schema(row_type)
+            .map_err(|e| FlussError::new_err(format!("Failed to get table 
schema: {}", e)))?;
+
+        // Convert Arrow schema to PyArrow schema
+        let py_schema = expected_schema
+            .as_ref()
+            .to_pyarrow(py)
+            .map_err(|e| FlussError::new_err(format!("Failed to convert 
schema: {}", e)))?;
+
         // Import pyarrow module
         let pyarrow = py.import("pyarrow")?;
 
         // Get the Table class from pyarrow module
         let table_class = pyarrow.getattr("Table")?;
 
-        // Call Table.from_pandas(df) - from_pandas is a class method
-        let pa_table = table_class.call_method1("from_pandas", (df,))?;
+        // Call Table.from_pandas(df, schema=expected_schema) to ensure proper 
type casting
+        let pa_table = table_class.call_method(
+            "from_pandas",
+            (df,),
+            Some(&[("schema", py_schema)].into_py_dict(py)?),
+        )?;
 
         // Then call write_arrow with the converted table
         self.write_arrow(py, pa_table.into())
@@ -473,12 +498,393 @@ fn python_value_to_datum(
                 )))
             }
         }
+        fcore::metadata::DataType::Decimal(decimal_type) => {
+            python_decimal_to_datum(value, decimal_type.precision(), 
decimal_type.scale())
+        }
+        fcore::metadata::DataType::Date(_) => python_date_to_datum(value),
+        fcore::metadata::DataType::Time(_) => python_time_to_datum(value),
+        fcore::metadata::DataType::Timestamp(_) => 
python_datetime_to_timestamp_ntz(value),
+        fcore::metadata::DataType::TimestampLTz(_) => 
python_datetime_to_timestamp_ltz(value),
         _ => Err(FlussError::new_err(format!(
             "Unsupported data type for row-level operations: {data_type}"
         ))),
     }
 }
 
+/// Cached decimal.Decimal type
+/// Uses PyOnceLock for thread-safety and subinterpreter compatibility.
+static DECIMAL_TYPE: pyo3::sync::PyOnceLock<Py<pyo3::types::PyType>> =
+    pyo3::sync::PyOnceLock::new();
+
+/// Cached UTC epoch type
+static UTC_EPOCH: pyo3::sync::PyOnceLock<Py<PyAny>> = 
pyo3::sync::PyOnceLock::new();
+
+/// Get the cached decimal.Decimal type, importing it once per interpreter.
+fn get_decimal_type(py: Python) -> PyResult<Bound<pyo3::types::PyType>> {
+    let ty = DECIMAL_TYPE.get_or_try_init(py, || -> PyResult<_> {
+        let decimal_mod = py.import("decimal")?;
+        let decimal_ty = decimal_mod
+            .getattr("Decimal")?
+            .downcast_into::<pyo3::types::PyType>()?;
+        Ok(decimal_ty.unbind())
+    })?;
+    Ok(ty.bind(py).clone())
+}
+
+/// Get the cached UTC epoch datetime, creating it once per interpreter.
+fn get_utc_epoch(py: Python) -> PyResult<Bound<PyAny>> {
+    let epoch = UTC_EPOCH.get_or_try_init(py, || -> PyResult<_> {
+        let datetime_mod = py.import("datetime")?;
+        let timezone = datetime_mod.getattr("timezone")?;
+        let utc = timezone.getattr("utc")?;
+        let epoch = datetime_mod
+            .getattr("datetime")?
+            .call1((1970, 1, 1, 0, 0, 0, 0, &utc))?;
+        Ok(epoch.unbind())
+    })?;
+    Ok(epoch.bind(py).clone())
+}
+
+/// Validate that value is a decimal.Decimal instance.
+fn ensure_is_decimal(value: &Bound<PyAny>) -> PyResult<()> {
+    let decimal_ty = get_decimal_type(value.py())?;
+    if !value.is_instance(&decimal_ty.into_any())? {
+        return Err(FlussError::new_err(format!(
+            "Expected decimal.Decimal, got {}",
+            get_type_name(value)
+        )));
+    }
+    Ok(())
+}
+
+/// Convert Python decimal.Decimal to Datum::Decimal.
+/// Only accepts decimal.Decimal
+fn python_decimal_to_datum(
+    value: &Bound<PyAny>,
+    precision: u32,
+    scale: u32,
+) -> PyResult<fcore::row::Datum<'static>> {
+    use std::str::FromStr;
+
+    ensure_is_decimal(value)?;
+
+    let decimal_str: String = value.str()?.extract()?;
+    let bd = bigdecimal::BigDecimal::from_str(&decimal_str).map_err(|e| {
+        FlussError::new_err(format!("Failed to parse decimal '{}': {}", 
decimal_str, e))
+    })?;
+
+    let decimal = fcore::row::Decimal::from_big_decimal(bd, precision, 
scale).map_err(|e| {
+        FlussError::new_err(format!(
+            "Failed to convert decimal '{}' to DECIMAL({}, {}): {}",
+            decimal_str, precision, scale, e
+        ))
+    })?;
+
+    Ok(fcore::row::Datum::Decimal(decimal))
+}
+
+/// Convert Python datetime.date to Datum::Date.
+fn python_date_to_datum(value: &Bound<PyAny>) -> 
PyResult<fcore::row::Datum<'static>> {
+    use pyo3::types::{PyDate, PyDateAccess, PyDateTime};
+
+    // Reject datetime.datetime (subclass of date) - use timestamp columns for 
those
+    if value.downcast::<PyDateTime>().is_ok() {
+        return Err(FlussError::new_err(
+            "Expected datetime.date, got datetime.datetime. Use a TIMESTAMP 
column for datetime values.",
+        ));
+    }
+
+    let date = value.downcast::<PyDate>().map_err(|_| {
+        FlussError::new_err(format!(
+            "Expected datetime.date, got {}",
+            get_type_name(value)
+        ))
+    })?;
+
+    let year = date.get_year();
+    let month = date.get_month();
+    let day = date.get_day();
+
+    // Calculate days since Unix epoch (1970-01-01)
+    let civil_date = jiff::civil::date(year as i16, month as i8, day as i8);
+    let epoch = jiff::civil::date(1970, 1, 1);
+    let days_since_epoch = (civil_date - epoch).get_days();
+
+    Ok(fcore::row::Datum::Date(fcore::row::Date::new(
+        days_since_epoch,
+    )))
+}
+
+/// Convert Python datetime.time to Datum::Time.
+/// Uses PyO3's native PyTime type for efficient access.
+///
+/// Note: Fluss TIME is always stored as milliseconds since midnight (i32) 
regardless
+/// of the schema's precision setting. This matches the Java Fluss wire 
protocol.
+/// Sub-millisecond precision (microseconds not divisible by 1000) will raise 
an error
+/// to prevent silent data loss and ensure fail-fast behavior.
+fn python_time_to_datum(value: &Bound<PyAny>) -> 
PyResult<fcore::row::Datum<'static>> {
+    use pyo3::types::{PyTime, PyTimeAccess};
+
+    let time = value.downcast::<PyTime>().map_err(|_| {
+        FlussError::new_err(format!(
+            "Expected datetime.time, got {}",
+            get_type_name(value)
+        ))
+    })?;
+
+    let hour = time.get_hour() as i32;
+    let minute = time.get_minute() as i32;
+    let second = time.get_second() as i32;
+    let microsecond = time.get_microsecond() as i32;
+
+    // Strict validation: reject sub-millisecond precision
+    if microsecond % MICROS_PER_MILLI as i32 != 0 {
+        return Err(FlussError::new_err(format!(
+            "TIME values with sub-millisecond precision are not supported. \
+             Got time with {} microseconds (not divisible by 1000). \
+             Fluss stores TIME as milliseconds since midnight. \
+             Please round to milliseconds before insertion.",
+            microsecond
+        )));
+    }
+
+    // Convert to milliseconds since midnight
+    let millis = hour * MILLIS_PER_HOUR as i32
+        + minute * MILLIS_PER_MINUTE as i32
+        + second * MILLIS_PER_SECOND as i32
+        + microsecond / MICROS_PER_MILLI as i32;
+
+    Ok(fcore::row::Datum::Time(fcore::row::Time::new(millis)))
+}
+
+/// Convert Python datetime-like object to Datum::TimestampNtz.
+/// Supports: datetime.datetime (naive preferred), pd.Timestamp, np.datetime64
+fn python_datetime_to_timestamp_ntz(value: &Bound<PyAny>) -> 
PyResult<fcore::row::Datum<'static>> {
+    let (epoch_millis, nano_of_milli) = 
extract_datetime_components_ntz(value)?;
+
+    let ts = fcore::row::TimestampNtz::from_millis_nanos(epoch_millis, 
nano_of_milli)
+        .map_err(|e| FlussError::new_err(format!("Failed to create 
TimestampNtz: {}", e)))?;
+
+    Ok(fcore::row::Datum::TimestampNtz(ts))
+}
+
+/// Convert Python datetime-like object to Datum::TimestampLtz.
+/// For naive datetimes, assumes UTC. For aware datetimes, converts to UTC.
+/// Supports: datetime.datetime, pd.Timestamp, np.datetime64
+fn python_datetime_to_timestamp_ltz(value: &Bound<PyAny>) -> 
PyResult<fcore::row::Datum<'static>> {
+    let (epoch_millis, nano_of_milli) = 
extract_datetime_components_ltz(value)?;
+
+    let ts = fcore::row::TimestampLtz::from_millis_nanos(epoch_millis, 
nano_of_milli)
+        .map_err(|e| FlussError::new_err(format!("Failed to create 
TimestampLtz: {}", e)))?;
+
+    Ok(fcore::row::Datum::TimestampLtz(ts))
+}
+
+/// Extract epoch milliseconds for TimestampNtz (wall-clock time, no timezone 
conversion).
+/// Uses integer arithmetic to avoid float precision issues.
+/// For clarity, tz-aware datetimes are rejected - use TimestampLtz for those.
+fn extract_datetime_components_ntz(value: &Bound<PyAny>) -> PyResult<(i64, 
i32)> {
+    use pyo3::types::PyDateTime;
+
+    // Try PyDateTime first
+    if let Ok(dt) = value.downcast::<PyDateTime>() {
+        // Reject tz-aware datetime for NTZ - it's ambiguous what the user 
wants
+        let tzinfo = dt.getattr("tzinfo")?;
+        if !tzinfo.is_none() {
+            return Err(FlussError::new_err(
+                "TIMESTAMP (without timezone) requires a naive datetime. \
+                 Got timezone-aware datetime. Either remove tzinfo or use 
TIMESTAMP_LTZ column.",
+            ));
+        }
+        return datetime_to_epoch_millis_as_utc(dt);
+    }
+
+    // Check for pandas Timestamp by verifying module name
+    if is_pandas_timestamp(value) {
+        // For NTZ, reject tz-aware pandas Timestamps for consistency with 
datetime behavior
+        if let Ok(tz) = value.getattr("tz") {
+            if !tz.is_none() {
+                return Err(FlussError::new_err(
+                    "TIMESTAMP (without timezone) requires a naive 
pd.Timestamp. \
+                     Got timezone-aware Timestamp. Either use 
tz_localize(None) or use TIMESTAMP_LTZ column.",
+                ));
+            }
+        }
+        // Naive pandas Timestamp: .value is nanoseconds since epoch 
(wall-clock as UTC)
+        let nanos: i64 = value.getattr("value")?.extract()?;
+        return Ok(nanos_to_millis_and_submillis(nanos));
+    }
+
+    // Try to_pydatetime() for objects that support it
+    if let Ok(py_dt) = value.call_method0("to_pydatetime") {
+        if let Ok(dt) = py_dt.downcast::<PyDateTime>() {
+            let tzinfo = dt.getattr("tzinfo")?;
+            if !tzinfo.is_none() {
+                return Err(FlussError::new_err(
+                    "TIMESTAMP (without timezone) requires a naive datetime. \
+                     Got timezone-aware value. Use TIMESTAMP_LTZ column 
instead.",
+                ));
+            }
+            return datetime_to_epoch_millis_as_utc(dt);
+        }
+    }
+
+    Err(FlussError::new_err(format!(
+        "Expected naive datetime.datetime or pd.Timestamp, got {}",
+        get_type_name(value)
+    )))
+}
+
+/// Extract epoch milliseconds for TimestampLtz (instant in time, UTC-based).
+/// For naive datetimes, assumes UTC. For aware datetimes, converts to UTC.
+fn extract_datetime_components_ltz(value: &Bound<PyAny>) -> PyResult<(i64, 
i32)> {
+    use pyo3::types::PyDateTime;
+
+    // Try PyDateTime first
+    if let Ok(dt) = value.downcast::<PyDateTime>() {
+        // Check if timezone-aware
+        let tzinfo = dt.getattr("tzinfo")?;
+        if tzinfo.is_none() {
+            // Naive datetime: assume UTC (treat components as UTC time)
+            return datetime_to_epoch_millis_as_utc(dt);
+        } else {
+            // Aware datetime: use timedelta from epoch to get correct UTC 
instant
+            return datetime_to_epoch_millis_utc_aware(dt);
+        }
+    }
+
+    // Check for pandas Timestamp
+    if is_pandas_timestamp(value) {
+        // pandas Timestamp.value is always nanoseconds since UTC epoch
+        let nanos: i64 = value.getattr("value")?.extract()?;
+        return Ok(nanos_to_millis_and_submillis(nanos));
+    }
+
+    // Try to_pydatetime()
+    if let Ok(py_dt) = value.call_method0("to_pydatetime") {
+        if let Ok(dt) = py_dt.downcast::<PyDateTime>() {
+            let tzinfo = dt.getattr("tzinfo")?;
+            if tzinfo.is_none() {
+                return datetime_to_epoch_millis_as_utc(dt);
+            } else {
+                return datetime_to_epoch_millis_utc_aware(dt);
+            }
+        }
+    }
+
+    Err(FlussError::new_err(format!(
+        "Expected datetime.datetime or pd.Timestamp, got {}",
+        get_type_name(value)
+    )))
+}
+
+/// Convert datetime components to epoch milliseconds treating them as UTC
+fn datetime_to_epoch_millis_as_utc(
+    dt: &pyo3::Bound<'_, pyo3::types::PyDateTime>,
+) -> PyResult<(i64, i32)> {
+    use pyo3::types::{PyDateAccess, PyTimeAccess};
+
+    let year = dt.get_year();
+    let month = dt.get_month();
+    let day = dt.get_day();
+    let hour = dt.get_hour();
+    let minute = dt.get_minute();
+    let second = dt.get_second();
+    let microsecond = dt.get_microsecond();
+
+    // Create jiff civil datetime and convert to UTC timestamp
+    // Safe casts: hour (0-23), minute (0-59), second (0-59) all fit in i8
+    let civil_dt = jiff::civil::date(year as i16, month as i8, day as i8).at(
+        hour as i8,
+        minute as i8,
+        second as i8,
+        microsecond as i32 * 1000,
+    );
+
+    let timestamp = jiff::tz::Offset::UTC
+        .to_timestamp(civil_dt)
+        .map_err(|e| FlussError::new_err(format!("Invalid datetime: {}", e)))?;
+
+    let millis = timestamp.as_millisecond();
+    let nano_of_milli = (timestamp.subsec_nanosecond() % NANOS_PER_MILLI as 
i32) as i32;
+
+    Ok((millis, nano_of_milli))
+}
+
+/// Convert timezone-aware datetime to epoch milliseconds using Python's 
timedelta.
+/// This correctly handles timezone conversions by computing (dt - UTC_EPOCH).
+/// The UTC epoch is cached for performance.
+fn datetime_to_epoch_millis_utc_aware(
+    dt: &pyo3::Bound<'_, pyo3::types::PyDateTime>,
+) -> PyResult<(i64, i32)> {
+    use pyo3::types::{PyDelta, PyDeltaAccess};
+
+    let py = dt.py();
+    let epoch = get_utc_epoch(py)?;
+
+    // Compute delta = dt - epoch (this handles timezone conversion correctly)
+    let delta = dt.call_method1("__sub__", (epoch,))?;
+    let delta = delta.downcast::<PyDelta>()?;
+
+    // Extract components using integer arithmetic
+    let days = delta.get_days() as i64;
+    let seconds = delta.get_seconds() as i64;
+    let microseconds = delta.get_microseconds() as i64;
+
+    // Total milliseconds (note: days can be negative for dates before epoch)
+    let total_micros = days * MICROS_PER_DAY + seconds * MICROS_PER_SECOND + 
microseconds;
+    let millis = total_micros / MICROS_PER_MILLI;
+    let nano_of_milli = ((total_micros % MICROS_PER_MILLI) * MICROS_PER_MILLI) 
as i32;
+
+    // Handle negative microseconds remainder
+    let (millis, nano_of_milli) = if nano_of_milli < 0 {
+        (millis - 1, nano_of_milli + NANOS_PER_MILLI as i32)
+    } else {
+        (millis, nano_of_milli)
+    };
+
+    Ok((millis, nano_of_milli))
+}
+
+/// Convert nanoseconds to (milliseconds, nano_of_millisecond)
+fn nanos_to_millis_and_submillis(nanos: i64) -> (i64, i32) {
+    let millis = nanos / NANOS_PER_MILLI;
+    let nano_of_milli = (nanos % NANOS_PER_MILLI) as i32;
+
+    // Handle negative nanoseconds correctly (Euclidean remainder)
+    if nano_of_milli < 0 {
+        (millis - 1, nano_of_milli + NANOS_PER_MILLI as i32)
+    } else {
+        (millis, nano_of_milli)
+    }
+}
+
+/// Check if value is a pandas Timestamp by examining its type.
+fn is_pandas_timestamp(value: &Bound<PyAny>) -> bool {
+    // Check module and class name to avoid importing pandas
+    if let Ok(cls) = value.get_type().getattr("__module__") {
+        if let Ok(module) = cls.extract::<&str>() {
+            if module.starts_with("pandas") {
+                if let Ok(name) = value.get_type().getattr("__name__") {
+                    if let Ok(name_str) = name.extract::<&str>() {
+                        return name_str == "Timestamp";
+                    }
+                }
+            }
+        }
+    }
+    false
+}
+
+/// Get type name
+fn get_type_name(value: &Bound<PyAny>) -> String {
+    value
+        .get_type()
+        .name()
+        .map(|s| s.to_string())
+        .unwrap_or_else(|_| "unknown".to_string())
+}
+
 /// Scanner for reading log data from a Fluss table
 #[pyclass]
 pub struct LogScanner {
@@ -621,3 +1027,39 @@ impl LogScanner {
         }
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_nanos_to_millis_and_submillis() {
+        // Simple positive case
+        assert_eq!(nanos_to_millis_and_submillis(1_500_000), (1, 500_000));
+
+        // Exact millisecond boundary
+        assert_eq!(nanos_to_millis_and_submillis(2_000_000), (2, 0));
+
+        // Zero
+        assert_eq!(nanos_to_millis_and_submillis(0), (0, 0));
+
+        // Large value
+        assert_eq!(
+            nanos_to_millis_and_submillis(86_400_000_000_000), // 1 day in 
nanos
+            (86_400_000, 0)
+        );
+
+        // Negative: -1.5 milliseconds should be (-2 millis, +500_000 nanos)
+        // Because -1_500_000 nanos = -2ms + 500_000ns
+        assert_eq!(nanos_to_millis_and_submillis(-1_500_000), (-2, 500_000));
+
+        // Negative exact boundary
+        assert_eq!(nanos_to_millis_and_submillis(-2_000_000), (-2, 0));
+
+        // Small negative
+        assert_eq!(nanos_to_millis_and_submillis(-1), (-1, 999_999));
+
+        // Negative with sub-millisecond part
+        assert_eq!(nanos_to_millis_and_submillis(-500_000), (-1, 500_000));
+    }
+}
diff --git a/bindings/python/src/utils.rs b/bindings/python/src/utils.rs
index 09e6b5f..ee32c9c 100644
--- a/bindings/python/src/utils.rs
+++ b/bindings/python/src/utils.rs
@@ -59,8 +59,39 @@ impl Utils {
             ArrowDataType::Binary | ArrowDataType::LargeBinary => 
DataTypes::bytes(),
             ArrowDataType::Date32 => DataTypes::date(),
             ArrowDataType::Date64 => DataTypes::date(),
-            ArrowDataType::Time32(_) | ArrowDataType::Time64(_) => 
DataTypes::time(),
-            ArrowDataType::Timestamp(_, _) => DataTypes::timestamp(),
+            ArrowDataType::Time32(unit) => match unit {
+                arrow_schema::TimeUnit::Second => 
DataTypes::time_with_precision(0),
+                arrow_schema::TimeUnit::Millisecond => 
DataTypes::time_with_precision(3),
+                _ => {
+                    return Err(FlussError::new_err(format!(
+                        "Unsupported Time32 unit: {unit:?}"
+                    )));
+                }
+            },
+            ArrowDataType::Time64(unit) => match unit {
+                arrow_schema::TimeUnit::Microsecond => 
DataTypes::time_with_precision(6),
+                arrow_schema::TimeUnit::Nanosecond => 
DataTypes::time_with_precision(9),
+                _ => {
+                    return Err(FlussError::new_err(format!(
+                        "Unsupported Time64 unit: {unit:?}"
+                    )));
+                }
+            },
+            ArrowDataType::Timestamp(unit, tz) => {
+                let precision = match unit {
+                    arrow_schema::TimeUnit::Second => 0,
+                    arrow_schema::TimeUnit::Millisecond => 3,
+                    arrow_schema::TimeUnit::Microsecond => 6,
+                    arrow_schema::TimeUnit::Nanosecond => 9,
+                };
+                // Arrow Timestamp with timezone -> Fluss TimestampLtz
+                // Arrow Timestamp without timezone -> Fluss Timestamp (NTZ)
+                if tz.is_some() {
+                    DataTypes::timestamp_ltz_with_precision(precision)
+                } else {
+                    DataTypes::timestamp_with_precision(precision)
+                }
+            }
             ArrowDataType::Decimal128(precision, scale) => {
                 DataTypes::decimal(*precision as u32, *scale as u32)
             }

Reply via email to