This is an automated email from the ASF dual-hosted git repository.

bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 23d255a06 [KYUUBI #4937] Cleanup spark catalog shim and renamed to 
catalog utils
23d255a06 is described below

commit 23d255a06f6300be1afd390c92916613ce8b4ba3
Author: liangbowen <[email protected]>
AuthorDate: Thu Jun 15 20:21:51 2023 +0800

    [KYUUBI #4937] Cleanup spark catalog shim and renamed to catalog utils
    
    ### _Why are the changes needed?_
    
    to close #4937.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #4963 from bowenliang123/remove-spark-shim.
    
    Closes #4937
    
    e6593b474 [liangbowen] remove unnecessary row
    2481e3317 [liangbowen] remove SparkCatalogShim
    
    Authored-by: liangbowen <[email protected]>
    Signed-off-by: liangbowen <[email protected]>
---
 .../engine/spark/operation/GetCatalogs.scala       |   4 +-
 .../kyuubi/engine/spark/operation/GetColumns.scala |   6 +-
 .../engine/spark/operation/GetCurrentCatalog.scala |   5 +-
 .../spark/operation/GetCurrentDatabase.scala       |   7 +-
 .../kyuubi/engine/spark/operation/GetSchemas.scala |   4 +-
 .../engine/spark/operation/GetTableTypes.scala     |   4 +-
 .../kyuubi/engine/spark/operation/GetTables.scala  |   7 +-
 .../engine/spark/operation/SetCurrentCatalog.scala |   4 +-
 .../spark/operation/SetCurrentDatabase.scala       |   3 +-
 .../spark/operation/SparkSQLOperationManager.scala |   4 +-
 .../engine/spark/session/SparkSessionImpl.scala    |   6 +-
 .../engine/spark/shim/CatalogShim_v2_4.scala       | 184 ----------
 .../engine/spark/shim/CatalogShim_v3_0.scala       | 216 ------------
 .../engine/spark/shim/SparkCatalogShim.scala       | 183 ----------
 .../engine/spark/util/SparkCatalogUtils.scala      | 375 +++++++++++++++++++++
 .../SparkCatalogDatabaseOperationSuite.scala       |   4 +-
 .../spark/operation/SparkOperationSuite.scala      |  10 +-
 .../apache/kyuubi/jdbc/KyuubiHiveDriverSuite.scala |   8 +-
 18 files changed, 414 insertions(+), 620 deletions(-)

diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala
index 6d818e53e..c8e587300 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark.operation
 
 import org.apache.spark.sql.types.StructType
 
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
+import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
 import org.apache.kyuubi.operation.IterableFetchIterator
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
 import org.apache.kyuubi.session.Session
@@ -33,7 +33,7 @@ class GetCatalogs(session: Session) extends 
SparkOperation(session) {
 
   override protected def runInternal(): Unit = {
     try {
-      iter = new 
IterableFetchIterator(SparkCatalogShim().getCatalogs(spark).toList)
+      iter = new IterableFetchIterator(SparkCatalogUtils.getCatalogs(spark))
     } catch onError()
   }
 }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetColumns.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetColumns.scala
index e78516981..3a0ab7d5b 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetColumns.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetColumns.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark.operation
 
 import org.apache.spark.sql.types._
 
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
+import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
 import org.apache.kyuubi.operation.IterableFetchIterator
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
 import org.apache.kyuubi.session.Session
@@ -115,8 +115,8 @@ class GetColumns(
       val schemaPattern = toJavaRegex(schemaName)
       val tablePattern = toJavaRegex(tableName)
       val columnPattern = toJavaRegex(columnName)
-      iter = new IterableFetchIterator(SparkCatalogShim()
-        .getColumns(spark, catalogName, schemaPattern, tablePattern, 
columnPattern).toList)
+      iter = new IterableFetchIterator(SparkCatalogUtils
+        .getColumns(spark, catalogName, schemaPattern, tablePattern, 
columnPattern))
     } catch {
       onError()
     }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCurrentCatalog.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCurrentCatalog.scala
index 96e013284..1d85d3d5a 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCurrentCatalog.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCurrentCatalog.scala
@@ -17,9 +17,9 @@
 
 package org.apache.kyuubi.engine.spark.operation
 
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.StructType
 
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
 import org.apache.kyuubi.operation.IterableFetchIterator
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
@@ -38,7 +38,8 @@ class GetCurrentCatalog(session: Session) extends 
SparkOperation(session) {
 
   override protected def runInternal(): Unit = {
     try {
-      iter = new 
IterableFetchIterator(Seq(SparkCatalogShim().getCurrentCatalog(spark)))
+      val currentCatalogName = 
spark.sessionState.catalogManager.currentCatalog.name()
+      iter = new IterableFetchIterator(Seq(Row(currentCatalogName)))
     } catch onError()
   }
 }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCurrentDatabase.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCurrentDatabase.scala
index 10b325d76..2478fb6a4 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCurrentDatabase.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCurrentDatabase.scala
@@ -17,9 +17,10 @@
 
 package org.apache.kyuubi.engine.spark.operation
 
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.StructType
 
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
+import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils.quoteIfNeeded
 import org.apache.kyuubi.operation.IterableFetchIterator
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_SCHEM
@@ -38,7 +39,9 @@ class GetCurrentDatabase(session: Session) extends 
SparkOperation(session) {
 
   override protected def runInternal(): Unit = {
     try {
-      iter = new 
IterableFetchIterator(Seq(SparkCatalogShim().getCurrentDatabase(spark)))
+      val currentDatabaseName =
+        
spark.sessionState.catalogManager.currentNamespace.map(quoteIfNeeded).mkString(".")
+      iter = new IterableFetchIterator(Seq(Row(currentDatabaseName)))
     } catch onError()
   }
 }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetSchemas.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetSchemas.scala
index 3937f528d..46dc7634a 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetSchemas.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetSchemas.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark.operation
 
 import org.apache.spark.sql.types.StructType
 
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
+import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
 import org.apache.kyuubi.operation.IterableFetchIterator
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
 import org.apache.kyuubi.session.Session
@@ -40,7 +40,7 @@ class GetSchemas(session: Session, catalogName: String, 
schema: String)
   override protected def runInternal(): Unit = {
     try {
       val schemaPattern = toJavaRegex(schema)
-      val rows = SparkCatalogShim().getSchemas(spark, catalogName, 
schemaPattern)
+      val rows = SparkCatalogUtils.getSchemas(spark, catalogName, 
schemaPattern)
       iter = new IterableFetchIterator(rows)
     } catch onError()
   }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala
index 1d2cae381..1029175b2 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.spark.operation
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.StructType
 
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
+import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
 import org.apache.kyuubi.operation.IterableFetchIterator
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
 import org.apache.kyuubi.session.Session
@@ -33,6 +33,6 @@ class GetTableTypes(session: Session)
   }
 
   override protected def runInternal(): Unit = {
-    iter = new 
IterableFetchIterator(SparkCatalogShim.sparkTableTypes.map(Row(_)).toList)
+    iter = new 
IterableFetchIterator(SparkCatalogUtils.sparkTableTypes.map(Row(_)).toList)
   }
 }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala
