This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new bbf916d1d [KYUUBI #3529] Supple DDL tests for Spark Hive connector and
fix consistent issues w/ V1 implementation
bbf916d1d is described below
commit bbf916d1debc120d1cc99d1495c456405d14945a
Author: yikf <[email protected]>
AuthorDate: Sat Oct 15 22:45:22 2022 +0800
[KYUUBI #3529] Supple DDL tests for Spark Hive connector and fix consistent
issues w/ V1 implementation
### _Why are the changes needed?_
Fix https://github.com/apache/incubator-kyuubi/issues/3529
The intent of this PR is the following:
- Add tests related to catalog, including the listTables, loadTable, and
listNamespaces methods;
- Initialize the DDL test framework.
- Add CreateNamespaceSuite, DropNamespaceSuite and ShowTablesSuite to check
for consistency with V1 in hive connector.
- Rectify the fault that namespaces are deleted in cascades. During
cascades, ignore the exception that the table exists in the namespace.
- Fix the tableName problem of HiveTable, which should contain namespace
name.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #3530 from Yikf/hivev2-test.
Closes #3529
d0af0760 [yikf] Add tests to check for consistency with V1
Authored-by: yikf <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../kyuubi/spark/connector/hive/HiveTable.scala | 2 +-
.../spark/connector/hive/HiveTableCatalog.scala | 2 +-
.../hive/kyuubi/connector/HiveBridgeHelper.scala | 4 +
.../spark/connector/hive/HiveCatalogSuite.scala | 276 ++++++++++++++++++++-
.../spark/connector/hive/KyuubiHiveTest.scala | 37 ++-
.../hive/command/CreateNamespaceSuite.scala | 150 +++++++++++
.../hive/command/DDLCommandTestUtils.scala | 134 ++++++++++
.../hive/command/DropNamespaceSuite.scala | 118 +++++++++
.../connector/hive/command/ShowTablesSuite.scala | 115 +++++++++
9 files changed, 824 insertions(+), 14 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala
index 646a4a080..3d1d4f8c4 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala
@@ -59,7 +59,7 @@ case class HiveTable(
catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
}
- override def name(): String = catalogTable.identifier.table
+ override def name(): String = catalogTable.identifier.unquotedString
override def schema(): StructType = catalogTable.schema
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
index e58a2d871..4d0f48e57 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
@@ -346,7 +346,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
namespace: Array[String],
cascade: Boolean): Boolean = namespace match {
case Array(db) if catalog.databaseExists(db) =>
- if (catalog.listTables(db).nonEmpty) {
+ if (catalog.listTables(db).nonEmpty && !cascade) {
throw new IllegalStateException(s"Namespace ${namespace.quoted} is not
empty")
}
catalog.dropDatabase(db, ignoreIfNotExists = false, cascade)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
index 6ba1e8cfa..88b0305dd 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
@@ -23,6 +23,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.rdd.InputFileBlockHolder
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, ExternalCatalogEvent}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Literal}
+import org.apache.spark.sql.catalyst.util.quoteIfNeeded
import org.apache.spark.sql.connector.catalog.CatalogV2Util
import org.apache.spark.sql.connector.expressions.{BucketTransform,
FieldReference, IdentityTransform, LogicalExpressions, Transform}
import org.apache.spark.sql.connector.expressions.LogicalExpressions.{bucket,
reference}
@@ -114,4 +115,7 @@ object HiveBridgeHelper {
case l => l.sql
}
+ implicit class NamespaceHelper(namespace: Array[String]) {
+ def quoted: String = namespace.map(quoteIfNeeded).mkString(".")
+ }
}
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
index fae0be65e..7a1eb86dc 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
@@ -17,11 +17,83 @@
package org.apache.kyuubi.spark.connector.hive
+import java.net.URI
+import java.util
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import com.google.common.collect.Maps
+import org.apache.hadoop.fs.Path
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException,
NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
+import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._
+import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.IdentifierHelper
+
class HiveCatalogSuite extends KyuubiHiveTest {
+ val emptyProps: util.Map[String, String] = Collections.emptyMap[String,
String]
+ val schema: StructType = new StructType()
+ .add("id", IntegerType)
+ .add("data", StringType)
+
+ val testNs: Array[String] = Array("db")
+ val defaultNs: Array[String] = Array("default")
+ val testIdent: Identifier = Identifier.of(testNs, "test_table")
+
+ var catalog: HiveTableCatalog = _
+
+ private def newCatalog(): HiveTableCatalog = {
+ val catalog = new HiveTableCatalog
+ val catalogName = "hive"
+ val properties = Maps.newHashMap[String, String]()
+ properties.put("javax.jdo.option.ConnectionURL",
"jdbc:derby:memory:memorydb;create=true")
+ properties.put("javax.jdo.option.ConnectionDriverName",
"org.apache.derby.jdbc.EmbeddedDriver")
+ catalog.initialize(catalogName, new CaseInsensitiveStringMap(properties))
+ catalog
+ }
+
+ private def filterV2TableProperties(
+ properties: util.Map[String, String]): Map[String, String] = {
+ properties.asScala.filter(kv => !TABLE_RESERVED_PROPERTIES.contains(kv._1))
+ .filter(!_._1.startsWith(TableCatalog.OPTION_PREFIX)).toMap
+ }
+
+ def makeQualifiedPathWithWarehouse(path: String): URI = {
+ val p = new Path(catalog.conf.warehousePath, path)
+ val fs = p.getFileSystem(catalog.hadoopConfiguration())
+ fs.makeQualified(p).toUri
+ }
+
+ private def checkMetadata(expected: Map[String, String], actual: Map[String,
String]): Unit = {
+ // remove location and comment that are automatically added by HMS unless
they are expected
+ val toRemove =
+ NAMESPACE_RESERVED_PROPERTIES.filter(expected.contains)
+ assert(expected -- toRemove === actual)
+ }
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ catalog = newCatalog()
+ catalog.createNamespace(Array("ns"), emptyProps)
+ catalog.createNamespace(Array("ns2"), emptyProps)
+ catalog.createNamespace(Array("db"), emptyProps)
+ }
+
+ override def afterEach(): Unit = {
+ super.afterEach()
+ catalog.dropNamespace(Array("ns"), true)
+ catalog.dropNamespace(Array("ns2"), true)
+ catalog.dropNamespace(Array("db"), true)
+ catalog = null
+ }
+
test("get catalog name") {
withSparkSession() { spark =>
val catalog = new HiveTableCatalog
@@ -35,10 +107,9 @@ class HiveCatalogSuite extends KyuubiHiveTest {
withSparkSession() { spark =>
try {
spark.sql("USE hive")
- spark.sql("CREATE NAMESPACE IF NOT EXISTS ns1")
- assert(spark.sql(s"SHOW NAMESPACES").collect().length == 2)
+ assert(Try { spark.sql("CREATE NAMESPACE IF NOT EXISTS snns1")
}.isSuccess)
} finally {
- spark.sql("DROP NAMESPACE IF EXISTS ns1")
+ spark.sql("DROP NAMESPACE IF EXISTS snns1")
}
}
}
@@ -51,4 +122,203 @@ class HiveCatalogSuite extends KyuubiHiveTest {
assert(exception.message === "Table or view not found:
hive.ns1.nonexistent_table")
}
}
+
+ test("listTables") {
+
+ val ident1 = Identifier.of(Array("ns"), "test_table_1")
+ val ident2 = Identifier.of(Array("ns"), "test_table_2")
+ val ident3 = Identifier.of(Array("ns2"), "test_table_1")
+
+ assert(catalog.listTables(Array("ns")).isEmpty)
+
+ catalog.createTable(ident1, schema, Array.empty, emptyProps)
+
+ assert(catalog.listTables(Array("ns")).toSet == Set(ident1))
+ assert(catalog.listTables(Array("ns2")).isEmpty)
+
+ catalog.createTable(ident3, schema, Array.empty, emptyProps)
+ catalog.createTable(ident2, schema, Array.empty, emptyProps)
+
+ assert(catalog.listTables(Array("ns")).toSet == Set(ident1, ident2))
+ assert(catalog.listTables(Array("ns2")).toSet == Set(ident3))
+
+ catalog.dropTable(ident1)
+
+ assert(catalog.listTables(Array("ns")).toSet == Set(ident2))
+
+ catalog.dropTable(ident2)
+
+ assert(catalog.listTables(Array("ns")).isEmpty)
+ assert(catalog.listTables(Array("ns2")).toSet == Set(ident3))
+
+ catalog.dropTable(ident3)
+ }
+
+ test("createTable") {
+ assert(!catalog.tableExists(testIdent))
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
+ assert(parsed == Seq("db", "test_table"))
+ assert(table.schema == schema)
+ assert(filterV2TableProperties(table.properties) == Map())
+
+ assert(catalog.tableExists(testIdent))
+ catalog.dropTable(testIdent)
+ }
+
+ test("createTable: with properties") {
+ val properties = new util.HashMap[String, String]()
+ properties.put("property", "value")
+
+ assert(!catalog.tableExists(testIdent))
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+
+ val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
+ assert(parsed == Seq("db", "test_table"))
+ assert(table.schema == schema)
+ assert(filterV2TableProperties(table.properties).asJava == properties)
+
+ assert(catalog.tableExists(testIdent))
+ catalog.dropTable(testIdent)
+ }
+
+ test("createTable: table already exists") {
+ assert(!catalog.tableExists(testIdent))
+
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ val exc = intercept[TableAlreadyExistsException] {
+ catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ }
+
+ assert(exc.message.contains(table.name()))
+ assert(exc.message.contains("already exists"))
+
+ assert(catalog.tableExists(testIdent))
+ catalog.dropTable(testIdent)
+ }
+
+ test("tableExists") {
+ assert(!catalog.tableExists(testIdent))
+
+ catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+ assert(catalog.tableExists(testIdent))
+
+ catalog.dropTable(testIdent)
+
+ assert(!catalog.tableExists(testIdent))
+ }
+
+ test("createTable: location") {
+ val properties = new util.HashMap[String, String]()
+ assert(!catalog.tableExists(testIdent))
+
+ // default location
+ val t1 = catalog.createTable(testIdent, schema, Array.empty,
properties).asInstanceOf[HiveTable]
+ assert(t1.catalogTable.location ===
+ catalog.catalog.defaultTablePath(testIdent.asTableIdentifier))
+ catalog.dropTable(testIdent)
+
+ // relative path
+ properties.put(TableCatalog.PROP_LOCATION, "relative/path")
+ val t2 = catalog.createTable(testIdent, schema, Array.empty,
properties).asInstanceOf[HiveTable]
+ assert(t2.catalogTable.location ===
makeQualifiedPathWithWarehouse("db.db/relative/path"))
+ catalog.dropTable(testIdent)
+
+ // absolute path without scheme
+ properties.put(TableCatalog.PROP_LOCATION, "/absolute/path")
+ val t3 = catalog.createTable(testIdent, schema, Array.empty,
properties).asInstanceOf[HiveTable]
+ assert(t3.catalogTable.location.toString === "file:/absolute/path")
+ catalog.dropTable(testIdent)
+
+ // absolute path with scheme
+ properties.put(TableCatalog.PROP_LOCATION, "file:/absolute/path")
+ val t4 = catalog.createTable(testIdent, schema, Array.empty,
properties).asInstanceOf[HiveTable]
+ assert(t4.catalogTable.location.toString === "file:/absolute/path")
+ catalog.dropTable(testIdent)
+ }
+
+ test("loadTable") {
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ val loaded = catalog.loadTable(testIdent)
+
+ assert(table.name == loaded.name)
+ assert(table.schema == loaded.schema)
+ assert(table.properties == loaded.properties)
+ catalog.dropTable(testIdent)
+ }
+
+ test("loadTable: table does not exist") {
+ val exc = intercept[NoSuchTableException] {
+ catalog.loadTable(testIdent)
+ }
+
+ assert(exc.message.contains("Table or view 'test_table' not found in
database 'db'"))
+ }
+
+ test("invalidateTable") {
+ val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+ // Hive v2 don't cache table
+ catalog.invalidateTable(testIdent)
+
+ val loaded = catalog.loadTable(testIdent)
+
+ assert(table.name == loaded.name)
+ assert(table.schema == loaded.schema)
+ assert(table.properties == loaded.properties)
+ catalog.dropTable(testIdent)
+ }
+
+ test("listNamespaces: fail if missing namespace") {
+ catalog.dropNamespace(testNs, true)
+ assert(catalog.namespaceExists(testNs) === false)
+
+ val exc = intercept[NoSuchNamespaceException] {
+ assert(catalog.listNamespaces(testNs) === Array())
+ }
+
+ assert(exc.getMessage.contains(testNs.quoted))
+ assert(catalog.namespaceExists(testNs) === false)
+ }
+
+ test("loadNamespaceMetadata: fail missing namespace") {
+ catalog.dropNamespace(testNs, true)
+ val exc = intercept[NoSuchNamespaceException] {
+ catalog.loadNamespaceMetadata(testNs)
+ }
+
+ assert(exc.getMessage.contains(testNs.quoted))
+ }
+
+ test("loadNamespaceMetadata: non-empty metadata") {
+ catalog.dropNamespace(testNs, true)
+ assert(catalog.namespaceExists(testNs) === false)
+
+ catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+
+ val metadata = catalog.loadNamespaceMetadata(testNs)
+
+ assert(catalog.namespaceExists(testNs) === true)
+ checkMetadata(metadata.asScala.toMap, Map("property" -> "value"))
+
+ catalog.dropNamespace(testNs, cascade = false)
+ }
+
+ test("loadNamespaceMetadata: empty metadata") {
+ catalog.dropNamespace(testNs, true)
+ assert(catalog.namespaceExists(testNs) === false)
+
+ catalog.createNamespace(testNs, emptyProps)
+
+ val metadata = catalog.loadNamespaceMetadata(testNs)
+
+ assert(catalog.namespaceExists(testNs) === true)
+ checkMetadata(metadata.asScala.toMap, emptyProps.asScala.toMap)
+
+ catalog.dropNamespace(testNs, cascade = false)
+ }
}
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala
index 0f5ecc021..d0b17dc05 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala
@@ -19,14 +19,31 @@ package org.apache.kyuubi.spark.connector.hive
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{QueryTest, SparkSession}
+import org.apache.spark.sql.connector.catalog.{SupportsNamespaces,
TableCatalog}
-import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.spark.connector.common.LocalSparkSession
-abstract class KyuubiHiveTest extends KyuubiFunSuite with Logging {
+abstract class KyuubiHiveTest extends QueryTest with Logging {
- private var spark: SparkSession = _
+ private var innerSpark: SparkSession = _
+
+ protected val TABLE_RESERVED_PROPERTIES =
+ Seq(
+ TableCatalog.PROP_COMMENT,
+ TableCatalog.PROP_LOCATION,
+ TableCatalog.PROP_PROVIDER,
+ TableCatalog.PROP_OWNER,
+ TableCatalog.PROP_EXTERNAL,
+ TableCatalog.PROP_IS_MANAGED_LOCATION)
+
+ protected val NAMESPACE_RESERVED_PROPERTIES =
+ Seq(
+ SupportsNamespaces.PROP_COMMENT,
+ SupportsNamespaces.PROP_LOCATION,
+ SupportsNamespaces.PROP_OWNER)
+
+ protected def catalogName: String = "hive"
override def beforeEach(): Unit = {
super.beforeAll()
@@ -35,7 +52,7 @@ abstract class KyuubiHiveTest extends KyuubiFunSuite with
Logging {
override def afterEach(): Unit = {
super.afterAll()
- LocalSparkSession.stop(spark)
+ LocalSparkSession.stop(innerSpark)
}
def getOrCreateSpark(): Unit = {
@@ -47,15 +64,17 @@ abstract class KyuubiHiveTest extends KyuubiFunSuite with
Logging {
.set("javax.jdo.option.ConnectionURL",
"jdbc:derby:memory:memorydb;create=true")
.set("javax.jdo.option.ConnectionDriverName",
"org.apache.derby.jdbc.EmbeddedDriver")
- spark = SparkSession.builder.config(sparkConf).getOrCreate()
+ innerSpark = SparkSession.builder.config(sparkConf).getOrCreate()
}
def withSparkSession[T](conf: Map[String, String] = Map.empty[String,
String])(
f: SparkSession => T): T = {
- assert(spark != null)
+ assert(innerSpark != null)
conf.foreach {
- case (k, v) => spark.sessionState.conf.setConfString(k, v)
+ case (k, v) => innerSpark.sessionState.conf.setConfString(k, v)
}
- f(spark)
+ f(innerSpark)
}
+
+ override def spark: SparkSession = innerSpark
}
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/CreateNamespaceSuite.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/CreateNamespaceSuite.scala
new file mode 100644
index 000000000..855eb0c67
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/CreateNamespaceSuite.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive.command
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces
+import org.apache.spark.sql.internal.SQLConf
+
+import
org.apache.kyuubi.spark.connector.hive.command.DDLCommandTestUtils.{V1_COMMAND_VERSION,
V2_COMMAND_VERSION}
+
+trait CreateNamespaceSuiteBase extends DDLCommandTestUtils {
+ override protected def command: String = "CREATE NAMESPACE"
+
+ protected def namespaceArray: Array[String] = namespace.split('.')
+
+ protected def notFoundMsgPrefix: String =
+ if (commandVersion == V1_COMMAND_VERSION) "Database" else "Namespace"
+
+ private def namespace: String = "fakens"
+
+ override def afterEach(): Unit = {
+ sql(s"DROP NAMESPACE IF EXISTS $catalogName.$namespace")
+ super.afterEach()
+ }
+
+ test("basic test") {
+ val ns = s"$catalogName.$namespace"
+ withNamespace(ns) {
+ sql(s"CREATE NAMESPACE $ns")
+
assert(getCatalog(catalogName).asNamespaceCatalog.namespaceExists(namespaceArray))
+ }
+ }
+
+ test("namespace with location") {
+ val ns = s"$catalogName.$namespace"
+ withNamespace(ns) {
+ withTempDir { tmpDir =>
+ // The generated temp path is not qualified.
+ val path = tmpDir.getCanonicalPath
+ assert(!path.startsWith("file:/"))
+
+ val e = intercept[IllegalArgumentException] {
+ sql(s"CREATE NAMESPACE $ns LOCATION ''")
+ }
+ assert(e.getMessage.contains("Can not create a Path from an empty
string"))
+
+ val uri = new Path(path).toUri
+ sql(s"CREATE NAMESPACE $ns LOCATION '$uri'")
+
+ // Make sure the location is qualified.
+ val expected = makeQualifiedPath(tmpDir.toString)
+ assert("file" === expected.getScheme)
+ assert(new Path(getNamespaceLocation(catalogName,
namespaceArray)).toUri === expected)
+ }
+ }
+ }
+
+ test("Namespace already exists") {
+ val ns = s"$catalogName.$namespace"
+ withNamespace(ns) {
+ sql(s"CREATE NAMESPACE $ns")
+
+ val e = intercept[NamespaceAlreadyExistsException] {
+ sql(s"CREATE NAMESPACE $ns")
+ }
+ assert(e.getMessage.contains(s"Namespace '$namespace' already exists"))
+
+ // The following will be no-op since the namespace already exists.
+ Try { sql(s"CREATE NAMESPACE IF NOT EXISTS $ns") }.isSuccess
+ }
+ }
+
+ test("CreateNameSpace: reserved properties") {
+ import SupportsNamespaces._
+ val ns = s"$catalogName.$namespace"
+ withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) {
+ NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key
=>
+ val exception = intercept[ParseException] {
+ sql(s"CREATE NAMESPACE $ns WITH DBPROPERTIES('$key'='dummyVal')")
+ }
+ assert(exception.getMessage.contains(s"$key is a reserved namespace
property"))
+ }
+ }
+ withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) {
+ NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key
=>
+ withNamespace(ns) {
+ sql(s"CREATE NAMESPACE $ns WITH DBPROPERTIES('$key'='foo')")
+ assert(
+ sql(s"DESC NAMESPACE EXTENDED $ns")
+ .toDF("k", "v")
+ .where("k='Properties'")
+ .where("v=''")
+ .count == 1,
+ s"$key is a reserved namespace property and ignored")
+ val meta =
+
getCatalog(catalogName).asNamespaceCatalog.loadNamespaceMetadata(namespaceArray)
+ assert(
+ meta.get(key) == null || !meta.get(key).contains("foo"),
+ "reserved properties should not have side effects")
+ }
+ }
+ }
+ }
+
+ protected def getNamespaceLocation(catalog: String, namespace:
Array[String]): String = {
+ val metadata = getCatalog(catalog).asNamespaceCatalog
+ .loadNamespaceMetadata(namespace).asScala
+ metadata(SupportsNamespaces.PROP_LOCATION)
+ }
+}
+
+class CreateNamespaceV2Suite extends CreateNamespaceSuiteBase {
+
+ override protected def catalogName: String = super.catalogName
+
+ override protected def catalogVersion: String = "Hive V2"
+
+ override protected def commandVersion: String = V2_COMMAND_VERSION
+}
+
+class CreateNamespaceV1Suite extends CreateNamespaceSuiteBase {
+
+ val SESSION_CATALOG_NAME: String = "spark_catalog"
+
+ override protected def catalogName: String = SESSION_CATALOG_NAME
+
+ override protected def catalogVersion: String = "V1"
+
+ override protected def commandVersion: String = V1_COMMAND_VERSION
+}
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DDLCommandTestUtils.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DDLCommandTestUtils.scala
new file mode 100644
index 000000000..83fd95b6b
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DDLCommandTestUtils.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive.command
+
+import java.net.URI
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.connector.catalog.{CatalogPlugin,
SupportsNamespaces}
+import org.scalactic.source
+import org.scalatest.Tag
+
+import org.apache.kyuubi.spark.connector.hive.{KyuubiHiveConnectorException,
KyuubiHiveTest}
+
+trait DDLCommandTestUtils extends KyuubiHiveTest {
+ // The version of the catalog under testing such as "V1", "V2", "Hive V1",
"Hive V2".
+ protected def catalogVersion: String
+ // The version of the SQL command under testing such as "V1", "V2".
+ protected def commandVersion: String
+ // Name of the command as SQL statement, for instance "SHOW PARTITIONS"
+ protected def command: String
+
+ // Overrides the `test` method, and adds a prefix to easily find identify
the catalog to which
+ // the failed test in logs belongs to.
+ override def test(testName: String, testTags: Tag*)(testFun: => Any /*
Assertion */ )(implicit
+ pos: source.Position): Unit = {
+ val testNamePrefix = s"$command using $catalogVersion catalog
$commandVersion command"
+ super.test(s"$testNamePrefix: $testName", testTags: _*)(testFun)
+ }
+
+ // The metadata can not be distinguished between hive catalog and
spark_catalog
+ // because the catalogOption currently supported by spark is case-insensitive
+ // So they share a metadata derby db.
+ protected val builtinNamespace: Seq[String] = Seq("default")
+
+ protected def withNamespace(namespaces: String*)(f: => Unit): Unit = {
+ try {
+ f
+ } finally {
+ withSparkSession() { spark =>
+ namespaces.foreach { name =>
+ spark.sql(s"DROP NAMESPACE IF EXISTS $name CASCADE")
+ }
+ }
+ }
+ }
+
+ protected def sql(sql: String): DataFrame = {
+ withSparkSession() { spark =>
+ spark.sql(sql)
+ }
+ }
+
+ protected def getCatalog(name: String): CatalogPlugin = {
+ withSparkSession() { spark =>
+ spark.sessionState.catalogManager.catalog(name)
+ }
+ }
+
+ protected def makeQualifiedPath(path: String): URI = {
+ val hadoopPath = new Path(path)
+ val fs = hadoopPath.getFileSystem(spark.sessionState.newHadoopConf())
+ fs.makeQualified(hadoopPath).toUri
+ }
+
+ /**
+ * Drops table `tableName` after calling `f`.
+ */
+ protected def withTable(tableNames: String*)(f: => Unit): Unit = {
+ try {
+ f
+ } finally {
+ tableNames.foreach { name =>
+ spark.sql(s"DROP TABLE IF EXISTS $name")
+ }
+ }
+ }
+
+ protected def withNamespaceAndTable(
+ ns: String,
+ tableName: String,
+ cat: String = catalogName)(f: String => Unit): Unit = {
+ val nsCat = s"$cat.$ns"
+ withNamespace(nsCat) {
+ sql(s"CREATE NAMESPACE $nsCat")
+ val t = s"$nsCat.$tableName"
+ withTable(t) {
+ f(t)
+ }
+ }
+ }
+
+ /**
+ * Restores the current catalog/database after calling `f`.
+ */
+ protected def withCurrentCatalogAndNamespace(f: => Unit): Unit = {
+ val curCatalog = sql("select current_catalog()").head().getString(0)
+ val curDatabase = sql("select current_database()").head().getString(0)
+ try {
+ f
+ } finally {
+ spark.sql(s"USE $curCatalog.$curDatabase")
+ }
+ }
+
+ implicit class CatalogHelper(plugin: CatalogPlugin) {
+ def asNamespaceCatalog: SupportsNamespaces = plugin match {
+ case namespaceCatalog: SupportsNamespaces =>
+ namespaceCatalog
+ case _ =>
+ throw KyuubiHiveConnectorException(s"Catalog ${plugin.name} does not
support namespaces")
+ }
+ }
+}
+
+object DDLCommandTestUtils {
+ val V1_COMMAND_VERSION = "V1"
+ val V2_COMMAND_VERSION = "V2"
+}
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DropNamespaceSuite.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DropNamespaceSuite.scala
new file mode 100644
index 000000000..66eb42c86
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DropNamespaceSuite.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive.command
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.types.{StringType, StructType}
+
+import
org.apache.kyuubi.spark.connector.hive.command.DDLCommandTestUtils.{V1_COMMAND_VERSION,
V2_COMMAND_VERSION}
+
+trait DropNamespaceSuiteBase extends DDLCommandTestUtils {
+ override protected def command: String = "DROP NAMESPACE"
+
+ private def namespace: String = "fakens"
+
+ protected def notFoundMsgPrefix: String =
+ if (commandVersion == V1_COMMAND_VERSION) "Database" else "Namespace"
+
+ protected def checkNamespace(expected: Seq[String]) = {
+ val df = sql(s"SHOW NAMESPACES IN $catalogName")
+ assert(df.schema === new StructType().add("namespace", StringType, false))
+ checkAnswer(df, expected.map(Row(_)))
+ }
+
+ override def afterEach(): Unit = {
+ sql(s"DROP NAMESPACE IF EXISTS $catalogName.$namespace CASCADE")
+ super.afterEach()
+ }
+
+ test("basic tests") {
+ sql(s"CREATE NAMESPACE $catalogName.$namespace")
+ checkNamespace(Seq(namespace) ++ builtinNamespace)
+
+ sql(s"DROP NAMESPACE $catalogName.$namespace")
+ checkNamespace(builtinNamespace)
+ }
+
+ test("test handling of 'IF EXISTS'") {
+ // It must not throw any exceptions
+ sql(s"DROP NAMESPACE IF EXISTS $catalogName.unknown")
+ checkNamespace(builtinNamespace)
+ }
+
+ test("namespace does not exist") {
+ // Namespace $catalog.unknown does not exist.
+ val message = intercept[AnalysisException] {
+ sql(s"DROP NAMESPACE $catalogName.unknown")
+ }.getMessage
+ assert(message.contains(s"'unknown' not found"))
+ }
+
+ test("drop non-empty namespace with a non-cascading mode") {
+ sql(s"CREATE NAMESPACE $catalogName.$namespace")
+ sql(s"CREATE TABLE $catalogName.$namespace.table (id bigint) USING
parquet")
+ checkNamespace(Seq(namespace) ++ builtinNamespace)
+
+ // $catalog.ns.table is present, thus $catalog.ns cannot be dropped.
+ val e = intercept[IllegalStateException] {
+ sql(s"DROP NAMESPACE $catalogName.$namespace")
+ }
+ assert(e.getMessage.contains(s"Namespace $namespace is not empty"))
+ sql(s"DROP TABLE $catalogName.$namespace.table")
+
+ // Now that $catalog.ns is empty, it can be dropped.
+ sql(s"DROP NAMESPACE $catalogName.$namespace")
+ checkNamespace(builtinNamespace)
+ }
+
+ test("drop non-empty namespace with a cascade mode") {
+ sql(s"CREATE NAMESPACE $catalogName.$namespace")
+ sql(s"CREATE TABLE $catalogName.$namespace.table (id bigint) USING
parquet")
+ checkNamespace(Seq(namespace) ++ builtinNamespace)
+
+ sql(s"DROP NAMESPACE $catalogName.$namespace CASCADE")
+ checkNamespace(builtinNamespace)
+ }
+
+ test("drop current namespace") {
+ sql(s"CREATE NAMESPACE $catalogName.$namespace")
+ sql(s"USE $catalogName.$namespace")
+ sql(s"DROP NAMESPACE $catalogName.$namespace")
+ checkNamespace(builtinNamespace)
+ }
+}
+
+class DropNamespaceV2Suite extends DropNamespaceSuiteBase {
+
+ override protected def catalogName: String = super.catalogName
+
+ override protected def catalogVersion: String = "Hive V2"
+
+ override protected def commandVersion: String = V2_COMMAND_VERSION
+}
+
+class DropNamespaceV1Suite extends DropNamespaceSuiteBase {
+
+ val SESSION_CATALOG_NAME: String = "spark_catalog"
+
+ override protected def catalogName: String = SESSION_CATALOG_NAME
+
+ override protected def catalogVersion: String = "V1"
+
+ override protected def commandVersion: String = V1_COMMAND_VERSION
+}
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/ShowTablesSuite.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/ShowTablesSuite.scala
new file mode 100644
index 000000000..bff47c9de
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/ShowTablesSuite.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive.command
+
+import org.apache.spark.sql.Row
+
+import
org.apache.kyuubi.spark.connector.hive.command.DDLCommandTestUtils.{V1_COMMAND_VERSION,
V2_COMMAND_VERSION}
+
+trait ShowTablesSuiteBase extends DDLCommandTestUtils {
+ override protected def command: String = "SHOW TABLES"
+
+ private val defaultUsing = "USING parquet"
+
+ protected def runShowTablesSql(sqlText: String, expected: Seq[Row]): Unit = {
+ val df = spark.sql(sqlText)
+ checkAnswer(df, expected)
+ }
+
+ test("show an existing table") {
+ withNamespaceAndTable("ns", "table") { t =>
+ sql(s"CREATE TABLE $t (name STRING, id INT) $defaultUsing")
+ runShowTablesSql(s"SHOW TABLES IN $catalogName.ns", Seq(Row("ns",
"table", false)))
+ }
+ }
+
+ test("show tables with a pattern") {
+ withNamespace(s"$catalogName.ns1", s"$catalogName.ns2") {
+ sql(s"CREATE NAMESPACE $catalogName.ns1")
+ sql(s"CREATE NAMESPACE $catalogName.ns2")
+ withTable(
+ s"$catalogName.ns1.table",
+ s"$catalogName.ns1.table_name_1a",
+ s"$catalogName.ns1.table_name_2b",
+ s"$catalogName.ns2.table_name_2b") {
+ sql(s"CREATE TABLE $catalogName.ns1.table (id bigint, data string)
$defaultUsing")
+ sql(s"CREATE TABLE $catalogName.ns1.table_name_1a (id bigint, data
string) $defaultUsing")
+ sql(s"CREATE TABLE $catalogName.ns1.table_name_2b (id bigint, data
string) $defaultUsing")
+ sql(s"CREATE TABLE $catalogName.ns2.table_name_2b (id bigint, data
string) $defaultUsing")
+
+ runShowTablesSql(
+ s"SHOW TABLES FROM $catalogName.ns1",
+ Seq(
+ Row("ns1", "table", false),
+ Row("ns1", "table_name_1a", false),
+ Row("ns1", "table_name_2b", false)))
+
+ runShowTablesSql(
+ s"SHOW TABLES FROM $catalogName.ns1 LIKE '*name*'",
+ Seq(
+ Row("ns1", "table_name_1a", false),
+ Row("ns1", "table_name_2b", false)))
+
+ runShowTablesSql(
+ s"SHOW TABLES FROM $catalogName.ns1 LIKE
'table_name_1*|table_name_2*'",
+ Seq(
+ Row("ns1", "table_name_1a", false),
+ Row("ns1", "table_name_2b", false)))
+
+ runShowTablesSql(
+ s"SHOW TABLES FROM $catalogName.ns1 LIKE '*2b'",
+ Seq(Row("ns1", "table_name_2b", false)))
+ }
+ }
+ }
+
+ test("change current catalog and namespace with USE statements") {
+ withCurrentCatalogAndNamespace {
+ withNamespaceAndTable("ns", "table") { t =>
+ sql(s"CREATE TABLE $t (name STRING, id INT) $defaultUsing")
+
+ sql(s"USE $catalogName")
+ runShowTablesSql("SHOW TABLES", Seq())
+
+ // Update the current namespace to match "ns.tbl".
+ sql(s"USE $catalogName.ns")
+ runShowTablesSql("SHOW TABLES", Seq(Row("ns", "table", false)))
+ }
+ }
+ }
+}
+
+class ShowTablesV2Suite extends ShowTablesSuiteBase {
+
+ override protected def catalogName: String = super.catalogName
+
+ override protected def catalogVersion: String = "Hive V2"
+
+ override protected def commandVersion: String = V2_COMMAND_VERSION
+}
+
+class ShowTablesV1Suite extends ShowTablesSuiteBase {
+
+ val SESSION_CATALOG_NAME: String = "spark_catalog"
+
+ override protected def catalogName: String = SESSION_CATALOG_NAME
+
+ override protected def catalogVersion: String = "V1"
+
+ override protected def commandVersion: String = V1_COMMAND_VERSION
+}