This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c6387a669d [hive][spark] Support creating external table without 
schema when the table already exists (#4638)
c6387a669d is described below

commit c6387a669d6cd0611c352dbfe4d241fea6a6484c
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Dec 10 14:41:37 2024 +0800

    [hive][spark] Support creating external table without schema when the table 
already exists (#4638)
---
 docs/content/spark/sql-ddl.md                      | 29 ++++++++-
 docs/content/spark/sql-write.md                    | 31 +++++----
 .../org/apache/paimon/schema/SchemaManager.java    | 54 ++++++++++++----
 .../java/org/apache/paimon/hive/HiveCatalog.java   | 10 ++-
 .../java/org/apache/paimon/spark/SparkCatalog.java | 30 ++-------
 .../org/apache/paimon/spark/SparkSource.scala      |  2 +-
 .../org/apache/paimon/spark/sql/DDLTestBase.scala  |  2 +-
 .../spark/sql/DDLWithHiveCatalogTestBase.scala     | 75 ++++++++++++++++------
 8 files changed, 157 insertions(+), 76 deletions(-)

diff --git a/docs/content/spark/sql-ddl.md b/docs/content/spark/sql-ddl.md
index 638a21a704..cfe105f6ac 100644
--- a/docs/content/spark/sql-ddl.md
+++ b/docs/content/spark/sql-ddl.md
@@ -156,6 +156,33 @@ CREATE TABLE my_table (
 );
 ```
 
+### Create External Table
+
+When the catalog's `metastore` type is `hive`, if the `location` is specified 
when creating a table, that table will be considered an external table; 
otherwise, it will be a managed table. 
+
+When you drop an external table, only the metadata in Hive will be removed, 
and the actual data files will not be deleted; whereas dropping a managed table 
will also delete the data.
+
+```sql
+CREATE TABLE my_table (
+    user_id BIGINT,
+    item_id BIGINT,
+    behavior STRING,
+    dt STRING,
+    hh STRING
+) PARTITIONED BY (dt, hh) TBLPROPERTIES (
+    'primary-key' = 'dt,hh,user_id'
+) LOCATION '/path/to/table';
+```
+
+Furthermore, if there is already data stored in the specified location, you 
can create the table without explicitly specifying the fields, partitions and 
props or other information. 
+In this case, the new table will inherit them all from the existing table’s 
metadata. 
+
+However, if you manually specify them, you need to ensure that they are 
consistent with those of the existing table (props can be a subset). Therefore, 
it is strongly recommended not to specify them.
+
+```sql
+CREATE TABLE my_table LOCATION '/path/to/table';
+```
+
 ### Create Table As Select
 
 Table can be created and populated by the results of a query, for example, we 
have a sql like this: `CREATE TABLE table_b AS SELECT id, name FORM table_a`,
@@ -241,7 +268,7 @@ DROP VIEW v1;
 ```
 
 ## Tag
-### Create or Replace Tag
+### Create Or Replace Tag
 Create or replace a tag syntax with the following options.
 - Create a tag with or without the snapshot id and time retention.
 - Create an existed tag is not failed if using `IF NOT EXISTS` syntax.
diff --git a/docs/content/spark/sql-write.md b/docs/content/spark/sql-write.md
index 5f4fa2dabc..c3afcd3754 100644
--- a/docs/content/spark/sql-write.md
+++ b/docs/content/spark/sql-write.md
@@ -120,7 +120,17 @@ TRUNCATE TABLE my_table;
 
 ## Update Table
 
-spark supports update PrimitiveType and StructType, for example:
+Updates the column values for the rows that match a predicate. When no 
predicate is provided, update the column values for all rows. 
+
+Note:
+
+{{< hint info >}}
+
+Update primary key columns is not supported when the target table is a primary 
key table.
+
+{{< /hint >}}
+
+Spark supports update PrimitiveType and StructType, for example:
 
 ```sql
 -- Syntax
@@ -142,17 +152,22 @@ UPDATE t SET s.c2 = 'a_new' WHERE s.c1 = 1;
 
 ## Delete From Table
 
+Deletes the rows that match a predicate. When no predicate is provided, 
deletes all rows.
+
 ```sql
 DELETE FROM my_table WHERE currency = 'UNKNOWN';
 ```
 
 ## Merge Into Table
 
-Paimon currently supports Merge Into syntax in Spark 3+, which allow a set of 
updates, insertions and deletions based on a source table in a single commit.
+Merges a set of updates, insertions and deletions based on a source table into 
a target table.
+
+Note:
+
+{{< hint info >}}
+
+In update clause, to update primary key columns is not supported when the 
target table is a primary key table.
 
-{{< hint into >}}
-1. In update clause, to update primary key columns is not supported.
-2. `WHEN NOT MATCHED BY SOURCE` syntax is not supported.
 {{< /hint >}}
 
 **Example: One**
@@ -160,7 +175,6 @@ Paimon currently supports Merge Into syntax in Spark 3+, 
which allow a set of up
 This is a simple demo that, if a row exists in the target table update it, 
else insert it.
 
 ```sql
-
 -- Here both source and target tables have the same schema: (a INT, b INT, c 
STRING), and a is a primary key.
 
 MERGE INTO target
@@ -170,7 +184,6 @@ WHEN MATCHED THEN
 UPDATE SET *
 WHEN NOT MATCHED
 THEN INSERT *
-
 ```
 
 **Example: Two**
@@ -178,7 +191,6 @@ THEN INSERT *
 This is a demo with multiple, conditional clauses.
 
 ```sql
-
 -- Here both source and target tables have the same schema: (a INT, b INT, c 
STRING), and a is a primary key.
 
 MERGE INTO target
@@ -194,15 +206,12 @@ WHEN NOT MATCHED AND c > 'c9' THEN
    INSERT (a, b, c) VALUES (a, b * 1.1, c)      -- when not matched but meet 
the condition 3, then transform and insert this row;
 WHEN NOT MATCHED THEN
 INSERT *      -- when not matched, insert this row without any transformation;
-
 ```
 
 ## Streaming Write
 
 {{< hint info >}}
 
-Paimon currently supports Spark 3+ for streaming write.
-
 Paimon Structured Streaming only supports the two `append` and `complete` 
modes.
 
 {{< /hint >}}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 83ddbccfef..2139dca4a9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -208,24 +208,18 @@ public class SchemaManager implements Serializable {
         return createTable(schema, false);
     }
 
-    public TableSchema createTable(Schema schema, boolean ignoreIfExistsSame) 
throws Exception {
+    public TableSchema createTable(Schema schema, boolean externalTable) 
throws Exception {
         while (true) {
             Optional<TableSchema> latest = latest();
             if (latest.isPresent()) {
-                TableSchema oldSchema = latest.get();
-                boolean isSame =
-                        Objects.equals(oldSchema.fields(), schema.fields())
-                                && Objects.equals(oldSchema.partitionKeys(), 
schema.partitionKeys())
-                                && Objects.equals(oldSchema.primaryKeys(), 
schema.primaryKeys())
-                                && Objects.equals(oldSchema.options(), 
schema.options());
-                if (ignoreIfExistsSame && isSame) {
-                    return oldSchema;
+                TableSchema latestSchema = latest.get();
+                if (externalTable) {
+                    checkSchemaForExternalTable(latestSchema, schema);
+                    return latestSchema;
+                } else {
+                    throw new IllegalStateException(
+                            "Schema in filesystem exists, creation is not 
allowed.");
                 }
-
-                throw new IllegalStateException(
-                        "Schema in filesystem exists, please use updating,"
-                                + " latest schema is: "
-                                + oldSchema);
             }
 
             List<DataField> fields = schema.fields();
@@ -254,6 +248,38 @@ public class SchemaManager implements Serializable {
         }
     }
 
+    private void checkSchemaForExternalTable(TableSchema existsSchema, Schema 
newSchema) {
+        // When creating an external table, if the table already exists in the 
location, we can
+        // choose not to specify the fields.
+        if (newSchema.fields().isEmpty()
+                // When the fields are explicitly specified, we need check for 
consistency.
+                || (Objects.equals(existsSchema.fields(), newSchema.fields())
+                        && Objects.equals(existsSchema.partitionKeys(), 
newSchema.partitionKeys())
+                        && Objects.equals(existsSchema.primaryKeys(), 
newSchema.primaryKeys()))) {
+            // check for options
+            Map<String, String> existsOptions = existsSchema.options();
+            Map<String, String> newOptions = newSchema.options();
+            newOptions.forEach(
+                    (key, value) -> {
+                        if (!key.equals(Catalog.OWNER_PROP)
+                                && (!existsOptions.containsKey(key)
+                                        || 
!existsOptions.get(key).equals(value))) {
+                            throw new RuntimeException(
+                                    "New schema's options are not equal to the 
exists schema's, new schema: "
+                                            + newOptions
+                                            + ", exists schema: "
+                                            + existsOptions);
+                        }
+                    });
+        } else {
+            throw new RuntimeException(
+                    "New schema is not equal to exists schema, new schema: "
+                            + newSchema
+                            + ", exists schema: "
+                            + existsSchema);
+        }
+    }
+
     /** Update {@link SchemaChange}s. */
     public TableSchema commitChanges(SchemaChange... changes) throws Exception 
{
         return commitChanges(Arrays.asList(changes));
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 151e2b4d2c..c74ede9815 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -720,11 +720,7 @@ public class HiveCatalog extends AbstractCatalog {
         try {
             tableSchema = schemaManager(identifier, 
location).createTable(schema, externalTable);
         } catch (Exception e) {
-            throw new RuntimeException(
-                    "Failed to commit changes of table "
-                            + identifier.getFullName()
-                            + " to underlying files.",
-                    e);
+            throw new RuntimeException("Failed to create table " + 
identifier.getFullName(), e);
         }
 
         try {
@@ -735,7 +731,9 @@ public class HiveCatalog extends AbstractCatalog {
                                             identifier, tableSchema, location, 
externalTable)));
         } catch (Exception e) {
             try {
-                fileIO.deleteDirectoryQuietly(location);
+                if (!externalTable) {
+                    fileIO.deleteDirectoryQuietly(location);
+                }
             } catch (Exception ee) {
                 LOG.error("Delete directory[{}] fail for table {}", location, 
identifier, ee);
             }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 5ad1b13b7c..d6318c723f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -75,7 +75,6 @@ import static 
org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
 import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf;
 import static org.apache.paimon.spark.utils.CatalogUtils.checkNamespace;
 import static org.apache.paimon.spark.utils.CatalogUtils.toIdentifier;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Spark {@link TableCatalog} for paimon. */
 public class SparkCatalog extends SparkBaseCatalog implements SupportFunction, 
SupportView {
@@ -298,26 +297,8 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction, S
             Map<String, String> properties)
             throws TableAlreadyExistsException, NoSuchNamespaceException {
         try {
-            String provider = properties.get(TableCatalog.PROP_PROVIDER);
-            if ((!usePaimon(provider))
-                    && 
SparkSource.FORMAT_NAMES().contains(provider.toLowerCase())) {
-                Map<String, String> newProperties = new HashMap<>(properties);
-                newProperties.put(TYPE.key(), FORMAT_TABLE.toString());
-                newProperties.put(FILE_FORMAT.key(), provider.toLowerCase());
-                catalog.createTable(
-                        toIdentifier(ident),
-                        toInitialSchema(schema, partitions, newProperties),
-                        false);
-            } else {
-                checkArgument(
-                        usePaimon(provider),
-                        "SparkCatalog can only create paimon table, but 
current provider is %s",
-                        provider);
-                catalog.createTable(
-                        toIdentifier(ident),
-                        toInitialSchema(schema, partitions, properties),
-                        false);
-            }
+            catalog.createTable(
+                    toIdentifier(ident), toInitialSchema(schema, partitions, 
properties), false);
             return loadTable(ident);
         } catch (Catalog.TableAlreadyExistException e) {
             throw new TableAlreadyExistsException(ident);
@@ -406,9 +387,12 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction, S
     private Schema toInitialSchema(
             StructType schema, Transform[] partitions, Map<String, String> 
properties) {
         Map<String, String> normalizedProperties = new HashMap<>(properties);
-        if (!normalizedProperties.containsKey(TableCatalog.PROP_PROVIDER)) {
-            normalizedProperties.put(TableCatalog.PROP_PROVIDER, 
SparkSource.NAME());
+        String provider = properties.get(TableCatalog.PROP_PROVIDER);
+        if (!usePaimon(provider) && 
SparkSource.FORMAT_NAMES().contains(provider.toLowerCase())) {
+            normalizedProperties.put(TYPE.key(), FORMAT_TABLE.toString());
+            normalizedProperties.put(FILE_FORMAT.key(), 
provider.toLowerCase());
         }
+        normalizedProperties.remove(TableCatalog.PROP_PROVIDER);
         normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER);
         normalizedProperties.remove(TableCatalog.PROP_COMMENT);
         if (normalizedProperties.containsKey(TableCatalog.PROP_LOCATION)) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
index 0170a29f68..d80d7350a6 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
@@ -118,7 +118,7 @@ object SparkSource {
 
   val NAME = "paimon"
 
-  val FORMAT_NAMES = Seq("csv", "orc", "parquet")
+  val FORMAT_NAMES: Seq[String] = Seq("csv", "orc", "parquet")
 
   def toBaseRelation(table: FileStoreTable, _sqlContext: SQLContext): 
BaseRelation = {
     new BaseRelation {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
index 6ad5274496..3ed2c98306 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
@@ -161,7 +161,7 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
   test("Paimon DDL: create table without using paimon") {
     withTable("paimon_tbl") {
       sql("CREATE TABLE paimon_tbl (id int)")
-      
assert(loadTable("paimon_tbl").options().get("provider").equals("paimon"))
+      assert(!loadTable("paimon_tbl").options().containsKey("provider"))
     }
   }
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
index e99e4434ef..1189f1f290 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
@@ -326,13 +326,7 @@ abstract class DDLWithHiveCatalogTestBase extends 
PaimonHiveTestBase {
                 spark.sql(
                   s"CREATE TABLE external_tbl (id INT) USING paimon LOCATION 
'$expertTbLocation'")
                 checkAnswer(spark.sql("SELECT * FROM external_tbl"), Row(1))
-                assert(
-                  loadTable("paimon_db", "external_tbl")
-                    .location()
-                    .toString
-                    .split(':')
-                    .apply(1)
-                    .equals(expertTbLocation))
+                assert(getActualTableLocation("paimon_db", 
"external_tbl").equals(expertTbLocation))
 
                 // create managed table
                 spark.sql(s"CREATE TABLE managed_tbl (id INT) USING paimon")
@@ -373,12 +367,8 @@ abstract class DDLWithHiveCatalogTestBase extends 
PaimonHiveTestBase {
                 spark.sql("ALTER TABLE external_tbl RENAME TO 
external_tbl_renamed")
                 checkAnswer(spark.sql("SELECT * FROM external_tbl_renamed"), 
Row(1))
                 assert(
-                  loadTable("paimon_db", "external_tbl_renamed")
-                    .location()
-                    .toString
-                    .split(':')
-                    .apply(1)
-                    .equals(expertTbLocation))
+                  getActualTableLocation("paimon_db", 
"external_tbl_renamed").equals(
+                    expertTbLocation))
 
                 // create managed table
                 spark.sql(s"CREATE TABLE managed_tbl (id INT) USING paimon")
@@ -389,12 +379,55 @@ abstract class DDLWithHiveCatalogTestBase extends 
PaimonHiveTestBase {
                 spark.sql("ALTER TABLE managed_tbl RENAME TO 
managed_tbl_renamed")
                 checkAnswer(spark.sql("SELECT * FROM managed_tbl_renamed"), 
Row(1))
                 assert(
-                  !loadTable("paimon_db", "managed_tbl_renamed")
-                    .location()
-                    .toString
-                    .split(':')
-                    .apply(1)
-                    .equals(managedTbLocation.toString))
+                  !getActualTableLocation("paimon_db", 
"managed_tbl_renamed").equals(
+                    managedTbLocation.toString))
+              }
+            }
+        }
+    }
+  }
+
+  test("Paimon DDL with hive catalog: create external table without schema") {
+    Seq(sparkCatalogName, paimonHiveCatalogName).foreach {
+      catalogName =>
+        spark.sql(s"USE $catalogName")
+        withTempDir {
+          tbLocation =>
+            withDatabase("paimon_db") {
+              spark.sql(s"CREATE DATABASE IF NOT EXISTS paimon_db")
+              spark.sql(s"USE paimon_db")
+              withTable("t1", "t2", "t3", "t4", "t5") {
+                val expertTbLocation = tbLocation.getCanonicalPath
+                spark.sql(s"""
+                             |CREATE TABLE t1 (id INT, pt INT) USING paimon
+                             |PARTITIONED BY (pt)
+                             |TBLPROPERTIES('primary-key' = 'id', 'k1' = 'v1')
+                             |LOCATION '$expertTbLocation'
+                             |""".stripMargin)
+                spark.sql("INSERT INTO t1 VALUES (1, 1)")
+
+                // create table without schema
+                spark.sql(s"CREATE TABLE t2 USING paimon LOCATION 
'$expertTbLocation'")
+                checkAnswer(spark.sql("SELECT * FROM t2"), Row(1, 1))
+                assert(getActualTableLocation("paimon_db", 
"t2").equals(expertTbLocation))
+
+                // create table with wrong schema
+                intercept[Exception] {
+                  spark.sql(
+                    s"CREATE TABLE t3 (fake_col INT) USING paimon LOCATION 
'$expertTbLocation'")
+                }
+
+                // create table with exists props
+                spark.sql(
+                  s"CREATE TABLE t4 USING paimon TBLPROPERTIES ('k1' = 'v1') 
LOCATION '$expertTbLocation'")
+                checkAnswer(spark.sql("SELECT * FROM t4"), Row(1, 1))
+                assert(getActualTableLocation("paimon_db", 
"t4").equals(expertTbLocation))
+
+                // create table with new props
+                intercept[Exception] {
+                  spark.sql(
+                    s"CREATE TABLE t5 USING paimon TBLPROPERTIES ('k2' = 'v2') 
LOCATION '$expertTbLocation'")
+                }
               }
             }
         }
@@ -445,4 +478,8 @@ abstract class DDLWithHiveCatalogTestBase extends 
PaimonHiveTestBase {
       .toMap
     tableProps("path").split(":")(1)
   }
+
+  def getActualTableLocation(dbName: String, tblName: String): String = {
+    loadTable(dbName, tblName).location().toString.split(':').apply(1)
+  }
 }

Reply via email to