index 40642b825..980e4fdb1 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.spark.operation
 import org.apache.spark.sql.types.StructType
 
 import 
org.apache.kyuubi.config.KyuubiConf.OPERATION_GET_TABLES_IGNORE_TABLE_PROPERTIES
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
+import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
 import org.apache.kyuubi.operation.IterableFetchIterator
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
 import org.apache.kyuubi.session.Session
@@ -73,9 +73,8 @@ class GetTables(
     try {
       val schemaPattern = toJavaRegex(schema)
       val tablePattern = toJavaRegex(tableName)
-      val sparkShim = SparkCatalogShim()
       val catalogTablesAndViews =
-        sparkShim.getCatalogTablesOrViews(
+        SparkCatalogUtils.getCatalogTablesOrViews(
           spark,
           catalog,
           schemaPattern,
@@ -86,7 +85,7 @@ class GetTables(
       val allTableAndViews =
         if (tableTypes.exists("VIEW".equalsIgnoreCase)) {
           catalogTablesAndViews ++
-            sparkShim.getTempViews(spark, catalog, schemaPattern, tablePattern)
+            SparkCatalogUtils.getTempViews(spark, catalog, schemaPattern, 
tablePattern)
         } else {
           catalogTablesAndViews
         }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentCatalog.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentCatalog.scala
index 7571c3e32..88105b086 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentCatalog.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentCatalog.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark.operation
 
 import org.apache.spark.sql.types.StructType
 
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
+import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
 
@@ -35,7 +35,7 @@ class SetCurrentCatalog(session: Session, catalog: String) 
extends SparkOperatio
 
   override protected def runInternal(): Unit = {
     try {
-      SparkCatalogShim().setCurrentCatalog(spark, catalog)
+      SparkCatalogUtils.setCurrentCatalog(spark, catalog)
       setHasResultSet(false)
     } catch onError()
   }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentDatabase.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentDatabase.scala
index 2112f544a..d227f5fd2 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentDatabase.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentDatabase.scala
@@ -19,7 +19,6 @@ package org.apache.kyuubi.engine.spark.operation
 
 import org.apache.spark.sql.types.StructType
 
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
 
@@ -36,7 +35,7 @@ class SetCurrentDatabase(session: Session, database: String)
 
   override protected def runInternal(): Unit = {
     try {
-      SparkCatalogShim().setCurrentDatabase(spark, database)
+      spark.sessionState.catalogManager.setCurrentNamespace(Array(database))
       setHasResultSet(false)
     } catch onError()
   }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
index cb444aa77..cd9302cbf 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
@@ -26,7 +26,7 @@ import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_OPERATION_HANDLE_KEY
 import org.apache.kyuubi.engine.spark.repl.KyuubiSparkILoop
 import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
+import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
 import org.apache.kyuubi.operation.{NoneMode, Operation, OperationHandle, 
OperationManager, PlanOnlyMode}
 import org.apache.kyuubi.session.{Session, SessionHandle}
 
@@ -179,7 +179,7 @@ class SparkSQLOperationManager private (name: String) 
extends OperationManager(n
       tableTypes: java.util.List[String]): Operation = {
     val tTypes =
       if (tableTypes == null || tableTypes.isEmpty) {
-        SparkCatalogShim.sparkTableTypes
+        SparkCatalogUtils.sparkTableTypes
       } else {
         tableTypes.asScala.toSet
       }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
index ddc0370d9..8d9012cbd 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
@@ -25,8 +25,8 @@ import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
 import org.apache.kyuubi.engine.spark.events.SessionEvent
 import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
 import org.apache.kyuubi.engine.spark.udf.KDFRegistry
+import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
 import org.apache.kyuubi.events.EventBus
 import org.apache.kyuubi.operation.{Operation, OperationHandle}
 import org.apache.kyuubi.session._
@@ -62,7 +62,7 @@ class SparkSessionImpl(
 
     useCatalogAndDatabaseConf.get(USE_CATALOG).foreach { catalog =>
       try {
-        SparkCatalogShim().setCurrentCatalog(spark, catalog)
+        SparkCatalogUtils.setCurrentCatalog(spark, catalog)
       } catch {
         case e if e.getMessage.contains("Cannot find catalog plugin class for 
catalog") =>
           warn(e.getMessage())
@@ -71,7 +71,7 @@ class SparkSessionImpl(
 
     useCatalogAndDatabaseConf.get("use:database").foreach { database =>
       try {
-        SparkCatalogShim().setCurrentDatabase(spark, database)
+        spark.sessionState.catalogManager.setCurrentNamespace(Array(database))
       } catch {
         case e
             if database == "default" &&
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/CatalogShim_v2_4.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/CatalogShim_v2_4.scala
deleted file mode 100644
index ea72dd156..000000000
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/CatalogShim_v2_4.scala
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.engine.spark.shim
-
-import java.util.regex.Pattern
-
-import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
-
-class CatalogShim_v2_4 extends SparkCatalogShim {
-
-  override def getCatalogs(spark: SparkSession): Seq[Row] = {
-    Seq(Row(SparkCatalogShim.SESSION_CATALOG))
-  }
-
-  override protected def catalogExists(spark: SparkSession, catalog: String): 
Boolean = false
-
-  override def setCurrentCatalog(spark: SparkSession, catalog: String): Unit = 
{}
-
-  override def getCurrentCatalog(spark: SparkSession): Row = {
-    Row(SparkCatalogShim.SESSION_CATALOG)
-  }
-
-  override def getSchemas(
-      spark: SparkSession,
-      catalogName: String,
-      schemaPattern: String): Seq[Row] = {
-    (spark.sessionState.catalog.listDatabases(schemaPattern) ++
-      getGlobalTempViewManager(spark, schemaPattern)).map(Row(_, 
SparkCatalogShim.SESSION_CATALOG))
-  }
-
-  def setCurrentDatabase(spark: SparkSession, databaseName: String): Unit = {
-    spark.sessionState.catalog.setCurrentDatabase(databaseName)
-  }
-
-  def getCurrentDatabase(spark: SparkSession): Row = {
-    Row(spark.sessionState.catalog.getCurrentDatabase)
-  }
-
-  override protected def getGlobalTempViewManager(
-      spark: SparkSession,
-      schemaPattern: String): Seq[String] = {
-    val database = spark.sharedState.globalTempViewManager.database
-    Option(database).filter(_.matches(schemaPattern)).toSeq
-  }
-
-  override def getCatalogTablesOrViews(
-      spark: SparkSession,
-      catalogName: String,
-      schemaPattern: String,
-      tablePattern: String,
-      tableTypes: Set[String],
-      ignoreTableProperties: Boolean): Seq[Row] = {
-    val catalog = spark.sessionState.catalog
-    val databases = catalog.listDatabases(schemaPattern)
-
-    databases.flatMap { db =>
-      val identifiers = catalog.listTables(db, tablePattern, 
includeLocalTempViews = false)
-      catalog.getTablesByName(identifiers)
-        .filter(t => matched(tableTypes, t.tableType.name)).map { t =>
-          val typ = if (t.tableType.name == "VIEW") "VIEW" else "TABLE"
-          Row(
-            catalogName,
-            t.database,
-            t.identifier.table,
-            typ,
-            t.comment.getOrElse(""),
-            null,
-            null,
-            null,
-            null,
-            null)
-        }
-    }
-  }
-
-  override def getTempViews(
-      spark: SparkSession,
-      catalogName: String,
-      schemaPattern: String,
-      tablePattern: String): Seq[Row] = {
-    val views = getViews(spark, schemaPattern, tablePattern)
-    views.map { ident =>
-      Row(catalogName, ident.database.orNull, ident.table, "VIEW", "", null, 
null, null, null, null)
-    }
-  }
-
-  override protected def getViews(
-      spark: SparkSession,
-      schemaPattern: String,
-      tablePattern: String): Seq[TableIdentifier] = {
-    val db = getGlobalTempViewManager(spark, schemaPattern)
-    if (db.nonEmpty) {
-      spark.sessionState.catalog.listTables(db.head, tablePattern)
-    } else {
-      spark.sessionState.catalog.listLocalTempViews(tablePattern)
-    }
-  }
-
-  override def getColumns(
-      spark: SparkSession,
-      catalogName: String,
-      schemaPattern: String,
-      tablePattern: String,
-      columnPattern: String): Seq[Row] = {
-
-    val cp = columnPattern.r.pattern
-    val byCatalog = getColumnsByCatalog(spark, catalogName, schemaPattern, 
tablePattern, cp)
-    val byGlobalTmpDB = getColumnsByGlobalTempViewManager(spark, 
schemaPattern, tablePattern, cp)
-    val byLocalTmp = getColumnsByLocalTempViews(spark, tablePattern, cp)
-
-    byCatalog ++ byGlobalTmpDB ++ byLocalTmp
-  }
-
-  protected def getColumnsByCatalog(
-      spark: SparkSession,
-      catalogName: String,
-      schemaPattern: String,
-      tablePattern: String,
-      columnPattern: Pattern): Seq[Row] = {
-    val catalog = spark.sessionState.catalog
-
-    val databases = catalog.listDatabases(schemaPattern)
-
-    databases.flatMap { db =>
-      val identifiers = catalog.listTables(db, tablePattern, 
includeLocalTempViews = true)
-      catalog.getTablesByName(identifiers).flatMap { t =>
-        t.schema.zipWithIndex.filter(f => 
columnPattern.matcher(f._1.name).matches())
-          .map { case (f, i) => toColumnResult(catalogName, t.database, 
t.identifier.table, f, i) }
-      }
-    }
-  }
-
-  protected def getColumnsByGlobalTempViewManager(
-      spark: SparkSession,
-      schemaPattern: String,
-      tablePattern: String,
-      columnPattern: Pattern): Seq[Row] = {
-    val catalog = spark.sessionState.catalog
-
-    getGlobalTempViewManager(spark, schemaPattern).flatMap { globalTmpDb =>
-      catalog.globalTempViewManager.listViewNames(tablePattern).flatMap { v =>
-        catalog.globalTempViewManager.get(v).map { plan =>
-          plan.schema.zipWithIndex.filter(f => 
columnPattern.matcher(f._1.name).matches())
-            .map { case (f, i) =>
-              toColumnResult(SparkCatalogShim.SESSION_CATALOG, globalTmpDb, v, 
f, i)
-            }
-        }
-      }.flatten
-    }
-  }
-
-  protected def getColumnsByLocalTempViews(
-      spark: SparkSession,
-      tablePattern: String,
-      columnPattern: Pattern): Seq[Row] = {
-    val catalog = spark.sessionState.catalog
-
-    catalog.listLocalTempViews(tablePattern)
-      .map(v => (v, catalog.getTempView(v.table).get))
-      .flatMap { case (v, plan) =>
-        plan.schema.zipWithIndex
-          .filter(f => columnPattern.matcher(f._1.name).matches())
-          .map { case (f, i) =>
-            toColumnResult(SparkCatalogShim.SESSION_CATALOG, null, v.table, f, 
i)
-          }
-      }
-  }
-}
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/CatalogShim_v3_0.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/CatalogShim_v3_0.scala
deleted file mode 100644
index 27c524f30..000000000
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/CatalogShim_v3_0.scala
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.engine.spark.shim
-
-import java.util.regex.Pattern
-
-import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.connector.catalog.{CatalogExtension, 
CatalogPlugin, SupportsNamespaces, TableCatalog}
-
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim.SESSION_CATALOG
-
-class CatalogShim_v3_0 extends CatalogShim_v2_4 {
-
-  override def getCatalogs(spark: SparkSession): Seq[Row] = {
-
-    // A [[CatalogManager]] is session unique
-    val catalogMgr = spark.sessionState.catalogManager
-    // get the custom v2 session catalog or default spark_catalog
-    val sessionCatalog = invoke(catalogMgr, "v2SessionCatalog")
-    val defaultCatalog = catalogMgr.currentCatalog
-
-    val defaults = Seq(sessionCatalog, defaultCatalog).distinct
-      .map(invoke(_, "name").asInstanceOf[String])
-    val catalogs = getField(catalogMgr, "catalogs")
-      .asInstanceOf[scala.collection.Map[String, _]]
-    (catalogs.keys ++: defaults).distinct.map(Row(_))
-  }
-
-  private def getCatalog(spark: SparkSession, catalogName: String): 
CatalogPlugin = {
-    val catalogManager = spark.sessionState.catalogManager
-    if (catalogName == null || catalogName.isEmpty) {
-      catalogManager.currentCatalog
-    } else {
-      catalogManager.catalog(catalogName)
-    }
-  }
-
-  override def catalogExists(spark: SparkSession, catalog: String): Boolean = {
-    spark.sessionState.catalogManager.isCatalogRegistered(catalog)
-  }
-
-  override def setCurrentCatalog(spark: SparkSession, catalog: String): Unit = 
{
-    // SPARK-36841(3.3.0) Ensure setCurrentCatalog method catalog must exist
-    if (spark.sessionState.catalogManager.isCatalogRegistered(catalog)) {
-      spark.sessionState.catalogManager.setCurrentCatalog(catalog)
-    } else {
-      throw new IllegalArgumentException(s"Cannot find catalog plugin class 
for catalog '$catalog'")
-    }
-  }
-
-  override def getCurrentCatalog(spark: SparkSession): Row = {
-    Row(spark.sessionState.catalogManager.currentCatalog.name())
-  }
-
-  private def listAllNamespaces(
-      catalog: SupportsNamespaces,
-      namespaces: Array[Array[String]]): Array[Array[String]] = {
-    val children = namespaces.flatMap { ns =>
-      catalog.listNamespaces(ns)
-    }
-    if (children.isEmpty) {
-      namespaces
-    } else {
-      namespaces ++: listAllNamespaces(catalog, children)
-    }
-  }
-
-  private def listAllNamespaces(catalog: CatalogPlugin): Array[Array[String]] 
= {
-    catalog match {
-      case catalog: CatalogExtension =>
-        // DSv2 does not support pass schemaPattern transparently
-        catalog.defaultNamespace() +: catalog.listNamespaces(Array())
-      case catalog: SupportsNamespaces =>
-        val rootSchema = catalog.listNamespaces()
-        val allSchemas = listAllNamespaces(catalog, rootSchema)
-        allSchemas
-    }
-  }
-
-  /**
-   * Forked from Apache Spark's 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits
-   */
-  private def quoteIfNeeded(part: String): String = {
-    if (part.contains(".") || part.contains("`")) {
-      s"`${part.replace("`", "``")}`"
-    } else {
-      part
-    }
-  }
-
-  private def listNamespacesWithPattern(
-      catalog: CatalogPlugin,
-      schemaPattern: String): Array[Array[String]] = {
-    val p = schemaPattern.r.pattern
-    listAllNamespaces(catalog).filter { ns =>
-      val quoted = ns.map(quoteIfNeeded).mkString(".")
-      p.matcher(quoted).matches()
-    }.distinct
-  }
-
-  private def getSchemasWithPattern(catalog: CatalogPlugin, schemaPattern: 
String): Seq[String] = {
-    val p = schemaPattern.r.pattern
-    listAllNamespaces(catalog).flatMap { ns =>
-      val quoted = ns.map(quoteIfNeeded).mkString(".")
-      if (p.matcher(quoted).matches()) {
-        Some(quoted)
-      } else {
-        None
-      }
-    }.distinct
-  }
-
-  override def getSchemas(
-      spark: SparkSession,
-      catalogName: String,
-      schemaPattern: String): Seq[Row] = {
-    if (catalogName == SparkCatalogShim.SESSION_CATALOG) {
-      super.getSchemas(spark, catalogName, schemaPattern)
-    } else {
-      val catalog = getCatalog(spark, catalogName)
-      getSchemasWithPattern(catalog, schemaPattern).map(Row(_, catalog.name))
-    }
-  }
-
-  override def setCurrentDatabase(spark: SparkSession, databaseName: String): 
Unit = {
-    spark.sessionState.catalogManager.setCurrentNamespace(Array(databaseName))
-  }
-
-  override def getCurrentDatabase(spark: SparkSession): Row = {
-    
Row(spark.sessionState.catalogManager.currentNamespace.map(quoteIfNeeded).mkString("."))
-  }
-
-  override def getCatalogTablesOrViews(
-      spark: SparkSession,
-      catalogName: String,
-      schemaPattern: String,
-      tablePattern: String,
-      tableTypes: Set[String],
-      ignoreTableProperties: Boolean = false): Seq[Row] = {
-    val catalog = getCatalog(spark, catalogName)
-    val namespaces = listNamespacesWithPattern(catalog, schemaPattern)
-    catalog match {
-      case builtin if builtin.name() == SESSION_CATALOG =>
-        super.getCatalogTablesOrViews(
-          spark,
-          SESSION_CATALOG,
-          schemaPattern,
-          tablePattern,
-          tableTypes,
-          ignoreTableProperties)
-      case tc: TableCatalog =>
-        val tp = tablePattern.r.pattern
-        val identifiers = namespaces.flatMap { ns =>
-          tc.listTables(ns).filter(i => 
tp.matcher(quoteIfNeeded(i.name())).matches())
-        }
-        identifiers.map { ident =>
-          // TODO: restore view type for session catalog
-          val comment = if (ignoreTableProperties) ""
-          else 
tc.loadTable(ident).properties().getOrDefault(TableCatalog.PROP_COMMENT, "")
-          val schema = ident.namespace().map(quoteIfNeeded).mkString(".")
-          val tableName = quoteIfNeeded(ident.name())
-          Row(catalog.name(), schema, tableName, "TABLE", comment, null, null, 
null, null, null)
-        }
-      case _ => Seq.empty[Row]
-    }
-  }
-
-  override protected def getColumnsByCatalog(
-      spark: SparkSession,
-      catalogName: String,
-      schemaPattern: String,
-      tablePattern: String,
-      columnPattern: Pattern): Seq[Row] = {
-    val catalog = getCatalog(spark, catalogName)
-
-    catalog match {
-      case tc: TableCatalog =>
-        val namespaces = listNamespacesWithPattern(catalog, schemaPattern)
-        val tp = tablePattern.r.pattern
-        val identifiers = namespaces.flatMap { ns =>
-          tc.listTables(ns).filter(i => 
tp.matcher(quoteIfNeeded(i.name())).matches())
-        }
-        identifiers.flatMap { ident =>
-          val table = tc.loadTable(ident)
-          val namespace = ident.namespace().map(quoteIfNeeded).mkString(".")
-          val tableName = quoteIfNeeded(ident.name())
-
-          table.schema.zipWithIndex.filter(f => 
columnPattern.matcher(f._1.name).matches())
-            .map { case (f, i) => toColumnResult(tc.name(), namespace, 
tableName, f, i) }
-        }
-
-      case builtin if builtin.name() == SESSION_CATALOG =>
-        super.getColumnsByCatalog(
-          spark,
-          SESSION_CATALOG,
-          schemaPattern,
-          tablePattern,
-          columnPattern)
-    }
-  }
-}
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/SparkCatalogShim.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/SparkCatalogShim.scala
deleted file mode 100644
index 83c806523..000000000
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/SparkCatalogShim.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.engine.spark.shim
-
-import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.types.StructField
-
-import org.apache.kyuubi.Logging
-import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.sparkMajorMinorVersion
-import org.apache.kyuubi.engine.spark.schema.SchemaHelper
-
-/**
- * A shim that defines the interface interact with Spark's catalogs
- */
-trait SparkCatalogShim extends Logging {
-
-  // 
///////////////////////////////////////////////////////////////////////////////////////////////
-  //                                          Catalog                          
                  //
-  // 
///////////////////////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Get all register catalogs in Spark's `CatalogManager`
-   */
-  def getCatalogs(spark: SparkSession): Seq[Row]
-
-  protected def catalogExists(spark: SparkSession, catalog: String): Boolean
-
-  def setCurrentCatalog(spark: SparkSession, catalog: String): Unit
-
-  def getCurrentCatalog(spark: SparkSession): Row
-
-  // 
///////////////////////////////////////////////////////////////////////////////////////////////
-  //                                           Schema                          
                  //
-  // 
///////////////////////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * a list of [[Row]]s, with 2 fields `schemaName: String, catalogName: 
String`
-   */
-  def getSchemas(spark: SparkSession, catalogName: String, schemaPattern: 
String): Seq[Row]
-
-  def setCurrentDatabase(spark: SparkSession, databaseName: String): Unit
-
-  def getCurrentDatabase(spark: SparkSession): Row
-
-  protected def getGlobalTempViewManager(spark: SparkSession, schemaPattern: 
String): Seq[String]
-
-  // 
///////////////////////////////////////////////////////////////////////////////////////////////
-  //                                        Table & View                       
                  //
-  // 
///////////////////////////////////////////////////////////////////////////////////////////////
-
-  def getCatalogTablesOrViews(
-      spark: SparkSession,
-      catalogName: String,
-      schemaPattern: String,
-      tablePattern: String,
-      tableTypes: Set[String],
-      ignoreTableProperties: Boolean): Seq[Row]
-
-  def getTempViews(
-      spark: SparkSession,
-      catalogName: String,
-      schemaPattern: String,
-      tablePattern: String): Seq[Row]
-
-  protected def getViews(
-      spark: SparkSession,
-      schemaPattern: String,
-      tablePattern: String): Seq[TableIdentifier]
-
-  // 
///////////////////////////////////////////////////////////////////////////////////////////////
-  //                                          Columns                          
                  //
-  // 
///////////////////////////////////////////////////////////////////////////////////////////////
-
-  def getColumns(
-      spark: SparkSession,
-      catalogName: String,
-      schemaPattern: String,
-      tablePattern: String,
-      columnPattern: String): Seq[Row]
-
-  protected def toColumnResult(
-      catalog: String,
-      db: String,
-      table: String,
-      col: StructField,
-      pos: Int): Row = {
-    // format: off
-    Row(
-      catalog,                                              // TABLE_CAT
-      db,                                                   // TABLE_SCHEM
-      table,                                                // TABLE_NAME
-      col.name,                                             // COLUMN_NAME
-      SchemaHelper.toJavaSQLType(col.dataType),             // DATA_TYPE
-      col.dataType.sql,                                     // TYPE_NAME
-      SchemaHelper.getColumnSize(col.dataType).orNull,      // COLUMN_SIZE
-      null,                                                 // BUFFER_LENGTH
-      SchemaHelper.getDecimalDigits(col.dataType).orNull,   // DECIMAL_DIGITS
-      SchemaHelper.getNumPrecRadix(col.dataType).orNull,    // NUM_PREC_RADIX
-      if (col.nullable) 1 else 0,                           // NULLABLE
-      col.getComment().getOrElse(""),                       // REMARKS
-      null,                                                 // COLUMN_DEF
-      null,                                                 // SQL_DATA_TYPE
-      null,                                                 // SQL_DATETIME_SUB
-      null,                                                 // 
CHAR_OCTET_LENGTH
-      pos,                                                  // ORDINAL_POSITION
-      "YES",                                                // IS_NULLABLE
-      null,                                                 // SCOPE_CATALOG
-      null,                                                 // SCOPE_SCHEMA
-      null,                                                 // SCOPE_TABLE
-      null,                                                 // SOURCE_DATA_TYPE
-      "NO"                                                  // 
IS_AUTO_INCREMENT
-    )
-    // format: on
-  }
-
-  // 
///////////////////////////////////////////////////////////////////////////////////////////////
-  //                                         Miscellaneous                     
                  //
-  // 
///////////////////////////////////////////////////////////////////////////////////////////////
-
-  protected def invoke(
-      obj: Any,
-      methodName: String,
-      args: (Class[_], AnyRef)*): Any = {
-    val (types, values) = args.unzip
-    val method = obj.getClass.getMethod(methodName, types: _*)
-    method.setAccessible(true)
-    method.invoke(obj, values.toSeq: _*)
-  }
-
-  protected def invoke(
-      clazz: Class[_],
-      obj: AnyRef,
-      methodName: String,
-      args: (Class[_], AnyRef)*): AnyRef = {
-    val (types, values) = args.unzip
-    val method = clazz.getMethod(methodName, types: _*)
-    method.setAccessible(true)
-    method.invoke(obj, values.toSeq: _*)
-  }
-
-  protected def getField(o: Any, fieldName: String): Any = {
-    val field = o.getClass.getDeclaredField(fieldName)
-    field.setAccessible(true)
-    field.get(o)
-  }
-
-  protected def matched(tableTypes: Set[String], tableType: String): Boolean = 
{
-    val typ = if (tableType.equalsIgnoreCase("VIEW")) "VIEW" else "TABLE"
-    tableTypes.exists(typ.equalsIgnoreCase)
-  }
-
-}
-
-object SparkCatalogShim {
-  def apply(): SparkCatalogShim = {
-    sparkMajorMinorVersion match {
-      case (3, _) => new CatalogShim_v3_0
-      case (2, _) => new CatalogShim_v2_4
-      case _ =>
-        throw new IllegalArgumentException(s"Not Support spark version 
$sparkMajorMinorVersion")
-    }
-  }
-
-  val SESSION_CATALOG: String = "spark_catalog"
-
-  val sparkTableTypes = Set("VIEW", "TABLE")
-}
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala
new file mode 100644
index 000000000..13b87665d
--- /dev/null
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.util
+
+import java.util.regex.Pattern
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.connector.catalog.{CatalogExtension, 
CatalogPlugin, SupportsNamespaces, TableCatalog}
+import org.apache.spark.sql.types.StructField
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.engine.spark.schema.SchemaHelper
+import org.apache.kyuubi.util.reflect.DynMethods
+import org.apache.kyuubi.util.reflect.ReflectUtils.{getField, invokeAs}
+
+/**
+ * A shim that defines the interface interact with Spark's catalogs
+ */
+object SparkCatalogUtils extends Logging {
+
+  private val VIEW = "VIEW"
+  private val TABLE = "TABLE"
+
+  val SESSION_CATALOG: String = "spark_catalog"
+  val sparkTableTypes: Set[String] = Set(VIEW, TABLE)
+
+  // 
///////////////////////////////////////////////////////////////////////////////////////////////
+  //                                          Catalog                          
                  //
+  // 
///////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Get all register catalogs in Spark's `CatalogManager`
+   */
+  def getCatalogs(spark: SparkSession): Seq[Row] = {
+
+    // A [[CatalogManager]] is session unique
+    val catalogMgr = spark.sessionState.catalogManager
+    // get the custom v2 session catalog or default spark_catalog
+    val sessionCatalog = invokeAs[AnyRef](catalogMgr, "v2SessionCatalog")
+    val defaultCatalog = catalogMgr.currentCatalog
+
+    val defaults = Seq(sessionCatalog, defaultCatalog).distinct.map(catalog =>
+      
DynMethods.builder("name").impl(catalog.getClass).buildChecked(catalog).invoke[String]())
+    val catalogs = getField[scala.collection.Map[String, _]](catalogMgr, 
"catalogs")
+    (catalogs.keys ++: defaults).distinct.map(Row(_))
+  }
+
+  def getCatalog(spark: SparkSession, catalogName: String): CatalogPlugin = {
+    val catalogManager = spark.sessionState.catalogManager
+    if (StringUtils.isBlank(catalogName)) {
+      catalogManager.currentCatalog
+    } else {
+      catalogManager.catalog(catalogName)
+    }
+  }
+
+  def setCurrentCatalog(spark: SparkSession, catalog: String): Unit = {
+    // SPARK-36841(3.3.0) Ensure setCurrentCatalog method catalog must exist
+    if (spark.sessionState.catalogManager.isCatalogRegistered(catalog)) {
+      spark.sessionState.catalogManager.setCurrentCatalog(catalog)
+    } else {
+      throw new IllegalArgumentException(s"Cannot find catalog plugin class 
for catalog '$catalog'")
+    }
+  }
+
+  // 
///////////////////////////////////////////////////////////////////////////////////////////////
+  //                                           Schema                          
                  //
+  // 
///////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * a list of [[Row]]s, with 2 fields `schemaName: String, catalogName: 
String`
+   */
+  def getSchemas(
+      spark: SparkSession,
+      catalogName: String,
+      schemaPattern: String): Seq[Row] = {
+    if (catalogName == SparkCatalogUtils.SESSION_CATALOG) {
+      (spark.sessionState.catalog.listDatabases(schemaPattern) ++
+        getGlobalTempViewManager(spark, schemaPattern))
+        .map(Row(_, SparkCatalogUtils.SESSION_CATALOG))
+    } else {
+      val catalog = getCatalog(spark, catalogName)
+      getSchemasWithPattern(catalog, schemaPattern).map(Row(_, catalog.name))
+    }
+  }
+
+  private def getGlobalTempViewManager(
+      spark: SparkSession,
+      schemaPattern: String): Seq[String] = {
+    val database = spark.sharedState.globalTempViewManager.database
+    Option(database).filter(_.matches(schemaPattern)).toSeq
+  }
+
+  private def listAllNamespaces(
+      catalog: SupportsNamespaces,
+      namespaces: Array[Array[String]]): Array[Array[String]] = {
+    val children = namespaces.flatMap { ns =>
+      catalog.listNamespaces(ns)
+    }
+    if (children.isEmpty) {
+      namespaces
+    } else {
+      namespaces ++: listAllNamespaces(catalog, children)
+    }
+  }
+
+  private def listAllNamespaces(catalog: CatalogPlugin): Array[Array[String]] 
= {
+    catalog match {
+      case catalog: CatalogExtension =>
+        // DSv2 does not support pass schemaPattern transparently
+        catalog.defaultNamespace() +: catalog.listNamespaces(Array())
+      case catalog: SupportsNamespaces =>
+        val rootSchema = catalog.listNamespaces()
+        val allSchemas = listAllNamespaces(catalog, rootSchema)
+        allSchemas
+    }
+  }
+
+  private def listNamespacesWithPattern(
+      catalog: CatalogPlugin,
+      schemaPattern: String): Array[Array[String]] = {
+    listAllNamespaces(catalog).filter { ns =>
+      val quoted = ns.map(quoteIfNeeded).mkString(".")
+      schemaPattern.r.pattern.matcher(quoted).matches()
+    }.distinct
+  }
+
+  private def getSchemasWithPattern(catalog: CatalogPlugin, schemaPattern: 
String): Seq[String] = {
+    val p = schemaPattern.r.pattern
+    listAllNamespaces(catalog).flatMap { ns =>
+      val quoted = ns.map(quoteIfNeeded).mkString(".")
+      if (p.matcher(quoted).matches()) Some(quoted) else None
+    }.distinct
+  }
+
+  // 
///////////////////////////////////////////////////////////////////////////////////////////////
+  //                                        Table & View                       
                  //
+  // 
///////////////////////////////////////////////////////////////////////////////////////////////
+
+  def getCatalogTablesOrViews(
+      spark: SparkSession,
+      catalogName: String,
+      schemaPattern: String,
+      tablePattern: String,
+      tableTypes: Set[String],
+      ignoreTableProperties: Boolean = false): Seq[Row] = {
+    val catalog = getCatalog(spark, catalogName)
+    val namespaces = listNamespacesWithPattern(catalog, schemaPattern)
+    catalog match {
+      case builtin if builtin.name() == SESSION_CATALOG =>
+        val catalog = spark.sessionState.catalog
+        val databases = catalog.listDatabases(schemaPattern)
+
+        def isMatchedTableType(tableTypes: Set[String], tableType: String): 
Boolean = {
+          val typ = if (tableType.equalsIgnoreCase(VIEW)) VIEW else TABLE
+          tableTypes.exists(typ.equalsIgnoreCase)
+        }
+
+        databases.flatMap { db =>
+          val identifiers = catalog.listTables(db, tablePattern, 
includeLocalTempViews = false)
+          catalog.getTablesByName(identifiers)
+            .filter(t => isMatchedTableType(tableTypes, t.tableType.name)).map 
{ t =>
+              val typ = if (t.tableType.name == VIEW) VIEW else TABLE
+              Row(
+                catalogName,
+                t.database,
+                t.identifier.table,
+                typ,
+                t.comment.getOrElse(""),
+                null,
+                null,
+                null,
+                null,
+                null)
+            }
+        }
+      case tc: TableCatalog =>
+        val tp = tablePattern.r.pattern
+        val identifiers = namespaces.flatMap { ns =>
+          tc.listTables(ns).filter(i => 
tp.matcher(quoteIfNeeded(i.name())).matches())
+        }
+        identifiers.map { ident =>
+          // TODO: restore view type for session catalog
+          val comment = if (ignoreTableProperties) ""
+          else { // load table is a time consuming operation
+            
tc.loadTable(ident).properties().getOrDefault(TableCatalog.PROP_COMMENT, "")
+          }
+          val schema = ident.namespace().map(quoteIfNeeded).mkString(".")
+          val tableName = quoteIfNeeded(ident.name())
+          Row(catalog.name(), schema, tableName, TABLE, comment, null, null, 
null, null, null)
+        }
+      case _ => Seq.empty[Row]
+    }
+  }
+
+  private def getColumnsByCatalog(
+      spark: SparkSession,
+      catalogName: String,
+      schemaPattern: String,
+      tablePattern: String,
+      columnPattern: Pattern): Seq[Row] = {
+    val catalog = getCatalog(spark, catalogName)
+
+    catalog match {
+      case tc: TableCatalog =>
+        val namespaces = listNamespacesWithPattern(catalog, schemaPattern)
+        val tp = tablePattern.r.pattern
+        val identifiers = namespaces.flatMap { ns =>
+          tc.listTables(ns).filter(i => 
tp.matcher(quoteIfNeeded(i.name())).matches())
+        }
+        identifiers.flatMap { ident =>
+          val table = tc.loadTable(ident)
+          val namespace = ident.namespace().map(quoteIfNeeded).mkString(".")
+          val tableName = quoteIfNeeded(ident.name())
+
+          table.schema.zipWithIndex.filter(f => 
columnPattern.matcher(f._1.name).matches())
+            .map { case (f, i) => toColumnResult(tc.name(), namespace, 
tableName, f, i) }
+        }
+
+      case builtin if builtin.name() == SESSION_CATALOG =>
+        val catalog = spark.sessionState.catalog
+        val databases = catalog.listDatabases(schemaPattern)
+        databases.flatMap { db =>
+          val identifiers = catalog.listTables(db, tablePattern, 
includeLocalTempViews = true)
+          catalog.getTablesByName(identifiers).flatMap { t =>
+            t.schema.zipWithIndex.filter(f => 
columnPattern.matcher(f._1.name).matches())
+              .map { case (f, i) =>
+                toColumnResult(catalogName, t.database, t.identifier.table, f, 
i)
+              }
+          }
+        }
+    }
+  }
+
+  def getTempViews(
+      spark: SparkSession,
+      catalogName: String,
+      schemaPattern: String,
+      tablePattern: String): Seq[Row] = {
+    val views = getViews(spark, schemaPattern, tablePattern)
+    views.map { ident =>
+      Row(catalogName, ident.database.orNull, ident.table, VIEW, "", null, 
null, null, null, null)
+    }
+  }
+
+  private def getViews(
+      spark: SparkSession,
+      schemaPattern: String,
+      tablePattern: String): Seq[TableIdentifier] = {
+    val db = getGlobalTempViewManager(spark, schemaPattern)
+    if (db.nonEmpty) {
+      spark.sessionState.catalog.listTables(db.head, tablePattern)
+    } else {
+      spark.sessionState.catalog.listLocalTempViews(tablePattern)
+    }
+  }
+
+  // 
///////////////////////////////////////////////////////////////////////////////////////////////
+  //                                          Columns                          
                  //
+  // 
///////////////////////////////////////////////////////////////////////////////////////////////
+
+  def getColumns(
+      spark: SparkSession,
+      catalogName: String,
+      schemaPattern: String,
+      tablePattern: String,
+      columnPattern: String): Seq[Row] = {
+
+    val cp = columnPattern.r.pattern
+    val byCatalog = getColumnsByCatalog(spark, catalogName, schemaPattern, 
tablePattern, cp)
+    val byGlobalTmpDB = getColumnsByGlobalTempViewManager(spark, 
schemaPattern, tablePattern, cp)
+    val byLocalTmp = getColumnsByLocalTempViews(spark, tablePattern, cp)
+
+    byCatalog ++ byGlobalTmpDB ++ byLocalTmp
+  }
+
+  private def getColumnsByGlobalTempViewManager(
+      spark: SparkSession,
+      schemaPattern: String,
+      tablePattern: String,
+      columnPattern: Pattern): Seq[Row] = {
+    val catalog = spark.sessionState.catalog
+
+    getGlobalTempViewManager(spark, schemaPattern).flatMap { globalTmpDb =>
+      catalog.globalTempViewManager.listViewNames(tablePattern).flatMap { v =>
+        catalog.globalTempViewManager.get(v).map { plan =>
+          plan.schema.zipWithIndex.filter(f => 
columnPattern.matcher(f._1.name).matches())
+            .map { case (f, i) =>
+              toColumnResult(SparkCatalogUtils.SESSION_CATALOG, globalTmpDb, 
v, f, i)
+            }
+        }
+      }.flatten
+    }
+  }
+
+  private def getColumnsByLocalTempViews(
+      spark: SparkSession,
+      tablePattern: String,
+      columnPattern: Pattern): Seq[Row] = {
+    val catalog = spark.sessionState.catalog
+
+    catalog.listLocalTempViews(tablePattern)
+      .map(v => (v, catalog.getTempView(v.table).get))
+      .flatMap { case (v, plan) =>
+        plan.schema.zipWithIndex
+          .filter(f => columnPattern.matcher(f._1.name).matches())
+          .map { case (f, i) =>
+            toColumnResult(SparkCatalogUtils.SESSION_CATALOG, null, v.table, 
f, i)
+          }
+      }
+  }
+
+  private def toColumnResult(
+      catalog: String,
+      db: String,
+      table: String,
+      col: StructField,
+      pos: Int): Row = {
+    // format: off
+    Row(
+      catalog,                                              // TABLE_CAT
+      db,                                                   // TABLE_SCHEM
+      table,                                                // TABLE_NAME
+      col.name,                                             // COLUMN_NAME
+      SchemaHelper.toJavaSQLType(col.dataType),             // DATA_TYPE
+      col.dataType.sql,                                     // TYPE_NAME
+      SchemaHelper.getColumnSize(col.dataType).orNull,      // COLUMN_SIZE
+      null,                                                 // BUFFER_LENGTH
+      SchemaHelper.getDecimalDigits(col.dataType).orNull,   // DECIMAL_DIGITS
+      SchemaHelper.getNumPrecRadix(col.dataType).orNull,    // NUM_PREC_RADIX
+      if (col.nullable) 1 else 0,                           // NULLABLE
+      col.getComment().getOrElse(""),                       // REMARKS
+      null,                                                 // COLUMN_DEF
+      null,                                                 // SQL_DATA_TYPE
+      null,                                                 // SQL_DATETIME_SUB
+      null,                                                 // 
CHAR_OCTET_LENGTH
+      pos,                                                  // ORDINAL_POSITION
+      "YES",                                                // IS_NULLABLE
+      null,                                                 // SCOPE_CATALOG
+      null,                                                 // SCOPE_SCHEMA
+      null,                                                 // SCOPE_TABLE
+      null,                                                 // SOURCE_DATA_TYPE
+      "NO"                                                  // 
IS_AUTO_INCREMENT
+    )
+    // format: on
+  }
+
+  /**
+   * Forked from Apache Spark's 
[[org.apache.spark.sql.catalyst.util.quoteIfNeeded]]
+   */
+  def quoteIfNeeded(part: String): String = {
+    if (part.matches("[a-zA-Z0-9_]+") && !part.matches("\\d+")) {
+      part
+    } else {
+      s"`${part.replace("`", "``")}`"
+    }
+  }
+}
diff --git 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkCatalogDatabaseOperationSuite.scala
 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkCatalogDatabaseOperationSuite.scala
index 46208bff1..69431266b 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkCatalogDatabaseOperationSuite.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkCatalogDatabaseOperationSuite.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 import 
org.apache.kyuubi.config.KyuubiConf.ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED
 import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
+import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
 import org.apache.kyuubi.operation.HiveJDBCTestHelper
 
 class SparkCatalogDatabaseOperationSuite extends WithSparkSQLEngine with 
HiveJDBCTestHelper {
@@ -37,7 +37,7 @@ class SparkCatalogDatabaseOperationSuite extends 
WithSparkSQLEngine with HiveJDB
   test("set/get current catalog") {
     withJdbcStatement() { statement =>
       val catalog = statement.getConnection.getCatalog
-      assert(catalog == SparkCatalogShim.SESSION_CATALOG)
+      assert(catalog == SparkCatalogUtils.SESSION_CATALOG)
       statement.getConnection.setCatalog("dummy")
       val changedCatalog = statement.getConnection.getCatalog
       assert(changedCatalog == "dummy")
diff --git 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
index cf5044056..650bdabd9 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.types._
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
 import org.apache.kyuubi.engine.spark.schema.SchemaHelper.TIMESTAMP_NTZ
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
+import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
 import org.apache.kyuubi.operation.{HiveMetadataTests, SparkQueryTests}
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
 import org.apache.kyuubi.util.KyuubiHadoopUtils
@@ -49,7 +49,7 @@ class SparkOperationSuite extends WithSparkSQLEngine with 
HiveMetadataTests with
     withJdbcStatement() { statement =>
       val meta = statement.getConnection.getMetaData
       val types = meta.getTableTypes
-      val expected = SparkCatalogShim.sparkTableTypes.toIterator
+      val expected = SparkCatalogUtils.sparkTableTypes.toIterator
       while (types.next()) {
         assert(types.getString(TABLE_TYPE) === expected.next())
       }
@@ -143,7 +143,7 @@ class SparkOperationSuite extends WithSparkSQLEngine with 
HiveMetadataTests with
         var pos = 0
 
         while (rowSet.next()) {
-          assert(rowSet.getString(TABLE_CAT) === 
SparkCatalogShim.SESSION_CATALOG)
+          assert(rowSet.getString(TABLE_CAT) === 
SparkCatalogUtils.SESSION_CATALOG)
           assert(rowSet.getString(TABLE_SCHEM) === defaultSchema)
           assert(rowSet.getString(TABLE_NAME) === tableName)
           assert(rowSet.getString(COLUMN_NAME) === schema(pos).name)
@@ -201,7 +201,7 @@ class SparkOperationSuite extends WithSparkSQLEngine with 
HiveMetadataTests with
       val data = statement.getConnection.getMetaData
       val rowSet = data.getColumns("", "global_temp", viewName, null)
       while (rowSet.next()) {
-        assert(rowSet.getString(TABLE_CAT) === 
SparkCatalogShim.SESSION_CATALOG)
+        assert(rowSet.getString(TABLE_CAT) === 
SparkCatalogUtils.SESSION_CATALOG)
         assert(rowSet.getString(TABLE_SCHEM) === "global_temp")
         assert(rowSet.getString(TABLE_NAME) === viewName)
         assert(rowSet.getString(COLUMN_NAME) === "i")
@@ -228,7 +228,7 @@ class SparkOperationSuite extends WithSparkSQLEngine with 
HiveMetadataTests with
       val data = statement.getConnection.getMetaData
       val rowSet = data.getColumns("", "global_temp", viewName, "n")
       while (rowSet.next()) {
-        assert(rowSet.getString(TABLE_CAT) === 
SparkCatalogShim.SESSION_CATALOG)
+        assert(rowSet.getString(TABLE_CAT) === 
SparkCatalogUtils.SESSION_CATALOG)
         assert(rowSet.getString(TABLE_SCHEM) === "global_temp")
         assert(rowSet.getString(TABLE_NAME) === viewName)
         assert(rowSet.getString(COLUMN_NAME) === "n")
diff --git 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/jdbc/KyuubiHiveDriverSuite.scala
 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/jdbc/KyuubiHiveDriverSuite.scala
index 4d3c75498..ae68440df 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/jdbc/KyuubiHiveDriverSuite.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/jdbc/KyuubiHiveDriverSuite.scala
@@ -22,7 +22,7 @@ import java.util.Properties
 
 import org.apache.kyuubi.IcebergSuiteMixin
 import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
+import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
 import org.apache.kyuubi.jdbc.hive.{KyuubiConnection, KyuubiStatement}
 import org.apache.kyuubi.tags.IcebergTest
 
@@ -47,15 +47,15 @@ class KyuubiHiveDriverSuite extends WithSparkSQLEngine with 
IcebergSuiteMixin {
     val metaData = connection.getMetaData
     assert(metaData.getClass.getName === 
"org.apache.kyuubi.jdbc.hive.KyuubiDatabaseMetaData")
     val statement = connection.createStatement()
-    val table1 = 
s"${SparkCatalogShim.SESSION_CATALOG}.default.kyuubi_hive_jdbc"
+    val table1 = 
s"${SparkCatalogUtils.SESSION_CATALOG}.default.kyuubi_hive_jdbc"
     val table2 = s"$catalog.default.hdp_cat_tbl"
     try {
       statement.execute(s"CREATE TABLE $table1(key int) USING parquet")
       statement.execute(s"CREATE TABLE $table2(key int) USING $format")
 
-      val resultSet1 = metaData.getTables(SparkCatalogShim.SESSION_CATALOG, 
"default", "%", null)
+      val resultSet1 = metaData.getTables(SparkCatalogUtils.SESSION_CATALOG, 
"default", "%", null)
       assert(resultSet1.next())
-      assert(resultSet1.getString(1) === SparkCatalogShim.SESSION_CATALOG)
+      assert(resultSet1.getString(1) === SparkCatalogUtils.SESSION_CATALOG)
       assert(resultSet1.getString(2) === "default")
       assert(resultSet1.getString(3) === "kyuubi_hive_jdbc")
 

Reply via email to