This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new ad40acc feat: expose catalog metadata api for python (#306)
ad40acc is described below
commit ad40acc0c74e94d42b312daae22d92a3000db595
Author: SeungMin <[email protected]>
AuthorDate: Sun May 24 22:22:43 2026 +0900
feat: expose catalog metadata api for python (#306)
---
.../python/python/pypaimon_rust/datafusion.pyi | 21 +++++
bindings/python/src/context.rs | 51 +++++++++---
bindings/python/src/lib.rs | 2 +
bindings/python/src/schema.rs | 92 ++++++++++++++++++++++
bindings/python/src/{lib.rs => table.rs} | 36 +++++++--
bindings/python/tests/test_datafusion.py | 22 ++++++
6 files changed, 207 insertions(+), 17 deletions(-)
diff --git a/bindings/python/python/pypaimon_rust/datafusion.pyi
b/bindings/python/python/pypaimon_rust/datafusion.pyi
index dfa63c7..172c1fe 100644
--- a/bindings/python/python/pypaimon_rust/datafusion.pyi
+++ b/bindings/python/python/pypaimon_rust/datafusion.pyi
@@ -23,9 +23,30 @@ ArrowTypeLike: TypeAlias = Union[pyarrow.DataType,
pyarrow.Field, str]
InputFieldsLike: TypeAlias = Union[ArrowTypeLike, Sequence[ArrowTypeLike]]
VolatilityLike: TypeAlias = Union[str, Any]
+class DataField:
+ def name(self) -> str: ...
+ def field_type(self) -> str: ...
+ def is_nullable(self) -> bool: ...
+ def description(self) -> Optional[str]: ...
+
+class TableSchema:
+ def fields(self) -> List[DataField]: ...
+ def partition_keys(self) -> List[str]: ...
+ def primary_keys(self) -> List[str]: ...
+ def options(self) -> Dict[str, str]: ...
+ def comment(self) -> Optional[str]: ...
+
+class Table:
+ def identifier(self) -> str: ...
+ def location(self) -> str: ...
+ def schema(self) -> TableSchema: ...
+
class PaimonCatalog:
def __init__(self, catalog_options: Dict[str, str]) -> None: ...
def __datafusion_catalog_provider__(self, session: Any) -> object: ...
+ def list_databases(self) -> List[str]: ...
+ def list_tables(self, database_name: str) -> List[str]: ...
+ def get_table(self, identifier: str) -> Table: ...
class PythonScalarUDF:
def __init__(
diff --git a/bindings/python/src/context.rs b/bindings/python/src/context.rs
index f65d6a1..cc855ee 100644
--- a/bindings/python/src/context.rs
+++ b/bindings/python/src/context.rs
@@ -24,25 +24,25 @@ use datafusion::catalog::CatalogProvider;
use datafusion::logical_expr::{Signature, TypeSignature, Volatility};
use datafusion_ffi::catalog_provider::FFI_CatalogProvider;
use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
-use paimon::{CatalogFactory, Options};
+use paimon::catalog::Identifier;
+use paimon::{Catalog, CatalogFactory, Options};
use paimon_datafusion::{PaimonCatalogProvider, SQLContext};
-use pyo3::exceptions::PyRuntimeWarning;
+use pyo3::exceptions::{PyRuntimeWarning, PyValueError};
use pyo3::prelude::*;
use pyo3::types::PyCapsule;
use crate::blob::PyBlobReaderRegistry;
use crate::error::{df_to_py_err, to_py_err};
+use crate::table::PyTable;
use crate::udf::{build_python_scalar_udf, udf, PyPythonScalarUDFObject};
use paimon_datafusion::runtime::runtime;
-fn build_paimon_catalog_provider(
- catalog_options: HashMap<String, String>,
-) -> PyResult<Arc<PaimonCatalogProvider>> {
+fn build_paimon_catalog(catalog_options: HashMap<String, String>) ->
PyResult<Arc<dyn Catalog>> {
let rt = runtime();
rt.block_on(async {
let options = Options::from_map(catalog_options);
let catalog =
CatalogFactory::create(options).await.map_err(to_py_err)?;
- Ok::<_, PyErr>(Arc::new(PaimonCatalogProvider::new(catalog)))
+ Ok::<_, PyErr>(catalog)
})
}
@@ -65,6 +65,7 @@ fn ffi_logical_codec_from_pycapsule(obj: Bound<'_, PyAny>) ->
PyResult<FFI_Logic
/// A Paimon catalog exportable to Python DataFusion `SessionContext`.
#[pyclass(name = "PaimonCatalog")]
pub struct PaimonCatalog {
+ catalog: Arc<dyn Catalog>,
provider: Arc<PaimonCatalogProvider>,
}
@@ -73,9 +74,9 @@ impl PaimonCatalog {
/// Create a Paimon catalog that can be registered into a DataFusion
session.
#[new]
fn new(catalog_options: HashMap<String, String>) -> PyResult<Self> {
- Ok(Self {
- provider: build_paimon_catalog_provider(catalog_options)?,
- })
+ let catalog = build_paimon_catalog(catalog_options)?;
+ let provider =
Arc::new(PaimonCatalogProvider::new(Arc::clone(&catalog)));
+ Ok(Self { catalog, provider })
}
/// Export this catalog as a DataFusion catalog provider PyCapsule.
@@ -90,6 +91,35 @@ impl PaimonCatalog {
let provider = FFI_CatalogProvider::new_with_ffi_codec(provider,
Some(runtime()), codec);
PyCapsule::new(py, provider, Some(name))
}
+
+ /// List all databases in this catalog.
+ fn list_databases(&self) -> PyResult<Vec<String>> {
+ runtime()
+ .block_on(self.catalog.list_databases())
+ .map_err(to_py_err)
+ }
+
+ /// List all tables in the given database.
+ fn list_tables(&self, database_name: &str) -> PyResult<Vec<String>> {
+ runtime()
+ .block_on(self.catalog.list_tables(database_name))
+ .map_err(to_py_err)
+ }
+
+ /// Get a table handle by `"db.table"` identifier.
+ fn get_table(&self, identifier: &str) -> PyResult<PyTable> {
+ let parts: Vec<&str> = identifier.splitn(2, '.').collect();
+ if parts.len() != 2 || parts[0].is_empty() || parts[1].is_empty() {
+ return Err(PyValueError::new_err(format!(
+ "expected identifier in 'db.table' format, got '{identifier}'"
+ )));
+ }
+ let id = Identifier::new(parts[0], parts[1]);
+ let table = runtime()
+ .block_on(self.catalog.get_table(&id))
+ .map_err(to_py_err)?;
+ Ok(PyTable::new(Arc::new(table)))
+ }
}
/// A SQL context that supports registering multiple Paimon catalogs and
executing SQL.
@@ -226,6 +256,9 @@ impl PySQLContext {
pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) ->
PyResult<()> {
let this = PyModule::new(py, "datafusion")?;
this.add_class::<PaimonCatalog>()?;
+ this.add_class::<crate::table::PyTable>()?;
+ this.add_class::<crate::schema::PyTableSchema>()?;
+ this.add_class::<crate::schema::PyDataField>()?;
this.add_class::<PyPythonScalarUDFObject>()?;
this.add_class::<PySQLContext>()?;
this.add_function(wrap_pyfunction!(udf, &this)?)?;
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index 5f8d17a..d0d2002 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -20,6 +20,8 @@ use pyo3::prelude::*;
mod blob;
mod context;
mod error;
+mod schema;
+mod table;
mod udf;
#[pymodule]
diff --git a/bindings/python/src/schema.rs b/bindings/python/src/schema.rs
new file mode 100644
index 0000000..194cb3a
--- /dev/null
+++ b/bindings/python/src/schema.rs
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use paimon::spec::{DataField, TableSchema};
+use pyo3::prelude::*;
+
+#[pyclass(name = "TableSchema", module = "pypaimon_rust.datafusion")]
+pub struct PyTableSchema {
+ inner: TableSchema,
+}
+
+impl PyTableSchema {
+ pub fn new(inner: TableSchema) -> Self {
+ Self { inner }
+ }
+}
+
+#[pymethods]
+impl PyTableSchema {
+ fn fields(&self) -> Vec<PyDataField> {
+ self.inner
+ .fields()
+ .iter()
+ .cloned()
+ .map(PyDataField::new)
+ .collect()
+ }
+
+ fn partition_keys(&self) -> Vec<String> {
+ self.inner.partition_keys().to_vec()
+ }
+
+ fn primary_keys(&self) -> Vec<String> {
+ self.inner.primary_keys().to_vec()
+ }
+
+ fn options(&self) -> HashMap<String, String> {
+ self.inner.options().clone()
+ }
+
+ fn comment(&self) -> Option<String> {
+ self.inner.comment().map(str::to_string)
+ }
+}
+
+#[pyclass(name = "DataField", module = "pypaimon_rust.datafusion")]
+pub struct PyDataField {
+ inner: DataField,
+}
+
+impl PyDataField {
+ pub fn new(inner: DataField) -> Self {
+ Self { inner }
+ }
+}
+
+#[pymethods]
+impl PyDataField {
+ fn name(&self) -> String {
+ self.inner.name().to_string()
+ }
+
+ fn field_type(&self) -> String {
+ // TODO(#284 follow-up): mirror Java DataType.asSQLString() once
+ // a Display impl is added to paimon::spec::DataType.
+ format!("{:?}", self.inner.data_type())
+ }
+
+ fn is_nullable(&self) -> bool {
+ self.inner.data_type().is_nullable()
+ }
+
+ fn description(&self) -> Option<String> {
+ self.inner.description().map(str::to_string)
+ }
+}
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/table.rs
similarity index 55%
copy from bindings/python/src/lib.rs
copy to bindings/python/src/table.rs
index 5f8d17a..0a0c35c 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/table.rs
@@ -15,15 +15,35 @@
// specific language governing permissions and limitations
// under the License.
+use std::sync::Arc;
+
use pyo3::prelude::*;
-mod blob;
-mod context;
-mod error;
-mod udf;
+use crate::schema::PyTableSchema;
+
+#[pyclass(name = "Table", module = "pypaimon_rust.datafusion")]
+pub struct PyTable {
+ pub(crate) inner: Arc<paimon::table::Table>,
+}
+
+impl PyTable {
+ pub fn new(inner: Arc<paimon::table::Table>) -> Self {
+ Self { inner }
+ }
+}
+
+#[pymethods]
+impl PyTable {
+ fn identifier(&self) -> String {
+ let id = self.inner.identifier();
+ format!("{}.{}", id.database(), id.object())
+ }
+
+ fn location(&self) -> String {
+ self.inner.location().to_string()
+ }
-#[pymodule]
-fn pypaimon_rust(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
- context::register_module(py, m)?;
- Ok(())
+ fn schema(&self) -> PyTableSchema {
+ PyTableSchema::new(self.inner.schema().clone())
+ }
}
diff --git a/bindings/python/tests/test_datafusion.py
b/bindings/python/tests/test_datafusion.py
index d120a94..236b130 100644
--- a/bindings/python/tests/test_datafusion.py
+++ b/bindings/python/tests/test_datafusion.py
@@ -294,6 +294,7 @@ def test_query_simple_table_via_catalog_provider():
]
+
def test_sql_context_ddl_dml():
with tempfile.TemporaryDirectory() as warehouse:
ctx = SQLContext()
@@ -663,3 +664,24 @@ def test_table_functions_registered_with_catalog():
pytest.fail(f"expected {fn} to reject a single argument")
except Exception as e:
assert "requires 4 arguments" in str(e), str(e)
+
+
+def test_list_databases_and_tables():
+ catalog = PaimonCatalog({"warehouse": WAREHOUSE})
+
+ assert "default" in catalog.list_databases()
+
+ tables = catalog.list_tables("default")
+ assert "simple_log_table" in tables
+
+ table = catalog.get_table("default.simple_log_table")
+ assert table.identifier() == "default.simple_log_table"
+ assert table.location().endswith("/default.db/simple_log_table") or
table.location()
+
+ schema = table.schema()
+ field_names = [f.name() for f in schema.fields()]
+ assert "id" in field_names
+ assert "name" in field_names
+ # simple_log_table is non-partitioned, so partition keys are empty.
+ assert schema.partition_keys() == []
+