This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new c35df66 [feat] Create Python bindings for metadata (#8)
c35df66 is described below
commit c35df66603033ec3594f4ce151d61f7549c20249
Author: naivedogger <[email protected]>
AuthorDate: Wed Sep 17 15:00:33 2025 +0800
[feat] Create Python bindings for metadata (#8)
---
bindings/python/src/config.rs | 117 ++++++++
bindings/python/src/metadata.rs | 581 ++++++++++++++++++++++++++++++++++++++++
2 files changed, 698 insertions(+)
diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs
new file mode 100644
index 0000000..08b20b4
--- /dev/null
+++ b/bindings/python/src/config.rs
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use pyo3::prelude::*;
+use pyo3::types::PyDict;
+use crate::*;
+
+/// Configuration for Fluss client
+#[pyclass]
+#[derive(Clone)]
+pub struct Config {
+ inner: fcore::config::Config,
+}
+
+#[pymethods]
+impl Config {
+ /// Create a new Config with optional properties from a dictionary
+ #[new]
+ #[pyo3(signature = (properties = None))]
+ fn new(properties: Option<&Bound<'_, PyDict>>) -> PyResult<Self> {
+ let mut config = fcore::config::Config::default();
+
+ if let Some(props) = properties {
+ for item in props.iter() {
+ let key: String = item.0.extract()?;
+ let value: String = item.1.extract()?;
+
+ match key.as_str() {
+ "bootstrap.servers" => {
+ config.bootstrap_server = Some(value);
+ },
+ "request.max.size" => {
+ if let Ok(size) = value.parse::<i32>() {
+ config.request_max_size = size;
+ }
+ },
+ "writer.acks" => {
+ config.writer_acks = value;
+ },
+ "writer.retries" => {
+ if let Ok(retries) = value.parse::<i32>() {
+ config.writer_retries = retries;
+ }
+ },
+ "writer.batch.size" => {
+ if let Ok(size) = value.parse::<i32>() {
+ config.writer_batch_size = size;
+ }
+ },
+ _ => {
+ return Err(FlussError::new_err(format!("Unknown
property: {}", key)));
+ }
+ }
+ }
+ }
+
+ Ok(Self {
+ inner: config,
+ })
+ }
+
+ /// Get the bootstrap server
+ #[getter]
+ fn bootstrap_server(&self) -> Option<String> {
+ self.inner.bootstrap_server.clone()
+ }
+
+ /// Set the bootstrap server
+ #[setter]
+ fn set_bootstrap_server(&mut self, server: String) {
+ self.inner.bootstrap_server = Some(server);
+ }
+
+ /// Get the request max size
+ #[getter]
+ fn request_max_size(&self) -> i32 {
+ self.inner.request_max_size
+ }
+
+ /// Set the request max size
+ #[setter]
+ fn set_request_max_size(&mut self, size: i32) {
+ self.inner.request_max_size = size;
+ }
+
+ /// Get the writer batch size
+ #[getter]
+ fn writer_batch_size(&self) -> i32 {
+ self.inner.writer_batch_size
+ }
+
+ /// Set the writer batch size
+ #[setter]
+ fn set_writer_batch_size(&mut self, size: i32) {
+ self.inner.writer_batch_size = size;
+ }
+}
+
+impl Config {
+ pub fn get_core_config(&self) -> fcore::config::Config {
+ self.inner.clone()
+ }
+}
diff --git a/bindings/python/src/metadata.rs b/bindings/python/src/metadata.rs
new file mode 100644
index 0000000..238dde2
--- /dev/null
+++ b/bindings/python/src/metadata.rs
@@ -0,0 +1,581 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use pyo3::prelude::*;
+use crate::*;
+use pyo3::types::PyDict;
+use std::collections::HashMap;
+
+/// Represents a table path with database and table name
+#[pyclass]
+#[derive(Clone)]
+pub struct TablePath {
+ database_name: String,
+ table_name: String,
+}
+
+#[pymethods]
+impl TablePath {
+ /// Create a new TablePath
+ #[new]
+ pub fn new(database_name: String, table_name: String) -> Self {
+ Self {
+ database_name,
+ table_name,
+ }
+ }
+
+ /// Get the database name
+ #[getter]
+ pub fn database_name(&self) -> String {
+ self.database_name.clone()
+ }
+
+ /// Get the table name
+ #[getter]
+ pub fn table_name(&self) -> String {
+ self.table_name.clone()
+ }
+
+ /// Get table path as string
+ pub fn table_path_str(&self) -> String {
+ format!("{}.{}", self.database_name, self.table_name)
+ }
+
+ pub fn __str__(&self) -> String {
+ self.table_path_str()
+ }
+
+ fn __repr__(&self) -> String {
+ format!("TablePath('{}', '{}')", self.database_name, self.table_name)
+ }
+
+ /// Hash implementation for Python
+ pub fn __hash__(&self) -> u64 {
+ use std::collections::hash_map::DefaultHasher;
+ use std::hash::{Hash, Hasher};
+
+ let mut hasher = DefaultHasher::new();
+ self.database_name.hash(&mut hasher);
+ self.table_name.hash(&mut hasher);
+ hasher.finish()
+ }
+
+ /// Equality implementation for Python
+ pub fn __eq__(&self, other: &TablePath) -> bool {
+ self.database_name == other.database_name
+ && self.table_name == other.table_name
+ }
+}
+
+impl TablePath {
+ /// Convert to core TablePath
+ pub fn to_core(&self) -> fcore::metadata::TablePath {
+ fcore::metadata::TablePath::new(self.database_name.clone(),
self.table_name.clone())
+ }
+
+ pub fn from_core(core_path: fcore::metadata::TablePath) -> Self {
+ Self {
+ database_name: core_path.database().to_string(),
+ table_name: core_path.table().to_string(),
+ }
+ }
+}
+
+/// Schema wrapper for Fluss table schema
+#[pyclass]
+pub struct Schema {
+ __schema: fcore::metadata::Schema,
+}
+
+#[pymethods]
+impl Schema {
+ /// Create a new Schema from PyArrow schema with optional primary keys
+ #[new]
+ #[pyo3(signature = (schema, primary_keys=None))]
+ pub fn new(
+ schema: PyObject, // PyArrow schema
+ primary_keys: Option<Vec<String>>,
+ ) -> PyResult<Self> {
+ let arrow_schema =
crate::utils::Utils::pyarrow_to_arrow_schema(&schema)?;
+
+ let mut builder = fcore::metadata::Schema::builder();
+
+ for field in arrow_schema.fields() {
+ let fluss_data_type =
crate::utils::Utils::arrow_type_to_fluss_type(field.data_type())?;
+ builder = builder.column(field.name(), fluss_data_type);
+
+ if let Some(comment) = field.metadata().get("comment") {
+ builder = builder.with_comment(comment);
+ }
+ }
+
+ if let Some(pk_columns) = primary_keys {
+ if !pk_columns.is_empty() {
+ builder = builder.primary_key(pk_columns);
+ }
+ }
+
+ let fluss_schema = builder.build()
+ .map_err(|e| FlussError::new_err(format!("Failed to build schema:
{}", e)))?;
+
+ Ok(Self {
+ __schema: fluss_schema,
+ })
+ }
+
+ /// Get column names
+ fn get_column_names(&self) -> Vec<String> {
+ self.__schema.columns().iter().map(|col|
col.name().to_string()).collect()
+ }
+
+ /// Get column types
+ fn get_column_types(&self) -> Vec<String> {
+ self.__schema.columns().iter()
+ .map(|col| Utils::datatype_to_string(col.data_type()))
+ .collect()
+ }
+
+ /// Get columns as (name, type) pairs
+ fn get_columns(&self) -> Vec<(String, String)> {
+ self.__schema.columns().iter()
+ .map(|col| (col.name().to_string(),
Utils::datatype_to_string(col.data_type())))
+ .collect()
+ }
+
+ // TODO: support primaryKey
+
+ fn __str__(&self) -> String {
+ format!("Schema: columns={:?}", self.get_columns())
+ }
+}
+
+impl Schema {
+ /// Convert to core Schema
+ pub fn to_core(&self) -> &fcore::metadata::Schema {
+ &self.__schema
+ }
+}
+
+/// Table distribution configuration
+#[pyclass]
+pub struct TableDistribution {
+ inner: fcore::metadata::TableDistribution,
+}
+
+#[pymethods]
+impl TableDistribution {
+ /// Get bucket keys
+ fn bucket_keys(&self) -> Vec<String> {
+ self.inner.bucket_keys().to_vec()
+ }
+
+ /// Get bucket count
+ fn bucket_count(&self) -> Option<i32> {
+ self.inner.bucket_count()
+ }
+}
+
+
+/// Table descriptor containing schema and metadata
+#[pyclass]
+#[derive(Clone)]
+pub struct TableDescriptor {
+ __tbl_desc: fcore::metadata::TableDescriptor,
+}
+
+#[pymethods]
+impl TableDescriptor {
+ /// Create a new TableDescriptor
+ #[new]
+ #[pyo3(signature = (schema, **kwargs))]
+ pub fn new(
+ schema: &Schema, // fluss schema
+ kwargs: Option<&Bound<'_, PyDict>>,
+ ) -> PyResult<Self> {
+ let mut partition_keys = Vec::new();
+ let mut bucket_count = None;
+ let mut bucket_keys = Vec::new();
+ let mut properties = std::collections::HashMap::new();
+ let mut custom_properties = std::collections::HashMap::new();
+ let mut comment: Option<String> = None;
+ let mut log_format = None;
+ let mut kv_format = None;
+
+ if let Some(kwargs) = kwargs {
+ if let Ok(Some(pkeys)) = kwargs.get_item("partition_keys") {
+ partition_keys = pkeys.extract()?;
+ }
+ if let Ok(Some(bcount)) = kwargs.get_item("bucket_count") {
+ bucket_count = Some(bcount.extract()?);
+ }
+ if let Ok(Some(bkeys)) = kwargs.get_item("bucket_keys") {
+ bucket_keys = bkeys.extract()?;
+ }
+ if let Ok(Some(props)) = kwargs.get_item("properties") {
+ properties = props.extract()?;
+ }
+ if let Ok(Some(cprops)) = kwargs.get_item("custom_properties") {
+ custom_properties = cprops.extract()?;
+ }
+ if let Ok(Some(comm)) = kwargs.get_item("comment") {
+ comment = Some(comm.extract()?);
+ }
+ if let Ok(Some(lformat)) = kwargs.get_item("log_format") {
+ let format_str: String = lformat.extract()?;
+ log_format =
Some(fcore::metadata::LogFormat::parse(&format_str)
+ .map_err(|e| FlussError::new_err(e.to_string()))?);
+ }
+ if let Ok(Some(kformat)) = kwargs.get_item("kv_format") {
+ let format_str: String = kformat.extract()?;
+ kv_format = Some(fcore::metadata::KvFormat::parse(&format_str)
+ .map_err(|e| FlussError::new_err(e.to_string()))?);
+ }
+ }
+
+ let fluss_schema = schema.to_core().clone();
+
+ let mut builder = fcore::metadata::TableDescriptor::builder()
+ .schema(fluss_schema)
+ .properties(properties)
+ .custom_properties(custom_properties)
+ .partitioned_by(partition_keys)
+ .distributed_by(bucket_count, bucket_keys);
+
+ if let Some(comment) = comment {
+ builder = builder.comment(&comment);
+ }
+ if let Some(log_format) = log_format {
+ builder = builder.log_format(log_format);
+ }
+ if let Some(kv_format) = kv_format {
+ builder = builder.kv_format(kv_format);
+ }
+
+ let core_descriptor = builder.build()
+ .map_err(|e| FlussError::new_err(format!("Failed to build
TableDescriptor: {}", e)))?;
+
+ Ok(Self {
+ __tbl_desc: core_descriptor,
+ })
+ }
+
+ /// Get the schema of this table descriptor
+ pub fn get_schema(&self) -> PyResult<Schema> {
+ Ok(Schema {
+ __schema: self.__tbl_desc.schema().clone(),
+ })
+ }
+}
+
+impl TableDescriptor {
+ /// Convert to core TableDescriptor
+ pub fn to_core(&self) -> &fcore::metadata::TableDescriptor {
+ &self.__tbl_desc
+ }
+}
+
+/// Information about a Fluss table
+#[pyclass]
+#[derive(Clone)]
+pub struct TableInfo {
+ __table_info: fcore::metadata::TableInfo,
+}
+
+#[pymethods]
+impl TableInfo {
+ /// Get the table ID
+ #[getter]
+ pub fn table_id(&self) -> i64 {
+ self.__table_info.get_table_id()
+ }
+
+ /// Get the schema ID
+ #[getter]
+ pub fn schema_id(&self) -> i32 {
+ self.__table_info.get_schema_id()
+ }
+
+ /// Get the table path
+ #[getter]
+ pub fn table_path(&self) -> TablePath {
+ TablePath::from_core(self.__table_info.get_table_path().clone())
+ }
+
+ /// Get the created time
+ #[getter]
+ pub fn created_time(&self) -> i64 {
+ self.__table_info.get_created_time()
+ }
+
+ /// Get the modified time
+ #[getter]
+ pub fn modified_time(&self) -> i64 {
+ self.__table_info.get_modified_time()
+ }
+
+ /// Get the primary keys
+ pub fn get_primary_keys(&self) -> Vec<String> {
+ self.__table_info.get_primary_keys().clone()
+ }
+
+ /// Get the bucket keys
+ pub fn get_bucket_keys(&self) -> Vec<String> {
+ self.__table_info.get_bucket_keys().to_vec()
+ }
+
+ /// Get the partition keys
+ pub fn get_partition_keys(&self) -> Vec<String> {
+ self.__table_info.get_partition_keys().to_vec()
+ }
+
+ /// Get number of buckets
+ #[getter]
+ pub fn num_buckets(&self) -> i32 {
+ self.__table_info.get_num_buckets()
+ }
+
+ /// Check if table has primary key
+ pub fn has_primary_key(&self) -> bool {
+ self.__table_info.has_primary_key()
+ }
+
+ /// Check if table is partitioned
+ pub fn is_partitioned(&self) -> bool {
+ self.__table_info.is_partitioned()
+ }
+
+ /// Get properties
+ pub fn get_properties(&self) -> std::collections::HashMap<String, String> {
+ self.__table_info.get_properties().clone()
+ }
+
+ /// Get custom properties
+ pub fn get_custom_properties(&self) -> std::collections::HashMap<String,
String> {
+ self.__table_info.get_custom_properties().clone()
+ }
+
+ /// Get comment
+ #[getter]
+ pub fn comment(&self) -> Option<String> {
+ self.__table_info.get_comment().map(|s| s.to_string())
+ }
+
+ /// Get the Schema
+ pub fn get_schema(&self) -> Schema {
+ Schema {
+ __schema: self.__table_info.get_schema().clone(),
+ }
+ }
+
+ /// Get column names
+ pub fn get_column_names(&self) -> Vec<String> {
+ self.__table_info.get_schema().columns().iter()
+ .map(|col| col.name().to_string())
+ .collect()
+ }
+
+ /// Get column count
+ pub fn get_column_count(&self) -> usize {
+ self.__table_info.get_schema().columns().len()
+ }
+}
+
+impl TableInfo {
+ /// Create from core TableInfo (internal use)
+ pub fn from_core(info: fcore::metadata::TableInfo) -> Self {
+ Self {
+ __table_info: info,
+ }
+ }
+}
+
+/// Represents a lake snapshot with snapshot ID and table bucket offsets
+#[pyclass]
+#[derive(Clone)]
+pub struct LakeSnapshot {
+ snapshot_id: i64,
+ table_buckets_offset: HashMap<fcore::metadata::TableBucket, i64>,
+}
+
+/// Represents a table bucket with table ID, partition ID, and bucket ID
+#[pyclass]
+#[derive(Clone)]
+pub struct TableBucket {
+ table_id: i64,
+ partition_id: Option<i64>,
+ bucket: i32,
+}
+
+#[pymethods]
+impl TableBucket {
+ /// Create a new TableBucket
+ #[new]
+ pub fn new(table_id: i64, bucket: i32) -> Self {
+ Self {
+ table_id,
+ partition_id: None,
+ bucket,
+ }
+ }
+
+ /// Create a new TableBucket with partition
+ #[staticmethod]
+ pub fn with_partition(table_id: i64, partition_id: i64, bucket: i32) ->
Self {
+ Self {
+ table_id,
+ partition_id: Some(partition_id),
+ bucket,
+ }
+ }
+
+ /// Get table ID
+ #[getter]
+ pub fn table_id(&self) -> i64 {
+ self.table_id
+ }
+
+ /// Get bucket ID
+ #[getter]
+ pub fn bucket_id(&self) -> i32 {
+ self.bucket
+ }
+
+ /// Get partition ID
+ #[getter]
+ pub fn partition_id(&self) -> Option<i64> {
+ self.partition_id
+ }
+
+ /// String representation
+ pub fn __str__(&self) -> String {
+ if let Some(partition_id) = self.partition_id {
+ format!("TableBucket(table_id={}, partition_id={}, bucket={})",
+ self.table_id, partition_id, self.bucket)
+ } else {
+ format!("TableBucket(table_id={}, bucket={})",
+ self.table_id, self.bucket)
+ }
+ }
+
+ /// String representation
+ pub fn __repr__(&self) -> String {
+ self.__str__()
+ }
+
+ /// Hash implementation for Python
+ pub fn __hash__(&self) -> u64 {
+ use std::collections::hash_map::DefaultHasher;
+ use std::hash::{Hash, Hasher};
+
+ let mut hasher = DefaultHasher::new();
+ self.table_id.hash(&mut hasher);
+ self.partition_id.hash(&mut hasher);
+ self.bucket.hash(&mut hasher);
+ hasher.finish()
+ }
+
+ /// Equality implementation for Python
+ pub fn __eq__(&self, other: &TableBucket) -> bool {
+ self.table_id == other.table_id
+ && self.partition_id == other.partition_id
+ && self.bucket == other.bucket
+ }
+}
+
+impl TableBucket {
+ /// Create from core TableBucket (internal use)
+ pub fn from_core(bucket: fcore::metadata::TableBucket) -> Self {
+ Self {
+ table_id: bucket.table_id(),
+ partition_id: bucket.partition_id(),
+ bucket: bucket.bucket_id(),
+ }
+ }
+
+ /// Convert to core TableBucket (internal use)
+ pub fn to_core(&self) -> fcore::metadata::TableBucket {
+ fcore::metadata::TableBucket::new(self.table_id, self.partition_id,
self.bucket)
+ }
+}
+
+#[pymethods]
+impl LakeSnapshot {
+ /// Create a new LakeSnapshot
+ #[new]
+ pub fn new(snapshot_id: i64) -> Self {
+ Self {
+ snapshot_id,
+ table_buckets_offset: HashMap::new(),
+ }
+ }
+
+ /// Get snapshot ID
+ #[getter]
+ pub fn snapshot_id(&self) -> i64 {
+ self.snapshot_id
+ }
+
+ /// Get table bucket offsets as a Python dictionary with TableBucket keys
+ #[getter]
+ pub fn table_buckets_offset(&self, py: Python) -> PyResult<PyObject> {
+ let dict = PyDict::new(py);
+ for (bucket, offset) in &self.table_buckets_offset {
+ let py_bucket = TableBucket::from_core(bucket.clone());
+ dict.set_item(Py::new(py, py_bucket)?, *offset)?;
+ }
+ Ok(dict.into())
+ }
+
+ /// Get offset for a specific table bucket
+ pub fn get_bucket_offset(&self, bucket: &TableBucket) -> Option<i64> {
+ let core_bucket = bucket.to_core();
+ self.table_buckets_offset.get(&core_bucket).copied()
+ }
+
+ /// Get all table buckets
+ pub fn get_table_buckets(&self, py: Python) -> PyResult<Vec<PyObject>> {
+ let mut buckets = Vec::new();
+ for bucket in self.table_buckets_offset.keys() {
+ let py_bucket = TableBucket::from_core(bucket.clone());
+ buckets.push(Py::new(py, py_bucket)?.into());
+ }
+ Ok(buckets)
+ }
+
+ /// String representation
+ pub fn __str__(&self) -> String {
+ format!("LakeSnapshot(snapshot_id={}, buckets_count={})",
+ self.snapshot_id, self.table_buckets_offset.len())
+ }
+
+ /// String representation
+ pub fn __repr__(&self) -> String {
+ self.__str__()
+ }
+}
+
+impl LakeSnapshot {
+ /// Create from core LakeSnapshot (internal use)
+ pub fn from_core(snapshot: fcore::metadata::LakeSnapshot) -> Self {
+ Self {
+ snapshot_id: snapshot.snapshot_id,
+ table_buckets_offset: snapshot.table_buckets_offset,
+ }
+ }
+}
+