This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6170e40  [SPARK-24252][SQL] Add v2 catalog plugin system
6170e40 is described below

commit 6170e40c152a9fa9cca32318dc6bcf9d298723b7
Author: Ryan Blue <[email protected]>
AuthorDate: Fri Mar 8 19:31:49 2019 +0800

    [SPARK-24252][SQL] Add v2 catalog plugin system
    
    ## What changes were proposed in this pull request?
    
    This adds a v2 API for adding new catalog plugins to Spark.
    
    * Catalog implementations extend `CatalogPlugin` and are loaded via 
reflection, similar to data sources
    * `Catalogs` loads and initializes catalogs using configuration from a 
`SQLConf`
    * `CaseInsensitiveStringMap` is used to pass configuration to 
`CatalogPlugin` via `initialize`
    
    Catalogs are configured by adding config properties starting with 
`spark.sql.catalog.(name)`. The name property must specify a class that 
implements `CatalogPlugin`. Other properties under the namespace 
(`spark.sql.catalog.(name).(prop)`) are passed to the provider during 
initialization along with the catalog name.
    
    This replaces #21306, which will be implemented in two multiple parts: the 
catalog plugin system (this commit) and specific catalog APIs, like 
`TableCatalog`.
    
    ## How was this patch tested?
    
    Added test suites for `CaseInsensitiveStringMap` and for catalog loading.
    
    Closes #23915 from rdblue/SPARK-24252-add-v2-catalog-plugins.
    
    Authored-by: Ryan Blue <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../apache/spark/sql/catalog/v2/CatalogPlugin.java |  61 ++++++
 .../org/apache/spark/sql/catalog/v2/Catalogs.java  | 109 +++++++++++
 .../spark/sql/util/CaseInsensitiveStringMap.java   | 110 +++++++++++
 .../spark/sql/catalog/v2/CatalogLoadingSuite.java  | 208 +++++++++++++++++++++
 .../sql/util/CaseInsensitiveStringMapSuite.java    |  48 +++++
 .../scala/org/apache/spark/sql/SparkSession.scala  |   8 +
 6 files changed, 544 insertions(+)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogPlugin.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogPlugin.java
