This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 60e4c09 feat: Introduce python bindings row-based append API (#142)
60e4c09 is described below
commit 60e4c09bb9bc7441c7a41ac43d11b8ae6f5ec5c1
Author: Anton Borisov <[email protected]>
AuthorDate: Sat Jan 17 01:53:30 2026 +0000
feat: Introduce python bindings row-based append API (#142)
---
.gitignore | 11 +-
bindings/python/example/example.py | 14 ++-
bindings/python/fluss/__init__.pyi | 26 ++++
bindings/python/src/table.rs | 250 ++++++++++++++++++++++++++++++++++---
4 files changed, 283 insertions(+), 18 deletions(-)
diff --git a/.gitignore b/.gitignore
index c6edfb7..8202bbc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,4 +17,13 @@ Cargo.lock
# and can be added to the global gitignore or merged into this file. For a
more nuclear
# option (not recommended) you can uncomment the following to ignore the
entire idea folder.
.idea/
-.vscode/
\ No newline at end of file
+.vscode/
+
+# Python
+__pycache__/
+*.py[cod]
+*$py.class
+*.so
+*.egg-info/
+dist/
+build/
\ No newline at end of file
diff --git a/bindings/python/example/example.py
b/bindings/python/example/example.py
index 0b1e67d..f1f20d1 100644
--- a/bindings/python/example/example.py
+++ b/bindings/python/example/example.py
@@ -118,11 +118,21 @@ async def main():
append_writer.write_arrow_batch(pa_record_batch)
print("Successfully wrote PyArrow RecordBatch")
- # Test 3: Write Pandas DataFrame
+ # Test 3: Append single rows
+ print("\n--- Testing single row append ---")
+ # Dict input
+ await append_writer.append({"id": 8, "name": "Helen", "score": 93.5,
"age": 26})
+ print("Successfully appended row (dict)")
+
+ # List input
+ await append_writer.append([9, "Ivan", 90.0, 31])
+ print("Successfully appended row (list)")
+
+ # Test 4: Write Pandas DataFrame
print("\n--- Testing Pandas DataFrame write ---")
df = pd.DataFrame(
{
- "id": [6, 7],
+ "id": [10, 11],
"name": ["Frank", "Grace"],
"score": [89.3, 94.7],
"age": [29, 27],
diff --git a/bindings/python/fluss/__init__.pyi
b/bindings/python/fluss/__init__.pyi
index 4565242..6073070 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -68,6 +68,32 @@ class FlussTable:
def __repr__(self) -> str: ...
class AppendWriter:
+ async def append(self, row: dict | list | tuple) -> None:
+ """Append a single row to the table.
+
+ Args:
+ row: Dictionary mapping field names to values, or
+ list/tuple of values in schema order
+
+ Supported Types:
+ Currently supports primitive types only:
+ - Boolean, TinyInt, SmallInt, Int, BigInt (integers)
+ - Float, Double (floating point)
+ - String, Char (text)
+ - Bytes, Binary (binary data)
+ - Null values
+
+ Temporal types (Date, Timestamp, Decimal) are not yet supported.
+
+ Example:
+ await writer.append({'id': 1, 'name': 'Alice', 'score': 95.5})
+ await writer.append([1, 'Alice', 95.5])
+
+ Note:
+ For high-throughput bulk loading, prefer write_arrow_batch().
+ Use flush() to ensure all queued records are sent and acknowledged.
+ """
+ ...
def write_arrow(self, table: pa.Table) -> None: ...
def write_arrow_batch(self, batch: pa.RecordBatch) -> None: ...
def write_pandas(self, df: pd.DataFrame) -> None: ...
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 6cd13c4..db85c51 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -49,7 +49,7 @@ impl FlussTable {
let table_info = self.table_info.clone();
future_into_py(py, async move {
- let fluss_table = fcore::client::FlussTable::new(&conn, metadata,
table_info);
+ let fluss_table = fcore::client::FlussTable::new(&conn, metadata,
table_info.clone());
let table_append = fluss_table
.new_append()
@@ -57,7 +57,7 @@ impl FlussTable {
let rust_writer = table_append.create_writer();
- let py_writer = AppendWriter::from_core(rust_writer);
+ let py_writer = AppendWriter::from_core(rust_writer, table_info);
Python::attach(|py| Py::new(py, py_writer))
})
@@ -193,13 +193,14 @@ impl FlussTable {
/// Writer for appending data to a Fluss table
#[pyclass]
pub struct AppendWriter {
- inner: fcore::client::AppendWriter,
+ inner: Arc<fcore::client::AppendWriter>,
+ table_info: fcore::metadata::TableInfo,
}
#[pymethods]
impl AppendWriter {
/// Write Arrow table data
- pub fn write_arrow(&mut self, py: Python, table: Py<PyAny>) ->
PyResult<()> {
+ pub fn write_arrow(&self, py: Python, table: Py<PyAny>) -> PyResult<()> {
// Convert Arrow Table to batches and write each batch
let batches = table.call_method0(py, "to_batches")?;
let batch_list: Vec<Py<PyAny>> = batches.extract(py)?;
@@ -211,22 +212,40 @@ impl AppendWriter {
}
/// Write Arrow batch data
- pub fn write_arrow_batch(&mut self, py: Python, batch: Py<PyAny>) ->
PyResult<()> {
+ pub fn write_arrow_batch(&self, py: Python, batch: Py<PyAny>) ->
PyResult<()> {
// This shares the underlying Arrow buffers without copying data
let batch_bound = batch.bind(py);
let rust_batch: RecordBatch =
FromPyArrow::from_pyarrow_bound(batch_bound)
.map_err(|e| FlussError::new_err(format!("Failed to convert
RecordBatch: {e}")))?;
+ let inner = self.inner.clone();
// Release the GIL before blocking on async operation
let result = py.detach(|| {
- TOKIO_RUNTIME.block_on(async {
self.inner.append_arrow_batch(rust_batch).await })
+ TOKIO_RUNTIME.block_on(async {
inner.append_arrow_batch(rust_batch).await })
});
result.map_err(|e| FlussError::new_err(e.to_string()))
}
+ /// Append a single row to the table
+ pub fn append<'py>(
+ &self,
+ py: Python<'py>,
+ row: &Bound<'py, PyAny>,
+ ) -> PyResult<Bound<'py, PyAny>> {
+ let generic_row = python_to_generic_row(row, &self.table_info)?;
+ let inner = self.inner.clone();
+
+ future_into_py(py, async move {
+ inner
+ .append(generic_row)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })
+ }
+
/// Write Pandas DataFrame data
- pub fn write_pandas(&mut self, py: Python, df: Py<PyAny>) -> PyResult<()> {
+ pub fn write_pandas(&self, py: Python, df: Py<PyAny>) -> PyResult<()> {
// Import pyarrow module
let pyarrow = py.import("pyarrow")?;
@@ -241,12 +260,16 @@ impl AppendWriter {
}
/// Flush any pending data
- pub fn flush(&mut self) -> PyResult<()> {
- TOKIO_RUNTIME.block_on(async {
- self.inner
- .flush()
- .await
- .map_err(|e| FlussError::new_err(e.to_string()))
+ pub fn flush(&self, py: Python) -> PyResult<()> {
+ let inner = self.inner.clone();
+ // Release the GIL before blocking on I/O
+ py.detach(|| {
+ TOKIO_RUNTIME.block_on(async {
+ inner
+ .flush()
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))
+ })
})
}
@@ -257,8 +280,205 @@ impl AppendWriter {
impl AppendWriter {
/// Create a AppendWriter from a core append writer
- pub fn from_core(append: fcore::client::AppendWriter) -> Self {
- Self { inner: append }
+ pub fn from_core(
+ append: fcore::client::AppendWriter,
+ table_info: fcore::metadata::TableInfo,
+ ) -> Self {
+ Self {
+ inner: Arc::new(append),
+ table_info,
+ }
+ }
+}
+
+/// Represents different input shapes for a row
+#[derive(FromPyObject)]
+enum RowInput<'py> {
+ Dict(Bound<'py, pyo3::types::PyDict>),
+ Tuple(Bound<'py, pyo3::types::PyTuple>),
+ List(Bound<'py, pyo3::types::PyList>),
+}
+
+/// Helper function to process sequence types (list/tuple) into datums
+fn process_sequence_to_datums<'a, I>(
+ values: I,
+ len: usize,
+ fields: &[fcore::metadata::DataField],
+) -> PyResult<Vec<fcore::row::Datum<'static>>>
+where
+ I: Iterator<Item = Bound<'a, PyAny>>,
+{
+ if len != fields.len() {
+ return Err(FlussError::new_err(format!(
+ "Expected {} values, got {}",
+ fields.len(),
+ len
+ )));
+ }
+
+ let mut datums = Vec::with_capacity(fields.len());
+ for (i, (field, value)) in fields.iter().zip(values).enumerate() {
+ datums.push(
+ python_value_to_datum(&value, field.data_type()).map_err(|e| {
+ FlussError::new_err(format!("Field '{}' (index {}): {}",
field.name(), i, e))
+ })?,
+ );
+ }
+ Ok(datums)
+}
+
+/// Convert Python row (dict/list/tuple) to GenericRow based on schema
+fn python_to_generic_row(
+ row: &Bound<PyAny>,
+ table_info: &fcore::metadata::TableInfo,
+) -> PyResult<fcore::row::GenericRow<'static>> {
+ // Extract with user-friendly error message
+ let row_input: RowInput = row.extract().map_err(|_| {
+ let type_name = row
+ .get_type()
+ .name()
+ .map(|n| n.to_string())
+ .unwrap_or_else(|_| "unknown".to_string());
+ FlussError::new_err(format!(
+ "Row must be a dict, list, or tuple; got {}",
+ type_name
+ ))
+ })?;
+ let schema = table_info.row_type();
+ let fields = schema.fields();
+
+ let datums = match row_input {
+ RowInput::Dict(dict) => {
+ // Strict: reject unknown keys (and also reject non-str keys
nicely)
+ for (k, _) in dict.iter() {
+ let key_str = k.extract::<&str>().map_err(|_| {
+ let key_type = k
+ .get_type()
+ .name()
+ .map(|n| n.to_string())
+ .unwrap_or_else(|_| "unknown".to_string());
+ FlussError::new_err(format!("Row dict keys must be
strings; got {}", key_type))
+ })?;
+
+ if fields.iter().all(|f| f.name() != key_str) {
+ let expected = fields
+ .iter()
+ .map(|f| f.name())
+ .collect::<Vec<_>>()
+ .join(", ");
+ return Err(FlussError::new_err(format!(
+ "Unknown field '{}'. Expected fields: {}",
+ key_str, expected
+ )));
+ }
+ }
+
+ let mut datums = Vec::with_capacity(fields.len());
+ for field in fields {
+ let value = dict.get_item(field.name())?.ok_or_else(|| {
+ FlussError::new_err(format!("Missing field: {}",
field.name()))
+ })?;
+ datums.push(
+ python_value_to_datum(&value,
field.data_type()).map_err(|e| {
+ FlussError::new_err(format!("Field '{}': {}",
field.name(), e))
+ })?,
+ );
+ }
+ datums
+ }
+
+ RowInput::List(list) => process_sequence_to_datums(list.iter(),
list.len(), fields)?,
+
+ RowInput::Tuple(tuple) => process_sequence_to_datums(tuple.iter(),
tuple.len(), fields)?,
+ };
+
+ Ok(fcore::row::GenericRow { values: datums })
+}
+
+/// Convert Python value to Datum based on data type
+fn python_value_to_datum(
+ value: &Bound<PyAny>,
+ data_type: &fcore::metadata::DataType,
+) -> PyResult<fcore::row::Datum<'static>> {
+ use fcore::row::{Datum, F32, F64};
+
+ if value.is_none() {
+ return Ok(Datum::Null);
+ }
+
+ match data_type {
+ fcore::metadata::DataType::Boolean(_) => {
+ let v: bool = value.extract()?;
+ Ok(Datum::Bool(v))
+ }
+ fcore::metadata::DataType::TinyInt(_) => {
+ // Strict type checking: reject bool for int columns
+ if value.is_instance_of::<pyo3::types::PyBool>() {
+ return Err(FlussError::new_err(
+ "Expected int for TinyInt column, got bool. Use 0 or 1
explicitly.".to_string(),
+ ));
+ }
+ let v: i8 = value.extract()?;
+ Ok(Datum::Int8(v))
+ }
+ fcore::metadata::DataType::SmallInt(_) => {
+ if value.is_instance_of::<pyo3::types::PyBool>() {
+ return Err(FlussError::new_err(
+ "Expected int for SmallInt column, got bool. Use 0 or 1
explicitly."
+ .to_string(),
+ ));
+ }
+ let v: i16 = value.extract()?;
+ Ok(Datum::Int16(v))
+ }
+ fcore::metadata::DataType::Int(_) => {
+ if value.is_instance_of::<pyo3::types::PyBool>() {
+ return Err(FlussError::new_err(
+ "Expected int for Int column, got bool. Use 0 or 1
explicitly.".to_string(),
+ ));
+ }
+ let v: i32 = value.extract()?;
+ Ok(Datum::Int32(v))
+ }
+ fcore::metadata::DataType::BigInt(_) => {
+ if value.is_instance_of::<pyo3::types::PyBool>() {
+ return Err(FlussError::new_err(
+ "Expected int for BigInt column, got bool. Use 0 or 1
explicitly.".to_string(),
+ ));
+ }
+ let v: i64 = value.extract()?;
+ Ok(Datum::Int64(v))
+ }
+ fcore::metadata::DataType::Float(_) => {
+ let v: f32 = value.extract()?;
+ Ok(Datum::Float32(F32::from(v)))
+ }
+ fcore::metadata::DataType::Double(_) => {
+ let v: f64 = value.extract()?;
+ Ok(Datum::Float64(F64::from(v)))
+ }
+ fcore::metadata::DataType::String(_) |
fcore::metadata::DataType::Char(_) => {
+ let v: String = value.extract()?;
+ Ok(v.into())
+ }
+ fcore::metadata::DataType::Bytes(_) |
fcore::metadata::DataType::Binary(_) => {
+ // Efficient extraction: downcast to specific type and use bulk
copy.
+ // PyBytes::as_bytes() and PyByteArray::to_vec() are O(n) bulk
copies of the underlying data.
+ if let Ok(bytes) = value.downcast::<pyo3::types::PyBytes>() {
+ Ok(bytes.as_bytes().to_vec().into())
+ } else if let Ok(bytearray) =
value.downcast::<pyo3::types::PyByteArray>() {
+ Ok(bytearray.to_vec().into())
+ } else {
+ Err(FlussError::new_err(format!(
+ "Expected bytes or bytearray, got {}",
+ value.get_type().name()?
+ )))
+ }
+ }
+ _ => Err(FlussError::new_err(format!(
+ "Unsupported data type for row-level operations: {:?}",
+ data_type
+ ))),
}
}