This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0345f11  [SPARK-27661][SQL] Add SupportsNamespaces API
0345f11 is described below

commit 0345f1174d6019374ed5451140e01c224508bc0e
Author: Ryan Blue <b...@apache.org>
AuthorDate: Sun Aug 4 21:29:40 2019 -0700

    [SPARK-27661][SQL] Add SupportsNamespaces API
    
    ## What changes were proposed in this pull request?
    
    This adds an interface for catalog plugins that exposes namespace 
operations:
    * `listNamespaces`
    * `namespaceExists`
    * `loadNamespaceMetadata`
    * `createNamespace`
    * `alterNamespace`
    * `dropNamespace`
    
    ## How was this patch tested?
    
    API only. Existing tests for regressions.
    
    Closes #24560 from rdblue/SPARK-27661-add-catalog-namespace-api.
    
    Authored-by: Ryan Blue <b...@apache.org>
    Signed-off-by: Burak Yavuz <brk...@gmail.com>
---
 .../spark/sql/catalog/v2/NamespaceChange.java      |  97 ++++++++++
 .../spark/sql/catalog/v2/SupportsNamespaces.java   | 145 +++++++++++++++
 .../spark/sql/catalog/v2/utils/CatalogV2Util.scala |  33 +++-
 .../spark/sql/catalog/v2/TableCatalogSuite.scala   | 204 ++++++++++++++++++++-
 .../spark/sql/catalog/v2/TestTableCatalog.scala    |  70 ++++++-
 5 files changed, 543 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java
