This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 57ac912  [chore] Bump arrow version to 57 (#58)
57ac912 is described below

commit 57ac91249475befa3faad54da6b3771fee66e459
Author: yuxia Luo <[email protected]>
AuthorDate: Mon Nov 24 10:40:02 2025 +0800

    [chore] Bump arrow version to 57 (#58)
---
 Cargo.toml                        |  2 +-
 bindings/python/Cargo.toml        |  8 +++++---
 bindings/python/src/admin.rs      |  6 +++---
 bindings/python/src/connection.rs | 12 ++++++------
 bindings/python/src/metadata.rs   |  6 +++---
 bindings/python/src/table.rs      | 18 +++++++++---------
 bindings/python/src/utils.rs      | 30 ++++++++++++++++++------------
 crates/fluss/Cargo.toml           |  2 +-
 8 files changed, 46 insertions(+), 38 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 54436ac..e745d95 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -34,5 +34,5 @@ members = ["crates/fluss", "crates/examples", 
"bindings/python"]
 fluss = { version = "0.1.0", path = "./crates/fluss" }
 tokio = { version = "1.44.2", features = ["full"] }
 clap = { version = "4.5.37", features = ["derive"] }
-arrow = "55.1.0"
+arrow = "57.0.0"
 chrono = { version = "0.4", features = ["clock", "std", "wasmbind"] }
diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml
index 04826fb..9ecc629 100644
--- a/bindings/python/Cargo.toml
+++ b/bindings/python/Cargo.toml
@@ -27,11 +27,13 @@ name = "fluss"
 crate-type = ["cdylib"]
 
 [dependencies]
-pyo3 = { version = "0.24", features = ["extension-module"] }
+pyo3 = { version = "0.26.0", features = ["extension-module"] }
 fluss = { path = "../../crates/fluss" }
 tokio = { workspace = true }
 arrow = { workspace = true }
-arrow-pyarrow = "55.1.0"
-pyo3-async-runtimes = { version = "0.24.0", features = ["tokio-runtime"] }
+arrow-pyarrow = "57.0.0"
+arrow-schema = "57.0.0"
+arrow-array = "57.0.0"
+pyo3-async-runtimes = { version = "0.26.0", features = ["tokio-runtime"] }
 chrono = { workspace = true }
 once_cell = "1.21.3"
diff --git a/bindings/python/src/admin.rs b/bindings/python/src/admin.rs
index 73b2dd3..fa189eb 100644
--- a/bindings/python/src/admin.rs
+++ b/bindings/python/src/admin.rs
@@ -48,7 +48,7 @@ impl FlussAdmin {
                 .await
                 .map_err(|e| FlussError::new_err(e.to_string()))?;
 
-            Python::with_gil(|py| Ok(py.None()))
+            Python::attach(|py| Ok(py.None()))
         })
     }
 
