This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 60e4c09  feat: Introduce python bindings row-based append API (#142)
60e4c09 is described below

commit 60e4c09bb9bc7441c7a41ac43d11b8ae6f5ec5c1
Author: Anton Borisov <[email protected]>
AuthorDate: Sat Jan 17 01:53:30 2026 +0000

    feat: Introduce python bindings row-based append API (#142)
---
 .gitignore                         |  11 +-
 bindings/python/example/example.py |  14 ++-
 bindings/python/fluss/__init__.pyi |  26 ++++
 bindings/python/src/table.rs       | 250 ++++++++++++++++++++++++++++++++++---
 4 files changed, 283 insertions(+), 18 deletions(-)

diff --git a/.gitignore b/.gitignore
index c6edfb7..8202bbc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,4 +17,13 @@ Cargo.lock
 #  and can be added to the global gitignore or merged into this file.  For a 
more nuclear
 #  option (not recommended) you can uncomment the following to ignore the 
entire idea folder.
 .idea/
-.vscode/
\ No newline at end of file
+.vscode/
+
+# Python
+__pycache__/
+*.py[cod]
+*$py.class
+*.so
+*.egg-info/
+dist/
+build/
\ No newline at end of file
diff --git a/bindings/python/example/example.py 
b/bindings/python/example/example.py
index 0b1e67d..f1f20d1 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -118,11 +118,21 @@ async def main():
         append_writer.write_arrow_batch(pa_record_batch)
         print("Successfully wrote PyArrow RecordBatch")
 
-        # Test 3: Write Pandas DataFrame
+        # Test 3: Append single rows
+        print("\n--- Testing single row append ---")
+        # Dict input
+        await append_writer.append({"id": 8, "name": "Helen", "score": 93.5, 
"age": 26})
+        print("Successfully appended row (dict)")
+
+        # List input
+        await append_writer.append([9, "Ivan", 90.0, 31])
+        print("Successfully appended row (list)")
+
+        # Test 4: Write Pandas DataFrame
         print("\n--- Testing Pandas DataFrame write ---")
         df = pd.DataFrame(
             {
-                "id": [6, 7],
+                "id": [10, 11],
                 "name": ["Frank", "Grace"],
                 "score": [89.3, 94.7],
                 "age": [29, 27],
diff --git a/bindings/python/fluss/__init__.pyi 
b/bindings/python/fluss/__init__.pyi
index 4565242..6073070 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -68,6 +68,32 @@ class FlussTable:
     def __repr__(self) -> str: ...
 
 class AppendWriter:
+    async def append(self, row: dict | list | tuple) -> None:
+        """Append a single row to the table.
+
+        Args:
+            row: Dictionary mapping field names to values, or
+                 list/tuple of values in schema order
+
+        Supported Types:
+            Currently supports primitive types only:
+            - Boolean, TinyInt, SmallInt, Int, BigInt (integers)
+            - Float, Double (floating point)
+            - String, Char (text)
+            - Bytes, Binary (binary data)
+            - Null values
+
+            Temporal types (Date, Timestamp, Decimal) are not yet supported.
+
+        Example:
+            await writer.append({'id': 1, 'name': 'Alice', 'score': 95.5})
+            await writer.append([1, 'Alice', 95.5])
+
+        Note:
+            For high-throughput bulk loading, prefer write_arrow_batch().
+            Use flush() to ensure all queued records are sent and acknowledged.
+        """
+        ...
     def write_arrow(self, table: pa.Table) -> None: ...
     def write_arrow_batch(self, batch: pa.RecordBatch) -> None: ...
     def write_pandas(self, df: pd.DataFrame) -> None: ...
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 6cd13c4..db85c51 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -49,7 +49,7 @@ impl FlussTable {
         let table_info = self.table_info.clone();
 
         future_into_py(py, async move {
-            let fluss_table = fcore::client::FlussTable::new(&conn, metadata, 
table_info);
+            let fluss_table = fcore::client::FlussTable::new(&conn, metadata, 
table_info.clone());
 
             let table_append = fluss_table
                 .new_append()
@@ -57,7 +57,7 @@ impl FlussTable {
 
             let rust_writer = table_append.create_writer();
 
-            let py_writer = AppendWriter::from_core(rust_writer);
+            let py_writer = AppendWriter::from_core(rust_writer, table_info);
 
             Python::attach(|py| Py::new(py, py_writer))
         })
@@ -193,13 +193,14 @@ impl FlussTable {
 /// Writer for appending data to a Fluss table
 #[pyclass]
 pub struct AppendWriter {
-    inner: fcore::client::AppendWriter,
+    inner: Arc<fcore::client::AppendWriter>,
+    table_info: fcore::metadata::TableInfo,
 }
 
 #[pymethods]
 impl AppendWriter {
     /// Write Arrow table data
-    pub fn write_arrow(&mut self, py: Python, table: Py<PyAny>) -> 
PyResult<()> {
+    pub fn write_arrow(&self, py: Python, table: Py<PyAny>) -> PyResult<()> {
         // Convert Arrow Table to batches and write each batch
         let batches = table.call_method0(py, "to_batches")?;
         let batch_list: Vec<Py<PyAny>> = batches.extract(py)?;
@@ -211,22 +212,40 @@ impl AppendWriter {
     }
 
     /// Write Arrow batch data
-    pub fn write_arrow_batch(&mut self, py: Python, batch: Py<PyAny>) -> 
PyResult<()> {
+    pub fn write_arrow_batch(&self, py: Python, batch: Py<PyAny>) -> 
PyResult<()> {
         // This shares the underlying Arrow buffers without copying data
         let batch_bound = batch.bind(py);
         let rust_batch: RecordBatch = 
FromPyArrow::from_pyarrow_bound(batch_bound)
             .map_err(|e| FlussError::new_err(format!("Failed to convert 
RecordBatch: {e}")))?;
 
+        let inner = self.inner.clone();
         // Release the GIL before blocking on async operation
         let result = py.detach(|| {
-            TOKIO_RUNTIME.block_on(async { 
self.inner.append_arrow_batch(rust_batch).await })
+            TOKIO_RUNTIME.block_on(async { 
inner.append_arrow_batch(rust_batch).await })
         });
 
         result.map_err(|e| FlussError::new_err(e.to_string()))
     }
 
+    /// Append a single row to the table
+    pub fn append<'py>(
+        &self,
+        py: Python<'py>,
+        row: &Bound<'py, PyAny>,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let generic_row = python_to_generic_row(row, &self.table_info)?;
+        let inner = self.inner.clone();
+
+        future_into_py(py, async move {
+            inner
+                .append(generic_row)
+                .await
+                .map_err(|e| FlussError::new_err(e.to_string()))
+        })
+    }
+
     /// Write Pandas DataFrame data
-    pub fn write_pandas(&mut self, py: Python, df: Py<PyAny>) -> PyResult<()> {
+    pub fn write_pandas(&self, py: Python, df: Py<PyAny>) -> PyResult<()> {
         // Import pyarrow module
         let pyarrow = py.import("pyarrow")?;
 
@@ -241,12 +260,16 @@ impl AppendWriter {
     }
 
     /// Flush any pending data
-    pub fn flush(&mut self) -> PyResult<()> {
-        TOKIO_RUNTIME.block_on(async {
-            self.inner
-                .flush()
-                .await
-                .map_err(|e| FlussError::new_err(e.to_string()))
+    pub fn flush(&self, py: Python) -> PyResult<()> {
+        let inner = self.inner.clone();
+        // Release the GIL before blocking on I/O
+        py.detach(|| {
+            TOKIO_RUNTIME.block_on(async {
+                inner
+                    .flush()
+                    .await
+                    .map_err(|e| FlussError::new_err(e.to_string()))
+            })
         })
     }
 
@@ -257,8 +280,205 @@ impl AppendWriter {
 
 impl AppendWriter {
     /// Create a AppendWriter from a core append writer
-    pub fn from_core(append: fcore::client::AppendWriter) -> Self {
-        Self { inner: append }
+    pub fn from_core(
+        append: fcore::client::AppendWriter,
+        table_info: fcore::metadata::TableInfo,
+    ) -> Self {
+        Self {
+            inner: Arc::new(append),
+            table_info,
+        }
+    }
+}
+
+/// Represents different input shapes for a row
+#[derive(FromPyObject)]
+enum RowInput<'py> {
+    Dict(Bound<'py, pyo3::types::PyDict>),
+    Tuple(Bound<'py, pyo3::types::PyTuple>),
+    List(Bound<'py, pyo3::types::PyList>),
+}
+
+/// Helper function to process sequence types (list/tuple) into datums
+fn process_sequence_to_datums<'a, I>(
+    values: I,
+    len: usize,
+    fields: &[fcore::metadata::DataField],
+) -> PyResult<Vec<fcore::row::Datum<'static>>>
+where
+    I: Iterator<Item = Bound<'a, PyAny>>,
+{
+    if len != fields.len() {
+        return Err(FlussError::new_err(format!(
+            "Expected {} values, got {}",
+            fields.len(),
+            len
+        )));
+    }
+
+    let mut datums = Vec::with_capacity(fields.len());
+    for (i, (field, value)) in fields.iter().zip(values).enumerate() {
+        datums.push(
+            python_value_to_datum(&value, field.data_type()).map_err(|e| {
+                FlussError::new_err(format!("Field '{}' (index {}): {}", 
field.name(), i, e))
+            })?,
+        );
+    }
+    Ok(datums)
+}
+
+/// Convert Python row (dict/list/tuple) to GenericRow based on schema
+fn python_to_generic_row(
+    row: &Bound<PyAny>,
+    table_info: &fcore::metadata::TableInfo,
+) -> PyResult<fcore::row::GenericRow<'static>> {
+    // Extract with user-friendly error message
+    let row_input: RowInput = row.extract().map_err(|_| {
+        let type_name = row
+            .get_type()
+            .name()
+            .map(|n| n.to_string())
+            .unwrap_or_else(|_| "unknown".to_string());
+        FlussError::new_err(format!(
+            "Row must be a dict, list, or tuple; got {}",
+            type_name
+        ))
+    })?;
+    let schema = table_info.row_type();
+    let fields = schema.fields();
+
+    let datums = match row_input {
+        RowInput::Dict(dict) => {
+            // Strict: reject unknown keys (and also reject non-str keys 
nicely)
+            for (k, _) in dict.iter() {
+                let key_str = k.extract::<&str>().map_err(|_| {
+                    let key_type = k
+                        .get_type()
+                        .name()
+                        .map(|n| n.to_string())
+                        .unwrap_or_else(|_| "unknown".to_string());
+                    FlussError::new_err(format!("Row dict keys must be 
strings; got {}", key_type))
+                })?;
+
+                if fields.iter().all(|f| f.name() != key_str) {
+                    let expected = fields
+                        .iter()
+                        .map(|f| f.name())
+                        .collect::<Vec<_>>()
+                        .join(", ");
+                    return Err(FlussError::new_err(format!(
+                        "Unknown field '{}'. Expected fields: {}",
+                        key_str, expected
+                    )));
+                }
+            }
+
+            let mut datums = Vec::with_capacity(fields.len());
+            for field in fields {
+                let value = dict.get_item(field.name())?.ok_or_else(|| {
+                    FlussError::new_err(format!("Missing field: {}", 
field.name()))
+                })?;
+                datums.push(
+                    python_value_to_datum(&value, 
field.data_type()).map_err(|e| {
+                        FlussError::new_err(format!("Field '{}': {}", 
field.name(), e))
+                    })?,
+                );
+            }
+            datums
+        }
+
+        RowInput::List(list) => process_sequence_to_datums(list.iter(), 
list.len(), fields)?,
+
+        RowInput::Tuple(tuple) => process_sequence_to_datums(tuple.iter(), 
tuple.len(), fields)?,
+    };
+
+    Ok(fcore::row::GenericRow { values: datums })
+}
+
+/// Convert Python value to Datum based on data type
+fn python_value_to_datum(
+    value: &Bound<PyAny>,
+    data_type: &fcore::metadata::DataType,
+) -> PyResult<fcore::row::Datum<'static>> {
+    use fcore::row::{Datum, F32, F64};
+
+    if value.is_none() {
+        return Ok(Datum::Null);
+    }
+
+    match data_type {
+        fcore::metadata::DataType::Boolean(_) => {
+            let v: bool = value.extract()?;
+            Ok(Datum::Bool(v))
+        }
+        fcore::metadata::DataType::TinyInt(_) => {
+            // Strict type checking: reject bool for int columns
+            if value.is_instance_of::<pyo3::types::PyBool>() {
+                return Err(FlussError::new_err(
+                    "Expected int for TinyInt column, got bool. Use 0 or 1 
explicitly.".to_string(),
+                ));
+            }
+            let v: i8 = value.extract()?;
+            Ok(Datum::Int8(v))
+        }
+        fcore::metadata::DataType::SmallInt(_) => {
+            if value.is_instance_of::<pyo3::types::PyBool>() {
+                return Err(FlussError::new_err(
+                    "Expected int for SmallInt column, got bool. Use 0 or 1 
explicitly."
+                        .to_string(),
+                ));
+            }
+            let v: i16 = value.extract()?;
+            Ok(Datum::Int16(v))
+        }
+        fcore::metadata::DataType::Int(_) => {
+            if value.is_instance_of::<pyo3::types::PyBool>() {
+                return Err(FlussError::new_err(
+                    "Expected int for Int column, got bool. Use 0 or 1 
explicitly.".to_string(),
+                ));
+            }
+            let v: i32 = value.extract()?;
+            Ok(Datum::Int32(v))
+        }
+        fcore::metadata::DataType::BigInt(_) => {
+            if value.is_instance_of::<pyo3::types::PyBool>() {
+                return Err(FlussError::new_err(
+                    "Expected int for BigInt column, got bool. Use 0 or 1 
explicitly.".to_string(),
+                ));
+            }
+            let v: i64 = value.extract()?;
+            Ok(Datum::Int64(v))
+        }
+        fcore::metadata::DataType::Float(_) => {
+            let v: f32 = value.extract()?;
+            Ok(Datum::Float32(F32::from(v)))
+        }
+        fcore::metadata::DataType::Double(_) => {
+            let v: f64 = value.extract()?;
+            Ok(Datum::Float64(F64::from(v)))
+        }
+        fcore::metadata::DataType::String(_) | 
fcore::metadata::DataType::Char(_) => {
+            let v: String = value.extract()?;
+            Ok(v.into())
+        }
+        fcore::metadata::DataType::Bytes(_) | 
fcore::metadata::DataType::Binary(_) => {
+            // Efficient extraction: downcast to specific type and use bulk 
copy.
+            // PyBytes::as_bytes() and PyByteArray::to_vec() are O(n) bulk 
copies of the underlying data.
+            if let Ok(bytes) = value.downcast::<pyo3::types::PyBytes>() {
+                Ok(bytes.as_bytes().to_vec().into())
+            } else if let Ok(bytearray) = 
value.downcast::<pyo3::types::PyByteArray>() {
+                Ok(bytearray.to_vec().into())
+            } else {
+                Err(FlussError::new_err(format!(
+                    "Expected bytes or bytearray, got {}",
+                    value.get_type().name()?
+                )))
+            }
+        }
+        _ => Err(FlussError::new_err(format!(
+            "Unsupported data type for row-level operations: {:?}",
+            data_type
+        ))),
     }
 }
 

Reply via email to