This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a75c02dd [spark] use the internal session catalog in spark generic 
catalog (#2959)
6a75c02dd is described below

commit 6a75c02ddedce08dd46e5955c4859b42d60e1f88
Author: Yann Byron <[email protected]>
AuthorDate: Fri Mar 8 16:44:45 2024 +0800

    [spark] use the internal session catalog in spark generic catalog (#2959)
---
 .../generated/spark_connector_configuration.html   |   6 ++
 .../java/org/apache/paimon/spark/SparkCatalog.java |   8 +-
 .../apache/paimon/spark/SparkConnectorOptions.java |   7 ++
 .../apache/paimon/spark/SparkGenericCatalog.java   | 102 +++++++++++----------
 .../paimon/spark/catalog/SparkBaseCatalog.java     |   8 ++
 .../spark/sql/connector/catalog/CatalogUtils.scala |  58 ++++++++++++
 .../apache/paimon/spark/PaimonHiveTestBase.scala   |   2 +-
 7 files changed, 136 insertions(+), 55 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/spark_connector_configuration.html 
b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
index 00ca2ba17..09d363aff 100644
--- a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
@@ -74,5 +74,11 @@ under the License.
             <td>Boolean</td>
             <td>If true, allow to merge data types if the two types meet the 
rules for explicit casting.</td>
         </tr>
+        <tr>
+            <td><h5>catalog.create-underlying-session-catalog</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If true, create and use an underlying session catalog instead 
of default session catalog when use SparkGenericCatalog.</td>
+        </tr>
     </tbody>
 </table>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 359f04ff2..7e77d55a9 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -63,12 +63,11 @@ public class SparkCatalog extends SparkBaseCatalog {
 
     private static final String PRIMARY_KEY_IDENTIFIER = "primary-key";
 
-    private String name = null;
     protected Catalog catalog = null;
 
     @Override
     public void initialize(String name, CaseInsensitiveStringMap options) {
-        this.name = name;
+        this.catalogName = name;
         CatalogContext catalogContext =
                 CatalogContext.create(
                         Options.fromMap(options),
@@ -87,11 +86,6 @@ public class SparkCatalog extends SparkBaseCatalog {
         return catalog;
     }
 
-    @Override
-    public String name() {
-        return name;
-    }
-
     @Override
     public String[] defaultNamespace() {
         return new String[] {Catalog.DEFAULT_DATABASE};
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
index d2be72706..e553fb507 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
@@ -24,6 +24,13 @@ import static org.apache.paimon.options.ConfigOptions.key;
 
 /** Options for spark connector. */
 public class SparkConnectorOptions {
+
+    public static final ConfigOption<Boolean> 
CREATE_UNDERLYING_SESSION_CATALOG =
+            key("catalog.create-underlying-session-catalog")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "If true, create and use an underlying session 
catalog instead of default session catalog when use SparkGenericCatalog.");
     public static final ConfigOption<Boolean> MERGE_SCHEMA =
             key("write.merge-schema")
                     .booleanType()
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
index 7fe9d7a68..1e280596f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
@@ -25,6 +25,7 @@ import org.apache.paimon.spark.catalog.SparkBaseCatalog;
 import org.apache.paimon.utils.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.SparkConf;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
@@ -32,9 +33,12 @@ import 
org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalog;
 import org.apache.spark.sql.catalyst.catalog.InMemoryCatalog;
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
 import org.apache.spark.sql.connector.catalog.CatalogExtension;
 import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.CatalogUtils;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.NamespaceChange;
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
@@ -43,7 +47,9 @@ import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.connector.catalog.TableChange;
 import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
 import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog;
 import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.internal.SessionState;
 import org.apache.spark.sql.internal.StaticSQLConf;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -56,7 +62,6 @@ import java.util.concurrent.Callable;
 
 import static org.apache.paimon.options.CatalogOptions.METASTORE;
 import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
-import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /* This file is based on source code from the Iceberg Project 
(http://iceberg.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
@@ -66,19 +71,18 @@ import static 
org.apache.paimon.utils.Preconditions.checkNotNull;
  * A Spark catalog that can also load non-Paimon tables.
  *
  * <p>Most of the content of this class is referenced from Iceberg's 
SparkSessionCatalog.
- *
- * @param <T> CatalogPlugin class to avoid casting to TableCatalog and 
SupportsNamespaces.
  */
-public class SparkGenericCatalog<T extends TableCatalog & SupportsNamespaces>
-        extends SparkBaseCatalog implements CatalogExtension {
+public class SparkGenericCatalog extends SparkBaseCatalog implements 
CatalogExtension {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SparkGenericCatalog.class);
 
     private static final String[] DEFAULT_NAMESPACE = new String[] {"default"};
 
-    private String catalogName = null;
     private SparkCatalog sparkCatalog = null;
-    private T sessionCatalog = null;
+
+    private boolean underlyingSessionCatalogEnabled = false;
+
+    private CatalogPlugin sessionCatalog = null;
 
     @Override
     public Catalog paimonCatalog() {
@@ -92,47 +96,47 @@ public class SparkGenericCatalog<T extends TableCatalog & 
SupportsNamespaces>
 
     @Override
     public String[][] listNamespaces() throws NoSuchNamespaceException {
-        return getSessionCatalog().listNamespaces();
+        return asNamespaceCatalog().listNamespaces();
     }
 
     @Override
     public String[][] listNamespaces(String[] namespace) throws 
NoSuchNamespaceException {
-        return getSessionCatalog().listNamespaces(namespace);
+        return asNamespaceCatalog().listNamespaces(namespace);
     }
 
     @Override
     public boolean namespaceExists(String[] namespace) {
-        return getSessionCatalog().namespaceExists(namespace);
+        return asNamespaceCatalog().namespaceExists(namespace);
     }
 
     @Override
     public Map<String, String> loadNamespaceMetadata(String[] namespace)
             throws NoSuchNamespaceException {
-        return getSessionCatalog().loadNamespaceMetadata(namespace);
+        return asNamespaceCatalog().loadNamespaceMetadata(namespace);
     }
 
     @Override
     public void createNamespace(String[] namespace, Map<String, String> 
metadata)
             throws NamespaceAlreadyExistsException {
-        getSessionCatalog().createNamespace(namespace, metadata);
+        asNamespaceCatalog().createNamespace(namespace, metadata);
     }
 
     @Override
     public void alterNamespace(String[] namespace, NamespaceChange... changes)
             throws NoSuchNamespaceException {
-        getSessionCatalog().alterNamespace(namespace, changes);
+        asNamespaceCatalog().alterNamespace(namespace, changes);
     }
 
     @Override
     public boolean dropNamespace(String[] namespace, boolean cascade)
             throws NoSuchNamespaceException, NonEmptyNamespaceException {
-        return getSessionCatalog().dropNamespace(namespace, cascade);
+        return asNamespaceCatalog().dropNamespace(namespace, cascade);
     }
 
     @Override
     public Identifier[] listTables(String[] namespace) throws 
NoSuchNamespaceException {
         // delegate to the session catalog because all tables share the same 
namespace
-        return getSessionCatalog().listTables(namespace);
+        return asTableCatalog().listTables(namespace);
     }
 
     @Override
@@ -140,7 +144,7 @@ public class SparkGenericCatalog<T extends TableCatalog & 
SupportsNamespaces>
         try {
             return sparkCatalog.loadTable(ident);
         } catch (NoSuchTableException e) {
-            return throwsOldIfExceptionHappens(() -> 
getSessionCatalog().loadTable(ident), e);
+            return throwsOldIfExceptionHappens(() -> 
asTableCatalog().loadTable(ident), e);
         }
     }
 
@@ -149,8 +153,7 @@ public class SparkGenericCatalog<T extends TableCatalog & 
SupportsNamespaces>
         try {
             return sparkCatalog.loadTable(ident, version);
         } catch (NoSuchTableException e) {
-            return throwsOldIfExceptionHappens(
-                    () -> getSessionCatalog().loadTable(ident, version), e);
+            return throwsOldIfExceptionHappens(() -> 
asTableCatalog().loadTable(ident, version), e);
         }
     }
 
@@ -160,7 +163,7 @@ public class SparkGenericCatalog<T extends TableCatalog & 
SupportsNamespaces>
             return sparkCatalog.loadTable(ident, timestamp);
         } catch (NoSuchTableException e) {
             return throwsOldIfExceptionHappens(
-                    () -> getSessionCatalog().loadTable(ident, timestamp), e);
+                    () -> asTableCatalog().loadTable(ident, timestamp), e);
         }
     }
 
@@ -169,7 +172,7 @@ public class SparkGenericCatalog<T extends TableCatalog & 
SupportsNamespaces>
         // We do not need to check whether the table exists and whether
         // it is an Paimon table to reduce remote service requests.
         sparkCatalog.invalidateTable(ident);
-        getSessionCatalog().invalidateTable(ident);
+        asTableCatalog().invalidateTable(ident);
     }
 
     @Override
@@ -184,7 +187,7 @@ public class SparkGenericCatalog<T extends TableCatalog & 
SupportsNamespaces>
             return sparkCatalog.createTable(ident, schema, partitions, 
properties);
         } else {
             // delegate to the session catalog
-            return getSessionCatalog().createTable(ident, schema, partitions, 
properties);
+            return asTableCatalog().createTable(ident, schema, partitions, 
properties);
         }
     }
 
@@ -193,18 +196,18 @@ public class SparkGenericCatalog<T extends TableCatalog & 
SupportsNamespaces>
         if (sparkCatalog.tableExists(ident)) {
             return sparkCatalog.alterTable(ident, changes);
         } else {
-            return getSessionCatalog().alterTable(ident, changes);
+            return asTableCatalog().alterTable(ident, changes);
         }
     }
 
     @Override
     public boolean dropTable(Identifier ident) {
-        return sparkCatalog.dropTable(ident) || 
getSessionCatalog().dropTable(ident);
+        return sparkCatalog.dropTable(ident) || 
asTableCatalog().dropTable(ident);
     }
 
     @Override
     public boolean purgeTable(Identifier ident) {
-        return sparkCatalog.purgeTable(ident) || 
getSessionCatalog().purgeTable(ident);
+        return sparkCatalog.purgeTable(ident) || 
asTableCatalog().purgeTable(ident);
     }
 
     @Override
