This is an automated email from the ASF dual-hosted git repository.
bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 23d255a06 [KYUUBI #4937] Cleanup spark catalog shim and renamed to
catalog utils
23d255a06 is described below
commit 23d255a06f6300be1afd390c92916613ce8b4ba3
Author: liangbowen <[email protected]>
AuthorDate: Thu Jun 15 20:21:51 2023 +0800
[KYUUBI #4937] Cleanup spark catalog shim and renamed to catalog utils
### _Why are the changes needed?_
to close #4937.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4963 from bowenliang123/remove-spark-shim.
Closes #4937
e6593b474 [liangbowen] remove unnecessary row
2481e3317 [liangbowen] remove SparkCatalogShim
Authored-by: liangbowen <[email protected]>
Signed-off-by: liangbowen <[email protected]>
---
.../engine/spark/operation/GetCatalogs.scala | 4 +-
.../kyuubi/engine/spark/operation/GetColumns.scala | 6 +-
.../engine/spark/operation/GetCurrentCatalog.scala | 5 +-
.../spark/operation/GetCurrentDatabase.scala | 7 +-
.../kyuubi/engine/spark/operation/GetSchemas.scala | 4 +-
.../engine/spark/operation/GetTableTypes.scala | 4 +-
.../kyuubi/engine/spark/operation/GetTables.scala | 7 +-
.../engine/spark/operation/SetCurrentCatalog.scala | 4 +-
.../spark/operation/SetCurrentDatabase.scala | 3 +-
.../spark/operation/SparkSQLOperationManager.scala | 4 +-
.../engine/spark/session/SparkSessionImpl.scala | 6 +-
.../engine/spark/shim/CatalogShim_v2_4.scala | 184 ----------
.../engine/spark/shim/CatalogShim_v3_0.scala | 216 ------------
.../engine/spark/shim/SparkCatalogShim.scala | 183 ----------
.../engine/spark/util/SparkCatalogUtils.scala | 375 +++++++++++++++++++++
.../SparkCatalogDatabaseOperationSuite.scala | 4 +-
.../spark/operation/SparkOperationSuite.scala | 10 +-
.../apache/kyuubi/jdbc/KyuubiHiveDriverSuite.scala | 8 +-
18 files changed, 414 insertions(+), 620 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala
index 6d818e53e..c8e587300 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark.operation
import org.apache.spark.sql.types.StructType
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
+import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
import org.apache.kyuubi.operation.IterableFetchIterator
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
import org.apache.kyuubi.session.Session
@@ -33,7 +33,7 @@ class GetCatalogs(session: Session) extends
SparkOperation(session) {
override protected def runInternal(): Unit = {
try {
- iter = new
IterableFetchIterator(SparkCatalogShim().getCatalogs(spark).toList)
+ iter = new IterableFetchIterator(SparkCatalogUtils.getCatalogs(spark))
} catch onError()
}
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetColumns.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetColumns.scala
index e78516981..3a0ab7d5b 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetColumns.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetColumns.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark.operation
import org.apache.spark.sql.types._
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
+import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
import org.apache.kyuubi.operation.IterableFetchIterator
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
@@ -115,8 +115,8 @@ class GetColumns(
val schemaPattern = toJavaRegex(schemaName)
val tablePattern = toJavaRegex(tableName)
val columnPattern = toJavaRegex(columnName)
- iter = new IterableFetchIterator(SparkCatalogShim()
- .getColumns(spark, catalogName, schemaPattern, tablePattern,
columnPattern).toList)
+ iter = new IterableFetchIterator(SparkCatalogUtils
+ .getColumns(spark, catalogName, schemaPattern, tablePattern,
columnPattern))
} catch {
onError()
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCurrentCatalog.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCurrentCatalog.scala
index 96e013284..1d85d3d5a 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCurrentCatalog.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCurrentCatalog.scala
@@ -17,9 +17,9 @@
package org.apache.kyuubi.engine.spark.operation
+import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
import org.apache.kyuubi.operation.IterableFetchIterator
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
@@ -38,7 +38,8 @@ class GetCurrentCatalog(session: Session) extends
SparkOperation(session) {
override protected def runInternal(): Unit = {
try {
- iter = new
IterableFetchIterator(Seq(SparkCatalogShim().getCurrentCatalog(spark)))
+ val currentCatalogName =
spark.sessionState.catalogManager.currentCatalog.name()
+ iter = new IterableFetchIterator(Seq(Row(currentCatalogName)))
} catch onError()
}
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCurrentDatabase.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCurrentDatabase.scala
index 10b325d76..2478fb6a4 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCurrentDatabase.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCurrentDatabase.scala
@@ -17,9 +17,10 @@
package org.apache.kyuubi.engine.spark.operation
+import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
+import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils.quoteIfNeeded
import org.apache.kyuubi.operation.IterableFetchIterator
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_SCHEM
@@ -38,7 +39,9 @@ class GetCurrentDatabase(session: Session) extends
SparkOperation(session) {
override protected def runInternal(): Unit = {
try {
- iter = new
IterableFetchIterator(Seq(SparkCatalogShim().getCurrentDatabase(spark)))
+ val currentDatabaseName =
+
spark.sessionState.catalogManager.currentNamespace.map(quoteIfNeeded).mkString(".")
+ iter = new IterableFetchIterator(Seq(Row(currentDatabaseName)))
} catch onError()
}
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetSchemas.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetSchemas.scala
index 3937f528d..46dc7634a 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetSchemas.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetSchemas.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark.operation
import org.apache.spark.sql.types.StructType
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
+import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
import org.apache.kyuubi.operation.IterableFetchIterator
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
@@ -40,7 +40,7 @@ class GetSchemas(session: Session, catalogName: String,
schema: String)
override protected def runInternal(): Unit = {
try {
val schemaPattern = toJavaRegex(schema)
- val rows = SparkCatalogShim().getSchemas(spark, catalogName,
schemaPattern)
+ val rows = SparkCatalogUtils.getSchemas(spark, catalogName,
schemaPattern)
iter = new IterableFetchIterator(rows)
} catch onError()
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala
index 1d2cae381..1029175b2 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.spark.operation
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
+import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
import org.apache.kyuubi.operation.IterableFetchIterator
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
@@ -33,6 +33,6 @@ class GetTableTypes(session: Session)
}
override protected def runInternal(): Unit = {
- iter = new
IterableFetchIterator(SparkCatalogShim.sparkTableTypes.map(Row(_)).toList)
+ iter = new
IterableFetchIterator(SparkCatalogUtils.sparkTableTypes.map(Row(_)).toList)
}
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala
index 40642b825..980e4fdb1 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.spark.operation
import org.apache.spark.sql.types.StructType
import
org.apache.kyuubi.config.KyuubiConf.OPERATION_GET_TABLES_IGNORE_TABLE_PROPERTIES
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
+import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
import org.apache.kyuubi.operation.IterableFetchIterator
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
@@ -73,9 +73,8 @@ class GetTables(
try {
val schemaPattern = toJavaRegex(schema)
val tablePattern = toJavaRegex(tableName)
- val sparkShim = SparkCatalogShim()
val catalogTablesAndViews =
- sparkShim.getCatalogTablesOrViews(
+ SparkCatalogUtils.getCatalogTablesOrViews(
spark,
catalog,
schemaPattern,
@@ -86,7 +85,7 @@ class GetTables(
val allTableAndViews =
if (tableTypes.exists("VIEW".equalsIgnoreCase)) {
catalogTablesAndViews ++
- sparkShim.getTempViews(spark, catalog, schemaPattern, tablePattern)
+ SparkCatalogUtils.getTempViews(spark, catalog, schemaPattern,
tablePattern)
} else {
catalogTablesAndViews
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentCatalog.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentCatalog.scala
index 7571c3e32..88105b086 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentCatalog.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentCatalog.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark.operation
import org.apache.spark.sql.types.StructType
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
+import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
@@ -35,7 +35,7 @@ class SetCurrentCatalog(session: Session, catalog: String)
extends SparkOperatio
override protected def runInternal(): Unit = {
try {
- SparkCatalogShim().setCurrentCatalog(spark, catalog)
+ SparkCatalogUtils.setCurrentCatalog(spark, catalog)
setHasResultSet(false)
} catch onError()
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentDatabase.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentDatabase.scala
index 2112f544a..d227f5fd2 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentDatabase.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentDatabase.scala
@@ -19,7 +19,6 @@ package org.apache.kyuubi.engine.spark.operation
import org.apache.spark.sql.types.StructType
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
@@ -36,7 +35,7 @@ class SetCurrentDatabase(session: Session, database: String)
override protected def runInternal(): Unit = {
try {
- SparkCatalogShim().setCurrentDatabase(spark, database)
+ spark.sessionState.catalogManager.setCurrentNamespace(Array(database))
setHasResultSet(false)
} catch onError()
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
index cb444aa77..cd9302cbf 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
@@ -26,7 +26,7 @@ import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_OPERATION_HANDLE_KEY
import org.apache.kyuubi.engine.spark.repl.KyuubiSparkILoop
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
+import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
import org.apache.kyuubi.operation.{NoneMode, Operation, OperationHandle,
OperationManager, PlanOnlyMode}
import org.apache.kyuubi.session.{Session, SessionHandle}
@@ -179,7 +179,7 @@ class SparkSQLOperationManager private (name: String)
extends OperationManager(n
tableTypes: java.util.List[String]): Operation = {
val tTypes =
if (tableTypes == null || tableTypes.isEmpty) {
- SparkCatalogShim.sparkTableTypes
+ SparkCatalogUtils.sparkTableTypes
} else {
tableTypes.asScala.toSet
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
index ddc0370d9..8d9012cbd 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
@@ -25,8 +25,8 @@ import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.spark.events.SessionEvent
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
import org.apache.kyuubi.engine.spark.udf.KDFRegistry
+import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.operation.{Operation, OperationHandle}
import org.apache.kyuubi.session._
@@ -62,7 +62,7 @@ class SparkSessionImpl(
useCatalogAndDatabaseConf.get(USE_CATALOG).foreach { catalog =>
try {
- SparkCatalogShim().setCurrentCatalog(spark, catalog)
+ SparkCatalogUtils.setCurrentCatalog(spark, catalog)
} catch {
case e if e.getMessage.contains("Cannot find catalog plugin class for
catalog") =>
warn(e.getMessage())
@@ -71,7 +71,7 @@ class SparkSessionImpl(
useCatalogAndDatabaseConf.get("use:database").foreach { database =>
try {
- SparkCatalogShim().setCurrentDatabase(spark, database)
+ spark.sessionState.catalogManager.setCurrentNamespace(Array(database))
} catch {
case e
if database == "default" &&
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/CatalogShim_v2_4.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/CatalogShim_v2_4.scala
deleted file mode 100644
index ea72dd156..000000000
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/CatalogShim_v2_4.scala
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.engine.spark.shim
-
-import java.util.regex.Pattern
-
-import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
-
-class CatalogShim_v2_4 extends SparkCatalogShim {
-
- override def getCatalogs(spark: SparkSession): Seq[Row] = {
- Seq(Row(SparkCatalogShim.SESSION_CATALOG))
- }
-
- override protected def catalogExists(spark: SparkSession, catalog: String):
Boolean = false
-
- override def setCurrentCatalog(spark: SparkSession, catalog: String): Unit =
{}
-
- override def getCurrentCatalog(spark: SparkSession): Row = {
- Row(SparkCatalogShim.SESSION_CATALOG)
- }
-
- override def getSchemas(
- spark: SparkSession,
- catalogName: String,
- schemaPattern: String): Seq[Row] = {
- (spark.sessionState.catalog.listDatabases(schemaPattern) ++
- getGlobalTempViewManager(spark, schemaPattern)).map(Row(_,
SparkCatalogShim.SESSION_CATALOG))
- }
-
- def setCurrentDatabase(spark: SparkSession, databaseName: String): Unit = {
- spark.sessionState.catalog.setCurrentDatabase(databaseName)
- }
-
- def getCurrentDatabase(spark: SparkSession): Row = {
- Row(spark.sessionState.catalog.getCurrentDatabase)
- }
-
- override protected def getGlobalTempViewManager(
- spark: SparkSession,
- schemaPattern: String): Seq[String] = {
- val database = spark.sharedState.globalTempViewManager.database
- Option(database).filter(_.matches(schemaPattern)).toSeq
- }
-
- override def getCatalogTablesOrViews(
- spark: SparkSession,
- catalogName: String,
- schemaPattern: String,
- tablePattern: String,
- tableTypes: Set[String],
- ignoreTableProperties: Boolean): Seq[Row] = {
- val catalog = spark.sessionState.catalog
- val databases = catalog.listDatabases(schemaPattern)
-
- databases.flatMap { db =>
- val identifiers = catalog.listTables(db, tablePattern,
includeLocalTempViews = false)
- catalog.getTablesByName(identifiers)
- .filter(t => matched(tableTypes, t.tableType.name)).map { t =>
- val typ = if (t.tableType.name == "VIEW") "VIEW" else "TABLE"
- Row(
- catalogName,
- t.database,
- t.identifier.table,
- typ,
- t.comment.getOrElse(""),
- null,
- null,
- null,
- null,
- null)
- }
- }
- }
-
- override def getTempViews(
- spark: SparkSession,
- catalogName: String,
- schemaPattern: String,
- tablePattern: String): Seq[Row] = {
- val views = getViews(spark, schemaPattern, tablePattern)
- views.map { ident =>
- Row(catalogName, ident.database.orNull, ident.table, "VIEW", "", null,
null, null, null, null)
- }
- }
-
- override protected def getViews(
- spark: SparkSession,
- schemaPattern: String,
- tablePattern: String): Seq[TableIdentifier] = {
- val db = getGlobalTempViewManager(spark, schemaPattern)
- if (db.nonEmpty) {
- spark.sessionState.catalog.listTables(db.head, tablePattern)
- } else {
- spark.sessionState.catalog.listLocalTempViews(tablePattern)
- }
- }
-
- override def getColumns(
- spark: SparkSession,
- catalogName: String,
- schemaPattern: String,
- tablePattern: String,
- columnPattern: String): Seq[Row] = {
-
- val cp = columnPattern.r.pattern
- val byCatalog = getColumnsByCatalog(spark, catalogName, schemaPattern,
tablePattern, cp)
- val byGlobalTmpDB = getColumnsByGlobalTempViewManager(spark,
schemaPattern, tablePattern, cp)
- val byLocalTmp = getColumnsByLocalTempViews(spark, tablePattern, cp)
-
- byCatalog ++ byGlobalTmpDB ++ byLocalTmp
- }
-
- protected def getColumnsByCatalog(
- spark: SparkSession,
- catalogName: String,
- schemaPattern: String,
- tablePattern: String,
- columnPattern: Pattern): Seq[Row] = {
- val catalog = spark.sessionState.catalog
-
- val databases = catalog.listDatabases(schemaPattern)
-
- databases.flatMap { db =>
- val identifiers = catalog.listTables(db, tablePattern,
includeLocalTempViews = true)
- catalog.getTablesByName(identifiers).flatMap { t =>
- t.schema.zipWithIndex.filter(f =>
columnPattern.matcher(f._1.name).matches())
- .map { case (f, i) => toColumnResult(catalogName, t.database,
t.identifier.table, f, i) }
- }
- }
- }
-
- protected def getColumnsByGlobalTempViewManager(
- spark: SparkSession,
- schemaPattern: String,
- tablePattern: String,
- columnPattern: Pattern): Seq[Row] = {
- val catalog = spark.sessionState.catalog
-
- getGlobalTempViewManager(spark, schemaPattern).flatMap { globalTmpDb =>
- catalog.globalTempViewManager.listViewNames(tablePattern).flatMap { v =>
- catalog.globalTempViewManager.get(v).map { plan =>
- plan.schema.zipWithIndex.filter(f =>
columnPattern.matcher(f._1.name).matches())
- .map { case (f, i) =>
- toColumnResult(SparkCatalogShim.SESSION_CATALOG, globalTmpDb, v,
f, i)
- }
- }
- }.flatten
- }
- }
-
- protected def getColumnsByLocalTempViews(
- spark: SparkSession,
- tablePattern: String,
- columnPattern: Pattern): Seq[Row] = {
- val catalog = spark.sessionState.catalog
-
- catalog.listLocalTempViews(tablePattern)
- .map(v => (v, catalog.getTempView(v.table).get))
- .flatMap { case (v, plan) =>
- plan.schema.zipWithIndex
- .filter(f => columnPattern.matcher(f._1.name).matches())
- .map { case (f, i) =>
- toColumnResult(SparkCatalogShim.SESSION_CATALOG, null, v.table, f,
i)
- }
- }
- }
-}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/CatalogShim_v3_0.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/CatalogShim_v3_0.scala
deleted file mode 100644
index 27c524f30..000000000
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/CatalogShim_v3_0.scala
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.engine.spark.shim
-
-import java.util.regex.Pattern
-
-import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.connector.catalog.{CatalogExtension,
CatalogPlugin, SupportsNamespaces, TableCatalog}
-
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim.SESSION_CATALOG
-
-class CatalogShim_v3_0 extends CatalogShim_v2_4 {
-
- override def getCatalogs(spark: SparkSession): Seq[Row] = {
-
- // A [[CatalogManager]] is session unique
- val catalogMgr = spark.sessionState.catalogManager
- // get the custom v2 session catalog or default spark_catalog
- val sessionCatalog = invoke(catalogMgr, "v2SessionCatalog")
- val defaultCatalog = catalogMgr.currentCatalog
-
- val defaults = Seq(sessionCatalog, defaultCatalog).distinct
- .map(invoke(_, "name").asInstanceOf[String])
- val catalogs = getField(catalogMgr, "catalogs")
- .asInstanceOf[scala.collection.Map[String, _]]
- (catalogs.keys ++: defaults).distinct.map(Row(_))
- }
-
- private def getCatalog(spark: SparkSession, catalogName: String):
CatalogPlugin = {
- val catalogManager = spark.sessionState.catalogManager
- if (catalogName == null || catalogName.isEmpty) {
- catalogManager.currentCatalog
- } else {
- catalogManager.catalog(catalogName)
- }
- }
-
- override def catalogExists(spark: SparkSession, catalog: String): Boolean = {
- spark.sessionState.catalogManager.isCatalogRegistered(catalog)
- }
-
- override def setCurrentCatalog(spark: SparkSession, catalog: String): Unit =
{
- // SPARK-36841(3.3.0) Ensure setCurrentCatalog method catalog must exist
- if (spark.sessionState.catalogManager.isCatalogRegistered(catalog)) {
- spark.sessionState.catalogManager.setCurrentCatalog(catalog)
- } else {
- throw new IllegalArgumentException(s"Cannot find catalog plugin class
for catalog '$catalog'")
- }
- }
-
- override def getCurrentCatalog(spark: SparkSession): Row = {
- Row(spark.sessionState.catalogManager.currentCatalog.name())
- }
-
- private def listAllNamespaces(
- catalog: SupportsNamespaces,
- namespaces: Array[Array[String]]): Array[Array[String]] = {
- val children = namespaces.flatMap { ns =>
- catalog.listNamespaces(ns)
- }
- if (children.isEmpty) {
- namespaces
- } else {
- namespaces ++: listAllNamespaces(catalog, children)
- }
- }
-
- private def listAllNamespaces(catalog: CatalogPlugin): Array[Array[String]]
= {
- catalog match {
- case catalog: CatalogExtension =>
- // DSv2 does not support pass schemaPattern transparently
- catalog.defaultNamespace() +: catalog.listNamespaces(Array())
- case catalog: SupportsNamespaces =>
- val rootSchema = catalog.listNamespaces()
- val allSchemas = listAllNamespaces(catalog, rootSchema)
- allSchemas
- }
- }
-
- /**
- * Forked from Apache Spark's
org.apache.spark.sql.connector.catalog.CatalogV2Implicits
- */
- private def quoteIfNeeded(part: String): String = {
- if (part.contains(".") || part.contains("`")) {
- s"`${part.replace("`", "``")}`"
- } else {
- part
- }
- }
-
- private def listNamespacesWithPattern(
- catalog: CatalogPlugin,
- schemaPattern: String): Array[Array[String]] = {
- val p = schemaPattern.r.pattern
- listAllNamespaces(catalog).filter { ns =>
- val quoted = ns.map(quoteIfNeeded).mkString(".")
- p.matcher(quoted).matches()
- }.distinct
- }
-
- private def getSchemasWithPattern(catalog: CatalogPlugin, schemaPattern:
String): Seq[String] = {
- val p = schemaPattern.r.pattern
- listAllNamespaces(catalog).flatMap { ns =>
- val quoted = ns.map(quoteIfNeeded).mkString(".")
- if (p.matcher(quoted).matches()) {
- Some(quoted)
- } else {
- None
- }
- }.distinct
- }
-
- override def getSchemas(
- spark: SparkSession,
- catalogName: String,
- schemaPattern: String): Seq[Row] = {
- if (catalogName == SparkCatalogShim.SESSION_CATALOG) {
- super.getSchemas(spark, catalogName, schemaPattern)
- } else {
- val catalog = getCatalog(spark, catalogName)
- getSchemasWithPattern(catalog, schemaPattern).map(Row(_, catalog.name))
- }
- }
-
- override def setCurrentDatabase(spark: SparkSession, databaseName: String):
Unit = {
- spark.sessionState.catalogManager.setCurrentNamespace(Array(databaseName))
- }
-
- override def getCurrentDatabase(spark: SparkSession): Row = {
-
Row(spark.sessionState.catalogManager.currentNamespace.map(quoteIfNeeded).mkString("."))
- }
-
- override def getCatalogTablesOrViews(
- spark: SparkSession,
- catalogName: String,
- schemaPattern: String,
- tablePattern: String,
- tableTypes: Set[String],
- ignoreTableProperties: Boolean = false): Seq[Row] = {
- val catalog = getCatalog(spark, catalogName)
- val namespaces = listNamespacesWithPattern(catalog, schemaPattern)
- catalog match {
- case builtin if builtin.name() == SESSION_CATALOG =>
- super.getCatalogTablesOrViews(
- spark,
- SESSION_CATALOG,
- schemaPattern,
- tablePattern,
- tableTypes,
- ignoreTableProperties)
- case tc: TableCatalog =>
- val tp = tablePattern.r.pattern
- val identifiers = namespaces.flatMap { ns =>
- tc.listTables(ns).filter(i =>
tp.matcher(quoteIfNeeded(i.name())).matches())
- }
- identifiers.map { ident =>
- // TODO: restore view type for session catalog
- val comment = if (ignoreTableProperties) ""
- else
tc.loadTable(ident).properties().getOrDefault(TableCatalog.PROP_COMMENT, "")
- val schema = ident.namespace().map(quoteIfNeeded).mkString(".")
- val tableName = quoteIfNeeded(ident.name())
- Row(catalog.name(), schema, tableName, "TABLE", comment, null, null,
null, null, null)
- }
- case _ => Seq.empty[Row]
- }
- }
-
- override protected def getColumnsByCatalog(
- spark: SparkSession,
- catalogName: String,
- schemaPattern: String,
- tablePattern: String,
- columnPattern: Pattern): Seq[Row] = {
- val catalog = getCatalog(spark, catalogName)
-
- catalog match {
- case tc: TableCatalog =>
- val namespaces = listNamespacesWithPattern(catalog, schemaPattern)
- val tp = tablePattern.r.pattern
- val identifiers = namespaces.flatMap { ns =>
- tc.listTables(ns).filter(i =>
tp.matcher(quoteIfNeeded(i.name())).matches())
- }
- identifiers.flatMap { ident =>
- val table = tc.loadTable(ident)
- val namespace = ident.namespace().map(quoteIfNeeded).mkString(".")
- val tableName = quoteIfNeeded(ident.name())
-
- table.schema.zipWithIndex.filter(f =>
columnPattern.matcher(f._1.name).matches())
- .map { case (f, i) => toColumnResult(tc.name(), namespace,
tableName, f, i) }
- }
-
- case builtin if builtin.name() == SESSION_CATALOG =>
- super.getColumnsByCatalog(
- spark,
- SESSION_CATALOG,
- schemaPattern,
- tablePattern,
- columnPattern)
- }
- }
-}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/SparkCatalogShim.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/SparkCatalogShim.scala
deleted file mode 100644
index 83c806523..000000000
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/SparkCatalogShim.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.engine.spark.shim
-
-import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.types.StructField
-
-import org.apache.kyuubi.Logging
-import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.sparkMajorMinorVersion
-import org.apache.kyuubi.engine.spark.schema.SchemaHelper
-
-/**
- * A shim that defines the interface interact with Spark's catalogs
- */
-trait SparkCatalogShim extends Logging {
-
- //
///////////////////////////////////////////////////////////////////////////////////////////////
- // Catalog
//
- //
///////////////////////////////////////////////////////////////////////////////////////////////
-
- /**
- * Get all register catalogs in Spark's `CatalogManager`
- */
- def getCatalogs(spark: SparkSession): Seq[Row]
-
- protected def catalogExists(spark: SparkSession, catalog: String): Boolean
-
- def setCurrentCatalog(spark: SparkSession, catalog: String): Unit
-
- def getCurrentCatalog(spark: SparkSession): Row
-
- //
///////////////////////////////////////////////////////////////////////////////////////////////
- // Schema
//
- //
///////////////////////////////////////////////////////////////////////////////////////////////
-
- /**
- * a list of [[Row]]s, with 2 fields `schemaName: String, catalogName:
String`
- */
- def getSchemas(spark: SparkSession, catalogName: String, schemaPattern:
String): Seq[Row]
-
- def setCurrentDatabase(spark: SparkSession, databaseName: String): Unit
-
- def getCurrentDatabase(spark: SparkSession): Row
-
- protected def getGlobalTempViewManager(spark: SparkSession, schemaPattern:
String): Seq[String]
-
- //
///////////////////////////////////////////////////////////////////////////////////////////////
- // Table & View
//
- //
///////////////////////////////////////////////////////////////////////////////////////////////
-
- def getCatalogTablesOrViews(
- spark: SparkSession,
- catalogName: String,
- schemaPattern: String,
- tablePattern: String,
- tableTypes: Set[String],
- ignoreTableProperties: Boolean): Seq[Row]
-
- def getTempViews(
- spark: SparkSession,
- catalogName: String,
- schemaPattern: String,
- tablePattern: String): Seq[Row]
-
- protected def getViews(
- spark: SparkSession,
- schemaPattern: String,
- tablePattern: String): Seq[TableIdentifier]
-
- //
///////////////////////////////////////////////////////////////////////////////////////////////
- // Columns
//
- //
///////////////////////////////////////////////////////////////////////////////////////////////
-
- def getColumns(
- spark: SparkSession,
- catalogName: String,
- schemaPattern: String,
- tablePattern: String,
- columnPattern: String): Seq[Row]
-
- protected def toColumnResult(
- catalog: String,
- db: String,
- table: String,
- col: StructField,
- pos: Int): Row = {
- // format: off
- Row(
- catalog, // TABLE_CAT
- db, // TABLE_SCHEM
- table, // TABLE_NAME
- col.name, // COLUMN_NAME
- SchemaHelper.toJavaSQLType(col.dataType), // DATA_TYPE
- col.dataType.sql, // TYPE_NAME
- SchemaHelper.getColumnSize(col.dataType).orNull, // COLUMN_SIZE
- null, // BUFFER_LENGTH
- SchemaHelper.getDecimalDigits(col.dataType).orNull, // DECIMAL_DIGITS
- SchemaHelper.getNumPrecRadix(col.dataType).orNull, // NUM_PREC_RADIX
- if (col.nullable) 1 else 0, // NULLABLE
- col.getComment().getOrElse(""), // REMARKS
- null, // COLUMN_DEF
- null, // SQL_DATA_TYPE
- null, // SQL_DATETIME_SUB
- null, //
CHAR_OCTET_LENGTH
- pos, // ORDINAL_POSITION
- "YES", // IS_NULLABLE
- null, // SCOPE_CATALOG
- null, // SCOPE_SCHEMA
- null, // SCOPE_TABLE
- null, // SOURCE_DATA_TYPE
- "NO" //
IS_AUTO_INCREMENT
- )
- // format: on
- }
-
- //
///////////////////////////////////////////////////////////////////////////////////////////////
- // Miscellaneous
//
- //
///////////////////////////////////////////////////////////////////////////////////////////////
-
- protected def invoke(
- obj: Any,
- methodName: String,
- args: (Class[_], AnyRef)*): Any = {
- val (types, values) = args.unzip
- val method = obj.getClass.getMethod(methodName, types: _*)
- method.setAccessible(true)
- method.invoke(obj, values.toSeq: _*)
- }
-
- protected def invoke(
- clazz: Class[_],
- obj: AnyRef,
- methodName: String,
- args: (Class[_], AnyRef)*): AnyRef = {
- val (types, values) = args.unzip
- val method = clazz.getMethod(methodName, types: _*)
- method.setAccessible(true)
- method.invoke(obj, values.toSeq: _*)
- }
-
- protected def getField(o: Any, fieldName: String): Any = {
- val field = o.getClass.getDeclaredField(fieldName)
- field.setAccessible(true)
- field.get(o)
- }
-
- protected def matched(tableTypes: Set[String], tableType: String): Boolean =
{
- val typ = if (tableType.equalsIgnoreCase("VIEW")) "VIEW" else "TABLE"
- tableTypes.exists(typ.equalsIgnoreCase)
- }
-
-}
-
-object SparkCatalogShim {
- def apply(): SparkCatalogShim = {
- sparkMajorMinorVersion match {
- case (3, _) => new CatalogShim_v3_0
- case (2, _) => new CatalogShim_v2_4
- case _ =>
- throw new IllegalArgumentException(s"Not Support spark version
$sparkMajorMinorVersion")
- }
- }
-
- val SESSION_CATALOG: String = "spark_catalog"
-
- val sparkTableTypes = Set("VIEW", "TABLE")
-}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala
new file mode 100644
index 000000000..13b87665d
--- /dev/null
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.util
+
+import java.util.regex.Pattern
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.connector.catalog.{CatalogExtension,
CatalogPlugin, SupportsNamespaces, TableCatalog}
+import org.apache.spark.sql.types.StructField
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.engine.spark.schema.SchemaHelper
+import org.apache.kyuubi.util.reflect.DynMethods
+import org.apache.kyuubi.util.reflect.ReflectUtils.{getField, invokeAs}
+
+/**
+ * A shim that defines the interface interact with Spark's catalogs
+ */
+object SparkCatalogUtils extends Logging {
+
+ private val VIEW = "VIEW"
+ private val TABLE = "TABLE"
+
+ val SESSION_CATALOG: String = "spark_catalog"
+ val sparkTableTypes: Set[String] = Set(VIEW, TABLE)
+
+ //
///////////////////////////////////////////////////////////////////////////////////////////////
+ // Catalog
//
+ //
///////////////////////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Get all register catalogs in Spark's `CatalogManager`
+ */
+ def getCatalogs(spark: SparkSession): Seq[Row] = {
+
+ // A [[CatalogManager]] is session unique
+ val catalogMgr = spark.sessionState.catalogManager
+ // get the custom v2 session catalog or default spark_catalog
+ val sessionCatalog = invokeAs[AnyRef](catalogMgr, "v2SessionCatalog")
+ val defaultCatalog = catalogMgr.currentCatalog
+
+ val defaults = Seq(sessionCatalog, defaultCatalog).distinct.map(catalog =>
+
DynMethods.builder("name").impl(catalog.getClass).buildChecked(catalog).invoke[String]())
+ val catalogs = getField[scala.collection.Map[String, _]](catalogMgr,
"catalogs")
+ (catalogs.keys ++: defaults).distinct.map(Row(_))
+ }
+
+ def getCatalog(spark: SparkSession, catalogName: String): CatalogPlugin = {
+ val catalogManager = spark.sessionState.catalogManager
+ if (StringUtils.isBlank(catalogName)) {
+ catalogManager.currentCatalog
+ } else {
+ catalogManager.catalog(catalogName)
+ }
+ }
+
+ def setCurrentCatalog(spark: SparkSession, catalog: String): Unit = {
+ // SPARK-36841(3.3.0) Ensure setCurrentCatalog method catalog must exist
+ if (spark.sessionState.catalogManager.isCatalogRegistered(catalog)) {
+ spark.sessionState.catalogManager.setCurrentCatalog(catalog)
+ } else {
+ throw new IllegalArgumentException(s"Cannot find catalog plugin class
for catalog '$catalog'")
+ }
+ }
+
+ //
///////////////////////////////////////////////////////////////////////////////////////////////
+ // Schema
//
+ //
///////////////////////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * a list of [[Row]]s, with 2 fields `schemaName: String, catalogName:
String`
+ */
+ def getSchemas(
+ spark: SparkSession,
+ catalogName: String,
+ schemaPattern: String): Seq[Row] = {
+ if (catalogName == SparkCatalogUtils.SESSION_CATALOG) {
+ (spark.sessionState.catalog.listDatabases(schemaPattern) ++
+ getGlobalTempViewManager(spark, schemaPattern))
+ .map(Row(_, SparkCatalogUtils.SESSION_CATALOG))
+ } else {
+ val catalog = getCatalog(spark, catalogName)
+ getSchemasWithPattern(catalog, schemaPattern).map(Row(_, catalog.name))
+ }
+ }
+
+ private def getGlobalTempViewManager(
+ spark: SparkSession,
+ schemaPattern: String): Seq[String] = {
+ val database = spark.sharedState.globalTempViewManager.database
+ Option(database).filter(_.matches(schemaPattern)).toSeq
+ }
+
+ private def listAllNamespaces(
+ catalog: SupportsNamespaces,
+ namespaces: Array[Array[String]]): Array[Array[String]] = {
+ val children = namespaces.flatMap { ns =>
+ catalog.listNamespaces(ns)
+ }
+ if (children.isEmpty) {
+ namespaces
+ } else {
+ namespaces ++: listAllNamespaces(catalog, children)
+ }
+ }
+
+ private def listAllNamespaces(catalog: CatalogPlugin): Array[Array[String]]
= {
+ catalog match {
+ case catalog: CatalogExtension =>
+ // DSv2 does not support pass schemaPattern transparently
+ catalog.defaultNamespace() +: catalog.listNamespaces(Array())
+ case catalog: SupportsNamespaces =>
+ val rootSchema = catalog.listNamespaces()
+ val allSchemas = listAllNamespaces(catalog, rootSchema)
+ allSchemas
+ }
+ }
+
+ private def listNamespacesWithPattern(
+ catalog: CatalogPlugin,
+ schemaPattern: String): Array[Array[String]] = {
+ listAllNamespaces(catalog).filter { ns =>
+ val quoted = ns.map(quoteIfNeeded).mkString(".")
+ schemaPattern.r.pattern.matcher(quoted).matches()
+ }.distinct
+ }
+
+ private def getSchemasWithPattern(catalog: CatalogPlugin, schemaPattern:
String): Seq[String] = {
+ val p = schemaPattern.r.pattern
+ listAllNamespaces(catalog).flatMap { ns =>
+ val quoted = ns.map(quoteIfNeeded).mkString(".")
+ if (p.matcher(quoted).matches()) Some(quoted) else None
+ }.distinct
+ }
+
+ //
///////////////////////////////////////////////////////////////////////////////////////////////
+ // Table & View
//
+ //
///////////////////////////////////////////////////////////////////////////////////////////////
+
+ def getCatalogTablesOrViews(
+ spark: SparkSession,
+ catalogName: String,
+ schemaPattern: String,
+ tablePattern: String,
+ tableTypes: Set[String],
+ ignoreTableProperties: Boolean = false): Seq[Row] = {
+ val catalog = getCatalog(spark, catalogName)
+ val namespaces = listNamespacesWithPattern(catalog, schemaPattern)
+ catalog match {
+ case builtin if builtin.name() == SESSION_CATALOG =>
+ val catalog = spark.sessionState.catalog
+ val databases = catalog.listDatabases(schemaPattern)
+
+ def isMatchedTableType(tableTypes: Set[String], tableType: String):
Boolean = {
+ val typ = if (tableType.equalsIgnoreCase(VIEW)) VIEW else TABLE
+ tableTypes.exists(typ.equalsIgnoreCase)
+ }
+
+ databases.flatMap { db =>
+ val identifiers = catalog.listTables(db, tablePattern,
includeLocalTempViews = false)
+ catalog.getTablesByName(identifiers)
+ .filter(t => isMatchedTableType(tableTypes, t.tableType.name)).map
{ t =>
+ val typ = if (t.tableType.name == VIEW) VIEW else TABLE
+ Row(
+ catalogName,
+ t.database,
+ t.identifier.table,
+ typ,
+ t.comment.getOrElse(""),
+ null,
+ null,
+ null,
+ null,
+ null)
+ }
+ }
+ case tc: TableCatalog =>
+ val tp = tablePattern.r.pattern
+ val identifiers = namespaces.flatMap { ns =>
+ tc.listTables(ns).filter(i =>
tp.matcher(quoteIfNeeded(i.name())).matches())
+ }
+ identifiers.map { ident =>
+ // TODO: restore view type for session catalog
+ val comment = if (ignoreTableProperties) ""
+ else { // load table is a time consuming operation
+
tc.loadTable(ident).properties().getOrDefault(TableCatalog.PROP_COMMENT, "")
+ }
+ val schema = ident.namespace().map(quoteIfNeeded).mkString(".")
+ val tableName = quoteIfNeeded(ident.name())
+ Row(catalog.name(), schema, tableName, TABLE, comment, null, null,
null, null, null)
+ }
+ case _ => Seq.empty[Row]
+ }
+ }
+
+ private def getColumnsByCatalog(
+ spark: SparkSession,
+ catalogName: String,
+ schemaPattern: String,
+ tablePattern: String,
+ columnPattern: Pattern): Seq[Row] = {
+ val catalog = getCatalog(spark, catalogName)
+
+ catalog match {
+ case tc: TableCatalog =>
+ val namespaces = listNamespacesWithPattern(catalog, schemaPattern)
+ val tp = tablePattern.r.pattern
+ val identifiers = namespaces.flatMap { ns =>
+ tc.listTables(ns).filter(i =>
tp.matcher(quoteIfNeeded(i.name())).matches())
+ }
+ identifiers.flatMap { ident =>
+ val table = tc.loadTable(ident)
+ val namespace = ident.namespace().map(quoteIfNeeded).mkString(".")
+ val tableName = quoteIfNeeded(ident.name())
+
+ table.schema.zipWithIndex.filter(f =>
columnPattern.matcher(f._1.name).matches())
+ .map { case (f, i) => toColumnResult(tc.name(), namespace,
tableName, f, i) }
+ }
+
+ case builtin if builtin.name() == SESSION_CATALOG =>
+ val catalog = spark.sessionState.catalog
+ val databases = catalog.listDatabases(schemaPattern)
+ databases.flatMap { db =>
+ val identifiers = catalog.listTables(db, tablePattern,
includeLocalTempViews = true)
+ catalog.getTablesByName(identifiers).flatMap { t =>
+ t.schema.zipWithIndex.filter(f =>
columnPattern.matcher(f._1.name).matches())
+ .map { case (f, i) =>
+ toColumnResult(catalogName, t.database, t.identifier.table, f,
i)
+ }
+ }
+ }
+ }
+ }
+
+ def getTempViews(
+ spark: SparkSession,
+ catalogName: String,
+ schemaPattern: String,
+ tablePattern: String): Seq[Row] = {
+ val views = getViews(spark, schemaPattern, tablePattern)
+ views.map { ident =>
+ Row(catalogName, ident.database.orNull, ident.table, VIEW, "", null,
null, null, null, null)
+ }
+ }
+
+ private def getViews(
+ spark: SparkSession,
+ schemaPattern: String,
+ tablePattern: String): Seq[TableIdentifier] = {
+ val db = getGlobalTempViewManager(spark, schemaPattern)
+ if (db.nonEmpty) {
+ spark.sessionState.catalog.listTables(db.head, tablePattern)
+ } else {
+ spark.sessionState.catalog.listLocalTempViews(tablePattern)
+ }
+ }
+
+ //
///////////////////////////////////////////////////////////////////////////////////////////////
+ // Columns
//
+ //
///////////////////////////////////////////////////////////////////////////////////////////////
+
+ def getColumns(
+ spark: SparkSession,
+ catalogName: String,
+ schemaPattern: String,
+ tablePattern: String,
+ columnPattern: String): Seq[Row] = {
+
+ val cp = columnPattern.r.pattern
+ val byCatalog = getColumnsByCatalog(spark, catalogName, schemaPattern,
tablePattern, cp)
+ val byGlobalTmpDB = getColumnsByGlobalTempViewManager(spark,
schemaPattern, tablePattern, cp)
+ val byLocalTmp = getColumnsByLocalTempViews(spark, tablePattern, cp)
+
+ byCatalog ++ byGlobalTmpDB ++ byLocalTmp
+ }
+
+ private def getColumnsByGlobalTempViewManager(
+ spark: SparkSession,
+ schemaPattern: String,
+ tablePattern: String,
+ columnPattern: Pattern): Seq[Row] = {
+ val catalog = spark.sessionState.catalog
+
+ getGlobalTempViewManager(spark, schemaPattern).flatMap { globalTmpDb =>
+ catalog.globalTempViewManager.listViewNames(tablePattern).flatMap { v =>
+ catalog.globalTempViewManager.get(v).map { plan =>
+ plan.schema.zipWithIndex.filter(f =>
columnPattern.matcher(f._1.name).matches())
+ .map { case (f, i) =>
+ toColumnResult(SparkCatalogUtils.SESSION_CATALOG, globalTmpDb,
v, f, i)
+ }
+ }
+ }.flatten
+ }
+ }
+
+ private def getColumnsByLocalTempViews(
+ spark: SparkSession,
+ tablePattern: String,
+ columnPattern: Pattern): Seq[Row] = {
+ val catalog = spark.sessionState.catalog
+
+ catalog.listLocalTempViews(tablePattern)
+ .map(v => (v, catalog.getTempView(v.table).get))
+ .flatMap { case (v, plan) =>
+ plan.schema.zipWithIndex
+ .filter(f => columnPattern.matcher(f._1.name).matches())
+ .map { case (f, i) =>
+ toColumnResult(SparkCatalogUtils.SESSION_CATALOG, null, v.table,
f, i)
+ }
+ }
+ }
+
+ private def toColumnResult(
+ catalog: String,
+ db: String,
+ table: String,
+ col: StructField,
+ pos: Int): Row = {
+ // format: off
+ Row(
+ catalog, // TABLE_CAT
+ db, // TABLE_SCHEM
+ table, // TABLE_NAME
+ col.name, // COLUMN_NAME
+ SchemaHelper.toJavaSQLType(col.dataType), // DATA_TYPE
+ col.dataType.sql, // TYPE_NAME
+ SchemaHelper.getColumnSize(col.dataType).orNull, // COLUMN_SIZE
+ null, // BUFFER_LENGTH
+ SchemaHelper.getDecimalDigits(col.dataType).orNull, // DECIMAL_DIGITS
+ SchemaHelper.getNumPrecRadix(col.dataType).orNull, // NUM_PREC_RADIX
+ if (col.nullable) 1 else 0, // NULLABLE
+ col.getComment().getOrElse(""), // REMARKS
+ null, // COLUMN_DEF
+ null, // SQL_DATA_TYPE
+ null, // SQL_DATETIME_SUB
+ null, //
CHAR_OCTET_LENGTH
+ pos, // ORDINAL_POSITION
+ "YES", // IS_NULLABLE
+ null, // SCOPE_CATALOG
+ null, // SCOPE_SCHEMA
+ null, // SCOPE_TABLE
+ null, // SOURCE_DATA_TYPE
+ "NO" //
IS_AUTO_INCREMENT
+ )
+ // format: on
+ }
+
+ /**
+ * Forked from Apache Spark's
[[org.apache.spark.sql.catalyst.util.quoteIfNeeded]]
+ */
+ def quoteIfNeeded(part: String): String = {
+ if (part.matches("[a-zA-Z0-9_]+") && !part.matches("\\d+")) {
+ part
+ } else {
+ s"`${part.replace("`", "``")}`"
+ }
+ }
+}
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkCatalogDatabaseOperationSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkCatalogDatabaseOperationSuite.scala
index 46208bff1..69431266b 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkCatalogDatabaseOperationSuite.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkCatalogDatabaseOperationSuite.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
import
org.apache.kyuubi.config.KyuubiConf.ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED
import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
+import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
import org.apache.kyuubi.operation.HiveJDBCTestHelper
class SparkCatalogDatabaseOperationSuite extends WithSparkSQLEngine with
HiveJDBCTestHelper {
@@ -37,7 +37,7 @@ class SparkCatalogDatabaseOperationSuite extends
WithSparkSQLEngine with HiveJDB
test("set/get current catalog") {
withJdbcStatement() { statement =>
val catalog = statement.getConnection.getCatalog
- assert(catalog == SparkCatalogShim.SESSION_CATALOG)
+ assert(catalog == SparkCatalogUtils.SESSION_CATALOG)
statement.getConnection.setCatalog("dummy")
val changedCatalog = statement.getConnection.getCatalog
assert(changedCatalog == "dummy")
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
index cf5044056..650bdabd9 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.types._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
import org.apache.kyuubi.engine.spark.schema.SchemaHelper.TIMESTAMP_NTZ
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
+import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
import org.apache.kyuubi.operation.{HiveMetadataTests, SparkQueryTests}
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.util.KyuubiHadoopUtils
@@ -49,7 +49,7 @@ class SparkOperationSuite extends WithSparkSQLEngine with
HiveMetadataTests with
withJdbcStatement() { statement =>
val meta = statement.getConnection.getMetaData
val types = meta.getTableTypes
- val expected = SparkCatalogShim.sparkTableTypes.toIterator
+ val expected = SparkCatalogUtils.sparkTableTypes.toIterator
while (types.next()) {
assert(types.getString(TABLE_TYPE) === expected.next())
}
@@ -143,7 +143,7 @@ class SparkOperationSuite extends WithSparkSQLEngine with
HiveMetadataTests with
var pos = 0
while (rowSet.next()) {
- assert(rowSet.getString(TABLE_CAT) ===
SparkCatalogShim.SESSION_CATALOG)
+ assert(rowSet.getString(TABLE_CAT) ===
SparkCatalogUtils.SESSION_CATALOG)
assert(rowSet.getString(TABLE_SCHEM) === defaultSchema)
assert(rowSet.getString(TABLE_NAME) === tableName)
assert(rowSet.getString(COLUMN_NAME) === schema(pos).name)
@@ -201,7 +201,7 @@ class SparkOperationSuite extends WithSparkSQLEngine with
HiveMetadataTests with
val data = statement.getConnection.getMetaData
val rowSet = data.getColumns("", "global_temp", viewName, null)
while (rowSet.next()) {
- assert(rowSet.getString(TABLE_CAT) ===
SparkCatalogShim.SESSION_CATALOG)
+ assert(rowSet.getString(TABLE_CAT) ===
SparkCatalogUtils.SESSION_CATALOG)
assert(rowSet.getString(TABLE_SCHEM) === "global_temp")
assert(rowSet.getString(TABLE_NAME) === viewName)
assert(rowSet.getString(COLUMN_NAME) === "i")
@@ -228,7 +228,7 @@ class SparkOperationSuite extends WithSparkSQLEngine with
HiveMetadataTests with
val data = statement.getConnection.getMetaData
val rowSet = data.getColumns("", "global_temp", viewName, "n")
while (rowSet.next()) {
- assert(rowSet.getString(TABLE_CAT) ===
SparkCatalogShim.SESSION_CATALOG)
+ assert(rowSet.getString(TABLE_CAT) ===
SparkCatalogUtils.SESSION_CATALOG)
assert(rowSet.getString(TABLE_SCHEM) === "global_temp")
assert(rowSet.getString(TABLE_NAME) === viewName)
assert(rowSet.getString(COLUMN_NAME) === "n")
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/jdbc/KyuubiHiveDriverSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/jdbc/KyuubiHiveDriverSuite.scala
index 4d3c75498..ae68440df 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/jdbc/KyuubiHiveDriverSuite.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/jdbc/KyuubiHiveDriverSuite.scala
@@ -22,7 +22,7 @@ import java.util.Properties
import org.apache.kyuubi.IcebergSuiteMixin
import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
-import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
+import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
import org.apache.kyuubi.jdbc.hive.{KyuubiConnection, KyuubiStatement}
import org.apache.kyuubi.tags.IcebergTest
@@ -47,15 +47,15 @@ class KyuubiHiveDriverSuite extends WithSparkSQLEngine with
IcebergSuiteMixin {
val metaData = connection.getMetaData
assert(metaData.getClass.getName ===
"org.apache.kyuubi.jdbc.hive.KyuubiDatabaseMetaData")
val statement = connection.createStatement()
- val table1 =
s"${SparkCatalogShim.SESSION_CATALOG}.default.kyuubi_hive_jdbc"
+ val table1 =
s"${SparkCatalogUtils.SESSION_CATALOG}.default.kyuubi_hive_jdbc"
val table2 = s"$catalog.default.hdp_cat_tbl"
try {
statement.execute(s"CREATE TABLE $table1(key int) USING parquet")
statement.execute(s"CREATE TABLE $table2(key int) USING $format")
- val resultSet1 = metaData.getTables(SparkCatalogShim.SESSION_CATALOG,
"default", "%", null)
+ val resultSet1 = metaData.getTables(SparkCatalogUtils.SESSION_CATALOG,
"default", "%", null)
assert(resultSet1.next())
- assert(resultSet1.getString(1) === SparkCatalogShim.SESSION_CATALOG)
+ assert(resultSet1.getString(1) === SparkCatalogUtils.SESSION_CATALOG)
assert(resultSet1.getString(2) === "default")
assert(resultSet1.getString(3) === "kyuubi_hive_jdbc")