This is an automated email from the ASF dual-hosted git repository.
jerry-024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 73cf7ca [datafusion] Allow callers to skip default-database init in
register_catalog (#345)
73cf7ca is described below
commit 73cf7ca1469ae3a40cde816a8b788f81f3a25793
Author: shyjsarah <[email protected]>
AuthorDate: Thu May 28 20:19:11 2026 -0700
[datafusion] Allow callers to skip default-database init in
register_catalog (#345)
---
bindings/python/src/context.rs | 14 +-
crates/integrations/datafusion/src/sql_context.rs | 245 +++++++++++++++++++++-
2 files changed, 247 insertions(+), 12 deletions(-)
diff --git a/bindings/python/src/context.rs b/bindings/python/src/context.rs
index cc855ee..2eb9424 100644
--- a/bindings/python/src/context.rs
+++ b/bindings/python/src/context.rs
@@ -184,19 +184,31 @@ impl PySQLContext {
Ok(ctx)
}
+ /// Registers a Paimon catalog under the given name.
+ ///
+ /// `default_database`: omitted / `None` → use `"default"` (back-compat);
+ /// `""` → skip default-db init (for principals without DESCRIBE on
`default`);
+ /// `"name"` → use `name`.
+ #[pyo3(signature = (catalog_name, catalog_options, default_database=None))]
fn register_catalog(
&mut self,
py: Python<'_>,
catalog_name: String,
catalog_options: HashMap<String, String>,
+ default_database: Option<String>,
) -> PyResult<()> {
let rt = runtime();
py.detach(|| {
rt.block_on(async {
let options = Options::from_map(catalog_options);
let catalog =
CatalogFactory::create(options).await.map_err(to_py_err)?;
+ let default_db: Option<String> = match default_database {
+ None => Some("default".to_string()),
+ Some(s) if s.is_empty() => None,
+ Some(s) => Some(s),
+ };
self.inner
- .register_catalog(catalog_name, catalog)
+ .register_catalog_with_default_db(catalog_name, catalog,
default_db.as_deref())
.await
.map_err(df_to_py_err)
})
diff --git a/crates/integrations/datafusion/src/sql_context.rs
b/crates/integrations/datafusion/src/sql_context.rs
index ec93741..96790de 100644
--- a/crates/integrations/datafusion/src/sql_context.rs
+++ b/crates/integrations/datafusion/src/sql_context.rs
@@ -122,18 +122,48 @@ impl SQLContext {
catalog_name: impl Into<String>,
catalog: Arc<dyn Catalog>,
) -> DFResult<()> {
+ self.register_catalog_with_default_db(catalog_name, catalog,
Some("default"))
+ .await
+ }
+
+ /// Like [`Self::register_catalog`] but lets the caller control
default-database init.
+ ///
+ /// `default_db = Some(name)` ensures `name` exists and sets it as current
on the
+ /// first catalog. `default_db = None` skips both — required for
principals that
+ /// lack DESCRIBE / CREATEDATABASE on `default`. Mirrors Java
`FlinkCatalog`'s
+ /// `defaultDatabase` / `DISABLE_CREATE_TABLE_IN_DEFAULT_DB`.
+ ///
+ /// `default_db = Some("")` is rejected — pass `None` to opt out instead.
+ ///
+ /// **Note on built-in TVFs (`vector_search`, `full_text_search`):** when
+ /// `default_db = None`, bare table names inside these functions still
resolve
+ /// against the literal namespace `"default"` (the fallback in
+ /// [`register_table_functions`]). Callers using `None` must qualify table
names
+ /// (`'db.table'` or `'catalog.db.table'`) in those calls.
+ pub async fn register_catalog_with_default_db(
+ &mut self,
+ catalog_name: impl Into<String>,
+ catalog: Arc<dyn Catalog>,
+ default_db: Option<&str>,
+ ) -> DFResult<()> {
+ if matches!(default_db, Some("")) {
+ return Err(DataFusionError::Plan(
+ "default_db must not be empty; pass None to skip
default-database init".to_string(),
+ ));
+ }
let catalog_name = catalog_name.into();
let is_first = self.catalogs.is_empty();
- let default_db = "default";
- match catalog.get_database(default_db).await {
- Ok(_) => {}
- Err(paimon::Error::DatabaseNotExist { .. }) => {
- catalog
- .create_database(default_db, true, Default::default())
- .await
- .map_err(|e| DataFusionError::External(Box::new(e)))?;
+ if let Some(default_db) = default_db {
+ match catalog.get_database(default_db).await {
+ Ok(_) => {}
+ Err(paimon::Error::DatabaseNotExist { .. }) => {
+ catalog
+ .create_database(default_db, true, Default::default())
+ .await
+ .map_err(|e| DataFusionError::External(Box::new(e)))?;
+ }
+ Err(e) => return Err(DataFusionError::External(Box::new(e))),
}
- Err(e) => return Err(DataFusionError::External(Box::new(e))),
}
self.ctx.register_catalog(
&catalog_name,
@@ -143,11 +173,13 @@ impl SQLContext {
self.blob_reader_registry.clone(),
)),
);
- register_table_functions(&self.ctx, &catalog, default_db);
+ register_table_functions(&self.ctx, &catalog,
default_db.unwrap_or("default"));
self.catalogs.insert(catalog_name.clone(), catalog);
if is_first {
self.set_current_catalog(catalog_name).await?;
- self.set_current_database(default_db).await?;
+ if let Some(default_db) = default_db {
+ self.set_current_database(default_db).await?;
+ }
}
Ok(())
}
@@ -2466,6 +2498,197 @@ mod tests {
ctx
}
+ // ==================== register_catalog_with_default_db tests
====================
+
+ /// Counts get/create_database calls so tests can assert whether the
default-db
+ /// init path fired. `get_database` returns `Unsupported` rather than
+ /// `DatabaseNotExist` so that *not* skipping the probe surfaces as a hard
error
+ /// (mimics a "Forbidden: DESCRIBE on DATABASE default" failure).
+ struct ProbeTrackingCatalog {
+ get_calls: std::sync::atomic::AtomicUsize,
+ create_calls: std::sync::atomic::AtomicUsize,
+ }
+
+ impl ProbeTrackingCatalog {
+ fn new() -> Self {
+ Self {
+ get_calls: std::sync::atomic::AtomicUsize::new(0),
+ create_calls: std::sync::atomic::AtomicUsize::new(0),
+ }
+ }
+ fn get_count(&self) -> usize {
+ self.get_calls.load(std::sync::atomic::Ordering::SeqCst)
+ }
+ fn create_count(&self) -> usize {
+ self.create_calls.load(std::sync::atomic::Ordering::SeqCst)
+ }
+ }
+
+ #[async_trait]
+ impl Catalog for ProbeTrackingCatalog {
+ async fn list_databases(&self) -> paimon::Result<Vec<String>> {
+ Ok(vec![])
+ }
+ async fn create_database(
+ &self,
+ _name: &str,
+ _ignore_if_exists: bool,
+ _properties: HashMap<String, String>,
+ ) -> paimon::Result<()> {
+ self.create_calls
+ .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
+ Ok(())
+ }
+ async fn get_database(&self, _name: &str) -> paimon::Result<Database> {
+ self.get_calls
+ .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
+ Err(paimon::Error::Unsupported {
+ message: "simulated Forbidden".to_string(),
+ })
+ }
+ async fn drop_database(
+ &self,
+ _name: &str,
+ _ignore_if_not_exists: bool,
+ _cascade: bool,
+ ) -> paimon::Result<()> {
+ Ok(())
+ }
+ async fn get_table(&self, identifier: &Identifier) ->
paimon::Result<Table> {
+ Err(paimon::Error::TableNotExist {
+ full_name: identifier.to_string(),
+ })
+ }
+ async fn list_tables(&self, _database_name: &str) ->
paimon::Result<Vec<String>> {
+ Ok(vec![])
+ }
+ async fn create_table(
+ &self,
+ _identifier: &Identifier,
+ _creation: PaimonSchema,
+ _ignore_if_exists: bool,
+ ) -> paimon::Result<()> {
+ Ok(())
+ }
+ async fn drop_table(
+ &self,
+ _identifier: &Identifier,
+ _ignore_if_not_exists: bool,
+ ) -> paimon::Result<()> {
+ Ok(())
+ }
+ async fn rename_table(
+ &self,
+ _from: &Identifier,
+ _to: &Identifier,
+ _ignore_if_not_exists: bool,
+ ) -> paimon::Result<()> {
+ Ok(())
+ }
+ async fn alter_table(
+ &self,
+ _identifier: &Identifier,
+ _changes: Vec<SchemaChange>,
+ _ignore_if_not_exists: bool,
+ ) -> paimon::Result<()> {
+ Ok(())
+ }
+ }
+
+ #[tokio::test]
+ async fn register_catalog_with_none_skips_default_db_probe() {
+ let catalog = Arc::new(ProbeTrackingCatalog::new());
+ let mut ctx = SQLContext::new();
+ ctx.register_catalog_with_default_db("paimon", catalog.clone(), None)
+ .await
+ .expect("None must skip probe so Forbidden-shaped error never
fires");
+ assert_eq!(catalog.get_count(), 0, "get_database must not be called");
+ assert_eq!(
+ catalog.create_count(),
+ 0,
+ "create_database must not be called"
+ );
+ // Current catalog still set; current database left unchanged.
+ assert_eq!(
+ ctx.ctx().state().config().options().catalog.default_catalog,
+ "paimon"
+ );
+ }
+
+ #[tokio::test]
+ async fn register_catalog_with_some_default_propagates_probe_error() {
+ let catalog = Arc::new(ProbeTrackingCatalog::new());
+ let mut ctx = SQLContext::new();
+ let err = ctx
+ .register_catalog_with_default_db("paimon", catalog.clone(),
Some("default"))
+ .await
+ .expect_err("non-DatabaseNotExist error from get_database must
propagate");
+ assert!(
+ err.to_string().contains("simulated Forbidden"),
+ "unexpected error: {err}"
+ );
+ assert_eq!(catalog.get_count(), 1);
+ assert_eq!(catalog.create_count(), 0);
+ }
+
+ #[tokio::test]
+ async fn register_catalog_with_some_empty_string_is_rejected() {
+ // Footgun guard: raw Rust callers could pass `Some("")` and silently
probe
+ // `get_database("")`. Reject at the API boundary; tell them to use
`None` instead.
+ let catalog = Arc::new(ProbeTrackingCatalog::new());
+ let mut ctx = SQLContext::new();
+ let err = ctx
+ .register_catalog_with_default_db("paimon", catalog.clone(),
Some(""))
+ .await
+ .expect_err("empty default_db must be rejected at the API");
+ assert!(
+ err.to_string().contains("must not be empty"),
+ "unexpected error: {err}"
+ );
+ assert_eq!(
+ catalog.get_count(),
+ 0,
+ "guard must short-circuit before any catalog call"
+ );
+ }
+
+ #[tokio::test]
+ async fn
register_catalog_with_none_table_function_resolves_bare_name_to_literal_default()
{
+ // Documents the fallback in register_table_functions:
`default_db.unwrap_or("default")`.
+ // When the caller opts out of default-db init, bare table names
inside built-in TVFs
+ // (vector_search / full_text_search) still resolve against the
literal namespace
+ // `"default"` — so a caller using `None` MUST use fully-qualified
names with these
+ // functions or they'll hit a `default.<name>` lookup that may not
exist / be readable.
+ let catalog = Arc::new(ProbeTrackingCatalog::new());
+ let mut ctx = SQLContext::new();
+ ctx.register_catalog_with_default_db("paimon", catalog, None)
+ .await
+ .unwrap();
+
+ let err = ctx
+ .sql("SELECT * FROM vector_search('bare', 'col', '[1.0]', 1)")
+ .await
+ .expect_err("bare name must error out — no `default.bare` table in
mock catalog");
+ let msg = err.to_string();
+ assert!(
+ msg.contains("default") && msg.contains("bare"),
+ "error must surface the fallback 'default' namespace + bare name,
got: {msg}"
+ );
+ }
+
+ #[tokio::test]
+ async fn register_catalog_default_wrapper_uses_default_db() {
+ // The bare register_catalog() must delegate with Some("default"), so
the
+ // probe fires and Forbidden propagates — same as Some("default")
above.
+ let catalog = Arc::new(ProbeTrackingCatalog::new());
+ let mut ctx = SQLContext::new();
+ assert!(ctx
+ .register_catalog("paimon", catalog.clone())
+ .await
+ .is_err());
+ assert_eq!(catalog.get_count(), 1);
+ }
+
fn assert_sql_type_to_paimon(
sql_type: datafusion::sql::sqlparser::ast::DataType,
expected: PaimonDataType,