@@ -213,13 +216,15 @@ public class SparkGenericCatalog<T extends TableCatalog & 
SupportsNamespaces>
         if (sparkCatalog.tableExists(from)) {
             sparkCatalog.renameTable(from, to);
         } else {
-            getSessionCatalog().renameTable(from, to);
+            asTableCatalog().renameTable(from, to);
         }
     }
 
     @Override
     public final void initialize(String name, CaseInsensitiveStringMap 
options) {
-        Configuration hadoopConf = 
SparkSession.active().sessionState().newHadoopConf();
+        SessionState sessionState = SparkSession.active().sessionState();
+        Configuration hadoopConf = sessionState.newHadoopConf();
+        SparkConf sparkConf = new SparkConf();
         if (options.containsKey(METASTORE.key())
                 && options.get(METASTORE.key()).equalsIgnoreCase("hive")) {
             String uri = options.get(CatalogOptions.URI.key());
@@ -242,10 +247,21 @@ public class SparkGenericCatalog<T extends TableCatalog & 
SupportsNamespaces>
         this.catalogName = name;
         this.sparkCatalog = new SparkCatalog();
 
-        this.sparkCatalog.initialize(
-                name,
-                autoFillConfigurations(
-                        options, SparkSession.active().sessionState().conf(), 
hadoopConf));
+        CaseInsensitiveStringMap newOptions =
+                autoFillConfigurations(options, sessionState.conf(), 
hadoopConf);
+        sparkCatalog.initialize(name, newOptions);
+
+        if (options.getBoolean(
+                SparkConnectorOptions.CREATE_UNDERLYING_SESSION_CATALOG.key(), 
false)) {
+            this.underlyingSessionCatalogEnabled = false;
+            for (Map.Entry<String, String> entry : options.entrySet()) {
+                sparkConf.set("spark.hadoop." + entry.getKey(), 
entry.getValue());
+                hadoopConf.set(entry.getKey(), entry.getValue());
+            }
+            ExternalCatalog externalCatalog =
+                    CatalogUtils.buildExternalCatalog(sparkConf, hadoopConf);
+            this.sessionCatalog = new V2SessionCatalog(new 
SessionCatalog(externalCatalog));
+        }
     }
 
     private CaseInsensitiveStringMap autoFillConfigurations(
@@ -282,30 +298,22 @@ public class SparkGenericCatalog<T extends TableCatalog & 
SupportsNamespaces>
 
     @Override
     @SuppressWarnings("unchecked")
-    public void setDelegateCatalog(CatalogPlugin sparkSessionCatalog) {
-        if (sparkSessionCatalog instanceof TableCatalog
-                && sparkSessionCatalog instanceof SupportsNamespaces) {
-            this.sessionCatalog = (T) sparkSessionCatalog;
-        } else {
-            throw new IllegalArgumentException("Invalid session catalog: " + 
sparkSessionCatalog);
+    public void setDelegateCatalog(CatalogPlugin delegate) {
+        if (!underlyingSessionCatalogEnabled) {
+            this.sessionCatalog = delegate;
         }
     }
 
-    @Override
-    public String name() {
-        return catalogName;
-    }
-
     private boolean usePaimon(String provider) {
         return provider == null || 
SparkSource.NAME().equalsIgnoreCase(provider);
     }
 
-    private T getSessionCatalog() {
-        checkNotNull(
-                sessionCatalog,
-                "Delegated SessionCatalog is missing. "
-                        + "Please make sure your are replacing Spark's default 
catalog, named 'spark_catalog'.");
-        return sessionCatalog;
+    private TableCatalog asTableCatalog() {
+        return (TableCatalog) sessionCatalog;
+    }
+
+    private SupportsNamespaces asNamespaceCatalog() {
+        return (SupportsNamespaces) sessionCatalog;
     }
 
     @Override
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java
index 3a18277de..2f5267029 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java
@@ -31,6 +31,14 @@ import org.apache.spark.sql.connector.catalog.TableCatalog;
 /** Spark base catalog. */
 public abstract class SparkBaseCatalog
         implements TableCatalog, SupportsNamespaces, ProcedureCatalog, 
WithPaimonCatalog {
+
+    protected String catalogName;
+
+    @Override
+    public String name() {
+        return catalogName;
+    }
+
     @Override
     public Procedure loadProcedure(Identifier identifier) throws 
NoSuchProcedureException {
         if (Catalog.SYSTEM_DATABASE_NAME.equals(identifier.namespace()[0])) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogUtils.scala
new file mode 100644
index 000000000..283fbf254
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogUtils.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalog
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+import org.apache.spark.util.Utils
+
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+object CatalogUtils {
+
+  def buildExternalCatalog(conf: SparkConf, hadoopConf: Configuration): 
ExternalCatalog = {
+    val externalCatalogClassName =
+      if 
(SparkSession.active.conf.get(CATALOG_IMPLEMENTATION.key).equals("hive")) {
+        "org.apache.spark.sql.hive.HiveExternalCatalog"
+      } else {
+        "org.apache.spark.sql.catalyst.catalog.InMemoryCatalog"
+      }
+    reflect[ExternalCatalog, SparkConf, 
Configuration](externalCatalogClassName, conf, hadoopConf)
+  }
+
+  private def reflect[T, Arg1 <: AnyRef, Arg2 <: AnyRef](
+      className: String,
+      ctorArg1: Arg1,
+      ctorArg2: Arg2)(implicit ctorArgTag1: ClassTag[Arg1], ctorArgTag2: 
ClassTag[Arg2]): T = {
+    try {
+      val clazz = Utils.classForName(className)
+      val ctor = clazz.getDeclaredConstructor(ctorArgTag1.runtimeClass, 
ctorArgTag2.runtimeClass)
+      val args = Array[AnyRef](ctorArg1, ctorArg2)
+      ctor.newInstance(args: _*).asInstanceOf[T]
+    } catch {
+      case NonFatal(e) =>
+        throw new IllegalArgumentException(s"Error while instantiating 
'$className':", e)
+    }
+  }
+
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
index abc1ecd81..3b07573ef 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala
@@ -43,7 +43,7 @@ class PaimonHiveTestBase extends PaimonSparkTestBase {
     super.sparkConf
       .set("spark.sql.warehouse.dir", tempHiveDBDir.getCanonicalPath)
       .set("spark.sql.catalogImplementation", "hive")
-      .set("spark.sql.catalog.spark_catalog", 
classOf[SparkGenericCatalog[_]].getName)
+      .set("spark.sql.catalog.spark_catalog", 
"org.apache.paimon.spark.SparkGenericCatalog")
       .set(s"spark.sql.catalog.$paimonHiveCatalogName", 
classOf[SparkCatalog].getName)
       .set(s"spark.sql.catalog.$paimonHiveCatalogName.metastore", "hive")
       .set(s"spark.sql.catalog.$paimonHiveCatalogName.warehouse", 
tempHiveDBDir.getCanonicalPath)

Reply via email to