This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new bbf916d1d [KYUUBI #3529] Supple DDL tests for Spark Hive connector and 
fix consistent issues w/ V1 implementation
bbf916d1d is described below

commit bbf916d1debc120d1cc99d1495c456405d14945a
Author: yikf <[email protected]>
AuthorDate: Sat Oct 15 22:45:22 2022 +0800

    [KYUUBI #3529] Supple DDL tests for Spark Hive connector and fix consistent 
issues w/ V1 implementation
    
    ### _Why are the changes needed?_
    
    Fix https://github.com/apache/incubator-kyuubi/issues/3529
    
    The intent of this PR is the following:
    - Add tests related to catalog, including the listTables, loadTable, and 
listNamespaces methods;
    - Initialize the DDL test framework.
    - Add CreateNamespaceSuite, DropNamespaceSuite and ShowTablesSuite to check 
for consistency with V1 in hive connector.
    - Rectify the fault that namespaces are deleted in cascades. During 
cascades, ignore the exception that the table exists in the namespace.
    - Fix the tableName problem of HiveTable, which should contain namespace 
name.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #3530 from Yikf/hivev2-test.
    
    Closes #3529
    
    d0af0760 [yikf] Add tests to check for consistency with V1
    
    Authored-by: yikf <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../kyuubi/spark/connector/hive/HiveTable.scala    |   2 +-
 .../spark/connector/hive/HiveTableCatalog.scala    |   2 +-
 .../hive/kyuubi/connector/HiveBridgeHelper.scala   |   4 +
 .../spark/connector/hive/HiveCatalogSuite.scala    | 276 ++++++++++++++++++++-
 .../spark/connector/hive/KyuubiHiveTest.scala      |  37 ++-
 .../hive/command/CreateNamespaceSuite.scala        | 150 +++++++++++
 .../hive/command/DDLCommandTestUtils.scala         | 134 ++++++++++
 .../hive/command/DropNamespaceSuite.scala          | 118 +++++++++
 .../connector/hive/command/ShowTablesSuite.scala   | 115 +++++++++
 9 files changed, 824 insertions(+), 14 deletions(-)

diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala
index 646a4a080..3d1d4f8c4 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala
@@ -59,7 +59,7 @@ case class HiveTable(
       catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
   }
 
-  override def name(): String = catalogTable.identifier.table
+  override def name(): String = catalogTable.identifier.unquotedString
 
   override def schema(): StructType = catalogTable.schema
 
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
index e58a2d871..4d0f48e57 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
@@ -346,7 +346,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
       namespace: Array[String],
       cascade: Boolean): Boolean = namespace match {
     case Array(db) if catalog.databaseExists(db) =>
-      if (catalog.listTables(db).nonEmpty) {
+      if (catalog.listTables(db).nonEmpty && !cascade) {
         throw new IllegalStateException(s"Namespace ${namespace.quoted} is not 
empty")
       }
       catalog.dropDatabase(db, ignoreIfNotExists = false, cascade)
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
index 6ba1e8cfa..88b0305dd 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala
@@ -23,6 +23,7 @@ import org.apache.spark.SparkContext
 import org.apache.spark.rdd.InputFileBlockHolder
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, ExternalCatalogEvent}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Literal}
+import org.apache.spark.sql.catalyst.util.quoteIfNeeded
 import org.apache.spark.sql.connector.catalog.CatalogV2Util
 import org.apache.spark.sql.connector.expressions.{BucketTransform, 
FieldReference, IdentityTransform, LogicalExpressions, Transform}
 import org.apache.spark.sql.connector.expressions.LogicalExpressions.{bucket, 
reference}
@@ -114,4 +115,7 @@ object HiveBridgeHelper {
     case l => l.sql
   }
 
+  implicit class NamespaceHelper(namespace: Array[String]) {
+    def quoted: String = namespace.map(quoteIfNeeded).mkString(".")
+  }
 }
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
index fae0be65e..7a1eb86dc 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
@@ -17,11 +17,83 @@
 
 package org.apache.kyuubi.spark.connector.hive
 
+import java.net.URI
+import java.util
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import com.google.common.collect.Maps
+import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, 
NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
+import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._
+import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
+import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.IdentifierHelper
+
 class HiveCatalogSuite extends KyuubiHiveTest {
 
+  val emptyProps: util.Map[String, String] = Collections.emptyMap[String, 
String]
+  val schema: StructType = new StructType()
+    .add("id", IntegerType)
+    .add("data", StringType)
+
+  val testNs: Array[String] = Array("db")
+  val defaultNs: Array[String] = Array("default")
+  val testIdent: Identifier = Identifier.of(testNs, "test_table")
+
+  var catalog: HiveTableCatalog = _
+
+  private def newCatalog(): HiveTableCatalog = {
+    val catalog = new HiveTableCatalog
+    val catalogName = "hive"
+    val properties = Maps.newHashMap[String, String]()
+    properties.put("javax.jdo.option.ConnectionURL", 
"jdbc:derby:memory:memorydb;create=true")
+    properties.put("javax.jdo.option.ConnectionDriverName", 
"org.apache.derby.jdbc.EmbeddedDriver")
+    catalog.initialize(catalogName, new CaseInsensitiveStringMap(properties))
+    catalog
+  }
+
+  private def filterV2TableProperties(
+      properties: util.Map[String, String]): Map[String, String] = {
+    properties.asScala.filter(kv => !TABLE_RESERVED_PROPERTIES.contains(kv._1))
+      .filter(!_._1.startsWith(TableCatalog.OPTION_PREFIX)).toMap
+  }
+
+  def makeQualifiedPathWithWarehouse(path: String): URI = {
+    val p = new Path(catalog.conf.warehousePath, path)
+    val fs = p.getFileSystem(catalog.hadoopConfiguration())
+    fs.makeQualified(p).toUri
+  }
+
+  private def checkMetadata(expected: Map[String, String], actual: Map[String, 
String]): Unit = {
+    // remove location and comment that are automatically added by HMS unless 
they are expected
+    val toRemove =
+      NAMESPACE_RESERVED_PROPERTIES.filter(expected.contains)
+    assert(expected -- toRemove === actual)
+  }
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    catalog = newCatalog()
+    catalog.createNamespace(Array("ns"), emptyProps)
+    catalog.createNamespace(Array("ns2"), emptyProps)
+    catalog.createNamespace(Array("db"), emptyProps)
+  }
+
+  override def afterEach(): Unit = {
+    super.afterEach()
+    catalog.dropNamespace(Array("ns"), true)
+    catalog.dropNamespace(Array("ns2"), true)
+    catalog.dropNamespace(Array("db"), true)
+    catalog = null
+  }
+
   test("get catalog name") {
     withSparkSession() { spark =>
       val catalog = new HiveTableCatalog
@@ -35,10 +107,9 @@ class HiveCatalogSuite extends KyuubiHiveTest {
     withSparkSession() { spark =>
       try {
         spark.sql("USE hive")
-        spark.sql("CREATE NAMESPACE IF NOT EXISTS ns1")
-        assert(spark.sql(s"SHOW NAMESPACES").collect().length == 2)
+        assert(Try { spark.sql("CREATE NAMESPACE IF NOT EXISTS snns1") 
}.isSuccess)
       } finally {
-        spark.sql("DROP NAMESPACE IF EXISTS ns1")
+        spark.sql("DROP NAMESPACE IF EXISTS snns1")
       }
     }
   }
@@ -51,4 +122,203 @@ class HiveCatalogSuite extends KyuubiHiveTest {
       assert(exception.message === "Table or view not found: 
hive.ns1.nonexistent_table")
     }
   }
+
+  test("listTables") {
+
+    val ident1 = Identifier.of(Array("ns"), "test_table_1")
+    val ident2 = Identifier.of(Array("ns"), "test_table_2")
+    val ident3 = Identifier.of(Array("ns2"), "test_table_1")
+
+    assert(catalog.listTables(Array("ns")).isEmpty)
+
+    catalog.createTable(ident1, schema, Array.empty, emptyProps)
+
+    assert(catalog.listTables(Array("ns")).toSet == Set(ident1))
+    assert(catalog.listTables(Array("ns2")).isEmpty)
+
+    catalog.createTable(ident3, schema, Array.empty, emptyProps)
+    catalog.createTable(ident2, schema, Array.empty, emptyProps)
+
+    assert(catalog.listTables(Array("ns")).toSet == Set(ident1, ident2))
+    assert(catalog.listTables(Array("ns2")).toSet == Set(ident3))
+
+    catalog.dropTable(ident1)
+
+    assert(catalog.listTables(Array("ns")).toSet == Set(ident2))
+
+    catalog.dropTable(ident2)
+
+    assert(catalog.listTables(Array("ns")).isEmpty)
+    assert(catalog.listTables(Array("ns2")).toSet == Set(ident3))
+
+    catalog.dropTable(ident3)
+  }
+
+  test("createTable") {
+    assert(!catalog.tableExists(testIdent))
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
+    assert(parsed == Seq("db", "test_table"))
+    assert(table.schema == schema)
+    assert(filterV2TableProperties(table.properties) == Map())
+
+    assert(catalog.tableExists(testIdent))
+    catalog.dropTable(testIdent)
+  }
+
+  test("createTable: with properties") {
+    val properties = new util.HashMap[String, String]()
+    properties.put("property", "value")
+
+    assert(!catalog.tableExists(testIdent))
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, properties)
+
+    val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
+    assert(parsed == Seq("db", "test_table"))
+    assert(table.schema == schema)
+    assert(filterV2TableProperties(table.properties).asJava == properties)
+
+    assert(catalog.tableExists(testIdent))
+    catalog.dropTable(testIdent)
+  }
+
+  test("createTable: table already exists") {
+    assert(!catalog.tableExists(testIdent))
+
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    val exc = intercept[TableAlreadyExistsException] {
+      catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    }
+
+    assert(exc.message.contains(table.name()))
+    assert(exc.message.contains("already exists"))
+
+    assert(catalog.tableExists(testIdent))
+    catalog.dropTable(testIdent)
+  }
+
+  test("tableExists") {
+    assert(!catalog.tableExists(testIdent))
+
+    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    assert(catalog.tableExists(testIdent))
+
+    catalog.dropTable(testIdent)
+
+    assert(!catalog.tableExists(testIdent))
+  }
+
+  test("createTable: location") {
+    val properties = new util.HashMap[String, String]()
+    assert(!catalog.tableExists(testIdent))
+
+    // default location
+    val t1 = catalog.createTable(testIdent, schema, Array.empty, 
properties).asInstanceOf[HiveTable]
+    assert(t1.catalogTable.location ===
+      catalog.catalog.defaultTablePath(testIdent.asTableIdentifier))
+    catalog.dropTable(testIdent)
+
+    // relative path
+    properties.put(TableCatalog.PROP_LOCATION, "relative/path")
+    val t2 = catalog.createTable(testIdent, schema, Array.empty, 
properties).asInstanceOf[HiveTable]
+    assert(t2.catalogTable.location === 
makeQualifiedPathWithWarehouse("db.db/relative/path"))
+    catalog.dropTable(testIdent)
+
+    // absolute path without scheme
+    properties.put(TableCatalog.PROP_LOCATION, "/absolute/path")
+    val t3 = catalog.createTable(testIdent, schema, Array.empty, 
properties).asInstanceOf[HiveTable]
+    assert(t3.catalogTable.location.toString === "file:/absolute/path")
+    catalog.dropTable(testIdent)
+
+    // absolute path with scheme
+    properties.put(TableCatalog.PROP_LOCATION, "file:/absolute/path")
+    val t4 = catalog.createTable(testIdent, schema, Array.empty, 
properties).asInstanceOf[HiveTable]
+    assert(t4.catalogTable.location.toString === "file:/absolute/path")
+    catalog.dropTable(testIdent)
+  }
+
+  test("loadTable") {
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    val loaded = catalog.loadTable(testIdent)
+
+    assert(table.name == loaded.name)
+    assert(table.schema == loaded.schema)
+    assert(table.properties == loaded.properties)
+    catalog.dropTable(testIdent)
+  }
+
+  test("loadTable: table does not exist") {
+    val exc = intercept[NoSuchTableException] {
+      catalog.loadTable(testIdent)
+    }
+
+    assert(exc.message.contains("Table or view 'test_table' not found in 
database 'db'"))
+  }
+
+  test("invalidateTable") {
+    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    // Hive v2 don't cache table
+    catalog.invalidateTable(testIdent)
+
+    val loaded = catalog.loadTable(testIdent)
+
+    assert(table.name == loaded.name)
+    assert(table.schema == loaded.schema)
+    assert(table.properties == loaded.properties)
+    catalog.dropTable(testIdent)
+  }
+
+  test("listNamespaces: fail if missing namespace") {
+    catalog.dropNamespace(testNs, true)
+    assert(catalog.namespaceExists(testNs) === false)
+
+    val exc = intercept[NoSuchNamespaceException] {
+      assert(catalog.listNamespaces(testNs) === Array())
+    }
+
+    assert(exc.getMessage.contains(testNs.quoted))
+    assert(catalog.namespaceExists(testNs) === false)
+  }
+
+  test("loadNamespaceMetadata: fail missing namespace") {
+    catalog.dropNamespace(testNs, true)
+    val exc = intercept[NoSuchNamespaceException] {
+      catalog.loadNamespaceMetadata(testNs)
+    }
+
+    assert(exc.getMessage.contains(testNs.quoted))
+  }
+
+  test("loadNamespaceMetadata: non-empty metadata") {
+    catalog.dropNamespace(testNs, true)
+    assert(catalog.namespaceExists(testNs) === false)
+
+    catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+
+    val metadata = catalog.loadNamespaceMetadata(testNs)
+
+    assert(catalog.namespaceExists(testNs) === true)
+    checkMetadata(metadata.asScala.toMap, Map("property" -> "value"))
+
+    catalog.dropNamespace(testNs, cascade = false)
+  }
+
+  test("loadNamespaceMetadata: empty metadata") {
+    catalog.dropNamespace(testNs, true)
+    assert(catalog.namespaceExists(testNs) === false)
+
+    catalog.createNamespace(testNs, emptyProps)
+
+    val metadata = catalog.loadNamespaceMetadata(testNs)
+
+    assert(catalog.namespaceExists(testNs) === true)
+    checkMetadata(metadata.asScala.toMap, emptyProps.asScala.toMap)
+
+    catalog.dropNamespace(testNs, cascade = false)
+  }
 }
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala
index 0f5ecc021..d0b17dc05 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala
@@ -19,14 +19,31 @@ package org.apache.kyuubi.spark.connector.hive
 
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{QueryTest, SparkSession}
+import org.apache.spark.sql.connector.catalog.{SupportsNamespaces, 
TableCatalog}
 
-import org.apache.kyuubi.KyuubiFunSuite
 import org.apache.kyuubi.spark.connector.common.LocalSparkSession
 
-abstract class KyuubiHiveTest extends KyuubiFunSuite with Logging {
+abstract class KyuubiHiveTest extends QueryTest with Logging {
 
-  private var spark: SparkSession = _
+  private var innerSpark: SparkSession = _
+
+  protected val TABLE_RESERVED_PROPERTIES =
+    Seq(
+      TableCatalog.PROP_COMMENT,
+      TableCatalog.PROP_LOCATION,
+      TableCatalog.PROP_PROVIDER,
+      TableCatalog.PROP_OWNER,
+      TableCatalog.PROP_EXTERNAL,
+      TableCatalog.PROP_IS_MANAGED_LOCATION)
+
+  protected val NAMESPACE_RESERVED_PROPERTIES =
+    Seq(
+      SupportsNamespaces.PROP_COMMENT,
+      SupportsNamespaces.PROP_LOCATION,
+      SupportsNamespaces.PROP_OWNER)
+
+  protected def catalogName: String = "hive"
 
   override def beforeEach(): Unit = {
     super.beforeAll()
@@ -35,7 +52,7 @@ abstract class KyuubiHiveTest extends KyuubiFunSuite with 
Logging {
 
   override def afterEach(): Unit = {
     super.afterAll()
-    LocalSparkSession.stop(spark)
+    LocalSparkSession.stop(innerSpark)
   }
 
   def getOrCreateSpark(): Unit = {
@@ -47,15 +64,17 @@ abstract class KyuubiHiveTest extends KyuubiFunSuite with 
Logging {
       .set("javax.jdo.option.ConnectionURL", 
"jdbc:derby:memory:memorydb;create=true")
       .set("javax.jdo.option.ConnectionDriverName", 
"org.apache.derby.jdbc.EmbeddedDriver")
 
-    spark = SparkSession.builder.config(sparkConf).getOrCreate()
+    innerSpark = SparkSession.builder.config(sparkConf).getOrCreate()
   }
 
   def withSparkSession[T](conf: Map[String, String] = Map.empty[String, 
String])(
       f: SparkSession => T): T = {
-    assert(spark != null)
+    assert(innerSpark != null)
     conf.foreach {
-      case (k, v) => spark.sessionState.conf.setConfString(k, v)
+      case (k, v) => innerSpark.sessionState.conf.setConfString(k, v)
     }
-    f(spark)
+    f(innerSpark)
   }
+
+  override def spark: SparkSession = innerSpark
 }
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/CreateNamespaceSuite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/CreateNamespaceSuite.scala
new file mode 100644
index 000000000..855eb0c67
--- /dev/null
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/CreateNamespaceSuite.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive.command
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces
+import org.apache.spark.sql.internal.SQLConf
+
+import 
org.apache.kyuubi.spark.connector.hive.command.DDLCommandTestUtils.{V1_COMMAND_VERSION,
 V2_COMMAND_VERSION}
+
+trait CreateNamespaceSuiteBase extends DDLCommandTestUtils {
+  override protected def command: String = "CREATE NAMESPACE"
+
+  protected def namespaceArray: Array[String] = namespace.split('.')
+
+  protected def notFoundMsgPrefix: String =
+    if (commandVersion == V1_COMMAND_VERSION) "Database" else "Namespace"
+
+  private def namespace: String = "fakens"
+
+  override def afterEach(): Unit = {
+    sql(s"DROP NAMESPACE IF EXISTS $catalogName.$namespace")
+    super.afterEach()
+  }
+
+  test("basic test") {
+    val ns = s"$catalogName.$namespace"
+    withNamespace(ns) {
+      sql(s"CREATE NAMESPACE $ns")
+      
assert(getCatalog(catalogName).asNamespaceCatalog.namespaceExists(namespaceArray))
+    }
+  }
+
+  test("namespace with location") {
+    val ns = s"$catalogName.$namespace"
+    withNamespace(ns) {
+      withTempDir { tmpDir =>
+        // The generated temp path is not qualified.
+        val path = tmpDir.getCanonicalPath
+        assert(!path.startsWith("file:/"))
+
+        val e = intercept[IllegalArgumentException] {
+          sql(s"CREATE NAMESPACE $ns LOCATION ''")
+        }
+        assert(e.getMessage.contains("Can not create a Path from an empty 
string"))
+
+        val uri = new Path(path).toUri
+        sql(s"CREATE NAMESPACE $ns LOCATION '$uri'")
+
+        // Make sure the location is qualified.
+        val expected = makeQualifiedPath(tmpDir.toString)
+        assert("file" === expected.getScheme)
+        assert(new Path(getNamespaceLocation(catalogName, 
namespaceArray)).toUri === expected)
+      }
+    }
+  }
+
+  test("Namespace already exists") {
+    val ns = s"$catalogName.$namespace"
+    withNamespace(ns) {
+      sql(s"CREATE NAMESPACE $ns")
+
+      val e = intercept[NamespaceAlreadyExistsException] {
+        sql(s"CREATE NAMESPACE $ns")
+      }
+      assert(e.getMessage.contains(s"Namespace '$namespace' already exists"))
+
+      // The following will be no-op since the namespace already exists.
+      Try { sql(s"CREATE NAMESPACE IF NOT EXISTS $ns") }.isSuccess
+    }
+  }
+
+  test("CreateNameSpace: reserved properties") {
+    import SupportsNamespaces._
+    val ns = s"$catalogName.$namespace"
+    withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) {
+      NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key 
=>
+        val exception = intercept[ParseException] {
+          sql(s"CREATE NAMESPACE $ns WITH DBPROPERTIES('$key'='dummyVal')")
+        }
+        assert(exception.getMessage.contains(s"$key is a reserved namespace 
property"))
+      }
+    }
+    withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) {
+      NAMESPACE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key 
=>
+        withNamespace(ns) {
+          sql(s"CREATE NAMESPACE $ns WITH DBPROPERTIES('$key'='foo')")
+          assert(
+            sql(s"DESC NAMESPACE EXTENDED $ns")
+              .toDF("k", "v")
+              .where("k='Properties'")
+              .where("v=''")
+              .count == 1,
+            s"$key is a reserved namespace property and ignored")
+          val meta =
+            
getCatalog(catalogName).asNamespaceCatalog.loadNamespaceMetadata(namespaceArray)
+          assert(
+            meta.get(key) == null || !meta.get(key).contains("foo"),
+            "reserved properties should not have side effects")
+        }
+      }
+    }
+  }
+
+  protected def getNamespaceLocation(catalog: String, namespace: 
Array[String]): String = {
+    val metadata = getCatalog(catalog).asNamespaceCatalog
+      .loadNamespaceMetadata(namespace).asScala
+    metadata(SupportsNamespaces.PROP_LOCATION)
+  }
+}
+
+class CreateNamespaceV2Suite extends CreateNamespaceSuiteBase {
+
+  override protected def catalogName: String = super.catalogName
+
+  override protected def catalogVersion: String = "Hive V2"
+
+  override protected def commandVersion: String = V2_COMMAND_VERSION
+}
+
+class CreateNamespaceV1Suite extends CreateNamespaceSuiteBase {
+
+  val SESSION_CATALOG_NAME: String = "spark_catalog"
+
+  override protected def catalogName: String = SESSION_CATALOG_NAME
+
+  override protected def catalogVersion: String = "V1"
+
+  override protected def commandVersion: String = V1_COMMAND_VERSION
+}
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DDLCommandTestUtils.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DDLCommandTestUtils.scala
new file mode 100644
index 000000000..83fd95b6b
--- /dev/null
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DDLCommandTestUtils.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive.command
+
+import java.net.URI
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.connector.catalog.{CatalogPlugin, 
SupportsNamespaces}
+import org.scalactic.source
+import org.scalatest.Tag
+
+import org.apache.kyuubi.spark.connector.hive.{KyuubiHiveConnectorException, 
KyuubiHiveTest}
+
+trait DDLCommandTestUtils extends KyuubiHiveTest {
+  // The version of the catalog under testing such as "V1", "V2", "Hive V1", 
"Hive V2".
+  protected def catalogVersion: String
+  // The version of the SQL command under testing such as "V1", "V2".
+  protected def commandVersion: String
+  // Name of the command as SQL statement, for instance "SHOW PARTITIONS"
+  protected def command: String
+
+  // Overrides the `test` method, and adds a prefix to easily find identify 
the catalog to which
+  // the failed test in logs belongs to.
+  override def test(testName: String, testTags: Tag*)(testFun: => Any /* 
Assertion */ )(implicit
+      pos: source.Position): Unit = {
+    val testNamePrefix = s"$command using $catalogVersion catalog 
$commandVersion command"
+    super.test(s"$testNamePrefix: $testName", testTags: _*)(testFun)
+  }
+
+  // The metadata can not be distinguished between hive catalog and 
spark_catalog
+  // because the catalogOption currently supported by spark is case-insensitive
+  // So they share a metadata derby db.
+  protected val builtinNamespace: Seq[String] = Seq("default")
+
+  protected def withNamespace(namespaces: String*)(f: => Unit): Unit = {
+    try {
+      f
+    } finally {
+      withSparkSession() { spark =>
+        namespaces.foreach { name =>
+          spark.sql(s"DROP NAMESPACE IF EXISTS $name CASCADE")
+        }
+      }
+    }
+  }
+
+  protected def sql(sql: String): DataFrame = {
+    withSparkSession() { spark =>
+      spark.sql(sql)
+    }
+  }
+
+  protected def getCatalog(name: String): CatalogPlugin = {
+    withSparkSession() { spark =>
+      spark.sessionState.catalogManager.catalog(name)
+    }
+  }
+
+  protected def makeQualifiedPath(path: String): URI = {
+    val hadoopPath = new Path(path)
+    val fs = hadoopPath.getFileSystem(spark.sessionState.newHadoopConf())
+    fs.makeQualified(hadoopPath).toUri
+  }
+
+  /**
+   * Drops table `tableName` after calling `f`.
+   */
+  protected def withTable(tableNames: String*)(f: => Unit): Unit = {
+    try {
+      f
+    } finally {
+      tableNames.foreach { name =>
+        spark.sql(s"DROP TABLE IF EXISTS $name")
+      }
+    }
+  }
+
+  protected def withNamespaceAndTable(
+      ns: String,
+      tableName: String,
+      cat: String = catalogName)(f: String => Unit): Unit = {
+    val nsCat = s"$cat.$ns"
+    withNamespace(nsCat) {
+      sql(s"CREATE NAMESPACE $nsCat")
+      val t = s"$nsCat.$tableName"
+      withTable(t) {
+        f(t)
+      }
+    }
+  }
+
+  /**
+   * Restores the current catalog/database after calling `f`.
+   */
+  protected def withCurrentCatalogAndNamespace(f: => Unit): Unit = {
+    val curCatalog = sql("select current_catalog()").head().getString(0)
+    val curDatabase = sql("select current_database()").head().getString(0)
+    try {
+      f
+    } finally {
+      spark.sql(s"USE $curCatalog.$curDatabase")
+    }
+  }
+
+  implicit class CatalogHelper(plugin: CatalogPlugin) {
+    def asNamespaceCatalog: SupportsNamespaces = plugin match {
+      case namespaceCatalog: SupportsNamespaces =>
+        namespaceCatalog
+      case _ =>
+        throw KyuubiHiveConnectorException(s"Catalog ${plugin.name} does not 
support namespaces")
+    }
+  }
+}
+
+object DDLCommandTestUtils {
+  val V1_COMMAND_VERSION = "V1"
+  val V2_COMMAND_VERSION = "V2"
+}
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DropNamespaceSuite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DropNamespaceSuite.scala
new file mode 100644
index 000000000..66eb42c86
--- /dev/null
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DropNamespaceSuite.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive.command
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.types.{StringType, StructType}
+
+import 
org.apache.kyuubi.spark.connector.hive.command.DDLCommandTestUtils.{V1_COMMAND_VERSION,
 V2_COMMAND_VERSION}
+
+trait DropNamespaceSuiteBase extends DDLCommandTestUtils {
+  override protected def command: String = "DROP NAMESPACE"
+
+  private def namespace: String = "fakens"
+
+  protected def notFoundMsgPrefix: String =
+    if (commandVersion == V1_COMMAND_VERSION) "Database" else "Namespace"
+
+  protected def checkNamespace(expected: Seq[String]) = {
+    val df = sql(s"SHOW NAMESPACES IN $catalogName")
+    assert(df.schema === new StructType().add("namespace", StringType, false))
+    checkAnswer(df, expected.map(Row(_)))
+  }
+
+  override def afterEach(): Unit = {
+    sql(s"DROP NAMESPACE IF EXISTS $catalogName.$namespace CASCADE")
+    super.afterEach()
+  }
+
+  test("basic tests") {
+    sql(s"CREATE NAMESPACE $catalogName.$namespace")
+    checkNamespace(Seq(namespace) ++ builtinNamespace)
+
+    sql(s"DROP NAMESPACE $catalogName.$namespace")
+    checkNamespace(builtinNamespace)
+  }
+
+  test("test handling of 'IF EXISTS'") {
+    // It must not throw any exceptions
+    sql(s"DROP NAMESPACE IF EXISTS $catalogName.unknown")
+    checkNamespace(builtinNamespace)
+  }
+
+  test("namespace does not exist") {
+    // Namespace $catalog.unknown does not exist.
+    val message = intercept[AnalysisException] {
+      sql(s"DROP NAMESPACE $catalogName.unknown")
+    }.getMessage
+    assert(message.contains(s"'unknown' not found"))
+  }
+
+  test("drop non-empty namespace with a non-cascading mode") {
+    sql(s"CREATE NAMESPACE $catalogName.$namespace")
+    sql(s"CREATE TABLE $catalogName.$namespace.table (id bigint) USING 
parquet")
+    checkNamespace(Seq(namespace) ++ builtinNamespace)
+
+    // $catalog.ns.table is present, thus $catalog.ns cannot be dropped.
+    val e = intercept[IllegalStateException] {
+      sql(s"DROP NAMESPACE $catalogName.$namespace")
+    }
+    assert(e.getMessage.contains(s"Namespace $namespace is not empty"))
+    sql(s"DROP TABLE $catalogName.$namespace.table")
+
+    // Now that $catalog.ns is empty, it can be dropped.
+    sql(s"DROP NAMESPACE $catalogName.$namespace")
+    checkNamespace(builtinNamespace)
+  }
+
+  test("drop non-empty namespace with a cascade mode") {
+    sql(s"CREATE NAMESPACE $catalogName.$namespace")
+    sql(s"CREATE TABLE $catalogName.$namespace.table (id bigint) USING 
parquet")
+    checkNamespace(Seq(namespace) ++ builtinNamespace)
+
+    sql(s"DROP NAMESPACE $catalogName.$namespace CASCADE")
+    checkNamespace(builtinNamespace)
+  }
+
+  test("drop current namespace") {
+    sql(s"CREATE NAMESPACE $catalogName.$namespace")
+    sql(s"USE $catalogName.$namespace")
+    sql(s"DROP NAMESPACE $catalogName.$namespace")
+    checkNamespace(builtinNamespace)
+  }
+}
+
+class DropNamespaceV2Suite extends DropNamespaceSuiteBase {
+
+  override protected def catalogName: String = super.catalogName
+
+  override protected def catalogVersion: String = "Hive V2"
+
+  override protected def commandVersion: String = V2_COMMAND_VERSION
+}
+
+class DropNamespaceV1Suite extends DropNamespaceSuiteBase {
+
+  val SESSION_CATALOG_NAME: String = "spark_catalog"
+
+  override protected def catalogName: String = SESSION_CATALOG_NAME
+
+  override protected def catalogVersion: String = "V1"
+
+  override protected def commandVersion: String = V1_COMMAND_VERSION
+}
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/ShowTablesSuite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/ShowTablesSuite.scala
new file mode 100644
index 000000000..bff47c9de
--- /dev/null
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/ShowTablesSuite.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive.command
+
+import org.apache.spark.sql.Row
+
+import 
org.apache.kyuubi.spark.connector.hive.command.DDLCommandTestUtils.{V1_COMMAND_VERSION,
 V2_COMMAND_VERSION}
+
+trait ShowTablesSuiteBase extends DDLCommandTestUtils {
+  override protected def command: String = "SHOW TABLES"
+
+  private val defaultUsing = "USING parquet"
+
+  protected def runShowTablesSql(sqlText: String, expected: Seq[Row]): Unit = {
+    val df = spark.sql(sqlText)
+    checkAnswer(df, expected)
+  }
+
+  test("show an existing table") {
+    withNamespaceAndTable("ns", "table") { t =>
+      sql(s"CREATE TABLE $t (name STRING, id INT) $defaultUsing")
+      runShowTablesSql(s"SHOW TABLES IN $catalogName.ns", Seq(Row("ns", 
"table", false)))
+    }
+  }
+
+  test("show tables with a pattern") {
+    withNamespace(s"$catalogName.ns1", s"$catalogName.ns2") {
+      sql(s"CREATE NAMESPACE $catalogName.ns1")
+      sql(s"CREATE NAMESPACE $catalogName.ns2")
+      withTable(
+        s"$catalogName.ns1.table",
+        s"$catalogName.ns1.table_name_1a",
+        s"$catalogName.ns1.table_name_2b",
+        s"$catalogName.ns2.table_name_2b") {
+        sql(s"CREATE TABLE $catalogName.ns1.table (id bigint, data string) 
$defaultUsing")
+        sql(s"CREATE TABLE $catalogName.ns1.table_name_1a (id bigint, data 
string) $defaultUsing")
+        sql(s"CREATE TABLE $catalogName.ns1.table_name_2b (id bigint, data 
string) $defaultUsing")
+        sql(s"CREATE TABLE $catalogName.ns2.table_name_2b (id bigint, data 
string) $defaultUsing")
+
+        runShowTablesSql(
+          s"SHOW TABLES FROM $catalogName.ns1",
+          Seq(
+            Row("ns1", "table", false),
+            Row("ns1", "table_name_1a", false),
+            Row("ns1", "table_name_2b", false)))
+
+        runShowTablesSql(
+          s"SHOW TABLES FROM $catalogName.ns1 LIKE '*name*'",
+          Seq(
+            Row("ns1", "table_name_1a", false),
+            Row("ns1", "table_name_2b", false)))
+
+        runShowTablesSql(
+          s"SHOW TABLES FROM $catalogName.ns1 LIKE 
'table_name_1*|table_name_2*'",
+          Seq(
+            Row("ns1", "table_name_1a", false),
+            Row("ns1", "table_name_2b", false)))
+
+        runShowTablesSql(
+          s"SHOW TABLES FROM $catalogName.ns1 LIKE '*2b'",
+          Seq(Row("ns1", "table_name_2b", false)))
+      }
+    }
+  }
+
+  test("change current catalog and namespace with USE statements") {
+    withCurrentCatalogAndNamespace {
+      withNamespaceAndTable("ns", "table") { t =>
+        sql(s"CREATE TABLE $t (name STRING, id INT) $defaultUsing")
+
+        sql(s"USE $catalogName")
+        runShowTablesSql("SHOW TABLES", Seq())
+
+        // Update the current namespace to match "ns.tbl".
+        sql(s"USE $catalogName.ns")
+        runShowTablesSql("SHOW TABLES", Seq(Row("ns", "table", false)))
+      }
+    }
+  }
+}
+
+class ShowTablesV2Suite extends ShowTablesSuiteBase {
+
+  override protected def catalogName: String = super.catalogName
+
+  override protected def catalogVersion: String = "Hive V2"
+
+  override protected def commandVersion: String = V2_COMMAND_VERSION
+}
+
+class ShowTablesV1Suite extends ShowTablesSuiteBase {
+
+  val SESSION_CATALOG_NAME: String = "spark_catalog"
+
+  override protected def catalogName: String = SESSION_CATALOG_NAME
+
+  override protected def catalogVersion: String = "V1"
+
+  override protected def commandVersion: String = V1_COMMAND_VERSION
+}


Reply via email to