This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b8c289  chore: fix Box leaking and batch API refactor (#136)
4b8c289 is described below

commit 4b8c289a3ad0cb330c8b5969ed3ab4f851451e52
Author: Anton Borisov <[email protected]>
AuthorDate: Sat Jan 10 16:04:31 2026 +0000

    chore: fix Box leaking and batch API refactor (#136)
---
 bindings/python/src/table.rs | 100 +++++--------------------------------------
 1 file changed, 11 insertions(+), 89 deletions(-)

diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 71759d7..8a11648 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -17,6 +17,8 @@
 
 use crate::TOKIO_RUNTIME;
 use crate::*;
+use arrow::array::RecordBatch;
+use arrow_pyarrow::FromPyArrow;
 use fluss::client::EARLIEST_OFFSET;
 use fluss::rpc::message::OffsetSpec;
 use pyo3_async_runtimes::tokio::future_into_py;
@@ -148,34 +150,17 @@ impl AppendWriter {
 
     /// Write Arrow batch data
     pub fn write_arrow_batch(&mut self, py: Python, batch: Py<PyAny>) -> 
PyResult<()> {
-        // Extract number of rows and columns from the Arrow batch
-        let num_rows: usize = batch.getattr(py, "num_rows")?.extract(py)?;
-        let num_columns: usize = batch.getattr(py, 
"num_columns")?.extract(py)?;
-
-        // Process each row in the batch
-        for row_idx in 0..num_rows {
-            let mut generic_row = fcore::row::GenericRow::new();
-
-            // Extract values for each column in this row
-            for col_idx in 0..num_columns {
-                let column = batch.call_method1(py, "column", (col_idx,))?;
-                let value = column.call_method1(py, "__getitem__", 
(row_idx,))?;
-
-                // Convert the Python value to a Datum and add to the row
-                let datum = self.convert_python_value_to_datum(py, value)?;
-                generic_row.set_field(col_idx, datum);
-            }
+        // This shares the underlying Arrow buffers without copying data
+        let batch_bound = batch.bind(py);
+        let rust_batch: RecordBatch = 
FromPyArrow::from_pyarrow_bound(batch_bound)
+            .map_err(|e| FlussError::new_err(format!("Failed to convert 
RecordBatch: {e}")))?;
 
-            // Append this row using the async append method
-            TOKIO_RUNTIME.block_on(async {
-                self.inner
-                    .append(generic_row)
-                    .await
-                    .map_err(|e| FlussError::new_err(e.to_string()))
-            })?;
-        }
+        // Release the GIL before blocking on async operation
+        let result = py.detach(|| {
+            TOKIO_RUNTIME.block_on(async { 
self.inner.append_arrow_batch(rust_batch).await })
+        });
 
-        Ok(())
+        result.map_err(|e| FlussError::new_err(e.to_string()))
     }
 
     /// Write Pandas DataFrame data
@@ -213,69 +198,6 @@ impl AppendWriter {
     pub fn from_core(append: fcore::client::AppendWriter) -> Self {
         Self { inner: append }
     }
-
-    fn convert_python_value_to_datum(
-        &self,
-        py: Python,
-        value: Py<PyAny>,
-    ) -> PyResult<fcore::row::Datum<'static>> {
-        use fcore::row::{Blob, Datum, F32, F64};
-
-        // Check for None (null)
-        if value.is_none(py) {
-            return Ok(Datum::Null);
-        }
-
-        // Try to extract different types
-        if let Ok(type_name) = value.bind(py).get_type().name() {
-            if type_name == "StringScalar" {
-                if let Ok(py_value) = value.call_method0(py, "as_py") {
-                    if let Ok(str_val) = py_value.extract::<String>(py) {
-                        let leaked_str: &'static str = 
Box::leak(str_val.into_boxed_str());
-                        return Ok(Datum::String(leaked_str));
-                    }
-                }
-            }
-        }
-
-        if let Ok(bool_val) = value.extract::<bool>(py) {
-            return Ok(Datum::Bool(bool_val));
-        }
-
-        if let Ok(int_val) = value.extract::<i32>(py) {
-            return Ok(Datum::Int32(int_val));
-        }
-
-        if let Ok(int_val) = value.extract::<i64>(py) {
-            return Ok(Datum::Int64(int_val));
-        }
-
-        if let Ok(float_val) = value.extract::<f32>(py) {
-            return Ok(Datum::Float32(F32::from(float_val)));
-        }
-
-        if let Ok(float_val) = value.extract::<f64>(py) {
-            return Ok(Datum::Float64(F64::from(float_val)));
-        }
-
-        if let Ok(str_val) = value.extract::<String>(py) {
-            // Convert String to &'static str by leaking memory
-            // This is a simplified approach - in production, you might want 
better lifetime management
-            let leaked_str: &'static str = Box::leak(str_val.into_boxed_str());
-            return Ok(Datum::String(leaked_str));
-        }
-
-        if let Ok(bytes_val) = value.extract::<Vec<u8>>(py) {
-            let blob = Blob::from(bytes_val);
-            return Ok(Datum::Blob(blob));
-        }
-
-        // If we can't convert, return an error
-        let type_name = value.bind(py).get_type().name()?;
-        Err(FlussError::new_err(format!(
-            "Cannot convert Python value to Datum: {type_name:?}"
-        )))
-    }
 }
 
 /// Scanner for reading log data from a Fluss table

Reply via email to