This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.8 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 508b260fd4694e6af6f102b5b6e7d95b53890bed Author: Zouxxyy <[email protected]> AuthorDate: Mon May 13 15:41:45 2024 +0800 [spark] Add check for provider when creating table with SparkCatalog (#3326) --- .../main/java/org/apache/paimon/spark/SparkCatalog.java | 17 +++++++++++------ .../org/apache/paimon/spark/SparkGenericCatalog.java | 4 ---- .../apache/paimon/spark/catalog/SparkBaseCatalog.java | 5 +++++ .../scala/org/apache/paimon/spark/sql/DDLTestBase.scala | 13 +++++++++++++ 4 files changed, 29 insertions(+), 10 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 8f98ea91e..4e8d8eaf7 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -27,7 +27,6 @@ import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.spark.catalog.SparkBaseCatalog; import org.apache.paimon.table.Table; -import org.apache.paimon.utils.Preconditions; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; @@ -56,6 +55,7 @@ import java.util.stream.Collectors; import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE; import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** Spark {@link TableCatalog} for paimon. */ public class SparkCatalog extends SparkBaseCatalog { @@ -99,7 +99,7 @@ public class SparkCatalog extends SparkBaseCatalog { @Override public void createNamespace(String[] namespace, Map<String, String> metadata) throws NamespaceAlreadyExistsException { - Preconditions.checkArgument( + checkArgument( isValidateNamespace(namespace), "Namespace %s is not valid", Arrays.toString(namespace)); @@ -137,7 +137,7 @@ public class SparkCatalog extends SparkBaseCatalog { @Override public Map<String, String> loadNamespaceMetadata(String[] namespace) throws NoSuchNamespaceException { - Preconditions.checkArgument( + checkArgument( isValidateNamespace(namespace), "Namespace %s is not valid", Arrays.toString(namespace)); @@ -178,7 +178,7 @@ public class SparkCatalog extends SparkBaseCatalog { */ public boolean dropNamespace(String[] namespace, boolean cascade) throws NoSuchNamespaceException { - Preconditions.checkArgument( + checkArgument( isValidateNamespace(namespace), "Namespace %s is not valid", Arrays.toString(namespace)); @@ -195,7 +195,7 @@ public class SparkCatalog extends SparkBaseCatalog { @Override public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException { - Preconditions.checkArgument( + checkArgument( isValidateNamespace(namespace), "Missing database in namespace: %s", Arrays.toString(namespace)); @@ -284,6 +284,11 @@ public class SparkCatalog extends SparkBaseCatalog { Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException { try { + String provider = properties.get(TableCatalog.PROP_PROVIDER); + checkArgument( + usePaimon(provider), + "SparkCatalog can only create paimon table, but current provider is %s", + provider); catalog.createTable( toIdentifier(ident), toInitialSchema(schema, partitions, properties), false); return loadTable(ident); @@ -377,7 +382,7 @@ public class SparkCatalog extends SparkBaseCatalog { private Schema toInitialSchema( StructType schema, Transform[] partitions, Map<String, String> properties) { - Preconditions.checkArgument( + checkArgument( Arrays.stream(partitions) .allMatch( partition -> { diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java index f8a6b203c..b947523fe 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java @@ -324,10 +324,6 @@ public class SparkGenericCatalog extends SparkBaseCatalog implements CatalogExte } } - private boolean usePaimon(String provider) { - return provider == null || SparkSource.NAME().equalsIgnoreCase(provider); - } - private TableCatalog asTableCatalog() { return (TableCatalog) sessionCatalog; } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java index 2f5267029..55670a594 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java @@ -20,6 +20,7 @@ package org.apache.paimon.spark.catalog; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.spark.SparkProcedures; +import org.apache.paimon.spark.SparkSource; import org.apache.paimon.spark.analysis.NoSuchProcedureException; import org.apache.paimon.spark.procedure.Procedure; import org.apache.paimon.spark.procedure.ProcedureBuilder; @@ -49,4 +50,8 @@ public abstract class SparkBaseCatalog } throw new NoSuchProcedureException(identifier); } + + public boolean usePaimon(String provider) { + return provider == null || SparkSource.NAME().equalsIgnoreCase(provider); + } } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala index 9ddb83118..6fda6b26c 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala @@ -71,4 +71,17 @@ abstract class DDLTestBase extends PaimonSparkTestBase { } } } + + test("Paimon DDL: create other table with paimon SparkCatalog") { + withTable("paimon_tbl1", "paimon_tbl2", "parquet_tbl") { + spark.sql(s"CREATE TABLE paimon_tbl1 (id int) USING paimon") + spark.sql(s"CREATE TABLE paimon_tbl2 (id int)") + val error = intercept[Exception] { + spark.sql(s"CREATE TABLE parquet_tbl (id int) USING parquet") + }.getMessage + assert( + error.contains( + "SparkCatalog can only create paimon table, but current provider is parquet")) + } + } }