@@ -67,7 +67,7 @@ impl FlussAdmin {
                 .await
                 .map_err(|e| FlussError::new_err(format!("Failed to get table: 
{e}")))?;
 
-            Python::with_gil(|py| {
+            Python::attach(|py| {
                 let table_info = TableInfo::from_core(core_table_info);
                 Py::new(py, table_info)
             })
@@ -89,7 +89,7 @@ impl FlussAdmin {
                 .await
                 .map_err(|e| FlussError::new_err(format!("Failed to get lake 
snapshot: {e}")))?;
 
-            Python::with_gil(|py| {
+            Python::attach(|py| {
                 let lake_snapshot = 
LakeSnapshot::from_core(core_lake_snapshot);
                 Py::new(py, lake_snapshot)
             })
diff --git a/bindings/python/src/connection.rs 
b/bindings/python/src/connection.rs
index aeb8410..a7559ce 100644
--- a/bindings/python/src/connection.rs
+++ b/bindings/python/src/connection.rs
@@ -41,7 +41,7 @@ impl FlussConnection {
                 inner: Arc::new(connection),
             };
 
-            Python::with_gil(|py| Py::new(py, py_connection))
+            Python::attach(|py| Py::new(py, py_connection))
         })
     }
 
@@ -57,7 +57,7 @@ impl FlussConnection {
 
             let py_admin = FlussAdmin::from_core(admin);
 
-            Python::with_gil(|py| Py::new(py, py_admin))
+            Python::attach(|py| Py::new(py, py_admin))
         })
     }
 
@@ -84,7 +84,7 @@ impl FlussConnection {
                 core_table.has_primary_key(),
             );
 
-            Python::with_gil(|py| Py::new(py, py_table))
+            Python::attach(|py| Py::new(py, py_table))
         })
     }
 
@@ -102,9 +102,9 @@ impl FlussConnection {
     #[pyo3(signature = (_exc_type=None, _exc_value=None, _traceback=None))]
     fn __exit__(
         &mut self,
-        _exc_type: Option<PyObject>,
-        _exc_value: Option<PyObject>,
-        _traceback: Option<PyObject>,
+        _exc_type: Option<Bound<'_, PyAny>>,
+        _exc_value: Option<Bound<'_, PyAny>>,
+        _traceback: Option<Bound<'_, PyAny>>,
     ) -> PyResult<bool> {
         self.close()?;
         Ok(false)
diff --git a/bindings/python/src/metadata.rs b/bindings/python/src/metadata.rs
index 66748ab..bc5f288 100644
--- a/bindings/python/src/metadata.rs
+++ b/bindings/python/src/metadata.rs
@@ -106,7 +106,7 @@ impl Schema {
     #[new]
     #[pyo3(signature = (schema, primary_keys=None))]
     pub fn new(
-        schema: PyObject, // PyArrow schema
+        schema: Py<PyAny>, // PyArrow schema
         primary_keys: Option<Vec<String>>,
     ) -> PyResult<Self> {
         let arrow_schema = 
crate::utils::Utils::pyarrow_to_arrow_schema(&schema)?;
@@ -553,7 +553,7 @@ impl LakeSnapshot {
 
     /// Get table bucket offsets as a Python dictionary with TableBucket keys
     #[getter]
-    pub fn table_buckets_offset(&self, py: Python) -> PyResult<PyObject> {
+    pub fn table_buckets_offset(&self, py: Python) -> PyResult<Py<PyAny>> {
         let dict = PyDict::new(py);
         for (bucket, offset) in &self.table_buckets_offset {
             let py_bucket = TableBucket::from_core(bucket.clone());
@@ -569,7 +569,7 @@ impl LakeSnapshot {
     }
 
     /// Get all table buckets
-    pub fn get_table_buckets(&self, py: Python) -> PyResult<Vec<PyObject>> {
+    pub fn get_table_buckets(&self, py: Python) -> PyResult<Vec<Py<PyAny>>> {
         let mut buckets = Vec::new();
         for bucket in self.table_buckets_offset.keys() {
             let py_bucket = TableBucket::from_core(bucket.clone());
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index c255fa6..2a8df25 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -51,7 +51,7 @@ impl FlussTable {
 
             let py_writer = AppendWriter::from_core(rust_writer);
 
-            Python::with_gil(|py| Py::new(py, py_writer))
+            Python::attach(|py| Py::new(py, py_writer))
         })
     }
 
@@ -75,7 +75,7 @@ impl FlussTable {
                 .map_err(|e| FlussError::new_err(e.to_string()))?;
 
             let py_scanner = LogScanner::from_core(rust_scanner, admin, 
table_info.clone());
-            Python::with_gil(|py| Py::new(py, py_scanner))
+            Python::attach(|py| Py::new(py, py_scanner))
         })
     }
 
@@ -131,10 +131,10 @@ pub struct AppendWriter {
 #[pymethods]
 impl AppendWriter {
     /// Write Arrow table data
-    pub fn write_arrow(&mut self, py: Python, table: PyObject) -> PyResult<()> 
{
+    pub fn write_arrow(&mut self, py: Python, table: Py<PyAny>) -> 
PyResult<()> {
         // Convert Arrow Table to batches and write each batch
         let batches = table.call_method0(py, "to_batches")?;
-        let batch_list: Vec<PyObject> = batches.extract(py)?;
+        let batch_list: Vec<Py<PyAny>> = batches.extract(py)?;
 
         for batch in batch_list {
             self.write_arrow_batch(py, batch)?;
@@ -143,7 +143,7 @@ impl AppendWriter {
     }
 
     /// Write Arrow batch data
-    pub fn write_arrow_batch(&mut self, py: Python, batch: PyObject) -> 
PyResult<()> {
+    pub fn write_arrow_batch(&mut self, py: Python, batch: Py<PyAny>) -> 
PyResult<()> {
         // Extract number of rows and columns from the Arrow batch
         let num_rows: usize = batch.getattr(py, "num_rows")?.extract(py)?;
         let num_columns: usize = batch.getattr(py, 
"num_columns")?.extract(py)?;
@@ -175,7 +175,7 @@ impl AppendWriter {
     }
 
     /// Write Pandas DataFrame data
-    pub fn write_pandas(&mut self, py: Python, df: PyObject) -> PyResult<()> {
+    pub fn write_pandas(&mut self, py: Python, df: Py<PyAny>) -> PyResult<()> {
         // Import pyarrow module
         let pyarrow = py.import("pyarrow")?;
 
@@ -213,7 +213,7 @@ impl AppendWriter {
     fn convert_python_value_to_datum(
         &self,
         py: Python,
-        value: PyObject,
+        value: Py<PyAny>,
     ) -> PyResult<fcore::row::Datum<'static>> {
         use fcore::row::{Blob, Datum, F32, F64};
 
@@ -321,7 +321,7 @@ impl LogScanner {
     }
 
     /// Convert all data to Arrow Table
-    fn to_arrow(&self, py: Python) -> PyResult<PyObject> {
+    fn to_arrow(&self, py: Python) -> PyResult<Py<PyAny>> {
         use std::collections::HashMap;
         use std::time::Duration;
 
@@ -387,7 +387,7 @@ impl LogScanner {
     }
 
     /// Convert all data to Pandas DataFrame
-    fn to_pandas(&self, py: Python) -> PyResult<PyObject> {
+    fn to_pandas(&self, py: Python) -> PyResult<Py<PyAny>> {
         let arrow_table = self.to_arrow(py)?;
 
         // Convert Arrow Table to Pandas DataFrame using pyarrow
diff --git a/bindings/python/src/utils.rs b/bindings/python/src/utils.rs
index 93933b3..09e6b5f 100644
--- a/bindings/python/src/utils.rs
+++ b/bindings/python/src/utils.rs
@@ -16,8 +16,8 @@
 // under the License.
 
 use crate::*;
-use arrow::datatypes::{Schema as ArrowSchema, SchemaRef};
-use arrow_pyarrow::ToPyArrow;
+use arrow_pyarrow::{FromPyArrow, ToPyArrow};
+use arrow_schema::SchemaRef;
 use std::sync::Arc;
 
 /// Utilities for schema conversion between PyArrow, Arrow, and Fluss
@@ -25,11 +25,10 @@ pub struct Utils;
 
 impl Utils {
     /// Convert PyArrow schema to Rust Arrow schema
-    pub fn pyarrow_to_arrow_schema(py_schema: &PyObject) -> 
PyResult<SchemaRef> {
-        Python::with_gil(|py| {
+    pub fn pyarrow_to_arrow_schema(py_schema: &Py<PyAny>) -> 
PyResult<SchemaRef> {
+        Python::attach(|py| {
             let schema_bound = py_schema.bind(py);
-
-            let schema: ArrowSchema = 
arrow_pyarrow::FromPyArrow::from_pyarrow_bound(schema_bound)
+            let schema: arrow_schema::Schema = 
FromPyArrow::from_pyarrow_bound(schema_bound)
                 .map_err(|e| {
                     FlussError::new_err(format!("Failed to convert PyArrow 
schema: {e}"))
                 })?;
@@ -172,14 +171,21 @@ impl Utils {
     pub fn combine_batches_to_table(
         py: Python,
         batches: Vec<Arc<arrow::record_batch::RecordBatch>>,
-    ) -> PyResult<PyObject> {
-        // Convert Rust Arrow RecordBatch to PyObject
-        let py_batches: Result<Vec<PyObject>, _> = batches
+    ) -> PyResult<Py<PyAny>> {
+        use arrow_array::RecordBatch as ArrowArrayRecordBatch;
+
+        let py_batches: Result<Vec<Py<PyAny>>, _> = batches
             .iter()
             .map(|batch| {
-                batch.as_ref().to_pyarrow(py).map_err(|e| {
-                    FlussError::new_err(format!("Failed to convert RecordBatch 
to PyObject: {e}"))
-                })
+                ArrowArrayRecordBatch::try_new(batch.schema().clone(), 
batch.columns().to_vec())
+                    .map_err(|e| FlussError::new_err(format!("Failed to 
convert RecordBatch: {e}")))
+                    .and_then(|b| {
+                        ToPyArrow::to_pyarrow(&b, py)
+                            .map(|x| x.into())
+                            .map_err(|e| {
+                                FlussError::new_err(format!("Failed to convert 
to PyObject: {e}"))
+                            })
+                    })
             })
             .collect();
 
diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index ab1efc2..af77037 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -24,7 +24,7 @@ build = "src/build.rs"
 
 [dependencies]
 arrow = { workspace = true }
-arrow-schema = "55.1.0"
+arrow-schema = "57.0.0"
 byteorder = "1.5"
 futures = "0.3"
 clap = { workspace = true }

Reply via email to