This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new f7f3abf feat: Support Arrow append operations for decimals, temporal
types in Python (#206)
f7f3abf is described below
commit f7f3abf7fbf501022f052a966effc2e2bca3d87e
Author: Anton Borisov <[email protected]>
AuthorDate: Sun Jan 25 08:31:55 2026 +0000
feat: Support Arrow append operations for decimals, temporal types in
Python (#206)
---
bindings/python/Cargo.toml | 1 +
bindings/python/example/example.py | 59 ++++-
bindings/python/src/table.rs | 448 ++++++++++++++++++++++++++++++++++++-
bindings/python/src/utils.rs | 35 ++-
4 files changed, 528 insertions(+), 15 deletions(-)
diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml
index 4da8bf8..ff4d687 100644
--- a/bindings/python/Cargo.toml
+++ b/bindings/python/Cargo.toml
@@ -36,3 +36,4 @@ arrow-schema = "57.0.0"
arrow-array = "57.0.0"
pyo3-async-runtimes = { version = "0.26.0", features = ["tokio-runtime"] }
jiff = { workspace = true }
+bigdecimal = "0.4"
diff --git a/bindings/python/example/example.py
b/bindings/python/example/example.py
index f1f20d1..730416b 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -17,6 +17,8 @@
import asyncio
import time
+from datetime import date, time as dt_time, datetime
+from decimal import Decimal
import pandas as pd
import pyarrow as pa
@@ -45,6 +47,11 @@ async def main():
pa.field("name", pa.string()),
pa.field("score", pa.float32()),
pa.field("age", pa.int32()),
+ pa.field("birth_date", pa.date32()),
+ pa.field("check_in_time", pa.time32("ms")),
+ pa.field("created_at", pa.timestamp("us")), # TIMESTAMP (NTZ)
+ pa.field("updated_at", pa.timestamp("us", tz="UTC")), # TIMESTAMP_LTZ
+ pa.field("salary", pa.decimal128(10, 2)),
]
# Create a PyArrow schema
@@ -60,7 +67,7 @@ async def main():
admin = await conn.get_admin()
# Create a Fluss table
- table_path = fluss.TablePath("fluss", "sample_table")
+ table_path = fluss.TablePath("fluss", "sample_table_types")
try:
await admin.create_table(table_path, table_descriptor, True)
@@ -96,6 +103,11 @@ async def main():
pa.array(["Alice", "Bob", "Charlie"], type=pa.string()),
pa.array([95.2, 87.2, 92.1], type=pa.float32()),
pa.array([25, 30, 35], type=pa.int32()),
+ pa.array([date(1999, 5, 15), date(1994, 3, 20), date(1989, 11,
8)], type=pa.date32()),
+ pa.array([dt_time(9, 0, 0), dt_time(9, 30, 0), dt_time(10, 0,
0)], type=pa.time32("ms")),
+ pa.array([datetime(2024, 1, 15, 10, 30), datetime(2024, 1, 15,
11, 0), datetime(2024, 1, 15, 11, 30)], type=pa.timestamp("us")),
+ pa.array([datetime(2024, 1, 15, 10, 30), datetime(2024, 1, 15,
11, 0), datetime(2024, 1, 15, 11, 30)], type=pa.timestamp("us", tz="UTC")),
+ pa.array([Decimal("75000.00"), Decimal("82000.50"),
Decimal("95000.75")], type=pa.decimal128(10, 2)),
],
schema=schema,
)
@@ -111,6 +123,11 @@ async def main():
pa.array(["David", "Eve"], type=pa.string()),
pa.array([88.5, 91.0], type=pa.float32()),
pa.array([28, 32], type=pa.int32()),
+ pa.array([date(1996, 7, 22), date(1992, 12, 1)],
type=pa.date32()),
+ pa.array([dt_time(14, 15, 0), dt_time(8, 45, 0)],
type=pa.time32("ms")),
+ pa.array([datetime(2024, 1, 16, 9, 0), datetime(2024, 1, 16,
9, 30)], type=pa.timestamp("us")),
+ pa.array([datetime(2024, 1, 16, 9, 0), datetime(2024, 1, 16,
9, 30)], type=pa.timestamp("us", tz="UTC")),
+ pa.array([Decimal("68000.00"), Decimal("72500.25")],
type=pa.decimal128(10, 2)),
],
schema=schema,
)
@@ -118,15 +135,32 @@ async def main():
append_writer.write_arrow_batch(pa_record_batch)
print("Successfully wrote PyArrow RecordBatch")
- # Test 3: Append single rows
- print("\n--- Testing single row append ---")
- # Dict input
- await append_writer.append({"id": 8, "name": "Helen", "score": 93.5,
"age": 26})
- print("Successfully appended row (dict)")
-
- # List input
- await append_writer.append([9, "Ivan", 90.0, 31])
- print("Successfully appended row (list)")
+ # Test 3: Append single rows with Date, Time, Timestamp, Decimal
+ print("\n--- Testing single row append with temporal/decimal types
---")
+ # Dict input with all types including Date, Time, Timestamp, Decimal
+ await append_writer.append({
+ "id": 8,
+ "name": "Helen",
+ "score": 93.5,
+ "age": 26,
+ "birth_date": date(1998, 4, 10),
+ "check_in_time": dt_time(11, 30, 45),
+ "created_at": datetime(2024, 1, 17, 14, 0, 0),
+ "updated_at": datetime(2024, 1, 17, 14, 0, 0),
+ "salary": Decimal("88000.00"),
+ })
+ print("Successfully appended row (dict with Date, Time, Timestamp,
Decimal)")
+
+ # List input with all types
+ await append_writer.append([
+ 9, "Ivan", 90.0, 31,
+ date(1993, 8, 25),
+ dt_time(16, 45, 0),
+ datetime(2024, 1, 17, 15, 30, 0),
+ datetime(2024, 1, 17, 15, 30, 0),
+ Decimal("91500.50"),
+ ])
+ print("Successfully appended row (list with Date, Time, Timestamp,
Decimal)")
# Test 4: Write Pandas DataFrame
print("\n--- Testing Pandas DataFrame write ---")
@@ -136,6 +170,11 @@ async def main():
"name": ["Frank", "Grace"],
"score": [89.3, 94.7],
"age": [29, 27],
+ "birth_date": [date(1995, 2, 14), date(1997, 9, 30)],
+ "check_in_time": [dt_time(10, 0, 0), dt_time(10, 30, 0)],
+ "created_at": [datetime(2024, 1, 18, 8, 0), datetime(2024, 1,
18, 8, 30)],
+ "updated_at": [datetime(2024, 1, 18, 8, 0), datetime(2024, 1,
18, 8, 30)],
+ "salary": [Decimal("79000.00"), Decimal("85500.75")],
}
)
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 773354e..b56a29d 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -18,12 +18,22 @@
use crate::TOKIO_RUNTIME;
use crate::*;
use arrow::array::RecordBatch;
-use arrow_pyarrow::FromPyArrow;
+use arrow_pyarrow::{FromPyArrow, ToPyArrow};
use fluss::client::EARLIEST_OFFSET;
use fluss::rpc::message::OffsetSpec;
+use pyo3::types::IntoPyDict;
use pyo3_async_runtimes::tokio::future_into_py;
use std::sync::Arc;
+// Time conversion constants
+const MILLIS_PER_SECOND: i64 = 1_000;
+const MILLIS_PER_MINUTE: i64 = 60_000;
+const MILLIS_PER_HOUR: i64 = 3_600_000;
+const MICROS_PER_MILLI: i64 = 1_000;
+const MICROS_PER_SECOND: i64 = 1_000_000;
+const MICROS_PER_DAY: i64 = 86_400_000_000;
+const NANOS_PER_MILLI: i64 = 1_000_000;
+
/// Represents a Fluss table for data operations
#[pyclass]
pub struct FlussTable {
@@ -246,14 +256,29 @@ impl AppendWriter {
/// Write Pandas DataFrame data
pub fn write_pandas(&self, py: Python, df: Py<PyAny>) -> PyResult<()> {
+ // Get the expected Arrow schema from the Fluss table
+ let row_type = self.table_info.get_row_type();
+ let expected_schema = fcore::record::to_arrow_schema(row_type)
+ .map_err(|e| FlussError::new_err(format!("Failed to get table
schema: {}", e)))?;
+
+ // Convert Arrow schema to PyArrow schema
+ let py_schema = expected_schema
+ .as_ref()
+ .to_pyarrow(py)
+ .map_err(|e| FlussError::new_err(format!("Failed to convert
schema: {}", e)))?;
+
// Import pyarrow module
let pyarrow = py.import("pyarrow")?;
// Get the Table class from pyarrow module
let table_class = pyarrow.getattr("Table")?;
- // Call Table.from_pandas(df) - from_pandas is a class method
- let pa_table = table_class.call_method1("from_pandas", (df,))?;
+ // Call Table.from_pandas(df, schema=expected_schema) to ensure proper
type casting
+ let pa_table = table_class.call_method(
+ "from_pandas",
+ (df,),
+ Some(&[("schema", py_schema)].into_py_dict(py)?),
+ )?;
// Then call write_arrow with the converted table
self.write_arrow(py, pa_table.into())
@@ -473,12 +498,393 @@ fn python_value_to_datum(
)))
}
}
+ fcore::metadata::DataType::Decimal(decimal_type) => {
+ python_decimal_to_datum(value, decimal_type.precision(),
decimal_type.scale())
+ }
+ fcore::metadata::DataType::Date(_) => python_date_to_datum(value),
+ fcore::metadata::DataType::Time(_) => python_time_to_datum(value),
+ fcore::metadata::DataType::Timestamp(_) =>
python_datetime_to_timestamp_ntz(value),
+ fcore::metadata::DataType::TimestampLTz(_) =>
python_datetime_to_timestamp_ltz(value),
_ => Err(FlussError::new_err(format!(
"Unsupported data type for row-level operations: {data_type}"
))),
}
}
+/// Cached decimal.Decimal type
+/// Uses PyOnceLock for thread-safety and subinterpreter compatibility.
+static DECIMAL_TYPE: pyo3::sync::PyOnceLock<Py<pyo3::types::PyType>> =
+ pyo3::sync::PyOnceLock::new();
+
+/// Cached UTC epoch type
+static UTC_EPOCH: pyo3::sync::PyOnceLock<Py<PyAny>> =
pyo3::sync::PyOnceLock::new();
+
+/// Get the cached decimal.Decimal type, importing it once per interpreter.
+fn get_decimal_type(py: Python) -> PyResult<Bound<pyo3::types::PyType>> {
+ let ty = DECIMAL_TYPE.get_or_try_init(py, || -> PyResult<_> {
+ let decimal_mod = py.import("decimal")?;
+ let decimal_ty = decimal_mod
+ .getattr("Decimal")?
+ .downcast_into::<pyo3::types::PyType>()?;
+ Ok(decimal_ty.unbind())
+ })?;
+ Ok(ty.bind(py).clone())
+}
+
+/// Get the cached UTC epoch datetime, creating it once per interpreter.
+fn get_utc_epoch(py: Python) -> PyResult<Bound<PyAny>> {
+ let epoch = UTC_EPOCH.get_or_try_init(py, || -> PyResult<_> {
+ let datetime_mod = py.import("datetime")?;
+ let timezone = datetime_mod.getattr("timezone")?;
+ let utc = timezone.getattr("utc")?;
+ let epoch = datetime_mod
+ .getattr("datetime")?
+ .call1((1970, 1, 1, 0, 0, 0, 0, &utc))?;
+ Ok(epoch.unbind())
+ })?;
+ Ok(epoch.bind(py).clone())
+}
+
+/// Validate that value is a decimal.Decimal instance.
+fn ensure_is_decimal(value: &Bound<PyAny>) -> PyResult<()> {
+ let decimal_ty = get_decimal_type(value.py())?;
+ if !value.is_instance(&decimal_ty.into_any())? {
+ return Err(FlussError::new_err(format!(
+ "Expected decimal.Decimal, got {}",
+ get_type_name(value)
+ )));
+ }
+ Ok(())
+}
+
+/// Convert Python decimal.Decimal to Datum::Decimal.
+/// Only accepts decimal.Decimal
+fn python_decimal_to_datum(
+ value: &Bound<PyAny>,
+ precision: u32,
+ scale: u32,
+) -> PyResult<fcore::row::Datum<'static>> {
+ use std::str::FromStr;
+
+ ensure_is_decimal(value)?;
+
+ let decimal_str: String = value.str()?.extract()?;
+ let bd = bigdecimal::BigDecimal::from_str(&decimal_str).map_err(|e| {
+ FlussError::new_err(format!("Failed to parse decimal '{}': {}",
decimal_str, e))
+ })?;
+
+ let decimal = fcore::row::Decimal::from_big_decimal(bd, precision,
scale).map_err(|e| {
+ FlussError::new_err(format!(
+ "Failed to convert decimal '{}' to DECIMAL({}, {}): {}",
+ decimal_str, precision, scale, e
+ ))
+ })?;
+
+ Ok(fcore::row::Datum::Decimal(decimal))
+}
+
+/// Convert Python datetime.date to Datum::Date.
+fn python_date_to_datum(value: &Bound<PyAny>) ->
PyResult<fcore::row::Datum<'static>> {
+ use pyo3::types::{PyDate, PyDateAccess, PyDateTime};
+
+ // Reject datetime.datetime (subclass of date) - use timestamp columns for
those
+ if value.downcast::<PyDateTime>().is_ok() {
+ return Err(FlussError::new_err(
+ "Expected datetime.date, got datetime.datetime. Use a TIMESTAMP
column for datetime values.",
+ ));
+ }
+
+ let date = value.downcast::<PyDate>().map_err(|_| {
+ FlussError::new_err(format!(
+ "Expected datetime.date, got {}",
+ get_type_name(value)
+ ))
+ })?;
+
+ let year = date.get_year();
+ let month = date.get_month();
+ let day = date.get_day();
+
+ // Calculate days since Unix epoch (1970-01-01)
+ let civil_date = jiff::civil::date(year as i16, month as i8, day as i8);
+ let epoch = jiff::civil::date(1970, 1, 1);
+ let days_since_epoch = (civil_date - epoch).get_days();
+
+ Ok(fcore::row::Datum::Date(fcore::row::Date::new(
+ days_since_epoch,
+ )))
+}
+
+/// Convert Python datetime.time to Datum::Time.
+/// Uses PyO3's native PyTime type for efficient access.
+///
+/// Note: Fluss TIME is always stored as milliseconds since midnight (i32)
regardless
+/// of the schema's precision setting. This matches the Java Fluss wire
protocol.
+/// Sub-millisecond precision (microseconds not divisible by 1000) will raise
an error
+/// to prevent silent data loss and ensure fail-fast behavior.
+fn python_time_to_datum(value: &Bound<PyAny>) ->
PyResult<fcore::row::Datum<'static>> {
+ use pyo3::types::{PyTime, PyTimeAccess};
+
+ let time = value.downcast::<PyTime>().map_err(|_| {
+ FlussError::new_err(format!(
+ "Expected datetime.time, got {}",
+ get_type_name(value)
+ ))
+ })?;
+
+ let hour = time.get_hour() as i32;
+ let minute = time.get_minute() as i32;
+ let second = time.get_second() as i32;
+ let microsecond = time.get_microsecond() as i32;
+
+ // Strict validation: reject sub-millisecond precision
+ if microsecond % MICROS_PER_MILLI as i32 != 0 {
+ return Err(FlussError::new_err(format!(
+ "TIME values with sub-millisecond precision are not supported. \
+ Got time with {} microseconds (not divisible by 1000). \
+ Fluss stores TIME as milliseconds since midnight. \
+ Please round to milliseconds before insertion.",
+ microsecond
+ )));
+ }
+
+ // Convert to milliseconds since midnight
+ let millis = hour * MILLIS_PER_HOUR as i32
+ + minute * MILLIS_PER_MINUTE as i32
+ + second * MILLIS_PER_SECOND as i32
+ + microsecond / MICROS_PER_MILLI as i32;
+
+ Ok(fcore::row::Datum::Time(fcore::row::Time::new(millis)))
+}
+
+/// Convert Python datetime-like object to Datum::TimestampNtz.
+/// Supports: datetime.datetime (naive preferred), pd.Timestamp, np.datetime64
+fn python_datetime_to_timestamp_ntz(value: &Bound<PyAny>) ->
PyResult<fcore::row::Datum<'static>> {
+ let (epoch_millis, nano_of_milli) =
extract_datetime_components_ntz(value)?;
+
+ let ts = fcore::row::TimestampNtz::from_millis_nanos(epoch_millis,
nano_of_milli)
+ .map_err(|e| FlussError::new_err(format!("Failed to create
TimestampNtz: {}", e)))?;
+
+ Ok(fcore::row::Datum::TimestampNtz(ts))
+}
+
+/// Convert Python datetime-like object to Datum::TimestampLtz.
+/// For naive datetimes, assumes UTC. For aware datetimes, converts to UTC.
+/// Supports: datetime.datetime, pd.Timestamp, np.datetime64
+fn python_datetime_to_timestamp_ltz(value: &Bound<PyAny>) ->
PyResult<fcore::row::Datum<'static>> {
+ let (epoch_millis, nano_of_milli) =
extract_datetime_components_ltz(value)?;
+
+ let ts = fcore::row::TimestampLtz::from_millis_nanos(epoch_millis,
nano_of_milli)
+ .map_err(|e| FlussError::new_err(format!("Failed to create
TimestampLtz: {}", e)))?;
+
+ Ok(fcore::row::Datum::TimestampLtz(ts))
+}
+
+/// Extract epoch milliseconds for TimestampNtz (wall-clock time, no timezone
conversion).
+/// Uses integer arithmetic to avoid float precision issues.
+/// For clarity, tz-aware datetimes are rejected - use TimestampLtz for those.
+fn extract_datetime_components_ntz(value: &Bound<PyAny>) -> PyResult<(i64,
i32)> {
+ use pyo3::types::PyDateTime;
+
+ // Try PyDateTime first
+ if let Ok(dt) = value.downcast::<PyDateTime>() {
+ // Reject tz-aware datetime for NTZ - it's ambiguous what the user
wants
+ let tzinfo = dt.getattr("tzinfo")?;
+ if !tzinfo.is_none() {
+ return Err(FlussError::new_err(
+ "TIMESTAMP (without timezone) requires a naive datetime. \
+ Got timezone-aware datetime. Either remove tzinfo or use
TIMESTAMP_LTZ column.",
+ ));
+ }
+ return datetime_to_epoch_millis_as_utc(dt);
+ }
+
+ // Check for pandas Timestamp by verifying module name
+ if is_pandas_timestamp(value) {
+ // For NTZ, reject tz-aware pandas Timestamps for consistency with
datetime behavior
+ if let Ok(tz) = value.getattr("tz") {
+ if !tz.is_none() {
+ return Err(FlussError::new_err(
+ "TIMESTAMP (without timezone) requires a naive
pd.Timestamp. \
+ Got timezone-aware Timestamp. Either use
tz_localize(None) or use TIMESTAMP_LTZ column.",
+ ));
+ }
+ }
+ // Naive pandas Timestamp: .value is nanoseconds since epoch
(wall-clock as UTC)
+ let nanos: i64 = value.getattr("value")?.extract()?;
+ return Ok(nanos_to_millis_and_submillis(nanos));
+ }
+
+ // Try to_pydatetime() for objects that support it
+ if let Ok(py_dt) = value.call_method0("to_pydatetime") {
+ if let Ok(dt) = py_dt.downcast::<PyDateTime>() {
+ let tzinfo = dt.getattr("tzinfo")?;
+ if !tzinfo.is_none() {
+ return Err(FlussError::new_err(
+ "TIMESTAMP (without timezone) requires a naive datetime. \
+ Got timezone-aware value. Use TIMESTAMP_LTZ column
instead.",
+ ));
+ }
+ return datetime_to_epoch_millis_as_utc(dt);
+ }
+ }
+
+ Err(FlussError::new_err(format!(
+ "Expected naive datetime.datetime or pd.Timestamp, got {}",
+ get_type_name(value)
+ )))
+}
+
+/// Extract epoch milliseconds for TimestampLtz (instant in time, UTC-based).
+/// For naive datetimes, assumes UTC. For aware datetimes, converts to UTC.
+fn extract_datetime_components_ltz(value: &Bound<PyAny>) -> PyResult<(i64,
i32)> {
+ use pyo3::types::PyDateTime;
+
+ // Try PyDateTime first
+ if let Ok(dt) = value.downcast::<PyDateTime>() {
+ // Check if timezone-aware
+ let tzinfo = dt.getattr("tzinfo")?;
+ if tzinfo.is_none() {
+ // Naive datetime: assume UTC (treat components as UTC time)
+ return datetime_to_epoch_millis_as_utc(dt);
+ } else {
+ // Aware datetime: use timedelta from epoch to get correct UTC
instant
+ return datetime_to_epoch_millis_utc_aware(dt);
+ }
+ }
+
+ // Check for pandas Timestamp
+ if is_pandas_timestamp(value) {
+ // pandas Timestamp.value is always nanoseconds since UTC epoch
+ let nanos: i64 = value.getattr("value")?.extract()?;
+ return Ok(nanos_to_millis_and_submillis(nanos));
+ }
+
+ // Try to_pydatetime()
+ if let Ok(py_dt) = value.call_method0("to_pydatetime") {
+ if let Ok(dt) = py_dt.downcast::<PyDateTime>() {
+ let tzinfo = dt.getattr("tzinfo")?;
+ if tzinfo.is_none() {
+ return datetime_to_epoch_millis_as_utc(dt);
+ } else {
+ return datetime_to_epoch_millis_utc_aware(dt);
+ }
+ }
+ }
+
+ Err(FlussError::new_err(format!(
+ "Expected datetime.datetime or pd.Timestamp, got {}",
+ get_type_name(value)
+ )))
+}
+
+/// Convert datetime components to epoch milliseconds treating them as UTC
+fn datetime_to_epoch_millis_as_utc(
+ dt: &pyo3::Bound<'_, pyo3::types::PyDateTime>,
+) -> PyResult<(i64, i32)> {
+ use pyo3::types::{PyDateAccess, PyTimeAccess};
+
+ let year = dt.get_year();
+ let month = dt.get_month();
+ let day = dt.get_day();
+ let hour = dt.get_hour();
+ let minute = dt.get_minute();
+ let second = dt.get_second();
+ let microsecond = dt.get_microsecond();
+
+ // Create jiff civil datetime and convert to UTC timestamp
+ // Safe casts: hour (0-23), minute (0-59), second (0-59) all fit in i8
+ let civil_dt = jiff::civil::date(year as i16, month as i8, day as i8).at(
+ hour as i8,
+ minute as i8,
+ second as i8,
+ microsecond as i32 * 1000,
+ );
+
+ let timestamp = jiff::tz::Offset::UTC
+ .to_timestamp(civil_dt)
+ .map_err(|e| FlussError::new_err(format!("Invalid datetime: {}", e)))?;
+
+ let millis = timestamp.as_millisecond();
+ let nano_of_milli = (timestamp.subsec_nanosecond() % NANOS_PER_MILLI as
i32) as i32;
+
+ Ok((millis, nano_of_milli))
+}
+
+/// Convert timezone-aware datetime to epoch milliseconds using Python's
timedelta.
+/// This correctly handles timezone conversions by computing (dt - UTC_EPOCH).
+/// The UTC epoch is cached for performance.
+fn datetime_to_epoch_millis_utc_aware(
+ dt: &pyo3::Bound<'_, pyo3::types::PyDateTime>,
+) -> PyResult<(i64, i32)> {
+ use pyo3::types::{PyDelta, PyDeltaAccess};
+
+ let py = dt.py();
+ let epoch = get_utc_epoch(py)?;
+
+ // Compute delta = dt - epoch (this handles timezone conversion correctly)
+ let delta = dt.call_method1("__sub__", (epoch,))?;
+ let delta = delta.downcast::<PyDelta>()?;
+
+ // Extract components using integer arithmetic
+ let days = delta.get_days() as i64;
+ let seconds = delta.get_seconds() as i64;
+ let microseconds = delta.get_microseconds() as i64;
+
+ // Total milliseconds (note: days can be negative for dates before epoch)
+ let total_micros = days * MICROS_PER_DAY + seconds * MICROS_PER_SECOND +
microseconds;
+ let millis = total_micros / MICROS_PER_MILLI;
+ let nano_of_milli = ((total_micros % MICROS_PER_MILLI) * MICROS_PER_MILLI)
as i32;
+
+ // Handle negative microseconds remainder
+ let (millis, nano_of_milli) = if nano_of_milli < 0 {
+ (millis - 1, nano_of_milli + NANOS_PER_MILLI as i32)
+ } else {
+ (millis, nano_of_milli)
+ };
+
+ Ok((millis, nano_of_milli))
+}
+
+/// Convert nanoseconds to (milliseconds, nano_of_millisecond)
+fn nanos_to_millis_and_submillis(nanos: i64) -> (i64, i32) {
+ let millis = nanos / NANOS_PER_MILLI;
+ let nano_of_milli = (nanos % NANOS_PER_MILLI) as i32;
+
+ // Handle negative nanoseconds correctly (Euclidean remainder)
+ if nano_of_milli < 0 {
+ (millis - 1, nano_of_milli + NANOS_PER_MILLI as i32)
+ } else {
+ (millis, nano_of_milli)
+ }
+}
+
+/// Check if value is a pandas Timestamp by examining its type.
+fn is_pandas_timestamp(value: &Bound<PyAny>) -> bool {
+ // Check module and class name to avoid importing pandas
+ if let Ok(cls) = value.get_type().getattr("__module__") {
+ if let Ok(module) = cls.extract::<&str>() {
+ if module.starts_with("pandas") {
+ if let Ok(name) = value.get_type().getattr("__name__") {
+ if let Ok(name_str) = name.extract::<&str>() {
+ return name_str == "Timestamp";
+ }
+ }
+ }
+ }
+ }
+ false
+}
+
+/// Get type name
+fn get_type_name(value: &Bound<PyAny>) -> String {
+ value
+ .get_type()
+ .name()
+ .map(|s| s.to_string())
+ .unwrap_or_else(|_| "unknown".to_string())
+}
+
/// Scanner for reading log data from a Fluss table
#[pyclass]
pub struct LogScanner {
@@ -621,3 +1027,39 @@ impl LogScanner {
}
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_nanos_to_millis_and_submillis() {
+ // Simple positive case
+ assert_eq!(nanos_to_millis_and_submillis(1_500_000), (1, 500_000));
+
+ // Exact millisecond boundary
+ assert_eq!(nanos_to_millis_and_submillis(2_000_000), (2, 0));
+
+ // Zero
+ assert_eq!(nanos_to_millis_and_submillis(0), (0, 0));
+
+ // Large value
+ assert_eq!(
+ nanos_to_millis_and_submillis(86_400_000_000_000), // 1 day in
nanos
+ (86_400_000, 0)
+ );
+
+ // Negative: -1.5 milliseconds should be (-2 millis, +500_000 nanos)
+ // Because -1_500_000 nanos = -2ms + 500_000ns
+ assert_eq!(nanos_to_millis_and_submillis(-1_500_000), (-2, 500_000));
+
+ // Negative exact boundary
+ assert_eq!(nanos_to_millis_and_submillis(-2_000_000), (-2, 0));
+
+ // Small negative
+ assert_eq!(nanos_to_millis_and_submillis(-1), (-1, 999_999));
+
+ // Negative with sub-millisecond part
+ assert_eq!(nanos_to_millis_and_submillis(-500_000), (-1, 500_000));
+ }
+}
diff --git a/bindings/python/src/utils.rs b/bindings/python/src/utils.rs
index 09e6b5f..ee32c9c 100644
--- a/bindings/python/src/utils.rs
+++ b/bindings/python/src/utils.rs
@@ -59,8 +59,39 @@ impl Utils {
ArrowDataType::Binary | ArrowDataType::LargeBinary =>
DataTypes::bytes(),
ArrowDataType::Date32 => DataTypes::date(),
ArrowDataType::Date64 => DataTypes::date(),
- ArrowDataType::Time32(_) | ArrowDataType::Time64(_) =>
DataTypes::time(),
- ArrowDataType::Timestamp(_, _) => DataTypes::timestamp(),
+ ArrowDataType::Time32(unit) => match unit {
+ arrow_schema::TimeUnit::Second =>
DataTypes::time_with_precision(0),
+ arrow_schema::TimeUnit::Millisecond =>
DataTypes::time_with_precision(3),
+ _ => {
+ return Err(FlussError::new_err(format!(
+ "Unsupported Time32 unit: {unit:?}"
+ )));
+ }
+ },
+ ArrowDataType::Time64(unit) => match unit {
+ arrow_schema::TimeUnit::Microsecond =>
DataTypes::time_with_precision(6),
+ arrow_schema::TimeUnit::Nanosecond =>
DataTypes::time_with_precision(9),
+ _ => {
+ return Err(FlussError::new_err(format!(
+ "Unsupported Time64 unit: {unit:?}"
+ )));
+ }
+ },
+ ArrowDataType::Timestamp(unit, tz) => {
+ let precision = match unit {
+ arrow_schema::TimeUnit::Second => 0,
+ arrow_schema::TimeUnit::Millisecond => 3,
+ arrow_schema::TimeUnit::Microsecond => 6,
+ arrow_schema::TimeUnit::Nanosecond => 9,
+ };
+ // Arrow Timestamp with timezone -> Fluss TimestampLtz
+ // Arrow Timestamp without timezone -> Fluss Timestamp (NTZ)
+ if tz.is_some() {
+ DataTypes::timestamp_ltz_with_precision(precision)
+ } else {
+ DataTypes::timestamp_with_precision(precision)
+ }
+ }
ArrowDataType::Decimal128(precision, scale) => {
DataTypes::decimal(*precision as u32, *scale as u32)
}