This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 65b6d48 [feat] Create Python bindings for Fluss Admin (#6)
65b6d48 is described below
commit 65b6d481759133a9d59f022bd8f9a3e2f6512092
Author: naivedogger <[email protected]>
AuthorDate: Thu Sep 18 12:09:53 2025 +0800
[feat] Create Python bindings for Fluss Admin (#6)
---
bindings/python/fluss/__init__.py | 20 +++++
bindings/python/src/admin.rs | 107 +++++++++++++++++++++++
bindings/python/src/connection.rs | 117 +++++++++++++++++++++++++
bindings/python/src/error.rs | 39 +++++++++
bindings/python/src/lib.rs | 67 ++++++++++++++
bindings/python/src/utils.rs | 178 ++++++++++++++++++++++++++++++++++++++
6 files changed, 528 insertions(+)
diff --git a/bindings/python/fluss/__init__.py
b/bindings/python/fluss/__init__.py
new file mode 100644
index 0000000..cceee10
--- /dev/null
+++ b/bindings/python/fluss/__init__.py
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from .fluss_python import *
+
+__version__ = "0.1.0"
diff --git a/bindings/python/src/admin.rs b/bindings/python/src/admin.rs
new file mode 100644
index 0000000..7ec6eee
--- /dev/null
+++ b/bindings/python/src/admin.rs
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use pyo3::prelude::*;
+use pyo3_async_runtimes::tokio::future_into_py;
+use crate::*;
+use std::sync::Arc;
+
+/// Administrative client for managing Fluss tables
+#[pyclass]
+pub struct FlussAdmin {
+ __admin: Arc<fcore::client::FlussAdmin>,
+}
+
+#[pymethods]
+impl FlussAdmin {
+ /// Create a table with the given schema
+ #[pyo3(signature = (table_path, table_descriptor, ignore_if_exists=None))]
+ pub fn create_table<'py>(
+ &self,
+ py: Python<'py>,
+ table_path: &TablePath,
+ table_descriptor: &TableDescriptor,
+ ignore_if_exists: Option<bool>,
+ ) -> PyResult<Bound<'py, PyAny>> {
+ let ignore = ignore_if_exists.unwrap_or(false);
+
+ let core_table_path = table_path.to_core().clone();
+ let core_descriptor = table_descriptor.to_core().clone();
+ let admin = self.__admin.clone();
+
+ future_into_py(py, async move {
+ admin.create_table(&core_table_path, &core_descriptor, ignore)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ Python::with_gil(|py| Ok(py.None()))
+ })
+ }
+
+ /// Get table information
+ pub fn get_table<'py>(
+ &self,
+ py: Python<'py>,
+ table_path: &TablePath,
+ ) -> PyResult<Bound<'py, PyAny>> {
+ let core_table_path = table_path.to_core().clone();
+ let admin = self.__admin.clone();
+
+ future_into_py(py, async move {
+ let core_table_info = admin.get_table(&core_table_path).await
+ .map_err(|e| FlussError::new_err(format!("Failed to get table:
{}", e)))?;
+
+ Python::with_gil(|py| {
+ let table_info = TableInfo::from_core(core_table_info);
+ Py::new(py, table_info)
+ })
+ })
+ }
+
+ /// Get the latest lake snapshot for a table
+ pub fn get_latest_lake_snapshot<'py>(
+ &self,
+ py: Python<'py>,
+ table_path: &TablePath,
+ ) -> PyResult<Bound<'py, PyAny>> {
+ let core_table_path = table_path.to_core().clone();
+ let admin = self.__admin.clone();
+
+ future_into_py(py, async move {
+ let core_lake_snapshot =
admin.get_latest_lake_snapshot(&core_table_path).await
+ .map_err(|e| FlussError::new_err(format!("Failed to get lake
snapshot: {}", e)))?;
+
+ Python::with_gil(|py| {
+ let lake_snapshot =
LakeSnapshot::from_core(core_lake_snapshot);
+ Py::new(py, lake_snapshot)
+ })
+ })
+ }
+
+ fn __repr__(&self) -> String {
+ "FlussAdmin()".to_string()
+ }
+}
+
+impl FlussAdmin {
+ // Internal method to create FlussAdmin from core admin
+ pub fn from_core(admin: fcore::client::FlussAdmin) -> Self {
+ Self {
+ __admin: Arc::new(admin),
+ }
+ }
+}
diff --git a/bindings/python/src/connection.rs
b/bindings/python/src/connection.rs
new file mode 100644
index 0000000..ba1fa50
--- /dev/null
+++ b/bindings/python/src/connection.rs
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use pyo3::prelude::*;
+use crate::*;
+use std::sync::Arc;
+use pyo3_async_runtimes::tokio::future_into_py;
+
+/// Connection to a Fluss cluster
+#[pyclass]
+pub struct FlussConnection {
+ inner: Arc<fcore::client::FlussConnection>,
+}
+
+#[pymethods]
+impl FlussConnection {
+ /// Create a new FlussConnection (async)
+ #[staticmethod]
+ fn connect<'py>(py: Python<'py>, config: &Config) -> PyResult<Bound<'py,
PyAny>> {
+ let rust_config = config.get_core_config();
+
+ future_into_py(py, async move {
+ let connection = fcore::client::FlussConnection::new(rust_config)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ let py_connection = FlussConnection {
+ inner: Arc::new(connection),
+ };
+
+ Python::with_gil(|py| {
+ Py::new(py, py_connection)
+ })
+ })
+ }
+
+ /// Get admin interface
+ fn get_admin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
+ let client = self.inner.clone();
+
+ future_into_py(py, async move {
+ let admin = client.get_admin()
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ let py_admin = FlussAdmin::from_core(admin);
+
+ Python::with_gil(|py| {
+ Py::new(py, py_admin)
+ })
+ })
+ }
+
+ /// Get a table
+ fn get_table<'py>(&self, py: Python<'py>, table_path: &TablePath) ->
PyResult<Bound<'py, PyAny>> {
+ let client = self.inner.clone();
+ let core_path = table_path.to_core().clone();
+
+ future_into_py(py, async move {
+ let core_table = client.get_table(&core_path)
+ .await
+ .map_err(|e| FlussError::new_err(e.to_string()))?;
+
+ let py_table = FlussTable::new_table(
+ client,
+ core_table.metadata,
+ core_table.table_info,
+ core_table.table_path,
+ core_table.has_primary_key,
+ );
+
+ Python::with_gil(|py| {
+ Py::new(py, py_table)
+ })
+ })
+ }
+
+ // Close the connection
+ fn close(&mut self) -> PyResult<()> {
+ Ok(())
+ }
+
+ // Enter the runtime context (for 'with' statement)
+ fn __enter__(slf: PyRef<Self>) -> PyRef<Self> {
+ slf
+ }
+
+ // Exit the runtime context (for 'with' statement)
+ #[pyo3(signature = (_exc_type=None, _exc_value=None, _traceback=None))]
+ fn __exit__(
+ &mut self,
+ _exc_type: Option<PyObject>,
+ _exc_value: Option<PyObject>,
+ _traceback: Option<PyObject>,
+ ) -> PyResult<bool> {
+ self.close()?;
+ Ok(false)
+ }
+
+ fn __repr__(&self) -> String {
+ "FlussConnection()".to_string()
+ }
+}
diff --git a/bindings/python/src/error.rs b/bindings/python/src/error.rs
new file mode 100644
index 0000000..2db2991
--- /dev/null
+++ b/bindings/python/src/error.rs
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use pyo3::prelude::*;
+
+/// Fluss errors
+#[pyclass(extends=PyException)]
+#[derive(Debug, Clone)]
+pub struct FlussError {
+ #[pyo3(get)]
+ pub message: String,
+}
+
+#[pymethods]
+impl FlussError {
+ fn __str__(&self) -> String {
+ format!("FlussError: {}", self.message)
+ }
+}
+
+impl FlussError {
+ pub fn new_err(message: impl ToString) -> PyErr {
+ PyErr::new::<FlussError, _>(message.to_string())
+ }
+}
\ No newline at end of file
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
new file mode 100644
index 0000000..0d8b7a5
--- /dev/null
+++ b/bindings/python/src/lib.rs
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub use ::fluss as fcore;
+use pyo3::prelude::*;
+use once_cell::sync::Lazy;
+use tokio::runtime::Runtime;
+
+mod config;
+mod connection;
+mod table;
+mod admin;
+mod types;
+mod error;
+mod utils;
+
+pub use config::*;
+pub use connection::*;
+pub use table::*;
+pub use admin::*;
+pub use types::*;
+pub use error::*;
+pub use utils::*;
+
+static TOKIO_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
+ tokio::runtime::Builder::new_multi_thread()
+ .enable_all()
+ .build()
+ .expect("Failed to create Tokio runtime")
+});
+
+#[pymodule]
+fn fluss_python(m: &Bound<'_, PyModule>) -> PyResult<()> {
+ // Register all classes
+ m.add_class::<Config>()?;
+ m.add_class::<FlussConnection>()?;
+ m.add_class::<TablePath>()?;
+ m.add_class::<TableInfo>()?;
+ m.add_class::<TableDescriptor>()?;
+ m.add_class::<FlussAdmin>()?;
+ m.add_class::<FlussTable>()?;
+ m.add_class::<AppendWriter>()?;
+ m.add_class::<Schema>()?;
+ m.add_class::<LogScanner>()?;
+ m.add_class::<LakeSnapshot>()?;
+ m.add_class::<TableBucket>()?;
+
+ // Register exception types
+ // TODO: maybe implement a separate module for exceptions
+ m.add("FlussError", m.py().get_type::<FlussError>())?;
+
+ Ok(())
+}
diff --git a/bindings/python/src/utils.rs b/bindings/python/src/utils.rs
new file mode 100644
index 0000000..c40104b
--- /dev/null
+++ b/bindings/python/src/utils.rs
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use pyo3::prelude::*;
+use arrow::datatypes::{Schema as ArrowSchema, SchemaRef};
+use std::sync::Arc;
+use arrow_pyarrow::ToPyArrow;
+use crate::*;
+
+/// Utilities for schema conversion between PyArrow, Arrow, and Fluss
+pub struct Utils;
+
+impl Utils {
+ /// Convert PyArrow schema to Rust Arrow schema
+ pub fn pyarrow_to_arrow_schema(py_schema: &PyObject) ->
PyResult<SchemaRef> {
+ Python::with_gil(|py| {
+ let schema_bound = py_schema.bind(py);
+
+ let schema: ArrowSchema =
arrow_pyarrow::FromPyArrow::from_pyarrow_bound(&schema_bound)
+ .map_err(|e| FlussError::new_err(format!("Failed to convert
PyArrow schema: {}", e)))?;
+ Ok(Arc::new(schema))
+ })
+ }
+
+ /// Convert Arrow DataType to Fluss DataType
+ pub fn arrow_type_to_fluss_type(arrow_type: &arrow::datatypes::DataType)
-> PyResult<fcore::metadata::DataType> {
+ use arrow::datatypes::DataType as ArrowDataType;
+ use fcore::metadata::DataTypes;
+
+ let fluss_type = match arrow_type {
+ ArrowDataType::Boolean => DataTypes::boolean(),
+ ArrowDataType::Int8 => DataTypes::tinyint(),
+ ArrowDataType::Int16 => DataTypes::smallint(),
+ ArrowDataType::Int32 => DataTypes::int(),
+ ArrowDataType::Int64 => DataTypes::bigint(),
+ ArrowDataType::UInt8 => DataTypes::tinyint(),
+ ArrowDataType::UInt16 => DataTypes::smallint(),
+ ArrowDataType::UInt32 => DataTypes::int(),
+ ArrowDataType::UInt64 => DataTypes::bigint(),
+ ArrowDataType::Float32 => DataTypes::float(),
+ ArrowDataType::Float64 => DataTypes::double(),
+ ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 =>
DataTypes::string(),
+ ArrowDataType::Binary | ArrowDataType::LargeBinary =>
DataTypes::bytes(),
+ ArrowDataType::Date32 => DataTypes::date(),
+ ArrowDataType::Date64 => DataTypes::date(),
+ ArrowDataType::Time32(_) | ArrowDataType::Time64(_) =>
DataTypes::time(),
+ ArrowDataType::Timestamp(_, _) => DataTypes::timestamp(),
+ ArrowDataType::Decimal128(precision, scale) =>
DataTypes::decimal(*precision as u32, *scale as u32),
+ _ => {
+ return Err(FlussError::new_err(format!(
+ "Unsupported Arrow data type: {:?}", arrow_type
+ )));
+ }
+ };
+
+ Ok(fluss_type)
+ }
+
+ /// Convert Fluss DataType to string representation
+ pub fn datatype_to_string(data_type: &fcore::metadata::DataType) -> String
{
+ match data_type {
+ fcore::metadata::DataType::Boolean(_) => "boolean".to_string(),
+ fcore::metadata::DataType::TinyInt(_) => "tinyint".to_string(),
+ fcore::metadata::DataType::SmallInt(_) => "smallint".to_string(),
+ fcore::metadata::DataType::Int(_) => "int".to_string(),
+ fcore::metadata::DataType::BigInt(_) => "bigint".to_string(),
+ fcore::metadata::DataType::Float(_) => "float".to_string(),
+ fcore::metadata::DataType::Double(_) => "double".to_string(),
+ fcore::metadata::DataType::String(_) => "string".to_string(),
+ fcore::metadata::DataType::Bytes(_) => "bytes".to_string(),
+ fcore::metadata::DataType::Date(_) => "date".to_string(),
+ fcore::metadata::DataType::Time(t) => {
+ if t.precision() == 0 {
+ "time".to_string()
+ } else {
+ format!("time({})", t.precision())
+ }
+ },
+ fcore::metadata::DataType::Timestamp(t) => {
+ if t.precision() == 6 {
+ "timestamp".to_string()
+ } else {
+ format!("timestamp({})", t.precision())
+ }
+ },
+ fcore::metadata::DataType::TimestampLTz(t) => {
+ if t.precision() == 6 {
+ "timestamp_ltz".to_string()
+ } else {
+ format!("timestamp_ltz({})", t.precision())
+ }
+ },
+ fcore::metadata::DataType::Char(c) => format!("char({})",
c.length()),
+ fcore::metadata::DataType::Decimal(d) => format!("decimal({},{})",
d.precision(), d.scale()),
+ fcore::metadata::DataType::Binary(b) => format!("binary({})",
b.length()),
+ fcore::metadata::DataType::Array(arr) => format!("array<{}>",
Utils::datatype_to_string(arr.get_element_type())),
+ fcore::metadata::DataType::Map(map) => format!("map<{},{}>",
+
Utils::datatype_to_string(map.key_type()),
+
Utils::datatype_to_string(map.value_type())),
+ fcore::metadata::DataType::Row(row) => {
+ let fields: Vec<String> = row.fields().iter()
+ .map(|field| format!("{}: {}", field.name(),
Utils::datatype_to_string(field.data_type())))
+ .collect();
+ format!("row<{}>", fields.join(", "))
+ },
+ }
+ }
+
+ /// Parse log format string to LogFormat enum
+ pub fn parse_log_format(format_str: &str) ->
PyResult<fcore::metadata::LogFormat> {
+ fcore::metadata::LogFormat::parse(format_str)
+ .map_err(|e| FlussError::new_err(format!("Invalid log format '{}':
{}", format_str, e)))
+ }
+
+ /// Parse kv format string to KvFormat enum
+ pub fn parse_kv_format(format_str: &str) ->
PyResult<fcore::metadata::KvFormat> {
+ fcore::metadata::KvFormat::parse(format_str)
+ .map_err(|e| FlussError::new_err(format!("Invalid kv format '{}':
{}", format_str, e)))
+ }
+
+ /// Convert ScanRecords to Arrow RecordBatch
+ pub fn convert_scan_records_to_arrow(
+ _scan_records: fcore::record::ScanRecords,
+ ) -> Vec<Arc<arrow::record_batch::RecordBatch>> {
+ let mut result = Vec::new();
+ for(_, records) in _scan_records.into_records() {
+ for record in records {
+ let columnar_row = record.row();
+ let row_id = columnar_row.get_row_id();
+ if row_id == 0 {
+ let record_batch = columnar_row.get_record_batch();
+ result.push(record_batch.clone());
+ }
+ }
+ }
+ result
+ }
+
+ /// Combine multiple Arrow batches into a single Table
+ pub fn combine_batches_to_table(py: Python, batches:
Vec<Arc<arrow::record_batch::RecordBatch>>) -> PyResult<PyObject> {
+ if batches.is_empty() {
+ return Err(FlussError::new_err("No batches to combine"));
+ }
+
+ // Convert Rust Arrow RecordBatch to PyObject
+ let py_batches: Result<Vec<PyObject>, _> = batches.iter()
+ .map(|batch| {
+ batch.as_ref().to_pyarrow(py)
+ .map_err(|e| FlussError::new_err(format!("Failed to
convert RecordBatch to PyObject: {}", e)))
+ })
+ .collect();
+
+ let py_batches = py_batches?;
+
+ let pyarrow = py.import("pyarrow")?;
+
+ // Use pyarrow.Table.from_batches to combine batches
+ let table = pyarrow
+ .getattr("Table")?
+ .call_method1("from_batches", (py_batches,))?;
+
+ Ok(table.into())
+ }
+}