Copilot commented on code in PR #206:
URL: https://github.com/apache/fluss-rust/pull/206#discussion_r2723457241


##########
bindings/python/src/table.rs:
##########
@@ -473,12 +498,393 @@ fn python_value_to_datum(
                 )))
             }
         }
+        fcore::metadata::DataType::Decimal(decimal_type) => {
+            python_decimal_to_datum(value, decimal_type.precision(), 
decimal_type.scale())
+        }
+        fcore::metadata::DataType::Date(_) => python_date_to_datum(value),
+        fcore::metadata::DataType::Time(_) => python_time_to_datum(value),
+        fcore::metadata::DataType::Timestamp(_) => 
python_datetime_to_timestamp_ntz(value),
+        fcore::metadata::DataType::TimestampLTz(_) => 
python_datetime_to_timestamp_ltz(value),
         _ => Err(FlussError::new_err(format!(
             "Unsupported data type for row-level operations: {data_type}"
         ))),
     }
 }
 
+/// Cached decimal.Decimal type
+/// Uses PyOnceLock for thread-safety and subinterpreter compatibility.
+static DECIMAL_TYPE: pyo3::sync::PyOnceLock<Py<pyo3::types::PyType>> =
+    pyo3::sync::PyOnceLock::new();
+
+/// Cached UTC epoch type
+static UTC_EPOCH: pyo3::sync::PyOnceLock<Py<PyAny>> = 
pyo3::sync::PyOnceLock::new();
+
+/// Get the cached decimal.Decimal type, importing it once per interpreter.
+fn get_decimal_type<'py>(py: Python<'py>) -> PyResult<Bound<'py, 
pyo3::types::PyType>> {
+    let ty = DECIMAL_TYPE.get_or_try_init(py, || -> PyResult<_> {
+        let decimal_mod = py.import("decimal")?;
+        let decimal_ty = decimal_mod
+            .getattr("Decimal")?
+            .downcast_into::<pyo3::types::PyType>()?;
+        Ok(decimal_ty.unbind())
+    })?;
+    Ok(ty.bind(py).clone())
+}
+
+/// Get the cached UTC epoch datetime, creating it once per interpreter.
+fn get_utc_epoch<'py>(py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
+    let epoch = UTC_EPOCH.get_or_try_init(py, || -> PyResult<_> {
+        let datetime_mod = py.import("datetime")?;
+        let timezone = datetime_mod.getattr("timezone")?;
+        let utc = timezone.getattr("utc")?;
+        let epoch = datetime_mod
+            .getattr("datetime")?
+            .call1((1970, 1, 1, 0, 0, 0, 0, &utc))?;
+        Ok(epoch.unbind())
+    })?;
+    Ok(epoch.bind(py).clone())
+}
+
+/// Validate that value is a decimal.Decimal instance.
+fn ensure_is_decimal(value: &Bound<PyAny>) -> PyResult<()> {
+    let decimal_ty = get_decimal_type(value.py())?;
+    if !value.is_instance(&decimal_ty.into_any())? {
+        return Err(FlussError::new_err(format!(
+            "Expected decimal.Decimal, got {}",
+            get_type_name(value)
+        )));
+    }
+    Ok(())
+}
+
+/// Convert Python decimal.Decimal to Datum::Decimal.
+/// Only accepts decimal.Decimal
+fn python_decimal_to_datum(
+    value: &Bound<PyAny>,
+    precision: u32,
+    scale: u32,
+) -> PyResult<fcore::row::Datum<'static>> {
+    use std::str::FromStr;
+
+    ensure_is_decimal(value)?;
+
+    let decimal_str: String = value.str()?.extract()?;
+    let bd = bigdecimal::BigDecimal::from_str(&decimal_str).map_err(|e| {
+        FlussError::new_err(format!("Failed to parse decimal '{}': {}", 
decimal_str, e))
+    })?;
+
+    let decimal = fcore::row::Decimal::from_big_decimal(bd, precision, 
scale).map_err(|e| {
+        FlussError::new_err(format!(
+            "Failed to convert decimal '{}' to DECIMAL({}, {}): {}",
+            decimal_str, precision, scale, e
+        ))
+    })?;
+
+    Ok(fcore::row::Datum::Decimal(decimal))
+}
+
+/// Convert Python datetime.date to Datum::Date.
+fn python_date_to_datum(value: &Bound<PyAny>) -> 
PyResult<fcore::row::Datum<'static>> {
+    use pyo3::types::{PyDate, PyDateAccess, PyDateTime};
+
+    // Reject datetime.datetime (subclass of date) - use timestamp columns for 
those
+    if value.downcast::<PyDateTime>().is_ok() {
+        return Err(FlussError::new_err(
+            "Expected datetime.date, got datetime.datetime. Use a TIMESTAMP 
column for datetime values.",
+        ));
+    }
+
+    let date = value.downcast::<PyDate>().map_err(|_| {
+        FlussError::new_err(format!(
+            "Expected datetime.date, got {}",
+            get_type_name(value)
+        ))
+    })?;
+
+    let year = date.get_year();
+    let month = date.get_month();
+    let day = date.get_day();
+
+    // Calculate days since Unix epoch (1970-01-01)
+    let civil_date = jiff::civil::date(year as i16, month as i8, day as i8);
+    let epoch = jiff::civil::date(1970, 1, 1);
+    let days_since_epoch = (civil_date - epoch).get_days();
+
+    Ok(fcore::row::Datum::Date(fcore::row::Date::new(
+        days_since_epoch,
+    )))
+}
+
+/// Convert Python datetime.time to Datum::Time.
+/// Uses PyO3's native PyTime type for efficient access.
+///
+/// Note: Fluss TIME is always stored as milliseconds since midnight (i32) 
regardless
+/// of the schema's precision setting. This matches the Java Fluss wire 
protocol.
+/// Sub-millisecond precision (microseconds not divisible by 1000) will raise 
an error
+/// to prevent silent data loss and ensure fail-fast behavior.
+fn python_time_to_datum(value: &Bound<PyAny>) -> 
PyResult<fcore::row::Datum<'static>> {
+    use pyo3::types::{PyTime, PyTimeAccess};
+
+    let time = value.downcast::<PyTime>().map_err(|_| {
+        FlussError::new_err(format!(
+            "Expected datetime.time, got {}",
+            get_type_name(value)
+        ))
+    })?;
+
+    let hour = time.get_hour() as i32;
+    let minute = time.get_minute() as i32;
+    let second = time.get_second() as i32;
+    let microsecond = time.get_microsecond() as i32;
+
+    // Strict validation: reject sub-millisecond precision
+    if microsecond % MICROS_PER_MILLI as i32 != 0 {
+        return Err(FlussError::new_err(format!(
+            "TIME values with sub-millisecond precision are not supported. \
+             Got time with {} microseconds (not divisible by 1000). \
+             Fluss stores TIME as milliseconds since midnight. \
+             Please round to milliseconds before insertion.",
+            microsecond
+        )));
+    }
+
+    // Convert to milliseconds since midnight
+    let millis = hour * MILLIS_PER_HOUR as i32
+        + minute * MILLIS_PER_MINUTE as i32
+        + second * MILLIS_PER_SECOND as i32
+        + microsecond / MICROS_PER_MILLI as i32;
+
+    Ok(fcore::row::Datum::Time(fcore::row::Time::new(millis)))
+}
+
+/// Convert Python datetime-like object to Datum::TimestampNtz.
+/// Supports: datetime.datetime (naive preferred), pd.Timestamp, np.datetime64
+fn python_datetime_to_timestamp_ntz(value: &Bound<PyAny>) -> 
PyResult<fcore::row::Datum<'static>> {
+    let (epoch_millis, nano_of_milli) = 
extract_datetime_components_ntz(value)?;
+
+    let ts = fcore::row::TimestampNtz::from_millis_nanos(epoch_millis, 
nano_of_milli)
+        .map_err(|e| FlussError::new_err(format!("Failed to create 
TimestampNtz: {}", e)))?;
+
+    Ok(fcore::row::Datum::TimestampNtz(ts))
+}
+
+/// Convert Python datetime-like object to Datum::TimestampLtz.
+/// For naive datetimes, assumes UTC. For aware datetimes, converts to UTC.
+/// Supports: datetime.datetime, pd.Timestamp, np.datetime64
+fn python_datetime_to_timestamp_ltz(value: &Bound<PyAny>) -> 
PyResult<fcore::row::Datum<'static>> {
+    let (epoch_millis, nano_of_milli) = 
extract_datetime_components_ltz(value)?;
+
+    let ts = fcore::row::TimestampLtz::from_millis_nanos(epoch_millis, 
nano_of_milli)
+        .map_err(|e| FlussError::new_err(format!("Failed to create 
TimestampLtz: {}", e)))?;
+
+    Ok(fcore::row::Datum::TimestampLtz(ts))
+}
+
+/// Extract epoch milliseconds for TimestampNtz (wall-clock time, no timezone 
conversion).
+/// Uses integer arithmetic to avoid float precision issues.
+/// For clarity, tz-aware datetimes are rejected - use TimestampLtz for those.
+fn extract_datetime_components_ntz(value: &Bound<PyAny>) -> PyResult<(i64, 
i32)> {
+    use pyo3::types::PyDateTime;
+
+    // Try PyDateTime first
+    if let Ok(dt) = value.downcast::<PyDateTime>() {
+        // Reject tz-aware datetime for NTZ - it's ambiguous what the user 
wants
+        let tzinfo = dt.getattr("tzinfo")?;
+        if !tzinfo.is_none() {
+            return Err(FlussError::new_err(
+                "TIMESTAMP (without timezone) requires a naive datetime. \
+                 Got timezone-aware datetime. Either remove tzinfo or use 
TIMESTAMP_LTZ column.",
+            ));
+        }
+        return datetime_to_epoch_millis_as_utc(dt);
+    }
+
+    // Check for pandas Timestamp by verifying module name
+    if is_pandas_timestamp(value) {
+        // For NTZ, reject tz-aware pandas Timestamps for consistency with 
datetime behavior
+        if let Ok(tz) = value.getattr("tz") {
+            if !tz.is_none() {
+                return Err(FlussError::new_err(
+                    "TIMESTAMP (without timezone) requires a naive 
pd.Timestamp. \
+                     Got timezone-aware Timestamp. Either use 
tz_localize(None) or use TIMESTAMP_LTZ column.",
+                ));
+            }
+        }
+        // Naive pandas Timestamp: .value is nanoseconds since epoch 
(wall-clock as UTC)
+        let nanos: i64 = value.getattr("value")?.extract()?;
+        return Ok(nanos_to_millis_and_submillis(nanos));
+    }
+
+    // Try to_pydatetime() for objects that support it
+    if let Ok(py_dt) = value.call_method0("to_pydatetime") {
+        if let Ok(dt) = py_dt.downcast::<PyDateTime>() {
+            let tzinfo = dt.getattr("tzinfo")?;
+            if !tzinfo.is_none() {
+                return Err(FlussError::new_err(
+                    "TIMESTAMP (without timezone) requires a naive datetime. \
+                     Got timezone-aware value. Use TIMESTAMP_LTZ column 
instead.",
+                ));
+            }
+            return datetime_to_epoch_millis_as_utc(dt);
+        }
+    }
+
+    Err(FlussError::new_err(format!(
+        "Expected naive datetime.datetime or pd.Timestamp, got {}",
+        get_type_name(value)
+    )))
+}
+
+/// Extract epoch milliseconds for TimestampLtz (instant in time, UTC-based).
+/// For naive datetimes, assumes UTC. For aware datetimes, converts to UTC.
+fn extract_datetime_components_ltz(value: &Bound<PyAny>) -> PyResult<(i64, 
i32)> {
+    use pyo3::types::PyDateTime;
+
+    // Try PyDateTime first
+    if let Ok(dt) = value.downcast::<PyDateTime>() {
+        // Check if timezone-aware
+        let tzinfo = dt.getattr("tzinfo")?;
+        if tzinfo.is_none() {
+            // Naive datetime: assume UTC (treat components as UTC time)
+            return datetime_to_epoch_millis_as_utc(dt);
+        } else {
+            // Aware datetime: use timedelta from epoch to get correct UTC 
instant
+            return datetime_to_epoch_millis_utc_aware(dt);
+        }
+    }
+
+    // Check for pandas Timestamp
+    if is_pandas_timestamp(value) {
+        // pandas Timestamp.value is always nanoseconds since UTC epoch
+        let nanos: i64 = value.getattr("value")?.extract()?;
+        return Ok(nanos_to_millis_and_submillis(nanos));
+    }
+
+    // Try to_pydatetime()
+    if let Ok(py_dt) = value.call_method0("to_pydatetime") {
+        if let Ok(dt) = py_dt.downcast::<PyDateTime>() {
+            let tzinfo = dt.getattr("tzinfo")?;
+            if tzinfo.is_none() {
+                return datetime_to_epoch_millis_as_utc(dt);
+            } else {
+                return datetime_to_epoch_millis_utc_aware(dt);
+            }
+        }
+    }
+
+    Err(FlussError::new_err(format!(
+        "Expected datetime.datetime or pd.Timestamp, got {}",
+        get_type_name(value)
+    )))
+}
+
+/// Convert datetime components to epoch milliseconds treating them as UTC
+fn datetime_to_epoch_millis_as_utc(
+    dt: &pyo3::Bound<'_, pyo3::types::PyDateTime>,
+) -> PyResult<(i64, i32)> {
+    use pyo3::types::{PyDateAccess, PyTimeAccess};
+
+    let year = dt.get_year();
+    let month = dt.get_month();
+    let day = dt.get_day();
+    let hour = dt.get_hour();
+    let minute = dt.get_minute();
+    let second = dt.get_second();
+    let microsecond = dt.get_microsecond();
+
+    // Create jiff civil datetime and convert to UTC timestamp
+    // Safe casts: hour (0-23), minute (0-59), second (0-59) all fit in i8
+    let civil_dt = jiff::civil::date(year as i16, month as i8, day as i8).at(
+        hour as i8,
+        minute as i8,
+        second as i8,
+        microsecond as i32 * 1000,
+    );
+
+    let timestamp = jiff::tz::Offset::UTC
+        .to_timestamp(civil_dt)
+        .map_err(|e| FlussError::new_err(format!("Invalid datetime: {}", e)))?;
+
+    let millis = timestamp.as_millisecond();
+    let nano_of_milli = (timestamp.subsec_nanosecond() % NANOS_PER_MILLI as 
i32) as i32;
+
+    Ok((millis, nano_of_milli))
+}
+
+/// Convert timezone-aware datetime to epoch milliseconds using Python's 
timedelta.
+/// This correctly handles timezone conversions by computing (dt - UTC_EPOCH).
+/// The UTC epoch is cached for performance.
+fn datetime_to_epoch_millis_utc_aware(
+    dt: &pyo3::Bound<'_, pyo3::types::PyDateTime>,
+) -> PyResult<(i64, i32)> {
+    use pyo3::types::{PyDelta, PyDeltaAccess};
+
+    let py = dt.py();
+    let epoch = get_utc_epoch(py)?;
+
+    // Compute delta = dt - epoch (this handles timezone conversion correctly)
+    let delta = dt.call_method1("__sub__", (epoch,))?;
+    let delta = delta.downcast::<PyDelta>()?;
+
+    // Extract components using integer arithmetic
+    let days = delta.get_days() as i64;
+    let seconds = delta.get_seconds() as i64;
+    let microseconds = delta.get_microseconds() as i64;
+
+    // Total milliseconds (note: days can be negative for dates before epoch)
+    let total_micros = days * MICROS_PER_DAY + seconds * MICROS_PER_SECOND + 
microseconds;
+    let millis = total_micros / MICROS_PER_MILLI;
+    let nano_of_milli = ((total_micros % MICROS_PER_MILLI) * MICROS_PER_MILLI) 
as i32;

Review Comment:
   The constant MICROS_PER_MILLI is being used as both the modulo divisor and 
as a conversion factor from microseconds to nanoseconds. While numerically 
correct (both are 1000), this is semantically confusing. Consider using a more 
appropriately named constant for the conversion factor, such as 
NANOS_PER_MICRO, or use a literal 1000 for clarity. This would improve code 
readability and make the intent clearer.



##########
bindings/python/src/table.rs:
##########
@@ -473,12 +498,393 @@ fn python_value_to_datum(
                 )))
             }
         }
