This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.8
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 508b260fd4694e6af6f102b5b6e7d95b53890bed
Author: Zouxxyy <[email protected]>
AuthorDate: Mon May 13 15:41:45 2024 +0800

    [spark] Add check for provider when creating table with SparkCatalog (#3326)
---
 .../main/java/org/apache/paimon/spark/SparkCatalog.java | 17 +++++++++++------
 .../org/apache/paimon/spark/SparkGenericCatalog.java    |  4 ----
 .../apache/paimon/spark/catalog/SparkBaseCatalog.java   |  5 +++++
 .../scala/org/apache/paimon/spark/sql/DDLTestBase.scala | 13 +++++++++++++
 4 files changed, 29 insertions(+), 10 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 8f98ea91e..4e8d8eaf7 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -27,7 +27,6 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.spark.catalog.SparkBaseCatalog;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.utils.Preconditions;
 
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
@@ -56,6 +55,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE;
 import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Spark {@link TableCatalog} for paimon. */
 public class SparkCatalog extends SparkBaseCatalog {
@@ -99,7 +99,7 @@ public class SparkCatalog extends SparkBaseCatalog {
     @Override
     public void createNamespace(String[] namespace, Map<String, String> 
metadata)
             throws NamespaceAlreadyExistsException {
-        Preconditions.checkArgument(
+        checkArgument(
                 isValidateNamespace(namespace),
                 "Namespace %s is not valid",
                 Arrays.toString(namespace));
@@ -137,7 +137,7 @@ public class SparkCatalog extends SparkBaseCatalog {
     @Override
     public Map<String, String> loadNamespaceMetadata(String[] namespace)
             throws NoSuchNamespaceException {
-        Preconditions.checkArgument(
+        checkArgument(
                 isValidateNamespace(namespace),
                 "Namespace %s is not valid",
                 Arrays.toString(namespace));
@@ -178,7 +178,7 @@ public class SparkCatalog extends SparkBaseCatalog {
      */
     public boolean dropNamespace(String[] namespace, boolean cascade)
             throws NoSuchNamespaceException {
-        Preconditions.checkArgument(
+        checkArgument(
                 isValidateNamespace(namespace),
                 "Namespace %s is not valid",
                 Arrays.toString(namespace));
@@ -195,7 +195,7 @@ public class SparkCatalog extends SparkBaseCatalog {
 
     @Override
     public Identifier[] listTables(String[] namespace) throws 
NoSuchNamespaceException {
-        Preconditions.checkArgument(
+        checkArgument(
                 isValidateNamespace(namespace),
                 "Missing database in namespace: %s",
                 Arrays.toString(namespace));
@@ -284,6 +284,11 @@ public class SparkCatalog extends SparkBaseCatalog {
             Map<String, String> properties)
             throws TableAlreadyExistsException, NoSuchNamespaceException {
         try {
+            String provider = properties.get(TableCatalog.PROP_PROVIDER);
+            checkArgument(
+                    usePaimon(provider),
+                    "SparkCatalog can only create paimon table, but current 
provider is %s",
+                    provider);
             catalog.createTable(
                     toIdentifier(ident), toInitialSchema(schema, partitions, 
properties), false);
             return loadTable(ident);
@@ -377,7 +382,7 @@ public class SparkCatalog extends SparkBaseCatalog {
 
     private Schema toInitialSchema(
             StructType schema, Transform[] partitions, Map<String, String> 
properties) {
-        Preconditions.checkArgument(
+        checkArgument(
                 Arrays.stream(partitions)
                         .allMatch(
                                 partition -> {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
index f8a6b203c..b947523fe 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
@@ -324,10 +324,6 @@ public class SparkGenericCatalog extends SparkBaseCatalog 
implements CatalogExte
         }
     }
 
-    private boolean usePaimon(String provider) {
-        return provider == null || 
SparkSource.NAME().equalsIgnoreCase(provider);
-    }
-
     private TableCatalog asTableCatalog() {
         return (TableCatalog) sessionCatalog;
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java
index 2f5267029..55670a594 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java
@@ -20,6 +20,7 @@ package org.apache.paimon.spark.catalog;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.spark.SparkProcedures;
+import org.apache.paimon.spark.SparkSource;
 import org.apache.paimon.spark.analysis.NoSuchProcedureException;
 import org.apache.paimon.spark.procedure.Procedure;
 import org.apache.paimon.spark.procedure.ProcedureBuilder;
@@ -49,4 +50,8 @@ public abstract class SparkBaseCatalog
         }
         throw new NoSuchProcedureException(identifier);
     }
+
+    public boolean usePaimon(String provider) {
+        return provider == null || 
SparkSource.NAME().equalsIgnoreCase(provider);
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
index 9ddb83118..6fda6b26c 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
@@ -71,4 +71,17 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
         }
     }
   }
+
+  test("Paimon DDL: create other table with paimon SparkCatalog") {
+    withTable("paimon_tbl1", "paimon_tbl2", "parquet_tbl") {
+      spark.sql(s"CREATE TABLE paimon_tbl1 (id int) USING paimon")
+      spark.sql(s"CREATE TABLE paimon_tbl2 (id int)")
+      val error = intercept[Exception] {
+        spark.sql(s"CREATE TABLE parquet_tbl (id int) USING parquet")
+      }.getMessage
+      assert(
+        error.contains(
+          "SparkCatalog can only create paimon table, but current provider is 
parquet"))
+    }
+  }
 }

Reply via email to