This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new ad40acc  feat: expose catalog metadata api for python (#306)
ad40acc is described below

commit ad40acc0c74e94d42b312daae22d92a3000db595
Author: SeungMin <[email protected]>
AuthorDate: Sun May 24 22:22:43 2026 +0900

    feat: expose catalog metadata api for python (#306)
---
 .../python/python/pypaimon_rust/datafusion.pyi     | 21 +++++
 bindings/python/src/context.rs                     | 51 +++++++++---
 bindings/python/src/lib.rs                         |  2 +
 bindings/python/src/schema.rs                      | 92 ++++++++++++++++++++++
 bindings/python/src/{lib.rs => table.rs}           | 36 +++++++--
 bindings/python/tests/test_datafusion.py           | 22 ++++++
 6 files changed, 207 insertions(+), 17 deletions(-)

diff --git a/bindings/python/python/pypaimon_rust/datafusion.pyi 
b/bindings/python/python/pypaimon_rust/datafusion.pyi
index dfa63c7..172c1fe 100644
--- a/bindings/python/python/pypaimon_rust/datafusion.pyi
+++ b/bindings/python/python/pypaimon_rust/datafusion.pyi
@@ -23,9 +23,30 @@ ArrowTypeLike: TypeAlias = Union[pyarrow.DataType, 
pyarrow.Field, str]
 InputFieldsLike: TypeAlias = Union[ArrowTypeLike, Sequence[ArrowTypeLike]]
 VolatilityLike: TypeAlias = Union[str, Any]
 
+class DataField:
+    def name(self) -> str: ...
+    def field_type(self) -> str: ...
+    def is_nullable(self) -> bool: ...
+    def description(self) -> Optional[str]: ...
+
+class TableSchema:
+    def fields(self) -> List[DataField]: ...
+    def partition_keys(self) -> List[str]: ...
+    def primary_keys(self) -> List[str]: ...
+    def options(self) -> Dict[str, str]: ...
+    def comment(self) -> Optional[str]: ...
+
+class Table:
+    def identifier(self) -> str: ...
+    def location(self) -> str: ...
+    def schema(self) -> TableSchema: ...
+
 class PaimonCatalog:
     def __init__(self, catalog_options: Dict[str, str]) -> None: ...
     def __datafusion_catalog_provider__(self, session: Any) -> object: ...
+    def list_databases(self) -> List[str]: ...
+    def list_tables(self, database_name: str) -> List[str]: ...
+    def get_table(self, identifier: str) -> Table: ...
 
 class PythonScalarUDF:
     def __init__(
diff --git a/bindings/python/src/context.rs b/bindings/python/src/context.rs
index f65d6a1..cc855ee 100644
--- a/bindings/python/src/context.rs
+++ b/bindings/python/src/context.rs
@@ -24,25 +24,25 @@ use datafusion::catalog::CatalogProvider;
 use datafusion::logical_expr::{Signature, TypeSignature, Volatility};
 use datafusion_ffi::catalog_provider::FFI_CatalogProvider;
 use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
-use paimon::{CatalogFactory, Options};
+use paimon::catalog::Identifier;
+use paimon::{Catalog, CatalogFactory, Options};
 use paimon_datafusion::{PaimonCatalogProvider, SQLContext};
-use pyo3::exceptions::PyRuntimeWarning;
+use pyo3::exceptions::{PyRuntimeWarning, PyValueError};
 use pyo3::prelude::*;
 use pyo3::types::PyCapsule;
 
 use crate::blob::PyBlobReaderRegistry;
 use crate::error::{df_to_py_err, to_py_err};
+use crate::table::PyTable;
 use crate::udf::{build_python_scalar_udf, udf, PyPythonScalarUDFObject};
 use paimon_datafusion::runtime::runtime;
 
-fn build_paimon_catalog_provider(
-    catalog_options: HashMap<String, String>,
-) -> PyResult<Arc<PaimonCatalogProvider>> {
+fn build_paimon_catalog(catalog_options: HashMap<String, String>) -> 
PyResult<Arc<dyn Catalog>> {
     let rt = runtime();
     rt.block_on(async {
         let options = Options::from_map(catalog_options);
         let catalog = 
CatalogFactory::create(options).await.map_err(to_py_err)?;
-        Ok::<_, PyErr>(Arc::new(PaimonCatalogProvider::new(catalog)))
+        Ok::<_, PyErr>(catalog)
     })
 }
 
@@ -65,6 +65,7 @@ fn ffi_logical_codec_from_pycapsule(obj: Bound<'_, PyAny>) -> 
PyResult<FFI_Logic
 /// A Paimon catalog exportable to Python DataFusion `SessionContext`.
 #[pyclass(name = "PaimonCatalog")]
 pub struct PaimonCatalog {
+    catalog: Arc<dyn Catalog>,
     provider: Arc<PaimonCatalogProvider>,
 }
 
@@ -73,9 +74,9 @@ impl PaimonCatalog {
     /// Create a Paimon catalog that can be registered into a DataFusion 
session.
     #[new]
     fn new(catalog_options: HashMap<String, String>) -> PyResult<Self> {
-        Ok(Self {
-            provider: build_paimon_catalog_provider(catalog_options)?,
-        })
+        let catalog = build_paimon_catalog(catalog_options)?;
+        let provider = 
Arc::new(PaimonCatalogProvider::new(Arc::clone(&catalog)));
+        Ok(Self { catalog, provider })
     }
 
     /// Export this catalog as a DataFusion catalog provider PyCapsule.
@@ -90,6 +91,35 @@ impl PaimonCatalog {
         let provider = FFI_CatalogProvider::new_with_ffi_codec(provider, 
Some(runtime()), codec);
         PyCapsule::new(py, provider, Some(name))
     }
+
+    /// List all databases in this catalog.
+    fn list_databases(&self) -> PyResult<Vec<String>> {
+        runtime()
+            .block_on(self.catalog.list_databases())
+            .map_err(to_py_err)
+    }
+
+    /// List all tables in the given database.
+    fn list_tables(&self, database_name: &str) -> PyResult<Vec<String>> {
+        runtime()
+            .block_on(self.catalog.list_tables(database_name))
+            .map_err(to_py_err)
+    }
+
+    /// Get a table handle by `"db.table"` identifier.
+    fn get_table(&self, identifier: &str) -> PyResult<PyTable> {
+        let parts: Vec<&str> = identifier.splitn(2, '.').collect();
+        if parts.len() != 2 || parts[0].is_empty() || parts[1].is_empty() {
+            return Err(PyValueError::new_err(format!(
+                "expected identifier in 'db.table' format, got '{identifier}'"
+            )));
+        }
+        let id = Identifier::new(parts[0], parts[1]);
+        let table = runtime()
+            .block_on(self.catalog.get_table(&id))
+            .map_err(to_py_err)?;
+        Ok(PyTable::new(Arc::new(table)))
+    }
 }
 
 /// A SQL context that supports registering multiple Paimon catalogs and 
executing SQL.
@@ -226,6 +256,9 @@ impl PySQLContext {
 pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) -> 
PyResult<()> {
     let this = PyModule::new(py, "datafusion")?;
     this.add_class::<PaimonCatalog>()?;
+    this.add_class::<crate::table::PyTable>()?;
+    this.add_class::<crate::schema::PyTableSchema>()?;
+    this.add_class::<crate::schema::PyDataField>()?;
     this.add_class::<PyPythonScalarUDFObject>()?;
     this.add_class::<PySQLContext>()?;
     this.add_function(wrap_pyfunction!(udf, &this)?)?;
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index 5f8d17a..d0d2002 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -20,6 +20,8 @@ use pyo3::prelude::*;
 mod blob;
 mod context;
 mod error;
+mod schema;
+mod table;
 mod udf;
 
 #[pymodule]
diff --git a/bindings/python/src/schema.rs b/bindings/python/src/schema.rs
new file mode 100644
index 0000000..194cb3a
--- /dev/null
+++ b/bindings/python/src/schema.rs
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use paimon::spec::{DataField, TableSchema};
+use pyo3::prelude::*;
+
+#[pyclass(name = "TableSchema", module = "pypaimon_rust.datafusion")]
+pub struct PyTableSchema {
+    inner: TableSchema,
+}
+
+impl PyTableSchema {
+    pub fn new(inner: TableSchema) -> Self {
+        Self { inner }
+    }
+}
+
+#[pymethods]
+impl PyTableSchema {
+    fn fields(&self) -> Vec<PyDataField> {
+        self.inner
+            .fields()
+            .iter()
+            .cloned()
+            .map(PyDataField::new)
+            .collect()
+    }
+
+    fn partition_keys(&self) -> Vec<String> {
+        self.inner.partition_keys().to_vec()
+    }
+
+    fn primary_keys(&self) -> Vec<String> {
+        self.inner.primary_keys().to_vec()
+    }
+
+    fn options(&self) -> HashMap<String, String> {
+        self.inner.options().clone()
+    }
+
+    fn comment(&self) -> Option<String> {
+        self.inner.comment().map(str::to_string)
+    }
+}
+
+#[pyclass(name = "DataField", module = "pypaimon_rust.datafusion")]
+pub struct PyDataField {
+    inner: DataField,
+}
+
+impl PyDataField {
+    pub fn new(inner: DataField) -> Self {
+        Self { inner }
+    }
+}
+
+#[pymethods]
+impl PyDataField {
+    fn name(&self) -> String {
+        self.inner.name().to_string()
+    }
+
+    fn field_type(&self) -> String {
+        // TODO(#284 follow-up): mirror Java DataType.asSQLString() once
+        // a Display impl is added to paimon::spec::DataType.
+        format!("{:?}", self.inner.data_type())
+    }
+
+    fn is_nullable(&self) -> bool {
+        self.inner.data_type().is_nullable()
+    }
+
+    fn description(&self) -> Option<String> {
+        self.inner.description().map(str::to_string)
+    }
+}
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/table.rs
similarity index 55%
copy from bindings/python/src/lib.rs
copy to bindings/python/src/table.rs
index 5f8d17a..0a0c35c 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/table.rs
@@ -15,15 +15,35 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::sync::Arc;
+
 use pyo3::prelude::*;
 
-mod blob;
-mod context;
-mod error;
-mod udf;
+use crate::schema::PyTableSchema;
+
+#[pyclass(name = "Table", module = "pypaimon_rust.datafusion")]
+pub struct PyTable {
+    pub(crate) inner: Arc<paimon::table::Table>,
+}
+
+impl PyTable {
+    pub fn new(inner: Arc<paimon::table::Table>) -> Self {
+        Self { inner }
+    }
+}
+
+#[pymethods]
+impl PyTable {
+    fn identifier(&self) -> String {
+        let id = self.inner.identifier();
+        format!("{}.{}", id.database(), id.object())
+    }
+
+    fn location(&self) -> String {
+        self.inner.location().to_string()
+    }
 
-#[pymodule]
-fn pypaimon_rust(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
-    context::register_module(py, m)?;
-    Ok(())
+    fn schema(&self) -> PyTableSchema {
+        PyTableSchema::new(self.inner.schema().clone())
+    }
 }
diff --git a/bindings/python/tests/test_datafusion.py 
b/bindings/python/tests/test_datafusion.py
index d120a94..236b130 100644
--- a/bindings/python/tests/test_datafusion.py
+++ b/bindings/python/tests/test_datafusion.py
@@ -294,6 +294,7 @@ def test_query_simple_table_via_catalog_provider():
     ]
 
 
+
 def test_sql_context_ddl_dml():
     with tempfile.TemporaryDirectory() as warehouse:
         ctx = SQLContext()
@@ -663,3 +664,24 @@ def test_table_functions_registered_with_catalog():
                 pytest.fail(f"expected {fn} to reject a single argument")
             except Exception as e:
                 assert "requires 4 arguments" in str(e), str(e)
+
+
+def test_list_databases_and_tables():
+    catalog = PaimonCatalog({"warehouse": WAREHOUSE})
+
+    assert "default" in catalog.list_databases()
+
+    tables = catalog.list_tables("default")
+    assert "simple_log_table" in tables
+
+    table = catalog.get_table("default.simple_log_table")
+    assert table.identifier() == "default.simple_log_table"
+    assert table.location().endswith("/default.db/simple_log_table") or 
table.location()
+
+    schema = table.schema()
+    field_names = [f.name() for f in schema.fields()]
+    assert "id" in field_names
+    assert "name" in field_names
+    # simple_log_table is non-partitioned, so partition keys are empty.
+    assert schema.partition_keys() == []
+

Reply via email to