XiaoHongbo-Hope commented on code in PR #293:
URL: https://github.com/apache/paimon-rust/pull/293#discussion_r3175920213


##########
crates/integrations/datafusion/src/sql_context.rs:
##########
@@ -64,54 +64,114 @@ use paimon::spec::{
 use crate::error::to_datafusion_error;
 use crate::DynamicOptions;
 
-/// Wraps a [`SessionContext`] and a Paimon [`Catalog`] to handle DDL 
statements
-/// that DataFusion does not natively support (e.g. ALTER TABLE).
-///
-/// For all other SQL, it delegates to the inner `SessionContext`.
+/// A SQL context that supports registering multiple Paimon catalogs and 
executing SQL.
 ///
 /// # Example
 /// ```ignore
-/// let ctx = SessionContext::new();
-/// let handler = PaimonSqlHandler::new(ctx, catalog, "paimon")?;
-/// let df = handler.sql("ALTER TABLE paimon.db.t ADD COLUMN age INT").await?;
+/// let mut ctx = SQLContext::new();
+/// ctx.register_catalog("paimon", catalog).await?;
+/// ctx.set_current_catalog("paimon").await?;
+/// let df = ctx.sql("ALTER TABLE paimon.db.t ADD COLUMN age INT").await?;
 /// ```
-pub struct PaimonSqlHandler {
+pub struct SQLContext {
     ctx: SessionContext,
-    catalog: Arc<dyn Catalog>,
-    /// The catalog name registered in the SessionContext (used to strip the 
catalog prefix).
-    catalog_name: String,
+    catalogs: HashMap<String, Arc<dyn Catalog>>,
+    current_catalog: String,
     /// Session-scoped dynamic options set via `SET 'paimon.key' = 'value'`.
     dynamic_options: DynamicOptions,
 }
 
-impl PaimonSqlHandler {
-    /// Creates a new handler that registers the Paimon catalog and relation 
planner
-    /// on the given [`SessionContext`].
+impl Default for SQLContext {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl SQLContext {
+    /// Creates a new empty SQL context.
+    pub fn new() -> Self {
+        let ctx = SessionContext::new();
+        ctx.register_relation_planner(Arc::new(
+            crate::relation_planner::PaimonRelationPlanner::new(),
+        ))
+        .expect("failed to register relation planner");
+        Self {
+            ctx,
+            catalogs: HashMap::new(),
+            current_catalog: String::new(),
+            dynamic_options: Default::default(),
+        }
+    }
+
+    /// Registers a Paimon catalog under the given name.
     ///
-    /// Dynamic options for `SET`/`RESET` are managed internally.
-    pub fn new(
-        ctx: SessionContext,
-        catalog: Arc<dyn Catalog>,
+    /// The first registered catalog automatically becomes the current catalog
+    /// for both Paimon-handled SQL and DataFusion-delegated SQL (SELECT, 
etc.).
+    /// A "default" database is created if it does not already exist (matching
+    /// the behavior of Spark/Flink Paimon catalogs).
+    pub async fn register_catalog(
+        &mut self,
         catalog_name: impl Into<String>,
-    ) -> DFResult<Self> {
+        catalog: Arc<dyn Catalog>,
+    ) -> DFResult<()> {
         let catalog_name = catalog_name.into();
-        let dynamic_options: DynamicOptions = Default::default();
-        ctx.register_catalog(
+        let is_first = self.catalogs.is_empty();
+
+        catalog
+            .create_database("default", true, Default::default())
+            .await
+            .map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+        self.ctx.register_catalog(
             &catalog_name,
             
Arc::new(crate::catalog::PaimonCatalogProvider::with_dynamic_options(
                 catalog.clone(),
-                dynamic_options.clone(),
+                self.dynamic_options.clone(),
             )),
         );
-        ctx.register_relation_planner(Arc::new(
-            crate::relation_planner::PaimonRelationPlanner::new(),
-        ))?;
-        Ok(Self {
-            ctx,
-            catalog,
-            catalog_name,
-            dynamic_options,
-        })
+        self.catalogs.insert(catalog_name.clone(), catalog);
+        if is_first {
+            self.set_current_catalog(catalog_name).await?;
+            self.set_current_database("default").await?;
+        }
+        Ok(())
+    }
+
+    /// Sets the current catalog for unqualified table references.
+    pub async fn set_current_catalog(&mut self, catalog_name: impl 
Into<String>) -> DFResult<()> {
+        let catalog_name = catalog_name.into();
+        if !self.catalogs.contains_key(&catalog_name) {
+            return Err(DataFusionError::Plan(format!(
+                "Unknown catalog '{catalog_name}'"
+            )));
+        }
+        if catalog_name.contains('\'') {
+            return Err(DataFusionError::Plan(
+                "Catalog name must not contain single quotes".to_string(),
+            ));
+        }
+        self.ctx
+            .sql(&format!(
+                "SET datafusion.catalog.default_catalog = '{catalog_name}'"
+            ))
+            .await?;
+        self.current_catalog = catalog_name;

Review Comment:
   set_current_catalog() updates both DataFusion's default_catalog and 
SQLContext.current_catalog, but the same catalog can also be changed through 
SQL, e.g. `SET datafusion.catalog.default_catalog = 'cat2'`. That path is 
delegated to SessionContext and does not update SQLContext.current_catalog, so 
SELECT and SQLContext-handled statements can resolve unqualified table names 
against different catalogs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to