new file mode 100644
index 0000000..6f5895b
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/NamespaceChange.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+/**
+ * NamespaceChange subclasses represent requested changes to a namespace. 
These are passed to
+ * {@link SupportsNamespaces#alterNamespace}. For example,
+ * <pre>
+ *   import NamespaceChange._
+ *   val catalog = Catalogs.load(name)
+ *   catalog.alterNamespace(ident,
+ *       setProperty("prop", "value"),
+ *       removeProperty("other_prop")
+ *     )
+ * </pre>
+ */
+public interface NamespaceChange {
+  /**
+   * Create a NamespaceChange for setting a namespace property.
+   * <p>
+   * If the property already exists, it will be replaced with the new value.
+   *
+   * @param property the property name
+   * @param value the new property value
+   * @return a NamespaceChange for the addition
+   */
+  static NamespaceChange setProperty(String property, String value) {
+    return new SetProperty(property, value);
+  }
+
+  /**
+   * Create a NamespaceChange for removing a namespace property.
+   * <p>
+   * If the property does not exist, the change will succeed.
+   *
+   * @param property the property name
+   * @return a NamespaceChange for the addition
+   */
+  static NamespaceChange removeProperty(String property) {
+    return new RemoveProperty(property);
+  }
+
+  /**
+   * A NamespaceChange to set a namespace property.
+   * <p>
+   * If the property already exists, it must be replaced with the new value.
+   */
+  final class SetProperty implements NamespaceChange {
+    private final String property;
+    private final String value;
+
+    private SetProperty(String property, String value) {
+      this.property = property;
+      this.value = value;
+    }
+
+    public String property() {
+      return property;
+    }
+
+    public String value() {
+      return value;
+    }
+  }
+
+  /**
+   * A NamespaceChange to remove a namespace property.
+   * <p>
+   * If the property does not exist, the change should succeed.
+   */
+  final class RemoveProperty implements NamespaceChange {
+    private final String property;
+
+    private RemoveProperty(String property) {
+      this.property = property;
+    }
+
+    public String property() {
+      return property;
+    }
+  }
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java
new file mode 100644
index 0000000..12c2e51
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/SupportsNamespaces.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+
+import java.util.Map;
+
+/**
+ * Catalog methods for working with namespaces.
+ * <p>
+ * If an object such as a table, view, or function exists, its parent 
namespaces must also exist
+ * and must be returned by the discovery methods {@link #listNamespaces()} and
+ * {@link #listNamespaces(String[])}.
+ * <p>
+ * Catalog implementations are not required to maintain the existence of 
namespaces independent of
+ * objects in a namespace. For example, a function catalog that loads 
functions using reflection
+ * and uses Java packages as namespaces is not required to support the methods 
to create, alter, or
+ * drop a namespace. Implementations are allowed to discover the existence of 
objects or namespaces
+ * without throwing {@link NoSuchNamespaceException} when no namespace is 
found.
+ */
+public interface SupportsNamespaces extends CatalogPlugin {
+
+  /**
+   * Return a default namespace for the catalog.
+   * <p>
+   * When this catalog is set as the current catalog, the namespace returned 
by this method will be
+   * set as the current namespace.
+   * <p>
+   * The namespace returned by this method is not required to exist.
+   *
+   * @return a multi-part namespace
+   */
+  default String[] defaultNamespace() {
+    return new String[0];
+  }
+
+  /**
+   * List top-level namespaces from the catalog.
+   * <p>
+   * If an object such as a table, view, or function exists, its parent 
namespaces must also exist
+   * and must be returned by this discovery method. For example, if table 
a.b.t exists, this method
+   * must return ["a"] in the result array.
+   *
+   * @return an array of multi-part namespace names
+   */
+  String[][] listNamespaces() throws NoSuchNamespaceException;
+
+  /**
+   * List namespaces in a namespace.
+   * <p>
+   * If an object such as a table, view, or function exists, its parent 
namespaces must also exist
+   * and must be returned by this discovery method. For example, if table 
a.b.t exists, this method
+   * invoked as listNamespaces(["a"]) must return ["a", "b"] in the result 
array.
+   *
+   * @param namespace a multi-part namespace
+   * @return an array of multi-part namespace names
+   * @throws NoSuchNamespaceException If the namespace does not exist 
(optional)
+   */
+  String[][] listNamespaces(String[] namespace) throws 
NoSuchNamespaceException;
+
+  /**
+   * Test whether a namespace exists.
+   * <p>
+   * If an object such as a table, view, or function exists, its parent 
namespaces must also exist.
+   * For example, if table a.b.t exists, this method invoked as 
namespaceExists(["a"]) or
+   * namespaceExists(["a", "b"]) must return true.
+   *
+   * @param namespace a multi-part namespace
+   * @return true if the namespace exists, false otherwise
+   */
+  default boolean namespaceExists(String[] namespace) {
+    try {
+      loadNamespaceMetadata(namespace);
+      return true;
+    } catch (NoSuchNamespaceException e) {
+      return false;
+    }
+  }
+
+  /**
+   * Load metadata properties for a namespace.
+   *
+   * @param namespace a multi-part namespace
+   * @return a string map of properties for the given namespace
+   * @throws NoSuchNamespaceException If the namespace does not exist 
(optional)
+   * @throws UnsupportedOperationException If namespace properties are not 
supported
+   */
+  Map<String, String> loadNamespaceMetadata(String[] namespace) throws 
NoSuchNamespaceException;
+
+  /**
+   * Create a namespace in the catalog.
+   *
+   * @param namespace a multi-part namespace
+   * @param metadata a string map of properties for the given namespace
+   * @throws NamespaceAlreadyExistsException If the namespace already exists
+   * @throws UnsupportedOperationException If create is not a supported 
operation
+   */
+  void createNamespace(
+      String[] namespace,
+      Map<String, String> metadata) throws NamespaceAlreadyExistsException;
+
+  /**
+   * Apply a set of metadata changes to a namespace in the catalog.
+   *
+   * @param namespace a multi-part namespace
+   * @param changes a collection of changes to apply to the namespace
+   * @throws NoSuchNamespaceException If the namespace does not exist 
(optional)
+   * @throws UnsupportedOperationException If namespace properties are not 
supported
+   */
+  void alterNamespace(
+      String[] namespace,
+      NamespaceChange... changes) throws NoSuchNamespaceException;
+
+  /**
+   * Drop a namespace from the catalog.
+   * <p>
+   * This operation may be rejected by the catalog implementation if the 
namespace is not empty by
+   * throwing {@link IllegalStateException}. If the catalog implementation 
does not support this
+   * operation, it may throw {@link UnsupportedOperationException}.
+   *
+   * @param namespace a multi-part namespace
+   * @return true if the namespace was dropped
+   * @throws NoSuchNamespaceException If the namespace does not exist 
(optional)
+   * @throws IllegalStateException If the namespace is not empty
+   * @throws UnsupportedOperationException If drop is not a supported operation
+   */
+  boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException;
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala
index 7cc80c4..cd9bcc0 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala
@@ -22,7 +22,7 @@ import java.util.Collections
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableChange}
+import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, 
NamespaceChange, TableChange}
 import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, 
RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, 
UpdateColumnType}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.sources.v2.Table
@@ -34,6 +34,37 @@ object CatalogV2Util {
   /**
    * Apply properties changes to a map and return the result.
    */
+  def applyNamespaceChanges(
+      properties: Map[String, String],
+      changes: Seq[NamespaceChange]): Map[String, String] = {
+    applyNamespaceChanges(properties.asJava, changes).asScala.toMap
+  }
+
+  /**
+   * Apply properties changes to a Java map and return the result.
+   */
+  def applyNamespaceChanges(
+      properties: util.Map[String, String],
+      changes: Seq[NamespaceChange]): util.Map[String, String] = {
+    val newProperties = new util.HashMap[String, String](properties)
+
+    changes.foreach {
+      case set: NamespaceChange.SetProperty =>
+        newProperties.put(set.property, set.value)
+
+      case unset: NamespaceChange.RemoveProperty =>
+        newProperties.remove(unset.property)
+
+      case _ =>
+      // ignore non-property changes
+    }
+
+    Collections.unmodifiableMap(newProperties)
+  }
+
+  /**
+   * Apply properties changes to a map and return the result.
+   */
   def applyPropertiesChanges(
       properties: Map[String, String],
       changes: Seq[TableChange]): Map[String, String] = {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala
index 9c1b9a3..089b4c5 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala
@@ -23,7 +23,7 @@ import java.util.Collections
 import scala.collection.JavaConverters._
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, 
TableAlreadyExistsException}
+import 
org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, 
NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, 
StringType, StructField, StructType, TimestampType}
@@ -37,13 +37,14 @@ class TableCatalogSuite extends SparkFunSuite {
       .add("id", IntegerType)
       .add("data", StringType)
 
-  private def newCatalog(): TableCatalog = {
+  private def newCatalog(): TableCatalog with SupportsNamespaces = {
     val newCatalog = new TestTableCatalog
     newCatalog.initialize("test", CaseInsensitiveStringMap.empty())
     newCatalog
   }
 
-  private val testIdent = Identifier.of(Array("`", "."), "test_table")
+  private val testNs = Array("`", ".")
+  private val testIdent = Identifier.of(testNs, "test_table")
 
   test("Catalogs can load the catalog") {
     val catalog = newCatalog()
@@ -654,4 +655,201 @@ class TableCatalogSuite extends SparkFunSuite {
     assert(!wasDropped)
     assert(!catalog.tableExists(testIdent))
   }
+
+  test("listNamespaces: list namespaces from metadata") {
+    val catalog = newCatalog()
+    catalog.createNamespace(Array("ns1"), Map("property" -> "value").asJava)
+
+    assert(catalog.listNamespaces === Array(Array("ns1")))
+    assert(catalog.listNamespaces(Array()) === Array(Array("ns1")))
+    assert(catalog.listNamespaces(Array("ns1")) === Array())
+  }
+
+  test("listNamespaces: list namespaces from tables") {
+    val catalog = newCatalog()
+    val ident1 = Identifier.of(Array("ns1", "ns2"), "test_table_1")
+    val ident2 = Identifier.of(Array("ns1", "ns2"), "test_table_2")
+
+    catalog.createTable(ident1, schema, Array.empty, emptyProps)
+    catalog.createTable(ident2, schema, Array.empty, emptyProps)
+
+    assert(catalog.listNamespaces === Array(Array("ns1")))
+    assert(catalog.listNamespaces(Array()) === Array(Array("ns1")))
+    assert(catalog.listNamespaces(Array("ns1")) === Array(Array("ns1", "ns2")))
+    assert(catalog.listNamespaces(Array("ns1", "ns2")) === Array())
+  }
+
+  test("listNamespaces: list namespaces from metadata and tables") {
+    val catalog = newCatalog()
+    val ident1 = Identifier.of(Array("ns1", "ns2"), "test_table_1")
+    val ident2 = Identifier.of(Array("ns1", "ns2"), "test_table_2")
+
+    catalog.createNamespace(Array("ns1"), Map("property" -> "value").asJava)
+    catalog.createTable(ident1, schema, Array.empty, emptyProps)
+    catalog.createTable(ident2, schema, Array.empty, emptyProps)
+
+    assert(catalog.listNamespaces === Array(Array("ns1")))
+    assert(catalog.listNamespaces(Array()) === Array(Array("ns1")))
+    assert(catalog.listNamespaces(Array("ns1")) === Array(Array("ns1", "ns2")))
+    assert(catalog.listNamespaces(Array("ns1", "ns2")) === Array())
+  }
+
+  test("loadNamespaceMetadata: fail if no metadata or tables exist") {
+    val catalog = newCatalog()
+
+    val exc = intercept[NoSuchNamespaceException] {
+      catalog.loadNamespaceMetadata(testNs)
+    }
+
+    assert(exc.getMessage.contains(testNs.quoted))
+  }
+
+  test("loadNamespaceMetadata: no metadata, table exists") {
+    val catalog = newCatalog()
+
+    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    val metadata = catalog.loadNamespaceMetadata(testNs)
+
+    assert(metadata.asScala === Map.empty)
+  }
+
+  test("loadNamespaceMetadata: metadata exists, no tables") {
+    val catalog = newCatalog()
+
+    catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+
+    val metadata = catalog.loadNamespaceMetadata(testNs)
+
+    assert(metadata.asScala === Map("property" -> "value"))
+  }
+
+  test("loadNamespaceMetadata: metadata and table exist") {
+    val catalog = newCatalog()
+
+    catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    val metadata = catalog.loadNamespaceMetadata(testNs)
+
+    assert(metadata.asScala === Map("property" -> "value"))
+  }
+
+  test("createNamespace: basic behavior") {
+    val catalog = newCatalog()
+
+    catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+
+    assert(catalog.namespaceExists(testNs) === true)
+    assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> 
"value"))
+  }
+
+  test("createNamespace: fail if metadata already exists") {
+    val catalog = newCatalog()
+
+    catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+
+    val exc = intercept[NamespaceAlreadyExistsException] {
+      catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+    }
+
+    assert(exc.getMessage.contains(testNs.quoted))
+    assert(catalog.namespaceExists(testNs) === true)
+    assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> 
"value"))
+  }
+
+  test("createNamespace: fail if namespace already exists from table") {
+    val catalog = newCatalog()
+
+    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    assert(catalog.namespaceExists(testNs) === true)
+    assert(catalog.loadNamespaceMetadata(testNs).asScala === Map.empty)
+
+    val exc = intercept[NamespaceAlreadyExistsException] {
+      catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+    }
+
+    assert(exc.getMessage.contains(testNs.quoted))
+    assert(catalog.namespaceExists(testNs) === true)
+    assert(catalog.loadNamespaceMetadata(testNs).asScala === Map.empty)
+  }
+
+  test("dropNamespace: drop missing namespace") {
+    val catalog = newCatalog()
+
+    assert(catalog.namespaceExists(testNs) === false)
+
+    val ret = catalog.dropNamespace(testNs)
+
+    assert(ret === false)
+  }
+
+  test("dropNamespace: drop empty namespace") {
+    val catalog = newCatalog()
+
+    catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+
+    assert(catalog.namespaceExists(testNs) === true)
+    assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> 
"value"))
+
+    val ret = catalog.dropNamespace(testNs)
+
+    assert(ret === true)
+    assert(catalog.namespaceExists(testNs) === false)
+  }
+
+  test("dropNamespace: fail if not empty") {
+    val catalog = newCatalog()
+
+    catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    val exc = intercept[IllegalStateException] {
+      catalog.dropNamespace(testNs)
+    }
+
+    assert(exc.getMessage.contains(testNs.quoted))
+    assert(catalog.namespaceExists(testNs) === true)
+    assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> 
"value"))
+  }
+
+  test("alterNamespace: basic behavior") {
+    val catalog = newCatalog()
+
+    catalog.createNamespace(testNs, Map("property" -> "value").asJava)
+
+    catalog.alterNamespace(testNs, NamespaceChange.setProperty("property2", 
"value2"))
+    assert(catalog.loadNamespaceMetadata(testNs).asScala === Map(
+      "property" -> "value", "property2" -> "value2"))
+
+    catalog.alterNamespace(testNs,
+      NamespaceChange.removeProperty("property2"),
+      NamespaceChange.setProperty("property3", "value3"))
+    assert(catalog.loadNamespaceMetadata(testNs).asScala === Map(
+      "property" -> "value", "property3" -> "value3"))
+
+    catalog.alterNamespace(testNs, NamespaceChange.removeProperty("property3"))
+    assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> 
"value"))
+  }
+
+  test("alterNamespace: create metadata if missing and table exists") {
+    val catalog = newCatalog()
+
+    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    catalog.alterNamespace(testNs, NamespaceChange.setProperty("property", 
"value"))
+
+    assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> 
"value"))
+  }
+
+  test("alterNamespace: fail if no metadata or table exists") {
+    val catalog = newCatalog()
+
+    val exc = intercept[NoSuchNamespaceException] {
+      catalog.alterNamespace(testNs, NamespaceChange.setProperty("property", 
"value"))
+    }
+
+    assert(exc.getMessage.contains(testNs.quoted))
+  }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala
index 6ba140f..6fdd6e3 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala
@@ -24,14 +24,18 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.catalog.v2.expressions.Transform
 import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util