new file mode 100644
index 0000000..5d4995a
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogPlugin.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * A marker interface to provide a catalog implementation for Spark.
+ * <p>
+ * Implementations can provide catalog functions by implementing additional 
interfaces for tables,
+ * views, and functions.
+ * <p>
+ * Catalog implementations must implement this marker interface to be loaded by
+ * {@link Catalogs#load(String, SQLConf)}. The loader will instantiate catalog 
classes using the
+ * required public no-arg constructor. After creating an instance, it will be 
configured by calling
+ * {@link #initialize(String, CaseInsensitiveStringMap)}.
+ * <p>
+ * Catalog implementations are registered to a name by adding a configuration 
option to Spark:
+ * {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All 
configuration properties
+ * in the Spark configuration that share the catalog name prefix,
+ * {@code spark.sql.catalog.catalog-name.(key)=(value)} will be passed in the 
case insensitive
+ * string map of options in initialization with the prefix removed.
+ * {@code name}, is also passed and is the catalog's name; in this case, 
"catalog-name".
+ */
+@Experimental
+public interface CatalogPlugin {
+  /**
+   * Called to initialize configuration.
+   * <p>
+   * This method is called once, just after the provider is instantiated.
+   *
+   * @param name the name used to identify and load this catalog
+   * @param options a case-insensitive string map of configuration
+   */
+  void initialize(String name, CaseInsensitiveStringMap options);
+
+  /**
+   * Called to get this catalog's name.
+   * <p>
+   * This method is only called after {@link #initialize(String, 
CaseInsensitiveStringMap)} is
+   * called to pass the catalog's name.
+   */
+  String name();
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java
new file mode 100644
index 0000000..efae266
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.SparkException;
+import org.apache.spark.annotation.Private;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.apache.spark.util.Utils;
+
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static scala.collection.JavaConverters.mapAsJavaMapConverter;
+
+@Private
+public class Catalogs {
+  private Catalogs() {
+  }
+
+  /**
+   * Load and configure a catalog by name.
+   * <p>
+   * This loads, instantiates, and initializes the catalog plugin for each 
call; it does not cache
+   * or reuse instances.
+   *
+   * @param name a String catalog name
+   * @param conf a SQLConf
+   * @return an initialized CatalogPlugin
+   * @throws SparkException If the plugin class cannot be found or instantiated
+   */
+  public static CatalogPlugin load(String name, SQLConf conf) throws 
SparkException {
+    String pluginClassName = conf.getConfString("spark.sql.catalog." + name, 
null);
+    if (pluginClassName == null) {
+      throw new SparkException(String.format(
+          "Catalog '%s' plugin class not found: spark.sql.catalog.%s is not 
defined", name, name));
+    }
+
+    ClassLoader loader = Utils.getContextOrSparkClassLoader();
+
+    try {
+      Class<?> pluginClass = loader.loadClass(pluginClassName);
+
+      if (!CatalogPlugin.class.isAssignableFrom(pluginClass)) {
+        throw new SparkException(String.format(
+            "Plugin class for catalog '%s' does not implement CatalogPlugin: 
%s",
+            name, pluginClassName));
+      }
+
+      CatalogPlugin plugin = 
CatalogPlugin.class.cast(pluginClass.newInstance());
+
+      plugin.initialize(name, catalogOptions(name, conf));
+
+      return plugin;
+
+    } catch (ClassNotFoundException e) {
+      throw new SparkException(String.format(
+          "Cannot find catalog plugin class for catalog '%s': %s", name, 
pluginClassName));
+
+    } catch (IllegalAccessException e) {
+      throw new SparkException(String.format(
+          "Failed to call public no-arg constructor for catalog '%s': %s", 
name, pluginClassName),
+          e);
+
+    } catch (InstantiationException e) {
+      throw new SparkException(String.format(
+          "Failed while instantiating plugin for catalog '%s': %s", name, 
pluginClassName),
+          e.getCause());
+    }
+  }
+
+  /**
+   * Extracts a named catalog's configuration from a SQLConf.
+   *
+   * @param name a catalog name
+   * @param conf a SQLConf
+   * @return a case insensitive string map of options starting with 
spark.sql.catalog.(name).
+   */
+  private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf 
conf) {
+    Map<String, String> allConfs = 
mapAsJavaMapConverter(conf.getAllConfs()).asJava();
+    Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + 
"\\.(.+)");
+
+    CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty();
+    for (Map.Entry<String, String> entry : allConfs.entrySet()) {
+      Matcher matcher = prefix.matcher(entry.getKey());
+      if (matcher.matches() && matcher.groupCount() > 0) {
+        options.put(matcher.group(1), entry.getValue());
+      }
+    }
+
+    return options;
+  }
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
new file mode 100644
index 0000000..8c5a6c6
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util;
+
+import org.apache.spark.annotation.Experimental;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Case-insensitive map of string keys to string values.
+ * <p>
+ * This is used to pass options to v2 implementations to ensure consistent 
case insensitivity.
+ * <p>
+ * Methods that return keys in this map, like {@link #entrySet()} and {@link 
#keySet()}, return
+ * keys converted to lower case.
+ */
+@Experimental
+public class CaseInsensitiveStringMap implements Map<String, String> {
+
+  public static CaseInsensitiveStringMap empty() {
+    return new CaseInsensitiveStringMap();
+  }
+
+  private final Map<String, String> delegate;
+
+  private CaseInsensitiveStringMap() {
+    this.delegate = new HashMap<>();
+  }
+
+  @Override
+  public int size() {
+    return delegate.size();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return delegate.isEmpty();
+  }
+
+  @Override
+  public boolean containsKey(Object key) {
+    return delegate.containsKey(key.toString().toLowerCase(Locale.ROOT));
+  }
+
+  @Override
+  public boolean containsValue(Object value) {
+    return delegate.containsValue(value);
+  }
+
+  @Override
+  public String get(Object key) {
+    return delegate.get(key.toString().toLowerCase(Locale.ROOT));
+  }
+
+  @Override
+  public String put(String key, String value) {
+    return delegate.put(key.toLowerCase(Locale.ROOT), value);
+  }
+
+  @Override
+  public String remove(Object key) {
+    return delegate.remove(key.toString().toLowerCase(Locale.ROOT));
+  }
+
+  @Override
+  public void putAll(Map<? extends String, ? extends String> m) {
+    for (Map.Entry<? extends String, ? extends String> entry : m.entrySet()) {
+      put(entry.getKey(), entry.getValue());
+    }
+  }
+
+  @Override
+  public void clear() {
+    delegate.clear();
+  }
+
+  @Override
+  public Set<String> keySet() {
+    return delegate.keySet();
+  }
+
+  @Override
+  public Collection<String> values() {
+    return delegate.values();
+  }
+
+  @Override
+  public Set<Map.Entry<String, String>> entrySet() {
+    return delegate.entrySet();
+  }
+}
diff --git 
a/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogLoadingSuite.java
 
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogLoadingSuite.java
new file mode 100644
index 0000000..2f55da8
--- /dev/null
+++ 
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogLoadingSuite.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.SparkException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.Callable;
+
+public class CatalogLoadingSuite {
+  @Test
+  public void testLoad() throws SparkException {
+    SQLConf conf = new SQLConf();
+    conf.setConfString("spark.sql.catalog.test-name", 
TestCatalogPlugin.class.getCanonicalName());
+
+    CatalogPlugin plugin = Catalogs.load("test-name", conf);
+    Assert.assertNotNull("Should instantiate a non-null plugin", plugin);
+    Assert.assertEquals("Plugin should have correct implementation",
+        TestCatalogPlugin.class, plugin.getClass());
+
+    TestCatalogPlugin testPlugin = (TestCatalogPlugin) plugin;
+    Assert.assertEquals("Options should contain no keys", 0, 
testPlugin.options.size());
+    Assert.assertEquals("Catalog should have correct name", "test-name", 
testPlugin.name());
+  }
+
+  @Test
+  public void testInitializationOptions() throws SparkException {
+    SQLConf conf = new SQLConf();
+    conf.setConfString("spark.sql.catalog.test-name", 
TestCatalogPlugin.class.getCanonicalName());
+    conf.setConfString("spark.sql.catalog.test-name.name", "not-catalog-name");
+    conf.setConfString("spark.sql.catalog.test-name.kEy", "valUE");
+
+    CatalogPlugin plugin = Catalogs.load("test-name", conf);
+    Assert.assertNotNull("Should instantiate a non-null plugin", plugin);
+    Assert.assertEquals("Plugin should have correct implementation",
+        TestCatalogPlugin.class, plugin.getClass());
+
+    TestCatalogPlugin testPlugin = (TestCatalogPlugin) plugin;
+
+    Assert.assertEquals("Options should contain only two keys", 2, 
testPlugin.options.size());
+    Assert.assertEquals("Options should contain correct value for name (not 
overwritten)",
+        "not-catalog-name", testPlugin.options.get("name"));
+    Assert.assertEquals("Options should contain correct value for key",
+        "valUE", testPlugin.options.get("key"));
+  }
+
+  @Test
+  public void testLoadWithoutConfig() {
+    SQLConf conf = new SQLConf();
+
+    SparkException exc = intercept(SparkException.class, () -> 
Catalogs.load("missing", conf));
+
+    Assert.assertTrue("Should complain that implementation is not configured",
+        exc.getMessage()
+            .contains("plugin class not found: spark.sql.catalog.missing is 
not defined"));
+    Assert.assertTrue("Should identify the catalog by name",
+        exc.getMessage().contains("missing"));
+  }
+
+  @Test
+  public void testLoadMissingClass() {
+    SQLConf conf = new SQLConf();
+    conf.setConfString("spark.sql.catalog.missing", 
"com.example.NoSuchCatalogPlugin");
+
+    SparkException exc = intercept(SparkException.class, () -> 
Catalogs.load("missing", conf));
+
+    Assert.assertTrue("Should complain that the class is not found",
+        exc.getMessage().contains("Cannot find catalog plugin class"));
+    Assert.assertTrue("Should identify the catalog by name",
+        exc.getMessage().contains("missing"));
+    Assert.assertTrue("Should identify the missing class",
+        exc.getMessage().contains("com.example.NoSuchCatalogPlugin"));
+  }
+
+  @Test
+  public void testLoadNonCatalogPlugin() {
+    SQLConf conf = new SQLConf();
+    String invalidClassName = InvalidCatalogPlugin.class.getCanonicalName();
+    conf.setConfString("spark.sql.catalog.invalid", invalidClassName);
+
+    SparkException exc = intercept(SparkException.class, () -> 
Catalogs.load("invalid", conf));
+
+    Assert.assertTrue("Should complain that class does not implement 
CatalogPlugin",
+        exc.getMessage().contains("does not implement CatalogPlugin"));
+    Assert.assertTrue("Should identify the catalog by name",
+        exc.getMessage().contains("invalid"));
+    Assert.assertTrue("Should identify the class",
+        exc.getMessage().contains(invalidClassName));
+  }
+
+  @Test
+  public void testLoadConstructorFailureCatalogPlugin() {
+    SQLConf conf = new SQLConf();
+    String invalidClassName = 
ConstructorFailureCatalogPlugin.class.getCanonicalName();
+    conf.setConfString("spark.sql.catalog.invalid", invalidClassName);
+
+    RuntimeException exc = intercept(RuntimeException.class, () -> 
Catalogs.load("invalid", conf));
+
+    Assert.assertTrue("Should have expected error message",
+        exc.getMessage().contains("Expected failure"));
+  }
+
+  @Test
+  public void testLoadAccessErrorCatalogPlugin() {
+    SQLConf conf = new SQLConf();
+    String invalidClassName = 
AccessErrorCatalogPlugin.class.getCanonicalName();
+    conf.setConfString("spark.sql.catalog.invalid", invalidClassName);
+
+    SparkException exc = intercept(SparkException.class, () -> 
Catalogs.load("invalid", conf));
+
+    Assert.assertTrue("Should complain that no public constructor is provided",
+        exc.getMessage().contains("Failed to call public no-arg constructor 
for catalog"));
+    Assert.assertTrue("Should identify the catalog by name",
+        exc.getMessage().contains("invalid"));
+    Assert.assertTrue("Should identify the class",
+        exc.getMessage().contains(invalidClassName));
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <E extends Exception> E intercept(Class<E> expected, 
Callable<?> callable) {
+    try {
+      callable.call();
+      Assert.fail("No exception was thrown, expected: " +
+          expected.getName());
+    } catch (Exception actual) {
+      try {
+        Assert.assertEquals(expected, actual.getClass());
+        return (E) actual;
+      } catch (AssertionError e) {
+        e.addSuppressed(actual);
+        throw e;
+      }
+    }
+    // Compiler doesn't catch that Assert.fail will always throw an exception.
+    throw new UnsupportedOperationException("[BUG] Should not reach this 
statement");
+  }
+}
+
+class TestCatalogPlugin implements CatalogPlugin {
+  String name = null;
+  CaseInsensitiveStringMap options = null;
+
+  TestCatalogPlugin() {
+  }
+
+  @Override
+  public void initialize(String name, CaseInsensitiveStringMap options) {
+    this.name = name;
+    this.options = options;
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+}
+
+class ConstructorFailureCatalogPlugin implements CatalogPlugin { // fails in 
its constructor
+  ConstructorFailureCatalogPlugin() {
+    throw new RuntimeException("Expected failure.");
+  }
+
+  @Override
+  public void initialize(String name, CaseInsensitiveStringMap options) {
+  }
+
+  @Override
+  public String name() {
+    return null;
+  }
+}
+
+class AccessErrorCatalogPlugin implements CatalogPlugin { // no public 
constructor
+  private AccessErrorCatalogPlugin() {
+  }
+
+  @Override
+  public void initialize(String name, CaseInsensitiveStringMap options) {
+  }
+
+  @Override
+  public String name() {
+    return null;
+  }
+}
+
+class InvalidCatalogPlugin { // doesn't implement CatalogPlugin
+  public void initialize(CaseInsensitiveStringMap options) {
+  }
+}
diff --git 
a/sql/catalyst/src/test/java/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.java
 
b/sql/catalyst/src/test/java/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.java
new file mode 100644
index 0000000..7639277
--- /dev/null
+++ 
b/sql/catalyst/src/test/java/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class CaseInsensitiveStringMapSuite {
+  @Test
+  public void testPutAndGet() {
+    CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty();
+    options.put("kEy", "valUE");
+
+    Assert.assertEquals("Should return correct value for lower-case key",
+        "valUE", options.get("key"));
+    Assert.assertEquals("Should return correct value for upper-case key",
+        "valUE", options.get("KEY"));
+  }
+
+  @Test
+  public void testKeySet() {
+    CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty();
+    options.put("kEy", "valUE");
+
+    Set<String> expectedKeySet = new HashSet<>();
+    expectedKeySet.add("key");
+
+    Assert.assertEquals("Should return lower-case key set", expectedKeySet, 
options.keySet());
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index f6fab76..5208b9a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit._
 import java.util.concurrent.atomic.AtomicReference
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.reflect.runtime.universe.TypeTag
 import scala.util.control.NonFatal
 
@@ -32,6 +33,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.catalog.Catalog
+import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Catalogs}
 import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.encoders._
@@ -620,6 +622,12 @@ class SparkSession private(
    */
   @transient lazy val catalog: Catalog = new CatalogImpl(self)
 
+  @transient private lazy val catalogs = new mutable.HashMap[String, 
CatalogPlugin]()
+
+  private[sql] def catalog(name: String): CatalogPlugin = synchronized {
+    catalogs.getOrElseUpdate(name, Catalogs.load(name, sessionState.conf))
+  }
+
   /**
    * Returns the specified table/view as a `DataFrame`.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to