This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 4b8c289 chore: fix Box leaking and batch API refactor (#136)
4b8c289 is described below
commit 4b8c289a3ad0cb330c8b5969ed3ab4f851451e52
Author: Anton Borisov <[email protected]>
AuthorDate: Sat Jan 10 16:04:31 2026 +0000
chore: fix Box leaking and batch API refactor (#136)
---
bindings/python/src/table.rs | 100 +++++--------------------------------------
1 file changed, 11 insertions(+), 89 deletions(-)
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index 71759d7..8a11648 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -17,6 +17,8 @@
use crate::TOKIO_RUNTIME;
use crate::*;
+use arrow::array::RecordBatch;
+use arrow_pyarrow::FromPyArrow;
use fluss::client::EARLIEST_OFFSET;
use fluss::rpc::message::OffsetSpec;
use pyo3_async_runtimes::tokio::future_into_py;
@@ -148,34 +150,17 @@ impl AppendWriter {
/// Write Arrow batch data
pub fn write_arrow_batch(&mut self, py: Python, batch: Py<PyAny>) ->
PyResult<()> {
- // Extract number of rows and columns from the Arrow batch
- let num_rows: usize = batch.getattr(py, "num_rows")?.extract(py)?;
- let num_columns: usize = batch.getattr(py,
"num_columns")?.extract(py)?;
-
- // Process each row in the batch
- for row_idx in 0..num_rows {
- let mut generic_row = fcore::row::GenericRow::new();
-
- // Extract values for each column in this row
- for col_idx in 0..num_columns {
- let column = batch.call_method1(py, "column", (col_idx,))?;
- let value = column.call_method1(py, "__getitem__",
(row_idx,))?;
-
- // Convert the Python value to a Datum and add to the row
- let datum = self.convert_python_value_to_datum(py, value)?;
- generic_row.set_field(col_idx, datum);
- }
+ // This shares the underlying Arrow buffers without copying data
+ let batch_bound = batch.bind(py);
+ let rust_batch: RecordBatch =
FromPyArrow::from_pyarrow_bound(batch_bound)
+ .map_err(|e| FlussError::new_err(format!("Failed to convert
RecordBatch: {e}")))?;
- // Append this row using the async append method
- TOKIO_RUNTIME.block_on(async {
- self.inner
- .append(generic_row)
- .await
- .map_err(|e| FlussError::new_err(e.to_string()))
- })?;
- }
+ // Release the GIL before blocking on async operation
+ let result = py.detach(|| {
+ TOKIO_RUNTIME.block_on(async {
self.inner.append_arrow_batch(rust_batch).await })
+ });
- Ok(())
+ result.map_err(|e| FlussError::new_err(e.to_string()))
}
/// Write Pandas DataFrame data
@@ -213,69 +198,6 @@ impl AppendWriter {
pub fn from_core(append: fcore::client::AppendWriter) -> Self {
Self { inner: append }
}
-
- fn convert_python_value_to_datum(
- &self,
- py: Python,
- value: Py<PyAny>,
- ) -> PyResult<fcore::row::Datum<'static>> {
- use fcore::row::{Blob, Datum, F32, F64};
-
- // Check for None (null)
- if value.is_none(py) {
- return Ok(Datum::Null);
- }
-
- // Try to extract different types
- if let Ok(type_name) = value.bind(py).get_type().name() {
- if type_name == "StringScalar" {
- if let Ok(py_value) = value.call_method0(py, "as_py") {
- if let Ok(str_val) = py_value.extract::<String>(py) {
- let leaked_str: &'static str =
Box::leak(str_val.into_boxed_str());
- return Ok(Datum::String(leaked_str));
- }
- }
- }
- }
-
- if let Ok(bool_val) = value.extract::<bool>(py) {
- return Ok(Datum::Bool(bool_val));
- }
-
- if let Ok(int_val) = value.extract::<i32>(py) {
- return Ok(Datum::Int32(int_val));
- }
-
- if let Ok(int_val) = value.extract::<i64>(py) {
- return Ok(Datum::Int64(int_val));
- }
-
- if let Ok(float_val) = value.extract::<f32>(py) {
- return Ok(Datum::Float32(F32::from(float_val)));
- }
-
- if let Ok(float_val) = value.extract::<f64>(py) {
- return Ok(Datum::Float64(F64::from(float_val)));
- }
-
- if let Ok(str_val) = value.extract::<String>(py) {
- // Convert String to &'static str by leaking memory
- // This is a simplified approach - in production, you might want
better lifetime management
- let leaked_str: &'static str = Box::leak(str_val.into_boxed_str());
- return Ok(Datum::String(leaked_str));
- }
-
- if let Ok(bytes_val) = value.extract::<Vec<u8>>(py) {
- let blob = Blob::from(bytes_val);
- return Ok(Datum::Blob(blob));
- }
-
- // If we can't convert, return an error
- let type_name = value.bind(py).get_type().name()?;
- Err(FlussError::new_err(format!(
- "Cannot convert Python value to Datum: {type_name:?}"
- )))
- }
}
/// Scanner for reading log data from a Fluss table