+        fcore::metadata::DataType::Decimal(decimal_type) => {
+            python_decimal_to_datum(value, decimal_type.precision(), 
decimal_type.scale())
+        }
+        fcore::metadata::DataType::Date(_) => python_date_to_datum(value),
+        fcore::metadata::DataType::Time(_) => python_time_to_datum(value),
+        fcore::metadata::DataType::Timestamp(_) => 
python_datetime_to_timestamp_ntz(value),
+        fcore::metadata::DataType::TimestampLTz(_) => 
python_datetime_to_timestamp_ltz(value),
         _ => Err(FlussError::new_err(format!(
             "Unsupported data type for row-level operations: {data_type}"
         ))),
     }
 }
 
+/// Cached decimal.Decimal type
+/// Uses PyOnceLock for thread-safety and subinterpreter compatibility.
+static DECIMAL_TYPE: pyo3::sync::PyOnceLock<Py<pyo3::types::PyType>> =
+    pyo3::sync::PyOnceLock::new();
+
+/// Cached UTC epoch type
+static UTC_EPOCH: pyo3::sync::PyOnceLock<Py<PyAny>> = 
pyo3::sync::PyOnceLock::new();
+
+/// Get the cached decimal.Decimal type, importing it once per interpreter.
+fn get_decimal_type<'py>(py: Python<'py>) -> PyResult<Bound<'py, 
pyo3::types::PyType>> {
+    let ty = DECIMAL_TYPE.get_or_try_init(py, || -> PyResult<_> {
+        let decimal_mod = py.import("decimal")?;
+        let decimal_ty = decimal_mod
+            .getattr("Decimal")?
+            .downcast_into::<pyo3::types::PyType>()?;
+        Ok(decimal_ty.unbind())
+    })?;
+    Ok(ty.bind(py).clone())
+}
+
+/// Get the cached UTC epoch datetime, creating it once per interpreter.
+fn get_utc_epoch<'py>(py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
+    let epoch = UTC_EPOCH.get_or_try_init(py, || -> PyResult<_> {
+        let datetime_mod = py.import("datetime")?;
+        let timezone = datetime_mod.getattr("timezone")?;
+        let utc = timezone.getattr("utc")?;
+        let epoch = datetime_mod
+            .getattr("datetime")?
+            .call1((1970, 1, 1, 0, 0, 0, 0, &utc))?;
+        Ok(epoch.unbind())
+    })?;
+    Ok(epoch.bind(py).clone())
+}
+
+/// Validate that value is a decimal.Decimal instance.
+fn ensure_is_decimal(value: &Bound<PyAny>) -> PyResult<()> {
+    let decimal_ty = get_decimal_type(value.py())?;
+    if !value.is_instance(&decimal_ty.into_any())? {
+        return Err(FlussError::new_err(format!(
+            "Expected decimal.Decimal, got {}",
+            get_type_name(value)
+        )));
+    }
+    Ok(())
+}
+
+/// Convert Python decimal.Decimal to Datum::Decimal.
+/// Only accepts decimal.Decimal
+fn python_decimal_to_datum(
+    value: &Bound<PyAny>,
+    precision: u32,
+    scale: u32,
+) -> PyResult<fcore::row::Datum<'static>> {
+    use std::str::FromStr;
+
+    ensure_is_decimal(value)?;
+
+    let decimal_str: String = value.str()?.extract()?;
+    let bd = bigdecimal::BigDecimal::from_str(&decimal_str).map_err(|e| {
+        FlussError::new_err(format!("Failed to parse decimal '{}': {}", 
decimal_str, e))
+    })?;
+
+    let decimal = fcore::row::Decimal::from_big_decimal(bd, precision, 
scale).map_err(|e| {
+        FlussError::new_err(format!(
+            "Failed to convert decimal '{}' to DECIMAL({}, {}): {}",
+            decimal_str, precision, scale, e
+        ))
+    })?;
+
+    Ok(fcore::row::Datum::Decimal(decimal))
+}
+
+/// Convert Python datetime.date to Datum::Date.
+fn python_date_to_datum(value: &Bound<PyAny>) -> 
PyResult<fcore::row::Datum<'static>> {
+    use pyo3::types::{PyDate, PyDateAccess, PyDateTime};
+
+    // Reject datetime.datetime (subclass of date) - use timestamp columns for 
those
+    if value.downcast::<PyDateTime>().is_ok() {
+        return Err(FlussError::new_err(
+            "Expected datetime.date, got datetime.datetime. Use a TIMESTAMP 
column for datetime values.",
+        ));
+    }
+
+    let date = value.downcast::<PyDate>().map_err(|_| {
+        FlussError::new_err(format!(
+            "Expected datetime.date, got {}",
+            get_type_name(value)
+        ))
+    })?;
+
+    let year = date.get_year();
+    let month = date.get_month();
+    let day = date.get_day();
+
+    // Calculate days since Unix epoch (1970-01-01)
+    let civil_date = jiff::civil::date(year as i16, month as i8, day as i8);
+    let epoch = jiff::civil::date(1970, 1, 1);
+    let days_since_epoch = (civil_date - epoch).get_days();
+
+    Ok(fcore::row::Datum::Date(fcore::row::Date::new(
+        days_since_epoch,
+    )))
+}
+
+/// Convert Python datetime.time to Datum::Time.
+/// Uses PyO3's native PyTime type for efficient access.
+///
+/// Note: Fluss TIME is always stored as milliseconds since midnight (i32) 
regardless
+/// of the schema's precision setting. This matches the Java Fluss wire 
protocol.
+/// Sub-millisecond precision (microseconds not divisible by 1000) will raise 
an error
+/// to prevent silent data loss and ensure fail-fast behavior.
+fn python_time_to_datum(value: &Bound<PyAny>) -> 
PyResult<fcore::row::Datum<'static>> {
+    use pyo3::types::{PyTime, PyTimeAccess};
+
+    let time = value.downcast::<PyTime>().map_err(|_| {
+        FlussError::new_err(format!(
+            "Expected datetime.time, got {}",
+            get_type_name(value)
+        ))
+    })?;
+
+    let hour = time.get_hour() as i32;
+    let minute = time.get_minute() as i32;
+    let second = time.get_second() as i32;
+    let microsecond = time.get_microsecond() as i32;
+
+    // Strict validation: reject sub-millisecond precision
+    if microsecond % MICROS_PER_MILLI as i32 != 0 {
+        return Err(FlussError::new_err(format!(
+            "TIME values with sub-millisecond precision are not supported. \
+             Got time with {} microseconds (not divisible by 1000). \
+             Fluss stores TIME as milliseconds since midnight. \
+             Please round to milliseconds before insertion.",
+            microsecond
+        )));
+    }
+
+    // Convert to milliseconds since midnight
+    let millis = hour * MILLIS_PER_HOUR as i32
+        + minute * MILLIS_PER_MINUTE as i32
+        + second * MILLIS_PER_SECOND as i32
+        + microsecond / MICROS_PER_MILLI as i32;
+
+    Ok(fcore::row::Datum::Time(fcore::row::Time::new(millis)))
+}
+
+/// Convert Python datetime-like object to Datum::TimestampNtz.
+/// Supports: datetime.datetime (naive preferred), pd.Timestamp, np.datetime64
+fn python_datetime_to_timestamp_ntz(value: &Bound<PyAny>) -> 
PyResult<fcore::row::Datum<'static>> {
+    let (epoch_millis, nano_of_milli) = 
extract_datetime_components_ntz(value)?;
+
+    let ts = fcore::row::TimestampNtz::from_millis_nanos(epoch_millis, 
nano_of_milli)
+        .map_err(|e| FlussError::new_err(format!("Failed to create 
TimestampNtz: {}", e)))?;
+
+    Ok(fcore::row::Datum::TimestampNtz(ts))
+}
+
+/// Convert Python datetime-like object to Datum::TimestampLtz.
+/// For naive datetimes, assumes UTC. For aware datetimes, converts to UTC.
+/// Supports: datetime.datetime, pd.Timestamp, np.datetime64
+fn python_datetime_to_timestamp_ltz(value: &Bound<PyAny>) -> 
PyResult<fcore::row::Datum<'static>> {
+    let (epoch_millis, nano_of_milli) = 
extract_datetime_components_ltz(value)?;
+
+    let ts = fcore::row::TimestampLtz::from_millis_nanos(epoch_millis, 
nano_of_milli)
+        .map_err(|e| FlussError::new_err(format!("Failed to create 
TimestampLtz: {}", e)))?;
+
+    Ok(fcore::row::Datum::TimestampLtz(ts))
+}
+
+/// Extract epoch milliseconds for TimestampNtz (wall-clock time, no timezone 
conversion).
+/// Uses integer arithmetic to avoid float precision issues.
+/// For clarity, tz-aware datetimes are rejected - use TimestampLtz for those.
+fn extract_datetime_components_ntz(value: &Bound<PyAny>) -> PyResult<(i64, 
i32)> {
+    use pyo3::types::PyDateTime;
+
+    // Try PyDateTime first
+    if let Ok(dt) = value.downcast::<PyDateTime>() {
+        // Reject tz-aware datetime for NTZ - it's ambiguous what the user 
wants
+        let tzinfo = dt.getattr("tzinfo")?;
+        if !tzinfo.is_none() {
+            return Err(FlussError::new_err(
+                "TIMESTAMP (without timezone) requires a naive datetime. \
+                 Got timezone-aware datetime. Either remove tzinfo or use 
TIMESTAMP_LTZ column.",
+            ));
+        }
+        return datetime_to_epoch_millis_as_utc(dt);
+    }
+
+    // Check for pandas Timestamp by verifying module name
+    if is_pandas_timestamp(value) {
+        // For NTZ, reject tz-aware pandas Timestamps for consistency with 
datetime behavior
+        if let Ok(tz) = value.getattr("tz") {
+            if !tz.is_none() {
+                return Err(FlussError::new_err(
+                    "TIMESTAMP (without timezone) requires a naive 
pd.Timestamp. \
+                     Got timezone-aware Timestamp. Either use 
tz_localize(None) or use TIMESTAMP_LTZ column.",
+                ));
+            }
+        }
+        // Naive pandas Timestamp: .value is nanoseconds since epoch 
(wall-clock as UTC)
+        let nanos: i64 = value.getattr("value")?.extract()?;
+        return Ok(nanos_to_millis_and_submillis(nanos));
+    }
+
+    // Try to_pydatetime() for objects that support it
+    if let Ok(py_dt) = value.call_method0("to_pydatetime") {
+        if let Ok(dt) = py_dt.downcast::<PyDateTime>() {
+            let tzinfo = dt.getattr("tzinfo")?;
+            if !tzinfo.is_none() {
+                return Err(FlussError::new_err(
+                    "TIMESTAMP (without timezone) requires a naive datetime. \
+                     Got timezone-aware value. Use TIMESTAMP_LTZ column 
instead.",
+                ));
+            }
+            return datetime_to_epoch_millis_as_utc(dt);
+        }
+    }
+
+    Err(FlussError::new_err(format!(
+        "Expected naive datetime.datetime or pd.Timestamp, got {}",
+        get_type_name(value)
+    )))
+}
+
+/// Extract epoch milliseconds for TimestampLtz (instant in time, UTC-based).
+/// For naive datetimes, assumes UTC. For aware datetimes, converts to UTC.
+fn extract_datetime_components_ltz(value: &Bound<PyAny>) -> PyResult<(i64, 
i32)> {
+    use pyo3::types::PyDateTime;
+
+    // Try PyDateTime first
+    if let Ok(dt) = value.downcast::<PyDateTime>() {
+        // Check if timezone-aware
+        let tzinfo = dt.getattr("tzinfo")?;
+        if tzinfo.is_none() {
+            // Naive datetime: assume UTC (treat components as UTC time)
+            return datetime_to_epoch_millis_as_utc(dt);
+        } else {
+            // Aware datetime: use timedelta from epoch to get correct UTC 
instant
+            return datetime_to_epoch_millis_utc_aware(dt);
+        }
+    }
+
+    // Check for pandas Timestamp
+    if is_pandas_timestamp(value) {
+        // pandas Timestamp.value is always nanoseconds since UTC epoch
+        let nanos: i64 = value.getattr("value")?.extract()?;
+        return Ok(nanos_to_millis_and_submillis(nanos));
+    }
+
+    // Try to_pydatetime()
+    if let Ok(py_dt) = value.call_method0("to_pydatetime") {
+        if let Ok(dt) = py_dt.downcast::<PyDateTime>() {
+            let tzinfo = dt.getattr("tzinfo")?;
+            if tzinfo.is_none() {
+                return datetime_to_epoch_millis_as_utc(dt);
+            } else {
+                return datetime_to_epoch_millis_utc_aware(dt);
+            }
+        }
+    }
+
+    Err(FlussError::new_err(format!(
+        "Expected datetime.datetime or pd.Timestamp, got {}",
+        get_type_name(value)
+    )))
+}
+
+/// Convert datetime components to epoch milliseconds treating them as UTC
+fn datetime_to_epoch_millis_as_utc(
+    dt: &pyo3::Bound<'_, pyo3::types::PyDateTime>,
+) -> PyResult<(i64, i32)> {
+    use pyo3::types::{PyDateAccess, PyTimeAccess};
+
+    let year = dt.get_year();
+    let month = dt.get_month();
+    let day = dt.get_day();
+    let hour = dt.get_hour();
+    let minute = dt.get_minute();
+    let second = dt.get_second();
+    let microsecond = dt.get_microsecond();
+
+    // Create jiff civil datetime and convert to UTC timestamp
+    // Safe casts: hour (0-23), minute (0-59), second (0-59) all fit in i8
+    let civil_dt = jiff::civil::date(year as i16, month as i8, day as i8).at(
+        hour as i8,
+        minute as i8,
+        second as i8,
+        microsecond as i32 * 1000,

Review Comment:
   For consistency with the rest of the code that uses named constants for time 
conversions (lines 29-35), this magic number should be replaced with a named 
constant. Consider using the existing NANOS_PER_MILLI constant divided by 
MICROS_PER_MILLI, or defining a new constant NANOS_PER_MICRO = 1000.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to