This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6170e40 [SPARK-24252][SQL] Add v2 catalog plugin system
6170e40 is described below
commit 6170e40c152a9fa9cca32318dc6bcf9d298723b7
Author: Ryan Blue <[email protected]>
AuthorDate: Fri Mar 8 19:31:49 2019 +0800
[SPARK-24252][SQL] Add v2 catalog plugin system
## What changes were proposed in this pull request?
This adds a v2 API for adding new catalog plugins to Spark.
* Catalog implementations extend `CatalogPlugin` and are loaded via
reflection, similar to data sources
* `Catalogs` loads and initializes catalogs using configuration from a
`SQLConf`
* `CaseInsensitiveStringMap` is used to pass configuration to
`CatalogPlugin` via `initialize`
Catalogs are configured by adding config properties starting with
`spark.sql.catalog.(name)`. The name property must specify a class that
implements `CatalogPlugin`. Other properties under the namespace
(`spark.sql.catalog.(name).(prop)`) are passed to the provider during
initialization along with the catalog name.
This replaces #21306, which will be implemented in two multiple parts: the
catalog plugin system (this commit) and specific catalog APIs, like
`TableCatalog`.
## How was this patch tested?
Added test suites for `CaseInsensitiveStringMap` and for catalog loading.
Closes #23915 from rdblue/SPARK-24252-add-v2-catalog-plugins.
Authored-by: Ryan Blue <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../apache/spark/sql/catalog/v2/CatalogPlugin.java | 61 ++++++
.../org/apache/spark/sql/catalog/v2/Catalogs.java | 109 +++++++++++
.../spark/sql/util/CaseInsensitiveStringMap.java | 110 +++++++++++
.../spark/sql/catalog/v2/CatalogLoadingSuite.java | 208 +++++++++++++++++++++
.../sql/util/CaseInsensitiveStringMapSuite.java | 48 +++++
.../scala/org/apache/spark/sql/SparkSession.scala | 8 +
6 files changed, 544 insertions(+)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogPlugin.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogPlugin.java
new file mode 100644
index 0000000..5d4995a
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogPlugin.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * A marker interface to provide a catalog implementation for Spark.
+ * <p>
+ * Implementations can provide catalog functions by implementing additional
interfaces for tables,
+ * views, and functions.
+ * <p>
+ * Catalog implementations must implement this marker interface to be loaded by
+ * {@link Catalogs#load(String, SQLConf)}. The loader will instantiate catalog
classes using the
+ * required public no-arg constructor. After creating an instance, it will be
configured by calling
+ * {@link #initialize(String, CaseInsensitiveStringMap)}.
+ * <p>
+ * Catalog implementations are registered to a name by adding a configuration
option to Spark:
+ * {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All
configuration properties
+ * in the Spark configuration that share the catalog name prefix,
+ * {@code spark.sql.catalog.catalog-name.(key)=(value)} will be passed in the
case insensitive
+ * string map of options in initialization with the prefix removed.
+ * {@code name}, is also passed and is the catalog's name; in this case,
"catalog-name".
+ */
+@Experimental
+public interface CatalogPlugin {
+ /**
+ * Called to initialize configuration.
+ * <p>
+ * This method is called once, just after the provider is instantiated.
+ *
+ * @param name the name used to identify and load this catalog
+ * @param options a case-insensitive string map of configuration
+ */
+ void initialize(String name, CaseInsensitiveStringMap options);
+
+ /**
+ * Called to get this catalog's name.
+ * <p>
+ * This method is only called after {@link #initialize(String,
CaseInsensitiveStringMap)} is
+ * called to pass the catalog's name.
+ */
+ String name();
+}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java
new file mode 100644
index 0000000..efae266
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.SparkException;
+import org.apache.spark.annotation.Private;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.apache.spark.util.Utils;
+
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static scala.collection.JavaConverters.mapAsJavaMapConverter;
+
+@Private
+public class Catalogs {
+ private Catalogs() {
+ }
+
+ /**
+ * Load and configure a catalog by name.
+ * <p>
+ * This loads, instantiates, and initializes the catalog plugin for each
call; it does not cache
+ * or reuse instances.
+ *
+ * @param name a String catalog name
+ * @param conf a SQLConf
+ * @return an initialized CatalogPlugin
+ * @throws SparkException If the plugin class cannot be found or instantiated
+ */
+ public static CatalogPlugin load(String name, SQLConf conf) throws
SparkException {
+ String pluginClassName = conf.getConfString("spark.sql.catalog." + name,
null);
+ if (pluginClassName == null) {
+ throw new SparkException(String.format(
+ "Catalog '%s' plugin class not found: spark.sql.catalog.%s is not
defined", name, name));
+ }
+
+ ClassLoader loader = Utils.getContextOrSparkClassLoader();
+
+ try {
+ Class<?> pluginClass = loader.loadClass(pluginClassName);
+
+ if (!CatalogPlugin.class.isAssignableFrom(pluginClass)) {
+ throw new SparkException(String.format(
+ "Plugin class for catalog '%s' does not implement CatalogPlugin:
%s",
+ name, pluginClassName));
+ }
+
+ CatalogPlugin plugin =
CatalogPlugin.class.cast(pluginClass.newInstance());
+
+ plugin.initialize(name, catalogOptions(name, conf));
+
+ return plugin;
+
+ } catch (ClassNotFoundException e) {
+ throw new SparkException(String.format(
+ "Cannot find catalog plugin class for catalog '%s': %s", name,
pluginClassName));
+
+ } catch (IllegalAccessException e) {
+ throw new SparkException(String.format(
+ "Failed to call public no-arg constructor for catalog '%s': %s",
name, pluginClassName),
+ e);
+
+ } catch (InstantiationException e) {
+ throw new SparkException(String.format(
+ "Failed while instantiating plugin for catalog '%s': %s", name,
pluginClassName),
+ e.getCause());
+ }
+ }
+
+ /**
+ * Extracts a named catalog's configuration from a SQLConf.
+ *
+ * @param name a catalog name
+ * @param conf a SQLConf
+ * @return a case insensitive string map of options starting with
spark.sql.catalog.(name).
+ */
+ private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf
conf) {
+ Map<String, String> allConfs =
mapAsJavaMapConverter(conf.getAllConfs()).asJava();
+ Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name +
"\\.(.+)");
+
+ CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty();
+ for (Map.Entry<String, String> entry : allConfs.entrySet()) {
+ Matcher matcher = prefix.matcher(entry.getKey());
+ if (matcher.matches() && matcher.groupCount() > 0) {
+ options.put(matcher.group(1), entry.getValue());
+ }
+ }
+
+ return options;
+ }
+}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
new file mode 100644
index 0000000..8c5a6c6
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util;
+
+import org.apache.spark.annotation.Experimental;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Case-insensitive map of string keys to string values.
+ * <p>
+ * This is used to pass options to v2 implementations to ensure consistent
case insensitivity.
+ * <p>
+ * Methods that return keys in this map, like {@link #entrySet()} and {@link
#keySet()}, return
+ * keys converted to lower case.
+ */
+@Experimental
+public class CaseInsensitiveStringMap implements Map<String, String> {
+
+ public static CaseInsensitiveStringMap empty() {
+ return new CaseInsensitiveStringMap();
+ }
+
+ private final Map<String, String> delegate;
+
+ private CaseInsensitiveStringMap() {
+ this.delegate = new HashMap<>();
+ }
+
+ @Override
+ public int size() {
+ return delegate.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return delegate.isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return delegate.containsKey(key.toString().toLowerCase(Locale.ROOT));
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ return delegate.containsValue(value);
+ }
+
+ @Override
+ public String get(Object key) {
+ return delegate.get(key.toString().toLowerCase(Locale.ROOT));
+ }
+
+ @Override
+ public String put(String key, String value) {
+ return delegate.put(key.toLowerCase(Locale.ROOT), value);
+ }
+
+ @Override
+ public String remove(Object key) {
+ return delegate.remove(key.toString().toLowerCase(Locale.ROOT));
+ }
+
+ @Override
+ public void putAll(Map<? extends String, ? extends String> m) {
+ for (Map.Entry<? extends String, ? extends String> entry : m.entrySet()) {
+ put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public void clear() {
+ delegate.clear();
+ }
+
+ @Override
+ public Set<String> keySet() {
+ return delegate.keySet();
+ }
+
+ @Override
+ public Collection<String> values() {
+ return delegate.values();
+ }
+
+ @Override
+ public Set<Map.Entry<String, String>> entrySet() {
+ return delegate.entrySet();
+ }
+}
diff --git
a/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogLoadingSuite.java
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogLoadingSuite.java
new file mode 100644
index 0000000..2f55da8
--- /dev/null
+++
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogLoadingSuite.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalog.v2;
+
+import org.apache.spark.SparkException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.Callable;
+
+public class CatalogLoadingSuite {
+ @Test
+ public void testLoad() throws SparkException {
+ SQLConf conf = new SQLConf();
+ conf.setConfString("spark.sql.catalog.test-name",
TestCatalogPlugin.class.getCanonicalName());
+
+ CatalogPlugin plugin = Catalogs.load("test-name", conf);
+ Assert.assertNotNull("Should instantiate a non-null plugin", plugin);
+ Assert.assertEquals("Plugin should have correct implementation",
+ TestCatalogPlugin.class, plugin.getClass());
+
+ TestCatalogPlugin testPlugin = (TestCatalogPlugin) plugin;
+ Assert.assertEquals("Options should contain no keys", 0,
testPlugin.options.size());
+ Assert.assertEquals("Catalog should have correct name", "test-name",
testPlugin.name());
+ }
+
+ @Test
+ public void testInitializationOptions() throws SparkException {
+ SQLConf conf = new SQLConf();
+ conf.setConfString("spark.sql.catalog.test-name",
TestCatalogPlugin.class.getCanonicalName());
+ conf.setConfString("spark.sql.catalog.test-name.name", "not-catalog-name");
+ conf.setConfString("spark.sql.catalog.test-name.kEy", "valUE");
+
+ CatalogPlugin plugin = Catalogs.load("test-name", conf);
+ Assert.assertNotNull("Should instantiate a non-null plugin", plugin);
+ Assert.assertEquals("Plugin should have correct implementation",
+ TestCatalogPlugin.class, plugin.getClass());
+
+ TestCatalogPlugin testPlugin = (TestCatalogPlugin) plugin;
+
+ Assert.assertEquals("Options should contain only two keys", 2,
testPlugin.options.size());
+ Assert.assertEquals("Options should contain correct value for name (not
overwritten)",
+ "not-catalog-name", testPlugin.options.get("name"));
+ Assert.assertEquals("Options should contain correct value for key",
+ "valUE", testPlugin.options.get("key"));
+ }
+
+ @Test
+ public void testLoadWithoutConfig() {
+ SQLConf conf = new SQLConf();
+
+ SparkException exc = intercept(SparkException.class, () ->
Catalogs.load("missing", conf));
+
+ Assert.assertTrue("Should complain that implementation is not configured",
+ exc.getMessage()
+ .contains("plugin class not found: spark.sql.catalog.missing is
not defined"));
+ Assert.assertTrue("Should identify the catalog by name",
+ exc.getMessage().contains("missing"));
+ }
+
+ @Test
+ public void testLoadMissingClass() {
+ SQLConf conf = new SQLConf();
+ conf.setConfString("spark.sql.catalog.missing",
"com.example.NoSuchCatalogPlugin");
+
+ SparkException exc = intercept(SparkException.class, () ->
Catalogs.load("missing", conf));
+
+ Assert.assertTrue("Should complain that the class is not found",
+ exc.getMessage().contains("Cannot find catalog plugin class"));
+ Assert.assertTrue("Should identify the catalog by name",
+ exc.getMessage().contains("missing"));
+ Assert.assertTrue("Should identify the missing class",
+ exc.getMessage().contains("com.example.NoSuchCatalogPlugin"));
+ }
+
+ @Test
+ public void testLoadNonCatalogPlugin() {
+ SQLConf conf = new SQLConf();
+ String invalidClassName = InvalidCatalogPlugin.class.getCanonicalName();
+ conf.setConfString("spark.sql.catalog.invalid", invalidClassName);
+
+ SparkException exc = intercept(SparkException.class, () ->
Catalogs.load("invalid", conf));
+
+ Assert.assertTrue("Should complain that class does not implement
CatalogPlugin",
+ exc.getMessage().contains("does not implement CatalogPlugin"));
+ Assert.assertTrue("Should identify the catalog by name",
+ exc.getMessage().contains("invalid"));
+ Assert.assertTrue("Should identify the class",
+ exc.getMessage().contains(invalidClassName));
+ }
+
+ @Test
+ public void testLoadConstructorFailureCatalogPlugin() {
+ SQLConf conf = new SQLConf();
+ String invalidClassName =
ConstructorFailureCatalogPlugin.class.getCanonicalName();
+ conf.setConfString("spark.sql.catalog.invalid", invalidClassName);
+
+ RuntimeException exc = intercept(RuntimeException.class, () ->
Catalogs.load("invalid", conf));
+
+ Assert.assertTrue("Should have expected error message",
+ exc.getMessage().contains("Expected failure"));
+ }
+
+ @Test
+ public void testLoadAccessErrorCatalogPlugin() {
+ SQLConf conf = new SQLConf();
+ String invalidClassName =
AccessErrorCatalogPlugin.class.getCanonicalName();
+ conf.setConfString("spark.sql.catalog.invalid", invalidClassName);
+
+ SparkException exc = intercept(SparkException.class, () ->
Catalogs.load("invalid", conf));
+
+ Assert.assertTrue("Should complain that no public constructor is provided",
+ exc.getMessage().contains("Failed to call public no-arg constructor
for catalog"));
+ Assert.assertTrue("Should identify the catalog by name",
+ exc.getMessage().contains("invalid"));
+ Assert.assertTrue("Should identify the class",
+ exc.getMessage().contains(invalidClassName));
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <E extends Exception> E intercept(Class<E> expected,
Callable<?> callable) {
+ try {
+ callable.call();
+ Assert.fail("No exception was thrown, expected: " +
+ expected.getName());
+ } catch (Exception actual) {
+ try {
+ Assert.assertEquals(expected, actual.getClass());
+ return (E) actual;
+ } catch (AssertionError e) {
+ e.addSuppressed(actual);
+ throw e;
+ }
+ }
+ // Compiler doesn't catch that Assert.fail will always throw an exception.
+ throw new UnsupportedOperationException("[BUG] Should not reach this
statement");
+ }
+}
+
+class TestCatalogPlugin implements CatalogPlugin {
+ String name = null;
+ CaseInsensitiveStringMap options = null;
+
+ TestCatalogPlugin() {
+ }
+
+ @Override
+ public void initialize(String name, CaseInsensitiveStringMap options) {
+ this.name = name;
+ this.options = options;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+}
+
+class ConstructorFailureCatalogPlugin implements CatalogPlugin { // fails in
its constructor
+ ConstructorFailureCatalogPlugin() {
+ throw new RuntimeException("Expected failure.");
+ }
+
+ @Override
+ public void initialize(String name, CaseInsensitiveStringMap options) {
+ }
+
+ @Override
+ public String name() {
+ return null;
+ }
+}
+
+class AccessErrorCatalogPlugin implements CatalogPlugin { // no public
constructor
+ private AccessErrorCatalogPlugin() {
+ }
+
+ @Override
+ public void initialize(String name, CaseInsensitiveStringMap options) {
+ }
+
+ @Override
+ public String name() {
+ return null;
+ }
+}
+
+class InvalidCatalogPlugin { // doesn't implement CatalogPlugin
+ public void initialize(CaseInsensitiveStringMap options) {
+ }
+}
diff --git
a/sql/catalyst/src/test/java/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.java
b/sql/catalyst/src/test/java/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.java
new file mode 100644
index 0000000..7639277
--- /dev/null
+++
b/sql/catalyst/src/test/java/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class CaseInsensitiveStringMapSuite {
+ @Test
+ public void testPutAndGet() {
+ CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty();
+ options.put("kEy", "valUE");
+
+ Assert.assertEquals("Should return correct value for lower-case key",
+ "valUE", options.get("key"));
+ Assert.assertEquals("Should return correct value for upper-case key",
+ "valUE", options.get("KEY"));
+ }
+
+ @Test
+ public void testKeySet() {
+ CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty();
+ options.put("kEy", "valUE");
+
+ Set<String> expectedKeySet = new HashSet<>();
+ expectedKeySet.add("key");
+
+ Assert.assertEquals("Should return lower-case key set", expectedKeySet,
options.keySet());
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index f6fab76..5208b9a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit._
import java.util.concurrent.atomic.AtomicReference
import scala.collection.JavaConverters._
+import scala.collection.mutable
import scala.reflect.runtime.universe.TypeTag
import scala.util.control.NonFatal
@@ -32,6 +33,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalog.Catalog
+import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Catalogs}
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.encoders._
@@ -620,6 +622,12 @@ class SparkSession private(
*/
@transient lazy val catalog: Catalog = new CatalogImpl(self)
+ @transient private lazy val catalogs = new mutable.HashMap[String,
CatalogPlugin]()
+
+ private[sql] def catalog(name: String): CatalogPlugin = synchronized {
+ catalogs.getOrElseUpdate(name, Catalogs.load(name, sessionState.conf))
+ }
+
/**
* Returns the specified table/view as a `DataFrame`.
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]