luoyuxia commented on code in PR #142:
URL: https://github.com/apache/fluss-rust/pull/142#discussion_r2697366278


##########
bindings/python/fluss/__init__.pyi:
##########
@@ -68,6 +68,35 @@ class FlussTable:
     def __repr__(self) -> str: ...
 
 class AppendWriter:
+    def append(self, row: dict | list | tuple) -> None:

Review Comment:
   can we make it as aysnc api? Otherwise, if just append a row, it has to wait 
about 500ms(rust core will  accumlate records into a record barch and send to 
server).



##########
bindings/python/src/table.rs:
##########
@@ -195,8 +206,205 @@ impl AppendWriter {
 
 impl AppendWriter {
     /// Create a AppendWriter from a core append writer
-    pub fn from_core(append: fcore::client::AppendWriter) -> Self {
-        Self { inner: append }
+    pub fn from_core(
+        append: fcore::client::AppendWriter,
+        table_info: fcore::metadata::TableInfo,
+    ) -> Self {
+        Self {
+            inner: append,
+            table_info,
+        }
+    }
+}
+
+/// Represents different input shapes for a row
+#[derive(FromPyObject)]
+enum RowInput<'py> {
+    Dict(Bound<'py, pyo3::types::PyDict>),
+    Tuple(Bound<'py, pyo3::types::PyTuple>),
+    List(Bound<'py, pyo3::types::PyList>),
+}
+
+/// Helper function to process sequence types (list/tuple) into datums
+fn process_sequence_to_datums<'a, I>(
+    values: I,
+    len: usize,
+    fields: &[fcore::metadata::DataField],
+) -> PyResult<Vec<fcore::row::Datum<'static>>>
+where
+    I: Iterator<Item = Bound<'a, PyAny>>,
+{
+    if len != fields.len() {
+        return Err(FlussError::new_err(format!(
+            "Expected {} values, got {}",
+            fields.len(),
+            len
+        )));
+    }
+
+    let mut datums = Vec::with_capacity(fields.len());
+    for (i, (field, value)) in fields.iter().zip(values).enumerate() {
+        datums.push(
+            python_value_to_datum(&value, field.data_type()).map_err(|e| {
+                FlussError::new_err(format!("Field '{}' (index {}): {}", 
field.name(), i, e))
+            })?,
+        );
+    }
+    Ok(datums)
+}
+
+/// Convert Python row (dict/list/tuple) to GenericRow based on schema
+fn python_to_generic_row(
+    row: &Bound<PyAny>,
+    table_info: &fcore::metadata::TableInfo,
+) -> PyResult<fcore::row::GenericRow<'static>> {
+    // Extract with user-friendly error message
+    let row_input: RowInput = row.extract().map_err(|_| {
+        let type_name = row
+            .get_type()
+            .name()
+            .map(|n| n.to_string())
+            .unwrap_or_else(|_| "unknown".to_string());
+        FlussError::new_err(format!(
+            "Row must be a dict, list, or tuple; got {}",
+            type_name
+        ))
+    })?;
+    let schema = table_info.row_type();
+    let fields = schema.fields();
+
+    let datums = match row_input {
+        RowInput::Dict(dict) => {
+            // Strict: reject unknown keys (and also reject non-str keys 
nicely)
+            for (k, _) in dict.iter() {
+                let key_str = k.extract::<&str>().map_err(|_| {
+                    let key_type = k
+                        .get_type()
+                        .name()
+                        .map(|n| n.to_string())
+                        .unwrap_or_else(|_| "unknown".to_string());
+                    FlussError::new_err(format!("Row dict keys must be 
strings; got {}", key_type))
+                })?;
+
+                if fields.iter().all(|f| f.name() != key_str) {
+                    let expected = fields
+                        .iter()
+                        .map(|f| f.name())
+                        .collect::<Vec<_>>()
+                        .join(", ");
+                    return Err(FlussError::new_err(format!(
+                        "Unknown field '{}'. Expected fields: {}",
+                        key_str, expected
+                    )));
+                }
+            }
+
+            let mut datums = Vec::with_capacity(fields.len());
+            for field in fields {
+                let value = dict.get_item(field.name())?.ok_or_else(|| {
+                    FlussError::new_err(format!("Missing field: {}", 
field.name()))
+                })?;
+                datums.push(
+                    python_value_to_datum(&value, 
field.data_type()).map_err(|e| {
+                        FlussError::new_err(format!("Field '{}': {}", 
field.name(), e))
+                    })?,
+                );
+            }
+            datums
+        }
+
+        RowInput::List(list) => process_sequence_to_datums(list.iter(), 
list.len(), fields)?,
+
+        RowInput::Tuple(tuple) => process_sequence_to_datums(tuple.iter(), 
tuple.len(), fields)?,
+    };
+
+    Ok(fcore::row::GenericRow { values: datums })
+}
+
+/// Convert Python value to Datum based on data type
+fn python_value_to_datum(
+    value: &Bound<PyAny>,
+    data_type: &fcore::metadata::DataType,
+) -> PyResult<fcore::row::Datum<'static>> {
+    use fcore::row::{Datum, F32, F64};
+
+    if value.is_none() {
+        return Ok(Datum::Null);
+    }
+
+    match data_type {
+        fcore::metadata::DataType::Boolean(_) => {
+            let v: bool = value.extract()?;
+            Ok(Datum::Bool(v))
+        }
+        fcore::metadata::DataType::TinyInt(_) => {
+            // Strict type checking: reject bool for int columns
+            if value.is_instance_of::<pyo3::types::PyBool>() {
+                return Err(FlussError::new_err(
+                    "Expected int for TinyInt column, got bool. Use 0 or 1 
explicitly.".to_string(),
+                ));
+            }
+            let v: i8 = value.extract()?;
+            Ok(Datum::Int8(v))
+        }
+        fcore::metadata::DataType::SmallInt(_) => {
+            if value.is_instance_of::<pyo3::types::PyBool>() {
+                return Err(FlussError::new_err(
+                    "Expected int for SmallInt column, got bool. Use 0 or 1 
explicitly."
+                        .to_string(),
+                ));
+            }
+            let v: i16 = value.extract()?;
+            Ok(Datum::Int16(v))
+        }
+        fcore::metadata::DataType::Int(_) => {
+            if value.is_instance_of::<pyo3::types::PyBool>() {
+                return Err(FlussError::new_err(
+                    "Expected int for Int column, got bool. Use 0 or 1 
explicitly.".to_string(),
+                ));
+            }
+            let v: i32 = value.extract()?;
+            Ok(Datum::Int32(v))
+        }
+        fcore::metadata::DataType::BigInt(_) => {
+            if value.is_instance_of::<pyo3::types::PyBool>() {
+                return Err(FlussError::new_err(
+                    "Expected int for BigInt column, got bool. Use 0 or 1 
explicitly.".to_string(),
+                ));
+            }
+            let v: i64 = value.extract()?;
+            Ok(Datum::Int64(v))
+        }
+        fcore::metadata::DataType::Float(_) => {
+            let v: f32 = value.extract()?;
+            Ok(Datum::Float32(F32::from(v)))
+        }
+        fcore::metadata::DataType::Double(_) => {
+            let v: f64 = value.extract()?;
+            Ok(Datum::Float64(F64::from(v)))
+        }
+        fcore::metadata::DataType::String(_) | 
fcore::metadata::DataType::Char(_) => {
+            let v: String = value.extract()?;
+            Ok(v.into())
+        }
+        fcore::metadata::DataType::Bytes(_) | 
fcore::metadata::DataType::Binary(_) => {
+            // Efficient extraction: downcast to specific type and use bulk 
copy.
+            // PyBytes::as_bytes() and PyByteArray::to_vec() are O(n) bulk 
copies of the underlying data.
+            if let Ok(bytes) = value.downcast::<pyo3::types::PyBytes>() {
+                Ok(bytes.as_bytes().to_vec().into())
+            } else if let Ok(bytearray) = 
value.downcast::<pyo3::types::PyByteArray>() {
+                Ok(bytearray.to_vec().into())
+            } else {
+                Err(FlussError::new_err(format!(
+                    "Expected bytes or bytearray, got {}",
+                    value.get_type().name()?
+                )))
+            }
+        }
+        _ => Err(FlussError::new_err(format!(

Review Comment:
   Not very familiar with python, but I'm curious about for `timestamp` type, 
the python user should pass what? `np.datetime64` or some other thing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to