This is an automated email from the ASF dual-hosted git repository.

jerry-024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 73cf7ca  [datafusion] Allow callers to skip default-database init in 
register_catalog (#345)
73cf7ca is described below

commit 73cf7ca1469ae3a40cde816a8b788f81f3a25793
Author: shyjsarah <[email protected]>
AuthorDate: Thu May 28 20:19:11 2026 -0700

    [datafusion] Allow callers to skip default-database init in 
register_catalog (#345)
---
 bindings/python/src/context.rs                    |  14 +-
 crates/integrations/datafusion/src/sql_context.rs | 245 +++++++++++++++++++++-
 2 files changed, 247 insertions(+), 12 deletions(-)

diff --git a/bindings/python/src/context.rs b/bindings/python/src/context.rs
index cc855ee..2eb9424 100644
--- a/bindings/python/src/context.rs
+++ b/bindings/python/src/context.rs
@@ -184,19 +184,31 @@ impl PySQLContext {
         Ok(ctx)
     }
 
+    /// Registers a Paimon catalog under the given name.
+    ///
+    /// `default_database`: omitted / `None` → use `"default"` (back-compat);
+    /// `""` → skip default-db init (for principals without DESCRIBE on 
`default`);
+    /// `"name"` → use `name`.
+    #[pyo3(signature = (catalog_name, catalog_options, default_database=None))]
     fn register_catalog(
         &mut self,
         py: Python<'_>,
         catalog_name: String,
         catalog_options: HashMap<String, String>,
+        default_database: Option<String>,
     ) -> PyResult<()> {
         let rt = runtime();
         py.detach(|| {
             rt.block_on(async {
                 let options = Options::from_map(catalog_options);
                 let catalog = 
CatalogFactory::create(options).await.map_err(to_py_err)?;
+                let default_db: Option<String> = match default_database {
+                    None => Some("default".to_string()),
+                    Some(s) if s.is_empty() => None,
+                    Some(s) => Some(s),
+                };
                 self.inner
-                    .register_catalog(catalog_name, catalog)
+                    .register_catalog_with_default_db(catalog_name, catalog, 
default_db.as_deref())
                     .await
                     .map_err(df_to_py_err)
             })
diff --git a/crates/integrations/datafusion/src/sql_context.rs 
b/crates/integrations/datafusion/src/sql_context.rs
index ec93741..96790de 100644
--- a/crates/integrations/datafusion/src/sql_context.rs
+++ b/crates/integrations/datafusion/src/sql_context.rs
@@ -122,18 +122,48 @@ impl SQLContext {
         catalog_name: impl Into<String>,
         catalog: Arc<dyn Catalog>,
     ) -> DFResult<()> {
+        self.register_catalog_with_default_db(catalog_name, catalog, 
Some("default"))
+            .await
+    }
+
+    /// Like [`Self::register_catalog`] but lets the caller control 
default-database init.
+    ///
+    /// `default_db = Some(name)` ensures `name` exists and sets it as current 
on the
+    /// first catalog. `default_db = None` skips both — required for 
principals that
+    /// lack DESCRIBE / CREATEDATABASE on `default`. Mirrors Java 
`FlinkCatalog`'s
+    /// `defaultDatabase` / `DISABLE_CREATE_TABLE_IN_DEFAULT_DB`.
+    ///
+    /// `default_db = Some("")` is rejected — pass `None` to opt out instead.
+    ///
+    /// **Note on built-in TVFs (`vector_search`, `full_text_search`):** when
+    /// `default_db = None`, bare table names inside these functions still 
resolve
+    /// against the literal namespace `"default"` (the fallback in
+    /// [`register_table_functions`]). Callers using `None` must qualify table 
names
+    /// (`'db.table'` or `'catalog.db.table'`) in those calls.
+    pub async fn register_catalog_with_default_db(
+        &mut self,
+        catalog_name: impl Into<String>,
+        catalog: Arc<dyn Catalog>,
+        default_db: Option<&str>,
+    ) -> DFResult<()> {
+        if matches!(default_db, Some("")) {
+            return Err(DataFusionError::Plan(
+                "default_db must not be empty; pass None to skip 
default-database init".to_string(),
+            ));
+        }
         let catalog_name = catalog_name.into();
         let is_first = self.catalogs.is_empty();
-        let default_db = "default";
-        match catalog.get_database(default_db).await {
-            Ok(_) => {}
-            Err(paimon::Error::DatabaseNotExist { .. }) => {
-                catalog
-                    .create_database(default_db, true, Default::default())
-                    .await
-                    .map_err(|e| DataFusionError::External(Box::new(e)))?;
+        if let Some(default_db) = default_db {
+            match catalog.get_database(default_db).await {
+                Ok(_) => {}
+                Err(paimon::Error::DatabaseNotExist { .. }) => {
+                    catalog
+                        .create_database(default_db, true, Default::default())
+                        .await
+                        .map_err(|e| DataFusionError::External(Box::new(e)))?;
+                }
+                Err(e) => return Err(DataFusionError::External(Box::new(e))),
             }
-            Err(e) => return Err(DataFusionError::External(Box::new(e))),
         }
         self.ctx.register_catalog(
             &catalog_name,
@@ -143,11 +173,13 @@ impl SQLContext {
                 self.blob_reader_registry.clone(),
             )),
         );
-        register_table_functions(&self.ctx, &catalog, default_db);
+        register_table_functions(&self.ctx, &catalog, 
default_db.unwrap_or("default"));
         self.catalogs.insert(catalog_name.clone(), catalog);
         if is_first {
             self.set_current_catalog(catalog_name).await?;
-            self.set_current_database(default_db).await?;
+            if let Some(default_db) = default_db {
+                self.set_current_database(default_db).await?;
+            }
         }
         Ok(())
     }
@@ -2466,6 +2498,197 @@ mod tests {
         ctx
     }
 
+    // ==================== register_catalog_with_default_db tests 
====================
+
+    /// Counts get/create_database calls so tests can assert whether the 
default-db
+    /// init path fired. `get_database` returns `Unsupported` rather than
+    /// `DatabaseNotExist` so that *not* skipping the probe surfaces as a hard 
error
+    /// (mimics a "Forbidden: DESCRIBE on DATABASE default" failure).
+    struct ProbeTrackingCatalog {
+        get_calls: std::sync::atomic::AtomicUsize,
+        create_calls: std::sync::atomic::AtomicUsize,
+    }
+
+    impl ProbeTrackingCatalog {
+        fn new() -> Self {
+            Self {
+                get_calls: std::sync::atomic::AtomicUsize::new(0),
+                create_calls: std::sync::atomic::AtomicUsize::new(0),
+            }
+        }
+        fn get_count(&self) -> usize {
+            self.get_calls.load(std::sync::atomic::Ordering::SeqCst)
+        }
+        fn create_count(&self) -> usize {
+            self.create_calls.load(std::sync::atomic::Ordering::SeqCst)
+        }
+    }
+
+    #[async_trait]
+    impl Catalog for ProbeTrackingCatalog {
+        async fn list_databases(&self) -> paimon::Result<Vec<String>> {
+            Ok(vec![])
+        }
+        async fn create_database(
+            &self,
+            _name: &str,
+            _ignore_if_exists: bool,
+            _properties: HashMap<String, String>,
+        ) -> paimon::Result<()> {
+            self.create_calls
+                .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
+            Ok(())
+        }
+        async fn get_database(&self, _name: &str) -> paimon::Result<Database> {
+            self.get_calls
+                .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
+            Err(paimon::Error::Unsupported {
+                message: "simulated Forbidden".to_string(),
+            })
+        }
+        async fn drop_database(
+            &self,
+            _name: &str,
+            _ignore_if_not_exists: bool,
+            _cascade: bool,
+        ) -> paimon::Result<()> {
+            Ok(())
+        }
+        async fn get_table(&self, identifier: &Identifier) -> 
paimon::Result<Table> {
+            Err(paimon::Error::TableNotExist {
+                full_name: identifier.to_string(),
+            })
+        }
+        async fn list_tables(&self, _database_name: &str) -> 
paimon::Result<Vec<String>> {
+            Ok(vec![])
+        }
+        async fn create_table(
+            &self,
+            _identifier: &Identifier,
+            _creation: PaimonSchema,
+            _ignore_if_exists: bool,
+        ) -> paimon::Result<()> {
+            Ok(())
+        }
+        async fn drop_table(
+            &self,
+            _identifier: &Identifier,
+            _ignore_if_not_exists: bool,
+        ) -> paimon::Result<()> {
+            Ok(())
+        }
+        async fn rename_table(
+            &self,
+            _from: &Identifier,
+            _to: &Identifier,
+            _ignore_if_not_exists: bool,
+        ) -> paimon::Result<()> {
+            Ok(())
+        }
+        async fn alter_table(
+            &self,
+            _identifier: &Identifier,
+            _changes: Vec<SchemaChange>,
+            _ignore_if_not_exists: bool,
+        ) -> paimon::Result<()> {
+            Ok(())
+        }
+    }
+
+    #[tokio::test]
+    async fn register_catalog_with_none_skips_default_db_probe() {
+        let catalog = Arc::new(ProbeTrackingCatalog::new());
+        let mut ctx = SQLContext::new();
+        ctx.register_catalog_with_default_db("paimon", catalog.clone(), None)
+            .await
+            .expect("None must skip probe so Forbidden-shaped error never 
fires");
+        assert_eq!(catalog.get_count(), 0, "get_database must not be called");
+        assert_eq!(
+            catalog.create_count(),
+            0,
+            "create_database must not be called"
+        );
+        // Current catalog still set; current database left unchanged.
+        assert_eq!(
+            ctx.ctx().state().config().options().catalog.default_catalog,
+            "paimon"
+        );
+    }
+
+    #[tokio::test]
+    async fn register_catalog_with_some_default_propagates_probe_error() {
+        let catalog = Arc::new(ProbeTrackingCatalog::new());
+        let mut ctx = SQLContext::new();
+        let err = ctx
+            .register_catalog_with_default_db("paimon", catalog.clone(), 
Some("default"))
+            .await
+            .expect_err("non-DatabaseNotExist error from get_database must 
propagate");
+        assert!(
+            err.to_string().contains("simulated Forbidden"),
+            "unexpected error: {err}"
+        );
+        assert_eq!(catalog.get_count(), 1);
+        assert_eq!(catalog.create_count(), 0);
+    }
+
+    #[tokio::test]
+    async fn register_catalog_with_some_empty_string_is_rejected() {
+        // Footgun guard: raw Rust callers could pass `Some("")` and silently 
probe
+        // `get_database("")`. Reject at the API boundary; tell them to use 
`None` instead.
+        let catalog = Arc::new(ProbeTrackingCatalog::new());
+        let mut ctx = SQLContext::new();
+        let err = ctx
+            .register_catalog_with_default_db("paimon", catalog.clone(), 
Some(""))
+            .await
+            .expect_err("empty default_db must be rejected at the API");
+        assert!(
+            err.to_string().contains("must not be empty"),
+            "unexpected error: {err}"
+        );
+        assert_eq!(
+            catalog.get_count(),
+            0,
+            "guard must short-circuit before any catalog call"
+        );
+    }
+
+    #[tokio::test]
+    async fn 
register_catalog_with_none_table_function_resolves_bare_name_to_literal_default()
 {
+        // Documents the fallback in register_table_functions: 
`default_db.unwrap_or("default")`.
+        // When the caller opts out of default-db init, bare table names 
inside built-in TVFs
+        // (vector_search / full_text_search) still resolve against the 
literal namespace
+        // `"default"` — so a caller using `None` MUST use fully-qualified 
names with these
+        // functions or they'll hit a `default.<name>` lookup that may not 
exist / be readable.
+        let catalog = Arc::new(ProbeTrackingCatalog::new());
+        let mut ctx = SQLContext::new();
+        ctx.register_catalog_with_default_db("paimon", catalog, None)
+            .await
+            .unwrap();
+
+        let err = ctx
+            .sql("SELECT * FROM vector_search('bare', 'col', '[1.0]', 1)")
+            .await
+            .expect_err("bare name must error out — no `default.bare` table in 
mock catalog");
+        let msg = err.to_string();
+        assert!(
+            msg.contains("default") && msg.contains("bare"),
+            "error must surface the fallback 'default' namespace + bare name, 
got: {msg}"
+        );
+    }
+
+    #[tokio::test]
+    async fn register_catalog_default_wrapper_uses_default_db() {
+        // The bare register_catalog() must delegate with Some("default"), so 
the
+        // probe fires and Forbidden propagates — same as Some("default") 
above.
+        let catalog = Arc::new(ProbeTrackingCatalog::new());
+        let mut ctx = SQLContext::new();
+        assert!(ctx
+            .register_catalog("paimon", catalog.clone())
+            .await
+            .is_err());
+        assert_eq!(catalog.get_count(), 1);
+    }
+
     fn assert_sql_type_to_paimon(
         sql_type: datafusion::sql::sqlparser::ast::DataType,
         expected: PaimonDataType,

Reply via email to