Copilot commented on code in PR #142:
URL: https://github.com/apache/fluss-rust/pull/142#discussion_r2695075404


##########
bindings/python/src/table.rs:
##########
@@ -195,8 +206,218 @@ impl AppendWriter {
 
 impl AppendWriter {
     /// Create a AppendWriter from a core append writer
-    pub fn from_core(append: fcore::client::AppendWriter) -> Self {
-        Self { inner: append }
+    pub fn from_core(
+        append: fcore::client::AppendWriter,
+        table_info: fcore::metadata::TableInfo,
+    ) -> Self {
+        Self {
+            inner: append,
+            table_info,
+        }
+    }
+}
+
+/// Represents different input shapes for a row
+#[derive(FromPyObject)]
+enum RowInput<'py> {
+    Dict(Bound<'py, pyo3::types::PyDict>),
+    Tuple(Bound<'py, pyo3::types::PyTuple>),
+    List(Bound<'py, pyo3::types::PyList>),
+}
+
+/// Convert Python row (dict/list/tuple) to GenericRow based on schema
+fn python_to_generic_row(
+    row: &Bound<PyAny>,
+    table_info: &fcore::metadata::TableInfo,
+) -> PyResult<fcore::row::GenericRow<'static>> {
+    // Extract with user-friendly error message
+    let row_input: RowInput = row.extract().map_err(|_| {
+        let type_name = row
+            .get_type()
+            .name()
+            .map(|n| n.to_string())
+            .unwrap_or_else(|_| "unknown".to_string());
+        FlussError::new_err(format!(
+            "Row must be a dict, list, or tuple; got {}",
+            type_name
+        ))
+    })?;
+    let schema = table_info.row_type();
+    let fields = schema.fields();
+    let mut datums = Vec::with_capacity(fields.len());
+
+    match row_input {
+        RowInput::Dict(dict) => {
+            // Strict: reject unknown keys (and also reject non-str keys 
nicely)
+            for (k, _) in dict.iter() {
+                let key_str = k.extract::<&str>().map_err(|_| {
+                    let key_type = k
+                        .get_type()
+                        .name()
+                        .map(|n| n.to_string())
+                        .unwrap_or_else(|_| "unknown".to_string());
+                    FlussError::new_err(format!("Row dict keys must be 
strings; got {}", key_type))
+                })?;
+
+                if fields.iter().all(|f| f.name() != key_str) {
+                    let expected = fields
+                        .iter()
+                        .map(|f| f.name())
+                        .collect::<Vec<_>>()
+                        .join(", ");
+                    return Err(FlussError::new_err(format!(
+                        "Unknown field '{}'. Expected fields: {}",
+                        key_str, expected
+                    )));
+                }
+            }
+
+            for field in fields {
+                let value = dict.get_item(field.name())?.ok_or_else(|| {
+                    FlussError::new_err(format!("Missing field: {}", 
field.name()))
+                })?;
+                datums.push(
+                    python_value_to_datum(&value, 
field.data_type()).map_err(|e| {
+                        FlussError::new_err(format!("Field '{}': {}", 
field.name(), e))
+                    })?,
+                );
+            }
+        }
+
+        RowInput::List(list) => {
+            if list.len() != fields.len() {
+                return Err(FlussError::new_err(format!(
+                    "Expected {} values, got {}",
+                    fields.len(),
+                    list.len()
+                )));
+            }
+
+            for (i, (field, value)) in 
fields.iter().zip(list.iter()).enumerate() {
+                datums.push(
+                    python_value_to_datum(&value, 
field.data_type()).map_err(|e| {
+                        FlussError::new_err(format!(
+                            "Field '{}' (index {}): {}",
+                            field.name(),
+                            i,
+                            e
+                        ))
+                    })?,
+                );
+            }
+        }
+
+        RowInput::Tuple(tuple) => {
+            if tuple.len() != fields.len() {
+                return Err(FlussError::new_err(format!(
+                    "Expected {} values, got {}",
+                    fields.len(),
+                    tuple.len()
+                )));
+            }
+
+            for (i, (field, value)) in 
fields.iter().zip(tuple.iter()).enumerate() {
+                datums.push(
+                    python_value_to_datum(&value, 
field.data_type()).map_err(|e| {
+                        FlussError::new_err(format!(
+                            "Field '{}' (index {}): {}",
+                            field.name(),
+                            i,
+                            e
+                        ))
+                    })?,
+                );
+            }
+        }
+    }
+
+    Ok(fcore::row::GenericRow { values: datums })
+}
+
+/// Convert Python value to Datum based on data type
+fn python_value_to_datum(
+    value: &Bound<PyAny>,
+    data_type: &fcore::metadata::DataType,
+) -> PyResult<fcore::row::Datum<'static>> {
+    use fcore::row::{Datum, F32, F64};
+
+    if value.is_none() {
+        return Ok(Datum::Null);
+    }
+
+    match data_type {
+        fcore::metadata::DataType::Boolean(_) => {
+            let v: bool = value.extract()?;
+            Ok(Datum::Bool(v))
+        }
+        fcore::metadata::DataType::TinyInt(_) => {
+            // Strict type checking: reject bool for int columns
+            if value.is_instance_of::<pyo3::types::PyBool>() {
+                return Err(FlussError::new_err(
+                    "Expected int for TinyInt column, got bool. Use 0 or 1 
explicitly.".to_string(),
+                ));
+            }
+            let v: i8 = value.extract()?;
+            Ok(Datum::Int8(v))
+        }
+        fcore::metadata::DataType::SmallInt(_) => {
+            if value.is_instance_of::<pyo3::types::PyBool>() {
+                return Err(FlussError::new_err(
+                    "Expected int for SmallInt column, got bool. Use 0 or 1 
explicitly."
+                        .to_string(),
+                ));
+            }
+            let v: i16 = value.extract()?;
+            Ok(Datum::Int16(v))
+        }
+        fcore::metadata::DataType::Int(_) => {
+            if value.is_instance_of::<pyo3::types::PyBool>() {
+                return Err(FlussError::new_err(
+                    "Expected int for Int column, got bool. Use 0 or 1 
explicitly.".to_string(),
+                ));
+            }
+            let v: i32 = value.extract()?;
+            Ok(Datum::Int32(v))
+        }
+        fcore::metadata::DataType::BigInt(_) => {
+            if value.is_instance_of::<pyo3::types::PyBool>() {
+                return Err(FlussError::new_err(
+                    "Expected int for BigInt column, got bool. Use 0 or 1 
explicitly.".to_string(),
+                ));
+            }
+            let v: i64 = value.extract()?;
+            Ok(Datum::Int64(v))
+        }
+        fcore::metadata::DataType::Float(_) => {
+            let v: f32 = value.extract()?;
+            Ok(Datum::Float32(F32::from(v)))
+        }
+        fcore::metadata::DataType::Double(_) => {
+            let v: f64 = value.extract()?;
+            Ok(Datum::Float64(F64::from(v)))
+        }
+        fcore::metadata::DataType::String(_) | 
fcore::metadata::DataType::Char(_) => {
+            let v: String = value.extract()?;
+            Ok(v.into())
+        }
+        fcore::metadata::DataType::Bytes(_) | 
fcore::metadata::DataType::Binary(_) => {
+            // Efficient extraction: downcast to specific type and use bulk 
copy
+            // PyBytes::as_bytes() and PyByteArray::to_vec() are O(n) bulk 
copies,

Review Comment:
   The comment on line 405 is incomplete - it ends with a comma and no 
continuation. Complete the sentence or remove the trailing comma.
   ```suggestion
               // PyBytes::as_bytes() and PyByteArray::to_vec() are O(n) bulk 
copies of the underlying data.
   ```



##########
bindings/python/src/table.rs:
##########
@@ -195,8 +206,218 @@ impl AppendWriter {
 
 impl AppendWriter {
     /// Create a AppendWriter from a core append writer
-    pub fn from_core(append: fcore::client::AppendWriter) -> Self {
-        Self { inner: append }
+    pub fn from_core(
+        append: fcore::client::AppendWriter,
+        table_info: fcore::metadata::TableInfo,
+    ) -> Self {
+        Self {
+            inner: append,
+            table_info,
+        }
+    }
+}
+
+/// Represents different input shapes for a row
+#[derive(FromPyObject)]
+enum RowInput<'py> {
+    Dict(Bound<'py, pyo3::types::PyDict>),
+    Tuple(Bound<'py, pyo3::types::PyTuple>),
+    List(Bound<'py, pyo3::types::PyList>),
+}
+
+/// Convert Python row (dict/list/tuple) to GenericRow based on schema
+fn python_to_generic_row(
+    row: &Bound<PyAny>,
+    table_info: &fcore::metadata::TableInfo,
+) -> PyResult<fcore::row::GenericRow<'static>> {
+    // Extract with user-friendly error message
+    let row_input: RowInput = row.extract().map_err(|_| {
+        let type_name = row
+            .get_type()
+            .name()
+            .map(|n| n.to_string())
+            .unwrap_or_else(|_| "unknown".to_string());
+        FlussError::new_err(format!(
+            "Row must be a dict, list, or tuple; got {}",
+            type_name
+        ))
+    })?;
+    let schema = table_info.row_type();
+    let fields = schema.fields();
+    let mut datums = Vec::with_capacity(fields.len());
+
+    match row_input {
+        RowInput::Dict(dict) => {
+            // Strict: reject unknown keys (and also reject non-str keys 
nicely)
+            for (k, _) in dict.iter() {
+                let key_str = k.extract::<&str>().map_err(|_| {
+                    let key_type = k
+                        .get_type()
+                        .name()
+                        .map(|n| n.to_string())
+                        .unwrap_or_else(|_| "unknown".to_string());
+                    FlussError::new_err(format!("Row dict keys must be 
strings; got {}", key_type))
+                })?;
+
+                if fields.iter().all(|f| f.name() != key_str) {
+                    let expected = fields
+                        .iter()
+                        .map(|f| f.name())
+                        .collect::<Vec<_>>()
+                        .join(", ");
+                    return Err(FlussError::new_err(format!(
+                        "Unknown field '{}'. Expected fields: {}",
+                        key_str, expected
+                    )));
+                }
+            }
+
+            for field in fields {
+                let value = dict.get_item(field.name())?.ok_or_else(|| {
+                    FlussError::new_err(format!("Missing field: {}", 
field.name()))
+                })?;
+                datums.push(
+                    python_value_to_datum(&value, 
field.data_type()).map_err(|e| {
+                        FlussError::new_err(format!("Field '{}': {}", 
field.name(), e))
+                    })?,
+                );
+            }
+        }
+
+        RowInput::List(list) => {
+            if list.len() != fields.len() {
+                return Err(FlussError::new_err(format!(
+                    "Expected {} values, got {}",
+                    fields.len(),
+                    list.len()
+                )));
+            }
+
+            for (i, (field, value)) in 
fields.iter().zip(list.iter()).enumerate() {
+                datums.push(
+                    python_value_to_datum(&value, 
field.data_type()).map_err(|e| {
+                        FlussError::new_err(format!(
+                            "Field '{}' (index {}): {}",
+                            field.name(),
+                            i,
+                            e
+                        ))
+                    })?,
+                );
+            }
+        }
+
+        RowInput::Tuple(tuple) => {
+            if tuple.len() != fields.len() {
+                return Err(FlussError::new_err(format!(
+                    "Expected {} values, got {}",
+                    fields.len(),
+                    tuple.len()
+                )));
+            }
+
+            for (i, (field, value)) in 
fields.iter().zip(tuple.iter()).enumerate() {
+                datums.push(
+                    python_value_to_datum(&value, 
field.data_type()).map_err(|e| {
+                        FlussError::new_err(format!(
+                            "Field '{}' (index {}): {}",
+                            field.name(),
+                            i,
+                            e
+                        ))
+                    })?,
+                );
+            }
+        }

Review Comment:
   The logic for handling List and Tuple inputs is duplicated (lines 287-308 
and 310-331). Consider extracting the common logic into a helper function that 
accepts an iterator, which would reduce code duplication and make the code more 
maintainable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to