fresh-borzoni commented on code in PR #206:
URL: https://github.com/apache/fluss-rust/pull/206#discussion_r2723338784
##########
bindings/python/src/table.rs:
##########
@@ -473,12 +498,381 @@ fn python_value_to_datum(
)))
}
}
+ fcore::metadata::DataType::Decimal(decimal_type) => {
+ python_decimal_to_datum(value, decimal_type.precision(),
decimal_type.scale())
+ }
+ fcore::metadata::DataType::Date(_) => python_date_to_datum(value),
+ fcore::metadata::DataType::Time(_) => python_time_to_datum(value),
+ fcore::metadata::DataType::Timestamp(_) =>
python_datetime_to_timestamp_ntz(value),
+ fcore::metadata::DataType::TimestampLTz(_) =>
python_datetime_to_timestamp_ltz(value),
_ => Err(FlussError::new_err(format!(
"Unsupported data type for row-level operations: {data_type}"
))),
}
}
+/// Cached decimal.Decimal type
+/// Uses PyOnceLock for thread-safety and subinterpreter compatibility.
+static DECIMAL_TYPE: pyo3::sync::PyOnceLock<Py<pyo3::types::PyType>> =
+ pyo3::sync::PyOnceLock::new();
+
+/// Cached UTC epoch type
+static UTC_EPOCH: pyo3::sync::PyOnceLock<Py<PyAny>> =
pyo3::sync::PyOnceLock::new();
+
+/// Get the cached decimal.Decimal type, importing it once per interpreter.
+fn get_decimal_type<'py>(py: Python<'py>) -> PyResult<Bound<'py,
pyo3::types::PyType>> {
+ let ty = DECIMAL_TYPE.get_or_try_init(py, || -> PyResult<_> {
+ let decimal_mod = py.import("decimal")?;
+ let decimal_ty = decimal_mod
+ .getattr("Decimal")?
+ .downcast_into::<pyo3::types::PyType>()?;
+ Ok(decimal_ty.unbind())
+ })?;
+ Ok(ty.bind(py).clone())
+}
+
+/// Get the cached UTC epoch datetime, creating it once per interpreter.
+fn get_utc_epoch<'py>(py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
+ let epoch = UTC_EPOCH.get_or_try_init(py, || -> PyResult<_> {
+ let datetime_mod = py.import("datetime")?;
+ let timezone = datetime_mod.getattr("timezone")?;
+ let utc = timezone.getattr("utc")?;
+ let epoch = datetime_mod
+ .getattr("datetime")?
+ .call1((1970, 1, 1, 0, 0, 0, 0, &utc))?;
+ Ok(epoch.unbind())
+ })?;
+ Ok(epoch.bind(py).clone())
+}
+
+/// Validate that value is a decimal.Decimal instance.
+fn ensure_is_decimal(value: &Bound<PyAny>) -> PyResult<()> {
+ let decimal_ty = get_decimal_type(value.py())?;
+ if !value.is_instance(&decimal_ty.into_any())? {
+ return Err(FlussError::new_err(format!(
+ "Expected decimal.Decimal, got {}",
+ get_type_name(value)
+ )));
+ }
+ Ok(())
+}
+
+/// Convert Python decimal.Decimal to Datum::Decimal.
+/// Only accepts decimal.Decimal
+fn python_decimal_to_datum(
+ value: &Bound<PyAny>,
+ precision: u32,
+ scale: u32,
+) -> PyResult<fcore::row::Datum<'static>> {
+ use std::str::FromStr;
+
+ ensure_is_decimal(value)?;
+
+ let decimal_str: String = value.str()?.extract()?;
+ let bd = bigdecimal::BigDecimal::from_str(&decimal_str).map_err(|e| {
+ FlussError::new_err(format!("Failed to parse decimal '{}': {}",
decimal_str, e))
+ })?;
+
+ let decimal = fcore::row::Decimal::from_big_decimal(bd, precision,
scale).map_err(|e| {
+ FlussError::new_err(format!(
+ "Failed to convert decimal '{}' to DECIMAL({}, {}): {}",
+ decimal_str, precision, scale, e
+ ))
+ })?;
+
+ Ok(fcore::row::Datum::Decimal(decimal))
+}
+
+/// Convert Python datetime.date to Datum::Date.
+fn python_date_to_datum(value: &Bound<PyAny>) ->
PyResult<fcore::row::Datum<'static>> {
+ use pyo3::types::{PyDate, PyDateAccess, PyDateTime};
+
+ // Reject datetime.datetime (subclass of date) - use timestamp columns for
those
+ if value.downcast::<PyDateTime>().is_ok() {
+ return Err(FlussError::new_err(
+ "Expected datetime.date, got datetime.datetime. Use a TIMESTAMP
column for datetime values.",
+ ));
+ }
+
+ let date = value.downcast::<PyDate>().map_err(|_| {
+ FlussError::new_err(format!(
+ "Expected datetime.date, got {}",
+ get_type_name(value)
+ ))
+ })?;
+
+ let year = date.get_year();
+ let month = date.get_month();
+ let day = date.get_day();
+
+ // Calculate days since Unix epoch (1970-01-01)
+ let civil_date = jiff::civil::date(year as i16, month as i8, day as i8);
+ let epoch = jiff::civil::date(1970, 1, 1);
+ let days_since_epoch = (civil_date - epoch).get_days();
+
+ Ok(fcore::row::Datum::Date(fcore::row::Date::new(
+ days_since_epoch,
+ )))
+}
+
+/// Convert Python datetime.time to Datum::Time.
+/// Uses PyO3's native PyTime type for efficient access.
+///
+/// Note: Fluss TIME is always stored as milliseconds since midnight (i32)
regardless
+/// of the schema's precision setting. This matches the Java Fluss wire
protocol.
+/// Sub-millisecond precision from Python's microseconds is truncated (not
rounded).
Review Comment:
It matches Java's client behaviour, but I think we are fine to be strict here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]