This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 65b6d48  [feat] Create Python bindings for Fluss Admin (#6)
65b6d48 is described below

commit 65b6d481759133a9d59f022bd8f9a3e2f6512092
Author: naivedogger <[email protected]>
AuthorDate: Thu Sep 18 12:09:53 2025 +0800

    [feat] Create Python bindings for Fluss Admin (#6)
---
 bindings/python/fluss/__init__.py |  20 +++++
 bindings/python/src/admin.rs      | 107 +++++++++++++++++++++++
 bindings/python/src/connection.rs | 117 +++++++++++++++++++++++++
 bindings/python/src/error.rs      |  39 +++++++++
 bindings/python/src/lib.rs        |  67 ++++++++++++++
 bindings/python/src/utils.rs      | 178 ++++++++++++++++++++++++++++++++++++++
 6 files changed, 528 insertions(+)

diff --git a/bindings/python/fluss/__init__.py 
b/bindings/python/fluss/__init__.py
new file mode 100644
index 0000000..cceee10
--- /dev/null
+++ b/bindings/python/fluss/__init__.py
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from .fluss_python import *
+
+__version__ = "0.1.0"
diff --git a/bindings/python/src/admin.rs b/bindings/python/src/admin.rs
new file mode 100644
index 0000000..7ec6eee
--- /dev/null
+++ b/bindings/python/src/admin.rs
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use pyo3::prelude::*;
+use pyo3_async_runtimes::tokio::future_into_py;
+use crate::*;
+use std::sync::Arc;
+
+/// Administrative client for managing Fluss tables
+#[pyclass]
+pub struct FlussAdmin {
+    __admin: Arc<fcore::client::FlussAdmin>,
+}
+
+#[pymethods]
+impl FlussAdmin {
+    /// Create a table with the given schema
+    #[pyo3(signature = (table_path, table_descriptor, ignore_if_exists=None))]
+    pub fn create_table<'py>(
+        &self,
+        py: Python<'py>,
+        table_path: &TablePath,
+        table_descriptor: &TableDescriptor,
+        ignore_if_exists: Option<bool>,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let ignore = ignore_if_exists.unwrap_or(false);
+        
+        let core_table_path = table_path.to_core().clone();
+        let core_descriptor = table_descriptor.to_core().clone();
+        let admin = self.__admin.clone();
+
+        future_into_py(py, async move {
+            admin.create_table(&core_table_path, &core_descriptor, ignore)
+                .await
+                .map_err(|e| FlussError::new_err(e.to_string()))?;
+        
+            Python::with_gil(|py| Ok(py.None()))
+        })
+    }
+
+    /// Get table information
+    pub fn get_table<'py>(
+        &self,
+        py: Python<'py>,
+        table_path: &TablePath,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let core_table_path = table_path.to_core().clone();
+        let admin = self.__admin.clone();
+        
+        future_into_py(py, async move {
+            let core_table_info = admin.get_table(&core_table_path).await
+                .map_err(|e| FlussError::new_err(format!("Failed to get table: 
{}", e)))?;
+
+            Python::with_gil(|py| {
+                let table_info = TableInfo::from_core(core_table_info);
+                Py::new(py, table_info)
+            })
+        })
+    }
+
+    /// Get the latest lake snapshot for a table
+    pub fn get_latest_lake_snapshot<'py>(
+        &self,
+        py: Python<'py>,
+        table_path: &TablePath,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let core_table_path = table_path.to_core().clone();
+        let admin = self.__admin.clone();
+        
+        future_into_py(py, async move {
+            let core_lake_snapshot = 
admin.get_latest_lake_snapshot(&core_table_path).await
+                .map_err(|e| FlussError::new_err(format!("Failed to get lake 
snapshot: {}", e)))?;
+
+            Python::with_gil(|py| {
+                let lake_snapshot = 
LakeSnapshot::from_core(core_lake_snapshot);
+                Py::new(py, lake_snapshot)
+            })
+        })
+    }
+
+    fn __repr__(&self) -> String {
+        "FlussAdmin()".to_string()
+    }
+}
+
+impl FlussAdmin {
+    // Internal method to create FlussAdmin from core admin
+    pub fn from_core(admin: fcore::client::FlussAdmin) -> Self {
+        Self {
+            __admin: Arc::new(admin),
+        }
+    }
+}
diff --git a/bindings/python/src/connection.rs 
b/bindings/python/src/connection.rs
new file mode 100644
index 0000000..ba1fa50
--- /dev/null
+++ b/bindings/python/src/connection.rs
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use pyo3::prelude::*;
+use crate::*;
+use std::sync::Arc;
+use pyo3_async_runtimes::tokio::future_into_py;
+
+/// Connection to a Fluss cluster
+#[pyclass]
+pub struct FlussConnection {
+    inner: Arc<fcore::client::FlussConnection>,
+}
+
+#[pymethods]
+impl FlussConnection {
+    /// Create a new FlussConnection (async)
+    #[staticmethod]
+    fn connect<'py>(py: Python<'py>, config: &Config) -> PyResult<Bound<'py, 
PyAny>> {
+        let rust_config = config.get_core_config();
+
+        future_into_py(py, async move {
+            let connection = fcore::client::FlussConnection::new(rust_config)
+                .await
+                .map_err(|e| FlussError::new_err(e.to_string()))?;
+        
+            let py_connection = FlussConnection {
+                inner: Arc::new(connection),
+            };
+
+            Python::with_gil(|py| {
+                Py::new(py, py_connection)
+            })
+        })
+    }
+    
+    /// Get admin interface
+    fn get_admin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
+        let client = self.inner.clone();
+
+        future_into_py(py, async move {
+            let admin = client.get_admin()
+                .await
+                .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+            let py_admin = FlussAdmin::from_core(admin);
+
+            Python::with_gil(|py| {
+                Py::new(py, py_admin)
+            })
+        })
+    }
+
+    /// Get a table
+    fn get_table<'py>(&self, py: Python<'py>, table_path: &TablePath) -> 
PyResult<Bound<'py, PyAny>> {
+        let client = self.inner.clone();
+        let core_path = table_path.to_core().clone();
+
+        future_into_py(py, async move {
+            let core_table = client.get_table(&core_path)
+                .await
+                .map_err(|e| FlussError::new_err(e.to_string()))?;
+        
+            let py_table = FlussTable::new_table(
+                client,
+                core_table.metadata,
+                core_table.table_info,
+                core_table.table_path,
+                core_table.has_primary_key,
+            );
+
+            Python::with_gil(|py| {
+                Py::new(py, py_table)
+            })
+        })
+    }
+
+    // Close the connection
+    fn close(&mut self) -> PyResult<()> {
+        Ok(())
+    }
+
+    // Enter the runtime context (for 'with' statement)
+    fn __enter__(slf: PyRef<Self>) -> PyRef<Self> {
+        slf
+    }
+    
+    // Exit the runtime context (for 'with' statement)
+    #[pyo3(signature = (_exc_type=None, _exc_value=None, _traceback=None))]
+    fn __exit__(
+        &mut self,
+        _exc_type: Option<PyObject>,
+        _exc_value: Option<PyObject>,
+        _traceback: Option<PyObject>,
+    ) -> PyResult<bool> {
+        self.close()?;
+        Ok(false)
+    }
+
+    fn __repr__(&self) -> String {
+        "FlussConnection()".to_string()
+    }
+}
diff --git a/bindings/python/src/error.rs b/bindings/python/src/error.rs
new file mode 100644
index 0000000..2db2991
--- /dev/null
+++ b/bindings/python/src/error.rs
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use pyo3::prelude::*;
+
+/// Fluss errors
+#[pyclass(extends=PyException)]
+#[derive(Debug, Clone)]
+pub struct FlussError {
+    #[pyo3(get)]
+    pub message: String,
+}
+
+#[pymethods]
+impl FlussError {
+    fn __str__(&self) -> String {
+        format!("FlussError: {}", self.message)
+    }
+}
+
+impl FlussError {
+    pub fn new_err(message: impl ToString) -> PyErr {
+        PyErr::new::<FlussError, _>(message.to_string())
+    }
+}
\ No newline at end of file
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
new file mode 100644
index 0000000..0d8b7a5
--- /dev/null
+++ b/bindings/python/src/lib.rs
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub use ::fluss as fcore;
+use pyo3::prelude::*;
+use once_cell::sync::Lazy;
+use tokio::runtime::Runtime;
+
+mod config;
+mod connection;
+mod table;
+mod admin;
+mod types;
+mod error;
+mod utils;
+
+pub use config::*;
+pub use connection::*;
+pub use table::*;
+pub use admin::*;
+pub use types::*;
+pub use error::*;
+pub use utils::*;
+
+static TOKIO_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
+    tokio::runtime::Builder::new_multi_thread()
+        .enable_all()
+        .build()
+        .expect("Failed to create Tokio runtime")
+});
+
+#[pymodule]
+fn fluss_python(m: &Bound<'_, PyModule>) -> PyResult<()> {
+    // Register all classes
+    m.add_class::<Config>()?;
+    m.add_class::<FlussConnection>()?;
+    m.add_class::<TablePath>()?;
+    m.add_class::<TableInfo>()?;
+    m.add_class::<TableDescriptor>()?;
+    m.add_class::<FlussAdmin>()?;
+    m.add_class::<FlussTable>()?;
+    m.add_class::<AppendWriter>()?;
+    m.add_class::<Schema>()?;
+    m.add_class::<LogScanner>()?;
+    m.add_class::<LakeSnapshot>()?;
+    m.add_class::<TableBucket>()?;
+    
+    // Register exception types
+    // TODO: maybe implement a separate module for exceptions
+    m.add("FlussError", m.py().get_type::<FlussError>())?;
+    
+    Ok(())
+}
diff --git a/bindings/python/src/utils.rs b/bindings/python/src/utils.rs
new file mode 100644
index 0000000..c40104b
--- /dev/null
+++ b/bindings/python/src/utils.rs
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use pyo3::prelude::*;
+use arrow::datatypes::{Schema as ArrowSchema, SchemaRef};
+use std::sync::Arc;
+use arrow_pyarrow::ToPyArrow;
+use crate::*;
+
+/// Utilities for schema conversion between PyArrow, Arrow, and Fluss
+pub struct Utils;
+
+impl Utils {
+    /// Convert PyArrow schema to Rust Arrow schema
+    pub fn pyarrow_to_arrow_schema(py_schema: &PyObject) -> 
PyResult<SchemaRef> {
+        Python::with_gil(|py| {
+            let schema_bound = py_schema.bind(py);
+            
+            let schema: ArrowSchema = 
arrow_pyarrow::FromPyArrow::from_pyarrow_bound(&schema_bound)
+                .map_err(|e| FlussError::new_err(format!("Failed to convert 
PyArrow schema: {}", e)))?;
+            Ok(Arc::new(schema))
+        })
+    }
+
+    /// Convert Arrow DataType to Fluss DataType
+    pub fn arrow_type_to_fluss_type(arrow_type: &arrow::datatypes::DataType) 
-> PyResult<fcore::metadata::DataType> {
+        use arrow::datatypes::DataType as ArrowDataType;
+        use fcore::metadata::DataTypes;
+
+        let fluss_type = match arrow_type {
+            ArrowDataType::Boolean => DataTypes::boolean(),
+            ArrowDataType::Int8 => DataTypes::tinyint(),
+            ArrowDataType::Int16 => DataTypes::smallint(),
+            ArrowDataType::Int32 => DataTypes::int(),
+            ArrowDataType::Int64 => DataTypes::bigint(),
+            ArrowDataType::UInt8 => DataTypes::tinyint(),
+            ArrowDataType::UInt16 => DataTypes::smallint(),
+            ArrowDataType::UInt32 => DataTypes::int(),
+            ArrowDataType::UInt64 => DataTypes::bigint(),
+            ArrowDataType::Float32 => DataTypes::float(),
+            ArrowDataType::Float64 => DataTypes::double(),
+            ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 => 
DataTypes::string(),
+            ArrowDataType::Binary | ArrowDataType::LargeBinary => 
DataTypes::bytes(),
+            ArrowDataType::Date32 => DataTypes::date(),
+            ArrowDataType::Date64 => DataTypes::date(),
+            ArrowDataType::Time32(_) | ArrowDataType::Time64(_) => 
DataTypes::time(),
+            ArrowDataType::Timestamp(_, _) => DataTypes::timestamp(),
+            ArrowDataType::Decimal128(precision, scale) => 
DataTypes::decimal(*precision as u32, *scale as u32),
+            _ => {
+                return Err(FlussError::new_err(format!(
+                    "Unsupported Arrow data type: {:?}", arrow_type
+                )));
+            }
+        };
+
+        Ok(fluss_type)
+    }
+
+    /// Convert Fluss DataType to string representation
+    pub fn datatype_to_string(data_type: &fcore::metadata::DataType) -> String 
{
+        match data_type {
+            fcore::metadata::DataType::Boolean(_) => "boolean".to_string(),
+            fcore::metadata::DataType::TinyInt(_) => "tinyint".to_string(),
+            fcore::metadata::DataType::SmallInt(_) => "smallint".to_string(),
+            fcore::metadata::DataType::Int(_) => "int".to_string(),
+            fcore::metadata::DataType::BigInt(_) => "bigint".to_string(),
+            fcore::metadata::DataType::Float(_) => "float".to_string(),
+            fcore::metadata::DataType::Double(_) => "double".to_string(),
+            fcore::metadata::DataType::String(_) => "string".to_string(),
+            fcore::metadata::DataType::Bytes(_) => "bytes".to_string(),
+            fcore::metadata::DataType::Date(_) => "date".to_string(),
+            fcore::metadata::DataType::Time(t) => {
+                if t.precision() == 0 {
+                    "time".to_string()
+                } else {
+                    format!("time({})", t.precision())
+                }
+            },
+            fcore::metadata::DataType::Timestamp(t) => {
+                if t.precision() == 6 {
+                    "timestamp".to_string()
+                } else {
+                    format!("timestamp({})", t.precision())
+                }
+            },
+            fcore::metadata::DataType::TimestampLTz(t) => {
+                if t.precision() == 6 {
+                    "timestamp_ltz".to_string()
+                } else {
+                    format!("timestamp_ltz({})", t.precision())
+                }
+            },
+            fcore::metadata::DataType::Char(c) => format!("char({})", 
c.length()),
+            fcore::metadata::DataType::Decimal(d) => format!("decimal({},{})", 
d.precision(), d.scale()),
+            fcore::metadata::DataType::Binary(b) => format!("binary({})", 
b.length()),
+            fcore::metadata::DataType::Array(arr) => format!("array<{}>", 
Utils::datatype_to_string(arr.get_element_type())),
+            fcore::metadata::DataType::Map(map) => format!("map<{},{}>", 
+                                        
Utils::datatype_to_string(map.key_type()), 
+                                        
Utils::datatype_to_string(map.value_type())),
+            fcore::metadata::DataType::Row(row) => {
+                let fields: Vec<String> = row.fields().iter()
+                    .map(|field| format!("{}: {}", field.name(), 
Utils::datatype_to_string(field.data_type())))
+                    .collect();
+                format!("row<{}>", fields.join(", "))
+            },
+        }
+    }
+
+    /// Parse log format string to LogFormat enum
+    pub fn parse_log_format(format_str: &str) -> 
PyResult<fcore::metadata::LogFormat> {
+        fcore::metadata::LogFormat::parse(format_str)
+            .map_err(|e| FlussError::new_err(format!("Invalid log format '{}': 
{}", format_str, e)))
+    }
+
+    /// Parse kv format string to KvFormat enum
+    pub fn parse_kv_format(format_str: &str) -> 
PyResult<fcore::metadata::KvFormat> {
+        fcore::metadata::KvFormat::parse(format_str)
+            .map_err(|e| FlussError::new_err(format!("Invalid kv format '{}': 
{}", format_str, e)))
+    }
+
+    /// Convert ScanRecords to Arrow RecordBatch
+    pub fn convert_scan_records_to_arrow(
+        _scan_records: fcore::record::ScanRecords,
+    ) -> Vec<Arc<arrow::record_batch::RecordBatch>> {
+        let mut result = Vec::new();
+        for(_, records) in _scan_records.into_records() {
+            for record in records {
+                let columnar_row = record.row();
+                let row_id = columnar_row.get_row_id();
+                if row_id == 0 {
+                    let record_batch = columnar_row.get_record_batch();
+                    result.push(record_batch.clone());
+                }
+            }
+        }
+        result
+    }
+    
+    /// Combine multiple Arrow batches into a single Table
+    pub fn combine_batches_to_table(py: Python, batches: 
Vec<Arc<arrow::record_batch::RecordBatch>>) -> PyResult<PyObject> {
+        if batches.is_empty() {
+            return Err(FlussError::new_err("No batches to combine"));
+        }
+        
+        // Convert Rust Arrow RecordBatch to PyObject
+        let py_batches: Result<Vec<PyObject>, _> = batches.iter()
+            .map(|batch| {
+                batch.as_ref().to_pyarrow(py)
+                    .map_err(|e| FlussError::new_err(format!("Failed to 
convert RecordBatch to PyObject: {}", e)))
+            })
+            .collect();
+        
+        let py_batches = py_batches?;
+        
+        let pyarrow = py.import("pyarrow")?;
+        
+        // Use pyarrow.Table.from_batches to combine batches
+        let table = pyarrow
+            .getattr("Table")?
+            .call_method1("from_batches", (py_batches,))?;
+        
+        Ok(table.into())
+    }
+}

Reply via email to