This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch support-database-level-ops in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
commit 744e51d384f495af093686876a3ee34968dccbe6 Author: luoyuxia <[email protected]> AuthorDate: Sun Feb 8 11:45:35 2026 +0800 feat: support database-level ops --- bindings/python/fluss/__init__.pyi | 66 +++++++++++ bindings/python/src/admin.rs | 237 +++++++++++++++++++++++++++++++++++++ bindings/python/src/lib.rs | 2 + bindings/python/src/metadata.rs | 102 ++++++++++++++++ 4 files changed, 407 insertions(+) diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index 50f3b20..a9ef828 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -121,6 +121,45 @@ class FlussConnection: def __repr__(self) -> str: ... class FlussAdmin: + async def create_database( + self, + database_name: str, + ignore_if_exists: bool = False, + database_descriptor: Optional["DatabaseDescriptor"] = None, + ) -> None: + """Create a database.""" + ... + async def drop_database( + self, + database_name: str, + ignore_if_not_exists: bool = False, + cascade: bool = True, + ) -> None: + """Drop a database.""" + ... + async def list_databases(self) -> List[str]: + """List all databases.""" + ... + async def database_exists(self, database_name: str) -> bool: + """Check if a database exists.""" + ... + async def get_database_info(self, database_name: str) -> "DatabaseInfo": + """Get database information.""" + ... + async def list_tables(self, database_name: str) -> List[str]: + """List all tables in a database.""" + ... + async def table_exists(self, table_path: TablePath) -> bool: + """Check if a table exists.""" + ... + async def drop_partition( + self, + table_path: TablePath, + partition_spec: Dict[str, str], + ignore_if_not_exists: bool = False, + ) -> None: + """Drop a partition from a partitioned table.""" + ... async def create_table( self, table_path: TablePath, @@ -203,6 +242,33 @@ class FlussAdmin: ... def __repr__(self) -> str: ... + +class DatabaseDescriptor: + """Descriptor for a Fluss database (comment and custom properties).""" + + def __init__( + self, + comment: Optional[str] = None, + custom_properties: Optional[Dict[str, str]] = None, + ) -> None: ... + @property + def comment(self) -> Optional[str]: ... + def get_custom_properties(self) -> Dict[str, str]: ... + def __repr__(self) -> str: ... + + +class DatabaseInfo: + """Information about a Fluss database.""" + + @property + def database_name(self) -> str: ... + def get_database_descriptor(self) -> DatabaseDescriptor: ... + @property + def created_time(self) -> int: ... + @property + def modified_time(self) -> int: ... + def __repr__(self) -> str: ... + class TableScan: """Builder for creating log scanners with flexible configuration. diff --git a/bindings/python/src/admin.rs b/bindings/python/src/admin.rs index d28c9c0..d5b4ddb 100644 --- a/bindings/python/src/admin.rs +++ b/bindings/python/src/admin.rs @@ -59,6 +59,243 @@ fn validate_bucket_ids(bucket_ids: &[i32]) -> PyResult<()> { #[pymethods] impl FlussAdmin { + /// Create a database. + /// + /// Args: + /// database_name: Name of the database + /// ignore_if_exists: If True, don't raise error if database already exists + /// database_descriptor: Optional descriptor (comment, custom_properties) + /// + /// Returns: + /// None + #[pyo3(signature = (database_name, ignore_if_exists=false, database_descriptor=None))] + pub fn create_database<'py>( + &self, + py: Python<'py>, + database_name: &str, + ignore_if_exists: bool, + database_descriptor: Option<&DatabaseDescriptor>, + ) -> PyResult<Bound<'py, PyAny>> { + let admin = self.__admin.clone(); + let name = database_name.to_string(); + let descriptor = database_descriptor.map(|d| d.to_core().clone()); + + future_into_py(py, async move { + admin + .create_database(&name, ignore_if_exists, descriptor.as_ref()) + .await + .map_err(|e| FlussError::new_err(format!("Failed to create database: {e}")))?; + + Python::attach(|py| Ok(py.None())) + }) + } + + /// Drop a database. + /// + /// Args: + /// database_name: Name of the database + /// ignore_if_not_exists: If True, don't raise error if database does not exist + /// cascade: If True, drop tables in the database first + /// + /// Returns: + /// None + #[pyo3(signature = (database_name, ignore_if_not_exists=false, cascade=true))] + pub fn drop_database<'py>( + &self, + py: Python<'py>, + database_name: &str, + ignore_if_not_exists: bool, + cascade: bool, + ) -> PyResult<Bound<'py, PyAny>> { + let admin = self.__admin.clone(); + let name = database_name.to_string(); + + future_into_py(py, async move { + admin + .drop_database(&name, ignore_if_not_exists, cascade) + .await + .map_err(|e| FlussError::new_err(format!("Failed to drop database: {e}")))?; + + Python::attach(|py| Ok(py.None())) + }) + } + + /// List all databases. + /// + /// Returns: + /// List[str]: Names of all databases + pub fn list_databases<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> { + let admin = self.__admin.clone(); + + future_into_py(py, async move { + let names = admin + .list_databases() + .await + .map_err(|e| FlussError::new_err(format!("Failed to list databases: {e}")))?; + + Python::attach(|py| { + let py_list = pyo3::types::PyList::empty(py); + for name in names { + py_list.append(name)?; + } + Ok(py_list.unbind()) + }) + }) + } + + /// Check if a database exists. + /// + /// Args: + /// database_name: Name of the database + /// + /// Returns: + /// bool: True if the database exists + pub fn database_exists<'py>( + &self, + py: Python<'py>, + database_name: &str, + ) -> PyResult<Bound<'py, PyAny>> { + let admin = self.__admin.clone(); + let name = database_name.to_string(); + + future_into_py(py, async move { + let exists = admin + .database_exists(&name) + .await + .map_err(|e| FlussError::new_err(format!("Failed to check database exists: {e}")))?; + + Python::attach(|py| { + let builtins = py.import("builtins")?; + let val = if exists { + builtins.getattr("True")? + } else { + builtins.getattr("False")? + }; + Ok(val.unbind()) + }) + }) + } + + /// Get database information. + /// + /// Args: + /// database_name: Name of the database + /// + /// Returns: + /// DatabaseInfo: Database metadata + pub fn get_database_info<'py>( + &self, + py: Python<'py>, + database_name: &str, + ) -> PyResult<Bound<'py, PyAny>> { + let admin = self.__admin.clone(); + let name = database_name.to_string(); + + future_into_py(py, async move { + let info = admin + .get_database_info(&name) + .await + .map_err(|e| FlussError::new_err(format!("Failed to get database info: {e}")))?; + + Python::attach(|py| Py::new(py, DatabaseInfo::from_core(info))) + }) + } + + /// List all tables in a database. + /// + /// Args: + /// database_name: Name of the database + /// + /// Returns: + /// List[str]: Names of all tables in the database + pub fn list_tables<'py>( + &self, + py: Python<'py>, + database_name: &str, + ) -> PyResult<Bound<'py, PyAny>> { + let admin = self.__admin.clone(); + let name = database_name.to_string(); + + future_into_py(py, async move { + let names = admin + .list_tables(&name) + .await + .map_err(|e| FlussError::new_err(format!("Failed to list tables: {e}")))?; + + Python::attach(|py| { + let py_list = pyo3::types::PyList::empty(py); + for name in names { + py_list.append(name)?; + } + Ok(py_list.unbind()) + }) + }) + } + + /// Check if a table exists. + /// + /// Args: + /// table_path: Path to the table (database, table) + /// + /// Returns: + /// bool: True if the table exists + pub fn table_exists<'py>( + &self, + py: Python<'py>, + table_path: &TablePath, + ) -> PyResult<Bound<'py, PyAny>> { + let core_table_path = table_path.to_core(); + let admin = self.__admin.clone(); + + future_into_py(py, async move { + let exists = admin + .table_exists(&core_table_path) + .await + .map_err(|e| FlussError::new_err(format!("Failed to check table exists: {e}")))?; + + Python::attach(|py| { + let builtins = py.import("builtins")?; + let val = if exists { + builtins.getattr("True")? + } else { + builtins.getattr("False")? + }; + Ok(val.unbind()) + }) + }) + } + + /// Drop a partition from a partitioned table. + /// + /// Args: + /// table_path: Path to the table + /// partition_spec: Dict mapping partition column name to value (e.g., {"region": "US"}) + /// ignore_if_not_exists: If True, don't raise error if partition does not exist + /// + /// Returns: + /// None + #[pyo3(signature = (table_path, partition_spec, ignore_if_not_exists=false))] + pub fn drop_partition<'py>( + &self, + py: Python<'py>, + table_path: &TablePath, + partition_spec: std::collections::HashMap<String, String>, + ignore_if_not_exists: bool, + ) -> PyResult<Bound<'py, PyAny>> { + let core_table_path = table_path.to_core(); + let admin = self.__admin.clone(); + let core_partition_spec = fcore::metadata::PartitionSpec::new(partition_spec); + + future_into_py(py, async move { + admin + .drop_partition(&core_table_path, &core_partition_spec, ignore_if_not_exists) + .await + .map_err(|e| FlussError::new_err(format!("Failed to drop partition: {e}")))?; + + Python::attach(|py| Ok(py.None())) + }) + } + /// Create a table with the given schema #[pyo3(signature = (table_path, table_descriptor, ignore_if_exists=None))] pub fn create_table<'py>( diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs index f1f4ee6..41f8de5 100644 --- a/bindings/python/src/lib.rs +++ b/bindings/python/src/lib.rs @@ -91,6 +91,8 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::<PartitionInfo>()?; m.add_class::<OffsetType>()?; m.add_class::<WriteResultHandle>()?; + m.add_class::<DatabaseDescriptor>()?; + m.add_class::<DatabaseInfo>()?; // Register constants m.add("EARLIEST_OFFSET", fcore::client::EARLIEST_OFFSET)?; diff --git a/bindings/python/src/metadata.rs b/bindings/python/src/metadata.rs index f39f9d4..93cbebf 100644 --- a/bindings/python/src/metadata.rs +++ b/bindings/python/src/metadata.rs @@ -657,3 +657,105 @@ impl LakeSnapshot { } } } + +/// Descriptor for a Fluss database (comment and custom properties) +#[pyclass] +#[derive(Clone)] +pub struct DatabaseDescriptor { + __descriptor: fcore::metadata::DatabaseDescriptor, +} + +#[pymethods] +impl DatabaseDescriptor { + /// Create a new DatabaseDescriptor + #[new] + #[pyo3(signature = (comment=None, custom_properties=None))] + pub fn new( + comment: Option<String>, + custom_properties: Option<HashMap<String, String>>, + ) -> PyResult<Self> { + let mut builder = fcore::metadata::DatabaseDescriptor::builder(); + if let Some(c) = comment { + builder = builder.comment(&c); + } + if let Some(props) = custom_properties { + builder = builder.custom_properties(props); + } + let __descriptor = builder.build(); + Ok(Self { __descriptor }) + } + + /// Get comment if set + #[getter] + pub fn comment(&self) -> Option<String> { + self.__descriptor.comment().map(|s| s.to_string()) + } + + /// Get custom properties + pub fn get_custom_properties(&self) -> HashMap<String, String> { + self.__descriptor.custom_properties().clone() + } + + fn __repr__(&self) -> String { + format!( + "DatabaseDescriptor(comment={:?}, custom_properties={:?})", + self.comment(), + self.get_custom_properties() + ) + } +} + +impl DatabaseDescriptor { + pub fn to_core(&self) -> &fcore::metadata::DatabaseDescriptor { + &self.__descriptor + } +} + +/// Information about a Fluss database +#[pyclass] +pub struct DatabaseInfo { + __info: fcore::metadata::DatabaseInfo, +} + +#[pymethods] +impl DatabaseInfo { + /// Get the database name + #[getter] + pub fn database_name(&self) -> String { + self.__info.database_name().to_string() + } + + /// Get the database descriptor + pub fn get_database_descriptor(&self) -> DatabaseDescriptor { + DatabaseDescriptor { + __descriptor: self.__info.database_descriptor().clone(), + } + } + + /// Get created time + #[getter] + pub fn created_time(&self) -> i64 { + self.__info.created_time() + } + + /// Get modified time + #[getter] + pub fn modified_time(&self) -> i64 { + self.__info.modified_time() + } + + fn __repr__(&self) -> String { + format!( + "DatabaseInfo(database_name='{}', created_time={}, modified_time={})", + self.database_name(), + self.created_time(), + self.modified_time() + ) + } +} + +impl DatabaseInfo { + pub fn from_core(info: fcore::metadata::DatabaseInfo) -> Self { + Self { __info: info } + } +}