-import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, 
TableAlreadyExistsException}
+import 
org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, 
NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
 import org.apache.spark.sql.sources.v2.{Table, TableCapability}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-class TestTableCatalog extends TableCatalog {
+class TestTableCatalog extends TableCatalog with SupportsNamespaces {
   import CatalogV2Implicits._
 
+  override val defaultNamespace: Array[String] = Array()
+  protected val namespaces: util.Map[List[String], Map[String, String]] =
+    new ConcurrentHashMap[List[String], Map[String, String]]()
+
   private val tables: util.Map[Identifier, Table] = new 
ConcurrentHashMap[Identifier, Table]()
   private var _name: Option[String] = None
 
@@ -88,6 +92,68 @@ class TestTableCatalog extends TableCatalog {
   }
 
   override def dropTable(ident: Identifier): Boolean = 
Option(tables.remove(ident)).isDefined
+
+  private def allNamespaces: Seq[Seq[String]] = {
+    (tables.keySet.asScala.map(_.namespace.toSeq) ++ 
namespaces.keySet.asScala).toSeq.distinct
+  }
+
+  override def namespaceExists(namespace: Array[String]): Boolean = {
+    allNamespaces.exists(_.startsWith(namespace))
+  }
+
+  override def listNamespaces: Array[Array[String]] = {
+    allNamespaces.map(_.head).distinct.map(Array(_)).toArray
+  }
+
+  override def listNamespaces(namespace: Array[String]): Array[Array[String]] 
= {
+    allNamespaces
+        .filter(_.size > namespace.length)
+        .filter(_.startsWith(namespace))
+        .map(_.take(namespace.length + 1))
+        .distinct
+        .map(_.toArray)
+        .toArray
+  }
+
+  override def loadNamespaceMetadata(namespace: Array[String]): 
util.Map[String, String] = {
+    Option(namespaces.get(namespace.toSeq)) match {
+      case Some(metadata) =>
+        metadata.asJava
+      case _ if namespaceExists(namespace) =>
+        util.Collections.emptyMap[String, String]
+      case _ =>
+        throw new NoSuchNamespaceException(namespace)
+    }
+  }
+
+  override def createNamespace(
+      namespace: Array[String],
+      metadata: util.Map[String, String]): Unit = {
+    if (namespaceExists(namespace)) {
+      throw new NamespaceAlreadyExistsException(namespace)
+    }
+
+    Option(namespaces.putIfAbsent(namespace.toList, metadata.asScala.toMap)) 
match {
+      case Some(_) =>
+        throw new NamespaceAlreadyExistsException(namespace)
+      case _ =>
+        // created successfully
+    }
+  }
+
+  override def alterNamespace(
+      namespace: Array[String],
+      changes: NamespaceChange*): Unit = {
+    val metadata = loadNamespaceMetadata(namespace).asScala.toMap
+    namespaces.put(namespace.toList, 
CatalogV2Util.applyNamespaceChanges(metadata, changes))
+  }
+
+  override def dropNamespace(namespace: Array[String]): Boolean = {
+    if (listTables(namespace).nonEmpty) {
+      throw new IllegalStateException(s"Cannot delete non-empty namespace: 
${namespace.quoted}")
+    }
+    Option(namespaces.remove(namespace.toList)).isDefined
+  }
 }
 
 case class InMemoryTable(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to