fresh-borzoni commented on code in PR #142:
URL: https://github.com/apache/fluss-rust/pull/142#discussion_r2698234147


##########
bindings/python/src/table.rs:
##########
@@ -195,8 +206,205 @@ impl AppendWriter {
 
 impl AppendWriter {
     /// Create a AppendWriter from a core append writer
-    pub fn from_core(append: fcore::client::AppendWriter) -> Self {
-        Self { inner: append }
+    pub fn from_core(
+        append: fcore::client::AppendWriter,
+        table_info: fcore::metadata::TableInfo,
+    ) -> Self {
+        Self {
+            inner: append,
+            table_info,
+        }
+    }
+}
+
+/// Represents different input shapes for a row
+#[derive(FromPyObject)]
+enum RowInput<'py> {
+    Dict(Bound<'py, pyo3::types::PyDict>),
+    Tuple(Bound<'py, pyo3::types::PyTuple>),
+    List(Bound<'py, pyo3::types::PyList>),
+}
+
+/// Helper function to process sequence types (list/tuple) into datums
+fn process_sequence_to_datums<'a, I>(
+    values: I,
+    len: usize,
+    fields: &[fcore::metadata::DataField],
+) -> PyResult<Vec<fcore::row::Datum<'static>>>
+where
+    I: Iterator<Item = Bound<'a, PyAny>>,
+{
+    if len != fields.len() {
+        return Err(FlussError::new_err(format!(
+            "Expected {} values, got {}",
+            fields.len(),
+            len
+        )));
+    }
+
+    let mut datums = Vec::with_capacity(fields.len());
+    for (i, (field, value)) in fields.iter().zip(values).enumerate() {
+        datums.push(
+            python_value_to_datum(&value, field.data_type()).map_err(|e| {
+                FlussError::new_err(format!("Field '{}' (index {}): {}", 
field.name(), i, e))
+            })?,
+        );
+    }
+    Ok(datums)
+}
+
+/// Convert Python row (dict/list/tuple) to GenericRow based on schema
+fn python_to_generic_row(
+    row: &Bound<PyAny>,
+    table_info: &fcore::metadata::TableInfo,
+) -> PyResult<fcore::row::GenericRow<'static>> {
+    // Extract with user-friendly error message
+    let row_input: RowInput = row.extract().map_err(|_| {
+        let type_name = row
+            .get_type()
+            .name()
+            .map(|n| n.to_string())
+            .unwrap_or_else(|_| "unknown".to_string());
+        FlussError::new_err(format!(
+            "Row must be a dict, list, or tuple; got {}",
+            type_name
+        ))
+    })?;
+    let schema = table_info.row_type();
+    let fields = schema.fields();
+
+    let datums = match row_input {
+        RowInput::Dict(dict) => {
+            // Strict: reject unknown keys (and also reject non-str keys 
nicely)
+            for (k, _) in dict.iter() {
+                let key_str = k.extract::<&str>().map_err(|_| {
+                    let key_type = k
+                        .get_type()
+                        .name()
+                        .map(|n| n.to_string())
+                        .unwrap_or_else(|_| "unknown".to_string());
+                    FlussError::new_err(format!("Row dict keys must be 
strings; got {}", key_type))
+                })?;
+
+                if fields.iter().all(|f| f.name() != key_str) {
+                    let expected = fields
+                        .iter()
+                        .map(|f| f.name())
+                        .collect::<Vec<_>>()
+                        .join(", ");
+                    return Err(FlussError::new_err(format!(
+                        "Unknown field '{}'. Expected fields: {}",
+                        key_str, expected
+                    )));
+                }
+            }
+
+            let mut datums = Vec::with_capacity(fields.len());
+            for field in fields {
+                let value = dict.get_item(field.name())?.ok_or_else(|| {
+                    FlussError::new_err(format!("Missing field: {}", 
field.name()))
+                })?;
+                datums.push(
+                    python_value_to_datum(&value, 
field.data_type()).map_err(|e| {
+                        FlussError::new_err(format!("Field '{}': {}", 
field.name(), e))
+                    })?,
+                );
+            }
+            datums
+        }
+
+        RowInput::List(list) => process_sequence_to_datums(list.iter(), 
list.len(), fields)?,
+
+        RowInput::Tuple(tuple) => process_sequence_to_datums(tuple.iter(), 
tuple.len(), fields)?,
+    };
+
+    Ok(fcore::row::GenericRow { values: datums })
+}
+
+/// Convert Python value to Datum based on data type
+fn python_value_to_datum(
+    value: &Bound<PyAny>,
+    data_type: &fcore::metadata::DataType,
+) -> PyResult<fcore::row::Datum<'static>> {
+    use fcore::row::{Datum, F32, F64};
+
+    if value.is_none() {
+        return Ok(Datum::Null);
+    }
+
+    match data_type {
+        fcore::metadata::DataType::Boolean(_) => {
+            let v: bool = value.extract()?;
+            Ok(Datum::Bool(v))
+        }
+        fcore::metadata::DataType::TinyInt(_) => {
+            // Strict type checking: reject bool for int columns
+            if value.is_instance_of::<pyo3::types::PyBool>() {
+                return Err(FlussError::new_err(
+                    "Expected int for TinyInt column, got bool. Use 0 or 1 
explicitly.".to_string(),
+                ));
+            }
+            let v: i8 = value.extract()?;
+            Ok(Datum::Int8(v))
+        }
+        fcore::metadata::DataType::SmallInt(_) => {
+            if value.is_instance_of::<pyo3::types::PyBool>() {
+                return Err(FlussError::new_err(
+                    "Expected int for SmallInt column, got bool. Use 0 or 1 
explicitly."
+                        .to_string(),
+                ));
+            }
+            let v: i16 = value.extract()?;
+            Ok(Datum::Int16(v))
+        }
+        fcore::metadata::DataType::Int(_) => {
+            if value.is_instance_of::<pyo3::types::PyBool>() {
+                return Err(FlussError::new_err(
+                    "Expected int for Int column, got bool. Use 0 or 1 
explicitly.".to_string(),
+                ));
+            }
+            let v: i32 = value.extract()?;
+            Ok(Datum::Int32(v))
+        }
+        fcore::metadata::DataType::BigInt(_) => {
+            if value.is_instance_of::<pyo3::types::PyBool>() {
+                return Err(FlussError::new_err(
+                    "Expected int for BigInt column, got bool. Use 0 or 1 
explicitly.".to_string(),
+                ));
+            }
+            let v: i64 = value.extract()?;
+            Ok(Datum::Int64(v))
+        }
+        fcore::metadata::DataType::Float(_) => {
+            let v: f32 = value.extract()?;
+            Ok(Datum::Float32(F32::from(v)))
+        }
+        fcore::metadata::DataType::Double(_) => {
+            let v: f64 = value.extract()?;
+            Ok(Datum::Float64(F64::from(v)))
+        }
+        fcore::metadata::DataType::String(_) | 
fcore::metadata::DataType::Char(_) => {
+            let v: String = value.extract()?;
+            Ok(v.into())
+        }
+        fcore::metadata::DataType::Bytes(_) | 
fcore::metadata::DataType::Binary(_) => {
+            // Efficient extraction: downcast to specific type and use bulk 
copy.
+            // PyBytes::as_bytes() and PyByteArray::to_vec() are O(n) bulk 
copies of the underlying data.
+            if let Ok(bytes) = value.downcast::<pyo3::types::PyBytes>() {
+                Ok(bytes.as_bytes().to_vec().into())
+            } else if let Ok(bytearray) = 
value.downcast::<pyo3::types::PyByteArray>() {
+                Ok(bytearray.to_vec().into())
+            } else {
+                Err(FlussError::new_err(format!(
+                    "Expected bytes or bytearray, got {}",
+                    value.get_type().name()?
+                )))
+            }
+        }
+        _ => Err(FlussError::new_err(format!(

Review Comment:
   Good question! 👍 
   When we add timestamp support to `append`, users would be able to pass:
     - `datetime.datetime` / `datetime.date` (Python stdlib)
     - `pd.Timestamp` (Pandas)
     - `np.datetime64` (NumPy)
     - Raw `int` (for epoch milliseconds/microseconds)
     
   Temporal types and decimal are intentionally excluded from this PR to keep 
scope. 
   We need:
     - Python -> Datum converters
     - Datum -> Arrow builders
     
     Created this issue for this: 
https://github.com/apache/fluss-rust/issues/167



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to