This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new a75540b feat: support database-level ops in python (#279)
a75540b is described below
commit a75540bd3c00091e7b6c0f12add104d654aac700
Author: yuxia Luo <[email protected]>
AuthorDate: Sun Feb 8 12:59:34 2026 +0800
feat: support database-level ops in python (#279)
---
bindings/cpp/src/types.rs | 2 +-
bindings/python/fluss/__init__.pyi | 66 +++++++++++
bindings/python/src/admin.rs | 227 ++++++++++++++++++++++++++++++++++++-
bindings/python/src/lib.rs | 2 +
bindings/python/src/metadata.rs | 104 ++++++++++++++++-
bindings/python/src/table.rs | 6 +-
6 files changed, 398 insertions(+), 9 deletions(-)
diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs
index 7837032..05d3d6a 100644
--- a/bindings/cpp/src/types.rs
+++ b/bindings/cpp/src/types.rs
@@ -257,7 +257,7 @@ fn get_decimal_type(idx: usize, schema:
Option<&fcore::metadata::Schema>) -> Res
.ok_or_else(|| anyhow!("Schema not available for decimal column
{idx}"))?;
match col.data_type() {
fcore::metadata::DataType::Decimal(dt) => Ok((dt.precision(),
dt.scale())),
- other => Err(anyhow!("Column {idx} is {:?}, not Decimal", other)),
+ other => Err(anyhow!("Column {idx} is {other:?}, not Decimal")),
}
}
diff --git a/bindings/python/fluss/__init__.pyi
b/bindings/python/fluss/__init__.pyi
index 50f3b20..a9ef828 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -121,6 +121,45 @@ class FlussConnection:
def __repr__(self) -> str: ...
class FlussAdmin:
+ async def create_database(
+ self,
+ database_name: str,
+ ignore_if_exists: bool = False,
+ database_descriptor: Optional["DatabaseDescriptor"] = None,
+ ) -> None:
+ """Create a database."""
+ ...
+ async def drop_database(
+ self,
+ database_name: str,
+ ignore_if_not_exists: bool = False,
+ cascade: bool = True,
+ ) -> None:
+ """Drop a database."""
+ ...
+ async def list_databases(self) -> List[str]:
+ """List all databases."""
+ ...
+ async def database_exists(self, database_name: str) -> bool:
+ """Check if a database exists."""
+ ...
+ async def get_database_info(self, database_name: str) -> "DatabaseInfo":
+ """Get database information."""
+ ...
+ async def list_tables(self, database_name: str) -> List[str]:
+ """List all tables in a database."""
+ ...
+ async def table_exists(self, table_path: TablePath) -> bool:
+ """Check if a table exists."""
+ ...
+ async def drop_partition(
+ self,
+ table_path: TablePath,
+ partition_spec: Dict[str, str],
+ ignore_if_not_exists: bool = False,
+ ) -> None:
+ """Drop a partition from a partitioned table."""
+ ...
async def create_table(
self,
table_path: TablePath,
@@ -203,6 +242,33 @@ class FlussAdmin:
...
def __repr__(self) -> str: ...
+
+class DatabaseDescriptor:
+ """Descriptor for a Fluss database (comment and custom properties)."""
+
+ def __init__(
+ self,
+ comment: Optional[str] = None,
+ custom_properties: Optional[Dict[str, str]] = None,
+ ) -> None: ...
+ @property
+ def comment(self) -> Optional[str]: ...
+ def get_custom_properties(self) -> Dict[str, str]: ...
+ def __repr__(self) -> str: ...
+
+
+class DatabaseInfo:
+ """Information about a Fluss database."""
+
+ @property
+ def database_name(self) -> str: ...
+ def get_database_descriptor(self) -> DatabaseDescriptor: ...
+ @property
+ def created_time(self) -> int: ...
+ @property
+ def modified_time(self) -> int: ...
+ def __repr__(self) -> str: ...
+
class TableScan:
"""Builder for creating log scanners with flexible configuration.
diff --git a/bindings/python/src/admin.rs b/bindings/python/src/admin.rs
index d28c9c0..335aa24 100644
--- a/bindings/python/src/admin.rs
+++ b/bindings/python/src/admin.rs
@@ -17,6 +17,7 @@
use crate::*;
use fcore::rpc::message::OffsetSpec;
+use pyo3::conversion::IntoPyObject;
use pyo3_async_runtimes::tokio::future_into_py;
use std::sync::Arc;
@@ -38,8 +39,7 @@ fn parse_offset_spec(offset_type: &str, timestamp:
Option<i64>) -> PyResult<Offs
Ok(OffsetSpec::Timestamp(ts))
}
_ => Err(FlussError::new_err(format!(
- "Invalid offset_type: '{}'. Must be 'earliest', 'latest', or
'timestamp'",
- offset_type
+ "Invalid offset_type: '{offset_type}'. Must be 'earliest',
'latest', or 'timestamp'"
))),
}
}
@@ -49,8 +49,7 @@ fn validate_bucket_ids(bucket_ids: &[i32]) -> PyResult<()> {
for &bucket_id in bucket_ids {
if bucket_id < 0 {
return Err(FlussError::new_err(format!(
- "Invalid bucket_id: {}. Bucket IDs must be non-negative",
- bucket_id
+ "Invalid bucket_id: {bucket_id}. Bucket IDs must be
non-negative"
)));
}
}
@@ -59,6 +58,226 @@ fn validate_bucket_ids(bucket_ids: &[i32]) -> PyResult<()> {
#[pymethods]
impl FlussAdmin {
+ /// Create a database.
+ ///
+ /// Args:
+ /// database_name: Name of the database
+ /// ignore_if_exists: If True, don't raise error if database already
exists
+ /// database_descriptor: Optional descriptor (comment,
custom_properties)
+ ///
+ /// Returns:
+ /// None
+ #[pyo3(signature = (database_name, ignore_if_exists=false,
database_descriptor=None))]
+ pub fn create_database<'py>(
+ &self,
+ py: Python<'py>,
+ database_name: &str,
+ ignore_if_exists: bool,
+ database_descriptor: Option<&DatabaseDescriptor>,
+ ) -> PyResult<Bound<'py, PyAny>> {
+ let admin = self.__admin.clone();
+ let name = database_name.to_string();
+ let descriptor = database_descriptor.map(|d| d.to_core().clone());
+
+ future_into_py(py, async move {
+ admin
+ .create_database(&name, ignore_if_exists, descriptor.as_ref())
+ .await
+ .map_err(|e| FlussError::new_err(format!("Failed to create
database: {e}")))?;
+
+ Python::attach(|py| Ok(py.None()))
+ })
+ }
+
+ /// Drop a database.
+ ///
+ /// Args:
+ /// database_name: Name of the database
+ /// ignore_if_not_exists: If True, don't raise error if database does
not exist
+ /// cascade: If True, drop tables in the database first
+ ///
+ /// Returns:
+ /// None
+ #[pyo3(signature = (database_name, ignore_if_not_exists=false,
cascade=true))]
+ pub fn drop_database<'py>(
+ &self,
+ py: Python<'py>,
+ database_name: &str,
+ ignore_if_not_exists: bool,
+ cascade: bool,
+ ) -> PyResult<Bound<'py, PyAny>> {
+ let admin = self.__admin.clone();
+ let name = database_name.to_string();
+
+ future_into_py(py, async move {
+ admin
+ .drop_database(&name, ignore_if_not_exists, cascade)
+ .await
+ .map_err(|e| FlussError::new_err(format!("Failed to drop
database: {e}")))?;
+
+ Python::attach(|py| Ok(py.None()))
+ })
+ }
+
+ /// List all databases.
+ ///
+ /// Returns:
+ /// List[str]: Names of all databases
+ pub fn list_databases<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py,
PyAny>> {
+ let admin = self.__admin.clone();
+
+ future_into_py(py, async move {
+ let names = admin
+ .list_databases()
+ .await
+ .map_err(|e| FlussError::new_err(format!("Failed to list
databases: {e}")))?;
+
+ Python::attach(|py| {
+ let py_list = pyo3::types::PyList::empty(py);
+ for name in names {
+ py_list.append(name)?;
+ }
+ Ok(py_list.unbind())
+ })
+ })
+ }
+
+ /// Check if a database exists.
+ ///
+ /// Args:
+ /// database_name: Name of the database
+ ///
+ /// Returns:
+ /// bool: True if the database exists
+ pub fn database_exists<'py>(
+ &self,
+ py: Python<'py>,
+ database_name: &str,
+ ) -> PyResult<Bound<'py, PyAny>> {
+ let admin = self.__admin.clone();
+ let name = database_name.to_string();
+
+ future_into_py(py, async move {
+ let exists = admin.database_exists(&name).await.map_err(|e| {
+ FlussError::new_err(format!("Failed to check database exists:
{e}"))
+ })?;
+
+ Python::attach(|py|
Ok(exists.into_pyobject(py)?.to_owned().into_any().unbind()))
+ })
+ }
+
+ /// Get database information.
+ ///
+ /// Args:
+ /// database_name: Name of the database
+ ///
+ /// Returns:
+ /// DatabaseInfo: Database metadata
+ pub fn get_database_info<'py>(
+ &self,
+ py: Python<'py>,
+ database_name: &str,
+ ) -> PyResult<Bound<'py, PyAny>> {
+ let admin = self.__admin.clone();
+ let name = database_name.to_string();
+
+ future_into_py(py, async move {
+ let info = admin
+ .get_database_info(&name)
+ .await
+ .map_err(|e| FlussError::new_err(format!("Failed to get
database info: {e}")))?;
+
+ Python::attach(|py| Py::new(py, DatabaseInfo::from_core(info)))
+ })
+ }
+
+ /// List all tables in a database.
+ ///
+ /// Args:
+ /// database_name: Name of the database
+ ///
+ /// Returns:
+ /// List[str]: Names of all tables in the database
+ pub fn list_tables<'py>(
+ &self,
+ py: Python<'py>,
+ database_name: &str,
+ ) -> PyResult<Bound<'py, PyAny>> {
+ let admin = self.__admin.clone();
+ let name = database_name.to_string();
+
+ future_into_py(py, async move {
+ let names = admin
+ .list_tables(&name)
+ .await
+ .map_err(|e| FlussError::new_err(format!("Failed to list
tables: {e}")))?;
+
+ Python::attach(|py| {
+ let py_list = pyo3::types::PyList::empty(py);
+ for name in names {
+ py_list.append(name)?;
+ }
+ Ok(py_list.unbind())
+ })
+ })
+ }
+
+ /// Check if a table exists.
+ ///
+ /// Args:
+ /// table_path: Path to the table (database, table)
+ ///
+ /// Returns:
+ /// bool: True if the table exists
+ pub fn table_exists<'py>(
+ &self,
+ py: Python<'py>,
+ table_path: &TablePath,
+ ) -> PyResult<Bound<'py, PyAny>> {
+ let core_table_path = table_path.to_core();
+ let admin = self.__admin.clone();
+
+ future_into_py(py, async move {
+ let exists = admin
+ .table_exists(&core_table_path)
+ .await
+ .map_err(|e| FlussError::new_err(format!("Failed to check
table exists: {e}")))?;
+
+ Python::attach(|py|
Ok(exists.into_pyobject(py)?.to_owned().into_any().unbind()))
+ })
+ }
+
+ /// Drop a partition from a partitioned table.
+ ///
+ /// Args:
+ /// table_path: Path to the table
+ /// partition_spec: Dict mapping partition column name to value (e.g.,
{"region": "US"})
+ /// ignore_if_not_exists: If True, don't raise error if partition does
not exist
+ ///
+ /// Returns:
+ /// None
+ #[pyo3(signature = (table_path, partition_spec,
ignore_if_not_exists=false))]
+ pub fn drop_partition<'py>(
+ &self,
+ py: Python<'py>,
+ table_path: &TablePath,
+ partition_spec: std::collections::HashMap<String, String>,
+ ignore_if_not_exists: bool,
+ ) -> PyResult<Bound<'py, PyAny>> {
+ let core_table_path = table_path.to_core();
+ let admin = self.__admin.clone();
+ let core_partition_spec =
fcore::metadata::PartitionSpec::new(partition_spec);
+
+ future_into_py(py, async move {
+ admin
+ .drop_partition(&core_table_path, &core_partition_spec,
ignore_if_not_exists)
+ .await
+ .map_err(|e| FlussError::new_err(format!("Failed to drop
partition: {e}")))?;
+
+ Python::attach(|py| Ok(py.None()))
+ })
+ }
+
/// Create a table with the given schema
#[pyo3(signature = (table_path, table_descriptor, ignore_if_exists=None))]
pub fn create_table<'py>(
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index f1f4ee6..41f8de5 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -91,6 +91,8 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PartitionInfo>()?;
m.add_class::<OffsetType>()?;
m.add_class::<WriteResultHandle>()?;
+ m.add_class::<DatabaseDescriptor>()?;
+ m.add_class::<DatabaseInfo>()?;
// Register constants
m.add("EARLIEST_OFFSET", fcore::client::EARLIEST_OFFSET)?;
diff --git a/bindings/python/src/metadata.rs b/bindings/python/src/metadata.rs
index f39f9d4..d6b122d 100644
--- a/bindings/python/src/metadata.rs
+++ b/bindings/python/src/metadata.rs
@@ -53,7 +53,7 @@ impl ChangeType {
}
fn __repr__(&self) -> String {
- format!("ChangeType.{:?}", self)
+ format!("ChangeType.{self:?}")
}
}
@@ -657,3 +657,105 @@ impl LakeSnapshot {
}
}
}
+
+/// Descriptor for a Fluss database (comment and custom properties)
+#[pyclass]
+#[derive(Clone)]
+pub struct DatabaseDescriptor {
+ __descriptor: fcore::metadata::DatabaseDescriptor,
+}
+
+#[pymethods]
+impl DatabaseDescriptor {
+ /// Create a new DatabaseDescriptor
+ #[new]
+ #[pyo3(signature = (comment=None, custom_properties=None))]
+ pub fn new(
+ comment: Option<String>,
+ custom_properties: Option<HashMap<String, String>>,
+ ) -> PyResult<Self> {
+ let mut builder = fcore::metadata::DatabaseDescriptor::builder();
+ if let Some(c) = comment {
+ builder = builder.comment(&c);
+ }
+ if let Some(props) = custom_properties {
+ builder = builder.custom_properties(props);
+ }
+ let __descriptor = builder.build();
+ Ok(Self { __descriptor })
+ }
+
+ /// Get comment if set
+ #[getter]
+ pub fn comment(&self) -> Option<String> {
+ self.__descriptor.comment().map(|s| s.to_string())
+ }
+
+ /// Get custom properties
+ pub fn get_custom_properties(&self) -> HashMap<String, String> {
+ self.__descriptor.custom_properties().clone()
+ }
+
+ fn __repr__(&self) -> String {
+ format!(
+ "DatabaseDescriptor(comment={:?}, custom_properties={:?})",
+ self.comment(),
+ self.get_custom_properties()
+ )
+ }
+}
+
+impl DatabaseDescriptor {
+ pub fn to_core(&self) -> &fcore::metadata::DatabaseDescriptor {
+ &self.__descriptor
+ }
+}
+
+/// Information about a Fluss database
+#[pyclass]
+pub struct DatabaseInfo {
+ __info: fcore::metadata::DatabaseInfo,
+}
+
+#[pymethods]
+impl DatabaseInfo {
+ /// Get the database name
+ #[getter]
+ pub fn database_name(&self) -> String {
+ self.__info.database_name().to_string()
+ }
+
+ /// Get the database descriptor
+ pub fn get_database_descriptor(&self) -> DatabaseDescriptor {
+ DatabaseDescriptor {
+ __descriptor: self.__info.database_descriptor().clone(),
+ }
+ }
+
+ /// Get created time
+ #[getter]
+ pub fn created_time(&self) -> i64 {
+ self.__info.created_time()
+ }
+
+ /// Get modified time
+ #[getter]
+ pub fn modified_time(&self) -> i64 {
+ self.__info.modified_time()
+ }
+
+ fn __repr__(&self) -> String {
+ format!(
+ "DatabaseInfo(database_name='{}', created_time={},
modified_time={})",
+ self.database_name(),
+ self.created_time(),
+ self.modified_time()
+ )
+ }
+}
+
+impl DatabaseInfo {
+ pub fn from_core(info: fcore::metadata::DatabaseInfo) -> Self {
+ Self { __info: info }
+ }
+}
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index d926596..cb203dc 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -315,7 +315,7 @@ fn resolve_projection_indices(
let idx = columns
.iter()
.position(|c| c.name() == name)
- .ok_or_else(|| FlussError::new_err(format!("Column '{}'
not found", name)))?;
+ .ok_or_else(|| FlussError::new_err(format!("Column
'{name}' not found")))?;
indices.push(idx);
}
Ok(Some(indices))
@@ -796,9 +796,9 @@ pub fn python_pk_to_generic_row(
let field: &fcore::metadata::DataField = &fields[*pk_idx];
let value = dict
.get_item(pk_name)?
- .ok_or_else(|| FlussError::new_err(format!("Missing PK
field: {}", pk_name)))?;
+ .ok_or_else(|| FlussError::new_err(format!("Missing PK
field: {pk_name}")))?;
datums[*pk_idx] = python_value_to_datum(&value,
field.data_type())
- .map_err(|e| FlussError::new_err(format!("PK field '{}':
{}", pk_name, e)))?;
+ .map_err(|e| FlussError::new_err(format!("PK field
'{pk_name}': {e}")))?;
}
}