This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new a75540b  feat: support database-level ops in python (#279)
a75540b is described below

commit a75540bd3c00091e7b6c0f12add104d654aac700
Author: yuxia Luo <[email protected]>
AuthorDate: Sun Feb 8 12:59:34 2026 +0800

    feat: support database-level ops in python (#279)
---
 bindings/cpp/src/types.rs          |   2 +-
 bindings/python/fluss/__init__.pyi |  66 +++++++++++
 bindings/python/src/admin.rs       | 227 ++++++++++++++++++++++++++++++++++++-
 bindings/python/src/lib.rs         |   2 +
 bindings/python/src/metadata.rs    | 104 ++++++++++++++++-
 bindings/python/src/table.rs       |   6 +-
 6 files changed, 398 insertions(+), 9 deletions(-)

diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs
index 7837032..05d3d6a 100644
--- a/bindings/cpp/src/types.rs
+++ b/bindings/cpp/src/types.rs
@@ -257,7 +257,7 @@ fn get_decimal_type(idx: usize, schema: 
Option<&fcore::metadata::Schema>) -> Res
         .ok_or_else(|| anyhow!("Schema not available for decimal column 
{idx}"))?;
     match col.data_type() {
         fcore::metadata::DataType::Decimal(dt) => Ok((dt.precision(), 
dt.scale())),
-        other => Err(anyhow!("Column {idx} is {:?}, not Decimal", other)),
+        other => Err(anyhow!("Column {idx} is {other:?}, not Decimal")),
     }
 }
 
diff --git a/bindings/python/fluss/__init__.pyi 
b/bindings/python/fluss/__init__.pyi
index 50f3b20..a9ef828 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -121,6 +121,45 @@ class FlussConnection:
     def __repr__(self) -> str: ...
 
 class FlussAdmin:
+    async def create_database(
+        self,
+        database_name: str,
+        ignore_if_exists: bool = False,
+        database_descriptor: Optional["DatabaseDescriptor"] = None,
+    ) -> None:
+        """Create a database."""
+        ...
+    async def drop_database(
+        self,
+        database_name: str,
+        ignore_if_not_exists: bool = False,
+        cascade: bool = True,
+    ) -> None:
+        """Drop a database."""
+        ...
+    async def list_databases(self) -> List[str]:
+        """List all databases."""
+        ...
+    async def database_exists(self, database_name: str) -> bool:
+        """Check if a database exists."""
+        ...
+    async def get_database_info(self, database_name: str) -> "DatabaseInfo":
+        """Get database information."""
+        ...
+    async def list_tables(self, database_name: str) -> List[str]:
+        """List all tables in a database."""
+        ...
+    async def table_exists(self, table_path: TablePath) -> bool:
+        """Check if a table exists."""
+        ...
+    async def drop_partition(
+        self,
+        table_path: TablePath,
+        partition_spec: Dict[str, str],
+        ignore_if_not_exists: bool = False,
+    ) -> None:
+        """Drop a partition from a partitioned table."""
+        ...
     async def create_table(
         self,
         table_path: TablePath,
@@ -203,6 +242,33 @@ class FlussAdmin:
         ...
     def __repr__(self) -> str: ...
 
+
+class DatabaseDescriptor:
+    """Descriptor for a Fluss database (comment and custom properties)."""
+
+    def __init__(
+        self,
+        comment: Optional[str] = None,
+        custom_properties: Optional[Dict[str, str]] = None,
+    ) -> None: ...
+    @property
+    def comment(self) -> Optional[str]: ...
+    def get_custom_properties(self) -> Dict[str, str]: ...
+    def __repr__(self) -> str: ...
+
+
+class DatabaseInfo:
+    """Information about a Fluss database."""
+
+    @property
+    def database_name(self) -> str: ...
+    def get_database_descriptor(self) -> DatabaseDescriptor: ...
+    @property
+    def created_time(self) -> int: ...
+    @property
+    def modified_time(self) -> int: ...
+    def __repr__(self) -> str: ...
+
 class TableScan:
     """Builder for creating log scanners with flexible configuration.
 
diff --git a/bindings/python/src/admin.rs b/bindings/python/src/admin.rs
index d28c9c0..335aa24 100644
--- a/bindings/python/src/admin.rs
+++ b/bindings/python/src/admin.rs
@@ -17,6 +17,7 @@
 
 use crate::*;
 use fcore::rpc::message::OffsetSpec;
+use pyo3::conversion::IntoPyObject;
 use pyo3_async_runtimes::tokio::future_into_py;
 use std::sync::Arc;
 
@@ -38,8 +39,7 @@ fn parse_offset_spec(offset_type: &str, timestamp: 
Option<i64>) -> PyResult<Offs
             Ok(OffsetSpec::Timestamp(ts))
         }
         _ => Err(FlussError::new_err(format!(
-            "Invalid offset_type: '{}'. Must be 'earliest', 'latest', or 
'timestamp'",
-            offset_type
+            "Invalid offset_type: '{offset_type}'. Must be 'earliest', 
'latest', or 'timestamp'"
         ))),
     }
 }
@@ -49,8 +49,7 @@ fn validate_bucket_ids(bucket_ids: &[i32]) -> PyResult<()> {
     for &bucket_id in bucket_ids {
         if bucket_id < 0 {
             return Err(FlussError::new_err(format!(
-                "Invalid bucket_id: {}. Bucket IDs must be non-negative",
-                bucket_id
+                "Invalid bucket_id: {bucket_id}. Bucket IDs must be 
non-negative"
             )));
         }
     }
@@ -59,6 +58,226 @@ fn validate_bucket_ids(bucket_ids: &[i32]) -> PyResult<()> {
 
 #[pymethods]
 impl FlussAdmin {
+    /// Create a database.
+    ///
+    /// Args:
+    ///     database_name: Name of the database
+    ///     ignore_if_exists: If True, don't raise error if database already 
exists
+    ///     database_descriptor: Optional descriptor (comment, 
custom_properties)
+    ///
+    /// Returns:
+    ///     None
+    #[pyo3(signature = (database_name, ignore_if_exists=false, 
database_descriptor=None))]
+    pub fn create_database<'py>(
+        &self,
+        py: Python<'py>,
+        database_name: &str,
+        ignore_if_exists: bool,
+        database_descriptor: Option<&DatabaseDescriptor>,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let admin = self.__admin.clone();
+        let name = database_name.to_string();
+        let descriptor = database_descriptor.map(|d| d.to_core().clone());
+
+        future_into_py(py, async move {
+            admin
+                .create_database(&name, ignore_if_exists, descriptor.as_ref())
+                .await
+                .map_err(|e| FlussError::new_err(format!("Failed to create 
database: {e}")))?;
+
+            Python::attach(|py| Ok(py.None()))
+        })
+    }
+
+    /// Drop a database.
+    ///
+    /// Args:
+    ///     database_name: Name of the database
+    ///     ignore_if_not_exists: If True, don't raise error if database does 
not exist
+    ///     cascade: If True, drop tables in the database first
+    ///
+    /// Returns:
+    ///     None
+    #[pyo3(signature = (database_name, ignore_if_not_exists=false, 
cascade=true))]
+    pub fn drop_database<'py>(
+        &self,
+        py: Python<'py>,
+        database_name: &str,
+        ignore_if_not_exists: bool,
+        cascade: bool,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let admin = self.__admin.clone();
+        let name = database_name.to_string();
+
+        future_into_py(py, async move {
+            admin
+                .drop_database(&name, ignore_if_not_exists, cascade)
+                .await
+                .map_err(|e| FlussError::new_err(format!("Failed to drop 
database: {e}")))?;
+
+            Python::attach(|py| Ok(py.None()))
+        })
+    }
+
+    /// List all databases.
+    ///
+    /// Returns:
+    ///     List[str]: Names of all databases
+    pub fn list_databases<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, 
PyAny>> {
+        let admin = self.__admin.clone();
+
+        future_into_py(py, async move {
+            let names = admin
+                .list_databases()
+                .await
+                .map_err(|e| FlussError::new_err(format!("Failed to list 
databases: {e}")))?;
+
+            Python::attach(|py| {
+                let py_list = pyo3::types::PyList::empty(py);
+                for name in names {
+                    py_list.append(name)?;
+                }
+                Ok(py_list.unbind())
+            })
+        })
+    }
+
+    /// Check if a database exists.
+    ///
+    /// Args:
+    ///     database_name: Name of the database
+    ///
+    /// Returns:
+    ///     bool: True if the database exists
+    pub fn database_exists<'py>(
+        &self,
+        py: Python<'py>,
+        database_name: &str,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let admin = self.__admin.clone();
+        let name = database_name.to_string();
+
+        future_into_py(py, async move {
+            let exists = admin.database_exists(&name).await.map_err(|e| {
+                FlussError::new_err(format!("Failed to check database exists: 
{e}"))
+            })?;
+
+            Python::attach(|py| 
Ok(exists.into_pyobject(py)?.to_owned().into_any().unbind()))
+        })
+    }
+
+    /// Get database information.
+    ///
+    /// Args:
+    ///     database_name: Name of the database
+    ///
+    /// Returns:
+    ///     DatabaseInfo: Database metadata
+    pub fn get_database_info<'py>(
+        &self,
+        py: Python<'py>,
+        database_name: &str,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let admin = self.__admin.clone();
+        let name = database_name.to_string();
+
+        future_into_py(py, async move {
+            let info = admin
+                .get_database_info(&name)
+                .await
+                .map_err(|e| FlussError::new_err(format!("Failed to get 
database info: {e}")))?;
+
+            Python::attach(|py| Py::new(py, DatabaseInfo::from_core(info)))
+        })
+    }
+
+    /// List all tables in a database.
+    ///
+    /// Args:
+    ///     database_name: Name of the database
+    ///
+    /// Returns:
+    ///     List[str]: Names of all tables in the database
+    pub fn list_tables<'py>(
+        &self,
+        py: Python<'py>,
+        database_name: &str,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let admin = self.__admin.clone();
+        let name = database_name.to_string();
+
+        future_into_py(py, async move {
+            let names = admin
+                .list_tables(&name)
+                .await
+                .map_err(|e| FlussError::new_err(format!("Failed to list 
tables: {e}")))?;
+
+            Python::attach(|py| {
+                let py_list = pyo3::types::PyList::empty(py);
+                for name in names {
+                    py_list.append(name)?;
+                }
+                Ok(py_list.unbind())
+            })
+        })
+    }
+
+    /// Check if a table exists.
+    ///
+    /// Args:
+    ///     table_path: Path to the table (database, table)
+    ///
+    /// Returns:
+    ///     bool: True if the table exists
+    pub fn table_exists<'py>(
+        &self,
+        py: Python<'py>,
+        table_path: &TablePath,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let core_table_path = table_path.to_core();
+        let admin = self.__admin.clone();
+
+        future_into_py(py, async move {
+            let exists = admin
+                .table_exists(&core_table_path)
+                .await
+                .map_err(|e| FlussError::new_err(format!("Failed to check 
table exists: {e}")))?;
+
+            Python::attach(|py| 
Ok(exists.into_pyobject(py)?.to_owned().into_any().unbind()))
+        })
+    }
+
+    /// Drop a partition from a partitioned table.
+    ///
+    /// Args:
+    ///     table_path: Path to the table
+    ///     partition_spec: Dict mapping partition column name to value (e.g., 
{"region": "US"})
+    ///     ignore_if_not_exists: If True, don't raise error if partition does 
not exist
+    ///
+    /// Returns:
+    ///     None
+    #[pyo3(signature = (table_path, partition_spec, 
ignore_if_not_exists=false))]
+    pub fn drop_partition<'py>(
+        &self,
+        py: Python<'py>,
+        table_path: &TablePath,
+        partition_spec: std::collections::HashMap<String, String>,
+        ignore_if_not_exists: bool,
+    ) -> PyResult<Bound<'py, PyAny>> {
+        let core_table_path = table_path.to_core();
+        let admin = self.__admin.clone();
+        let core_partition_spec = 
fcore::metadata::PartitionSpec::new(partition_spec);
+
+        future_into_py(py, async move {
+            admin
+                .drop_partition(&core_table_path, &core_partition_spec, 
ignore_if_not_exists)
+                .await
+                .map_err(|e| FlussError::new_err(format!("Failed to drop 
partition: {e}")))?;
+
+            Python::attach(|py| Ok(py.None()))
+        })
+    }
+
     /// Create a table with the given schema
     #[pyo3(signature = (table_path, table_descriptor, ignore_if_exists=None))]
     pub fn create_table<'py>(
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index f1f4ee6..41f8de5 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -91,6 +91,8 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> {
     m.add_class::<PartitionInfo>()?;
     m.add_class::<OffsetType>()?;
     m.add_class::<WriteResultHandle>()?;
+    m.add_class::<DatabaseDescriptor>()?;
+    m.add_class::<DatabaseInfo>()?;
 
     // Register constants
     m.add("EARLIEST_OFFSET", fcore::client::EARLIEST_OFFSET)?;
diff --git a/bindings/python/src/metadata.rs b/bindings/python/src/metadata.rs
index f39f9d4..d6b122d 100644
--- a/bindings/python/src/metadata.rs
+++ b/bindings/python/src/metadata.rs
@@ -53,7 +53,7 @@ impl ChangeType {
     }
 
     fn __repr__(&self) -> String {
-        format!("ChangeType.{:?}", self)
+        format!("ChangeType.{self:?}")
     }
 }
 
@@ -657,3 +657,105 @@ impl LakeSnapshot {
         }
     }
 }
+
+/// Descriptor for a Fluss database (comment and custom properties)
+#[pyclass]
+#[derive(Clone)]
+pub struct DatabaseDescriptor {
+    __descriptor: fcore::metadata::DatabaseDescriptor,
+}
+
+#[pymethods]
+impl DatabaseDescriptor {
+    /// Create a new DatabaseDescriptor
+    #[new]
+    #[pyo3(signature = (comment=None, custom_properties=None))]
+    pub fn new(
+        comment: Option<String>,
+        custom_properties: Option<HashMap<String, String>>,
+    ) -> PyResult<Self> {
+        let mut builder = fcore::metadata::DatabaseDescriptor::builder();
+        if let Some(c) = comment {
+            builder = builder.comment(&c);
+        }
+        if let Some(props) = custom_properties {
+            builder = builder.custom_properties(props);
+        }
+        let __descriptor = builder.build();
+        Ok(Self { __descriptor })
+    }
+
+    /// Get comment if set
+    #[getter]
+    pub fn comment(&self) -> Option<String> {
+        self.__descriptor.comment().map(|s| s.to_string())
+    }
+
+    /// Get custom properties
+    pub fn get_custom_properties(&self) -> HashMap<String, String> {
+        self.__descriptor.custom_properties().clone()
+    }
+
+    fn __repr__(&self) -> String {
+        format!(
+            "DatabaseDescriptor(comment={:?}, custom_properties={:?})",
+            self.comment(),
+            self.get_custom_properties()
+        )
+    }
+}
+
+impl DatabaseDescriptor {
+    pub fn to_core(&self) -> &fcore::metadata::DatabaseDescriptor {
+        &self.__descriptor
+    }
+}
+
+/// Information about a Fluss database
+#[pyclass]
+pub struct DatabaseInfo {
+    __info: fcore::metadata::DatabaseInfo,
+}
+
+#[pymethods]
+impl DatabaseInfo {
+    /// Get the database name
+    #[getter]
+    pub fn database_name(&self) -> String {
+        self.__info.database_name().to_string()
+    }
+
+    /// Get the database descriptor
+    pub fn get_database_descriptor(&self) -> DatabaseDescriptor {
+        DatabaseDescriptor {
+            __descriptor: self.__info.database_descriptor().clone(),
+        }
+    }
+
+    /// Get created time
+    #[getter]
+    pub fn created_time(&self) -> i64 {
+        self.__info.created_time()
+    }
+
+    /// Get modified time
+    #[getter]
+    pub fn modified_time(&self) -> i64 {
+        self.__info.modified_time()
+    }
+
+    fn __repr__(&self) -> String {
+        format!(
+            "DatabaseInfo(database_name='{}', created_time={}, 
modified_time={})",
+            self.database_name(),
+            self.created_time(),
+            self.modified_time()
+        )
+    }
+}
+
+impl DatabaseInfo {
+    pub fn from_core(info: fcore::metadata::DatabaseInfo) -> Self {
+        Self { __info: info }
+    }
+}
diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs
index d926596..cb203dc 100644
--- a/bindings/python/src/table.rs
+++ b/bindings/python/src/table.rs
@@ -315,7 +315,7 @@ fn resolve_projection_indices(
                 let idx = columns
                     .iter()
                     .position(|c| c.name() == name)
-                    .ok_or_else(|| FlussError::new_err(format!("Column '{}' 
not found", name)))?;
+                    .ok_or_else(|| FlussError::new_err(format!("Column 
'{name}' not found")))?;
                 indices.push(idx);
             }
             Ok(Some(indices))
@@ -796,9 +796,9 @@ pub fn python_pk_to_generic_row(
                 let field: &fcore::metadata::DataField = &fields[*pk_idx];
                 let value = dict
                     .get_item(pk_name)?
-                    .ok_or_else(|| FlussError::new_err(format!("Missing PK 
field: {}", pk_name)))?;
+                    .ok_or_else(|| FlussError::new_err(format!("Missing PK 
field: {pk_name}")))?;
                 datums[*pk_idx] = python_value_to_datum(&value, 
field.data_type())
-                    .map_err(|e| FlussError::new_err(format!("PK field '{}': 
{}", pk_name, e)))?;
+                    .map_err(|e| FlussError::new_err(format!("PK field 
'{pk_name}': {e}")))?;
             }
         }
 

Reply via email to