fresh-borzoni commented on code in PR #206:
URL: https://github.com/apache/fluss-rust/pull/206#discussion_r2723349673
##########
bindings/python/src/table.rs:
##########
@@ -473,12 +498,381 @@ fn python_value_to_datum(
)))
}
}
+ fcore::metadata::DataType::Decimal(decimal_type) => {
+ python_decimal_to_datum(value, decimal_type.precision(),
decimal_type.scale())
+ }
+ fcore::metadata::DataType::Date(_) => python_date_to_datum(value),
+ fcore::metadata::DataType::Time(_) => python_time_to_datum(value),
+ fcore::metadata::DataType::Timestamp(_) =>
python_datetime_to_timestamp_ntz(value),
+ fcore::metadata::DataType::TimestampLTz(_) =>
python_datetime_to_timestamp_ltz(value),
_ => Err(FlussError::new_err(format!(
"Unsupported data type for row-level operations: {data_type}"
))),
}
}
+/// Cached decimal.Decimal type
+/// Uses PyOnceLock for thread-safety and subinterpreter compatibility.
+static DECIMAL_TYPE: pyo3::sync::PyOnceLock<Py<pyo3::types::PyType>> =
+ pyo3::sync::PyOnceLock::new();
+
+/// Cached UTC epoch type
+static UTC_EPOCH: pyo3::sync::PyOnceLock<Py<PyAny>> =
pyo3::sync::PyOnceLock::new();
+
+/// Get the cached decimal.Decimal type, importing it once per interpreter.
+fn get_decimal_type<'py>(py: Python<'py>) -> PyResult<Bound<'py,
pyo3::types::PyType>> {
+ let ty = DECIMAL_TYPE.get_or_try_init(py, || -> PyResult<_> {
+ let decimal_mod = py.import("decimal")?;
+ let decimal_ty = decimal_mod
+ .getattr("Decimal")?
+ .downcast_into::<pyo3::types::PyType>()?;
+ Ok(decimal_ty.unbind())
+ })?;
+ Ok(ty.bind(py).clone())
+}
+
+/// Get the cached UTC epoch datetime, creating it once per interpreter.
+fn get_utc_epoch<'py>(py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
+ let epoch = UTC_EPOCH.get_or_try_init(py, || -> PyResult<_> {
+ let datetime_mod = py.import("datetime")?;
+ let timezone = datetime_mod.getattr("timezone")?;
+ let utc = timezone.getattr("utc")?;
+ let epoch = datetime_mod
+ .getattr("datetime")?
+ .call1((1970, 1, 1, 0, 0, 0, 0, &utc))?;
+ Ok(epoch.unbind())
+ })?;
+ Ok(epoch.bind(py).clone())
+}
+
+/// Validate that value is a decimal.Decimal instance.
+fn ensure_is_decimal(value: &Bound<PyAny>) -> PyResult<()> {
+ let decimal_ty = get_decimal_type(value.py())?;
+ if !value.is_instance(&decimal_ty.into_any())? {
+ return Err(FlussError::new_err(format!(
+ "Expected decimal.Decimal, got {}",
+ get_type_name(value)
+ )));
+ }
+ Ok(())
+}
+
+/// Convert Python decimal.Decimal to Datum::Decimal.
+/// Only accepts decimal.Decimal
+fn python_decimal_to_datum(
+ value: &Bound<PyAny>,
+ precision: u32,
+ scale: u32,
+) -> PyResult<fcore::row::Datum<'static>> {
+ use std::str::FromStr;
+
+ ensure_is_decimal(value)?;
+
+ let decimal_str: String = value.str()?.extract()?;
+ let bd = bigdecimal::BigDecimal::from_str(&decimal_str).map_err(|e| {
+ FlussError::new_err(format!("Failed to parse decimal '{}': {}",
decimal_str, e))
+ })?;
+
+ let decimal = fcore::row::Decimal::from_big_decimal(bd, precision,
scale).map_err(|e| {
+ FlussError::new_err(format!(
+ "Failed to convert decimal '{}' to DECIMAL({}, {}): {}",
+ decimal_str, precision, scale, e
+ ))
+ })?;
+
+ Ok(fcore::row::Datum::Decimal(decimal))
+}
+
+/// Convert Python datetime.date to Datum::Date.
+fn python_date_to_datum(value: &Bound<PyAny>) ->
PyResult<fcore::row::Datum<'static>> {
+ use pyo3::types::{PyDate, PyDateAccess, PyDateTime};
+
+ // Reject datetime.datetime (subclass of date) - use timestamp columns for
those
+ if value.downcast::<PyDateTime>().is_ok() {
+ return Err(FlussError::new_err(
+ "Expected datetime.date, got datetime.datetime. Use a TIMESTAMP
column for datetime values.",
+ ));
+ }
+
+ let date = value.downcast::<PyDate>().map_err(|_| {
+ FlussError::new_err(format!(
+ "Expected datetime.date, got {}",
+ get_type_name(value)
+ ))
+ })?;
+
+ let year = date.get_year();
+ let month = date.get_month();
+ let day = date.get_day();
+
+ // Calculate days since Unix epoch (1970-01-01)
+ let civil_date = jiff::civil::date(year as i16, month as i8, day as i8);
+ let epoch = jiff::civil::date(1970, 1, 1);
+ let days_since_epoch = (civil_date - epoch).get_days();
+
+ Ok(fcore::row::Datum::Date(fcore::row::Date::new(
+ days_since_epoch,
+ )))
+}
+
+/// Convert Python datetime.time to Datum::Time.
+/// Uses PyO3's native PyTime type for efficient access.
+///
+/// Note: Fluss TIME is always stored as milliseconds since midnight (i32)
regardless
+/// of the schema's precision setting. This matches the Java Fluss wire
protocol.
+/// Sub-millisecond precision from Python's microseconds is truncated (not
rounded).
+fn python_time_to_datum(value: &Bound<PyAny>) ->
PyResult<fcore::row::Datum<'static>> {
+ use pyo3::types::{PyTime, PyTimeAccess};
+
+ let time = value.downcast::<PyTime>().map_err(|_| {
+ FlussError::new_err(format!(
+ "Expected datetime.time, got {}",
+ get_type_name(value)
+ ))
+ })?;
+
+ let hour = time.get_hour() as i32;
+ let minute = time.get_minute() as i32;
+ let second = time.get_second() as i32;
+ let microsecond = time.get_microsecond() as i32;
+
+ // Convert to milliseconds since midnight (truncates sub-millisecond
precision)
+ let millis = hour * MILLIS_PER_HOUR as i32
+ + minute * MILLIS_PER_MINUTE as i32
+ + second * MILLIS_PER_SECOND as i32
+ + microsecond / MICROS_PER_MILLI as i32;
+
+ Ok(fcore::row::Datum::Time(fcore::row::Time::new(millis)))
+}
+
+/// Convert Python datetime-like object to Datum::TimestampNtz.
+/// Supports: datetime.datetime (naive preferred), pd.Timestamp, np.datetime64
+fn python_datetime_to_timestamp_ntz(value: &Bound<PyAny>) ->
PyResult<fcore::row::Datum<'static>> {
+ let (epoch_millis, nano_of_milli) =
extract_datetime_components_ntz(value)?;
+
+ let ts = fcore::row::TimestampNtz::from_millis_nanos(epoch_millis,
nano_of_milli)
+ .map_err(|e| FlussError::new_err(format!("Failed to create
TimestampNtz: {}", e)))?;
+
+ Ok(fcore::row::Datum::TimestampNtz(ts))
+}
+
+/// Convert Python datetime-like object to Datum::TimestampLtz.
+/// For naive datetimes, assumes UTC. For aware datetimes, converts to UTC.
+/// Supports: datetime.datetime, pd.Timestamp, np.datetime64
+fn python_datetime_to_timestamp_ltz(value: &Bound<PyAny>) ->
PyResult<fcore::row::Datum<'static>> {
+ let (epoch_millis, nano_of_milli) =
extract_datetime_components_ltz(value)?;
+
+ let ts = fcore::row::TimestampLtz::from_millis_nanos(epoch_millis,
nano_of_milli)
+ .map_err(|e| FlussError::new_err(format!("Failed to create
TimestampLtz: {}", e)))?;
+
+ Ok(fcore::row::Datum::TimestampLtz(ts))
+}
+
+/// Extract epoch milliseconds for TimestampNtz (wall-clock time, no timezone
conversion).
+/// Uses integer arithmetic to avoid float precision issues.
+/// For clarity, tz-aware datetimes are rejected - use TimestampLtz for those.
+fn extract_datetime_components_ntz(value: &Bound<PyAny>) -> PyResult<(i64,
i32)> {
+ use pyo3::types::PyDateTime;
+
+ // Try PyDateTime first
+ if let Ok(dt) = value.downcast::<PyDateTime>() {
+ // Reject tz-aware datetime for NTZ - it's ambiguous what the user
wants
+ let tzinfo = dt.getattr("tzinfo")?;
+ if !tzinfo.is_none() {
+ return Err(FlussError::new_err(
+ "TIMESTAMP (without timezone) requires a naive datetime. \
+ Got timezone-aware datetime. Either remove tzinfo or use
TIMESTAMP_LTZ column.",
+ ));
+ }
+ return datetime_to_epoch_millis_as_utc(dt);
+ }
+
+ // Check for pandas Timestamp by verifying module name
+ if is_pandas_timestamp(value) {
+ // For NTZ, reject tz-aware pandas Timestamps for consistency with
datetime behavior
+ if let Ok(tz) = value.getattr("tz") {
+ if !tz.is_none() {
+ return Err(FlussError::new_err(
+ "TIMESTAMP (without timezone) requires a naive
pd.Timestamp. \
+ Got timezone-aware Timestamp. Either use
tz_localize(None) or use TIMESTAMP_LTZ column.",
+ ));
+ }
+ }
+ // Naive pandas Timestamp: .value is nanoseconds since epoch
(wall-clock as UTC)
+ let nanos: i64 = value.getattr("value")?.extract()?;
+ return Ok(nanos_to_millis_and_submillis(nanos));
+ }
+
+ // Try to_pydatetime() for objects that support it
+ if let Ok(py_dt) = value.call_method0("to_pydatetime") {
+ if let Ok(dt) = py_dt.downcast::<PyDateTime>() {
+ let tzinfo = dt.getattr("tzinfo")?;
+ if !tzinfo.is_none() {
+ return Err(FlussError::new_err(
+ "TIMESTAMP (without timezone) requires a naive datetime. \
+ Got timezone-aware value. Use TIMESTAMP_LTZ column
instead.",
+ ));
+ }
+ return datetime_to_epoch_millis_as_utc(dt);
+ }
+ }
+
+ Err(FlussError::new_err(format!(
+ "Expected naive datetime.datetime or pd.Timestamp, got {}",
+ get_type_name(value)
+ )))
+}
+
+/// Extract epoch milliseconds for TimestampLtz (instant in time, UTC-based).
+/// For naive datetimes, assumes UTC. For aware datetimes, converts to UTC.
+fn extract_datetime_components_ltz(value: &Bound<PyAny>) -> PyResult<(i64,
i32)> {
+ use pyo3::types::PyDateTime;
+
+ // Try PyDateTime first
+ if let Ok(dt) = value.downcast::<PyDateTime>() {
+ // Check if timezone-aware
+ let tzinfo = dt.getattr("tzinfo")?;
+ if tzinfo.is_none() {
Review Comment:
we don't have negation here
I used `timedelta` from epoch to get correct UTC instant, so it handles all
timezones including explicit UTC/GMT)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]