This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c6387a669d [hive][spark] Support creating external table without
schema when the table already exists (#4638)
c6387a669d is described below
commit c6387a669d6cd0611c352dbfe4d241fea6a6484c
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Dec 10 14:41:37 2024 +0800
[hive][spark] Support creating external table without schema when the table
already exists (#4638)
---
docs/content/spark/sql-ddl.md | 29 ++++++++-
docs/content/spark/sql-write.md | 31 +++++----
.../org/apache/paimon/schema/SchemaManager.java | 54 ++++++++++++----
.../java/org/apache/paimon/hive/HiveCatalog.java | 10 ++-
.../java/org/apache/paimon/spark/SparkCatalog.java | 30 ++-------
.../org/apache/paimon/spark/SparkSource.scala | 2 +-
.../org/apache/paimon/spark/sql/DDLTestBase.scala | 2 +-
.../spark/sql/DDLWithHiveCatalogTestBase.scala | 75 ++++++++++++++++------
8 files changed, 157 insertions(+), 76 deletions(-)
diff --git a/docs/content/spark/sql-ddl.md b/docs/content/spark/sql-ddl.md
index 638a21a704..cfe105f6ac 100644
--- a/docs/content/spark/sql-ddl.md
+++ b/docs/content/spark/sql-ddl.md
@@ -156,6 +156,33 @@ CREATE TABLE my_table (
);
```
+### Create External Table
+
+When the catalog's `metastore` type is `hive`, if the `location` is specified
when creating a table, that table will be considered an external table;
otherwise, it will be a managed table.
+
+When you drop an external table, only the metadata in Hive will be removed,
and the actual data files will not be deleted; whereas dropping a managed table
will also delete the data.
+
+```sql
+CREATE TABLE my_table (
+ user_id BIGINT,
+ item_id BIGINT,
+ behavior STRING,
+ dt STRING,
+ hh STRING
+) PARTITIONED BY (dt, hh) TBLPROPERTIES (
+ 'primary-key' = 'dt,hh,user_id'
+) LOCATION '/path/to/table';
+```
+
+Furthermore, if there is already data stored in the specified location, you
can create the table without explicitly specifying the fields, partitions and
props or other information.
+In this case, the new table will inherit them all from the existing table’s
metadata.
+
+However, if you manually specify them, you need to ensure that they are
consistent with those of the existing table (props can be a subset). Therefore,
it is strongly recommended not to specify them.
+
+```sql
+CREATE TABLE my_table LOCATION '/path/to/table';
+```
+
### Create Table As Select
Table can be created and populated by the results of a query, for example, we
have a sql like this: `CREATE TABLE table_b AS SELECT id, name FORM table_a`,
@@ -241,7 +268,7 @@ DROP VIEW v1;
```
## Tag
-### Create or Replace Tag
+### Create Or Replace Tag
Create or replace a tag syntax with the following options.
- Create a tag with or without the snapshot id and time retention.
- Create an existed tag is not failed if using `IF NOT EXISTS` syntax.
diff --git a/docs/content/spark/sql-write.md b/docs/content/spark/sql-write.md
index 5f4fa2dabc..c3afcd3754 100644
--- a/docs/content/spark/sql-write.md
+++ b/docs/content/spark/sql-write.md
@@ -120,7 +120,17 @@ TRUNCATE TABLE my_table;
## Update Table
-spark supports update PrimitiveType and StructType, for example:
+Updates the column values for the rows that match a predicate. When no
predicate is provided, update the column values for all rows.
+
+Note:
+
+{{< hint info >}}
+
+Update primary key columns is not supported when the target table is a primary
key table.
+
+{{< /hint >}}
+
+Spark supports update PrimitiveType and StructType, for example:
```sql
-- Syntax
@@ -142,17 +152,22 @@ UPDATE t SET s.c2 = 'a_new' WHERE s.c1 = 1;
## Delete From Table
+Deletes the rows that match a predicate. When no predicate is provided,
deletes all rows.
+
```sql
DELETE FROM my_table WHERE currency = 'UNKNOWN';
```
## Merge Into Table
-Paimon currently supports Merge Into syntax in Spark 3+, which allow a set of
updates, insertions and deletions based on a source table in a single commit.
+Merges a set of updates, insertions and deletions based on a source table into
a target table.
+
+Note:
+
+{{< hint info >}}
+
+In update clause, to update primary key columns is not supported when the
target table is a primary key table.
-{{< hint into >}}
-1. In update clause, to update primary key columns is not supported.
-2. `WHEN NOT MATCHED BY SOURCE` syntax is not supported.
{{< /hint >}}
**Example: One**
@@ -160,7 +175,6 @@ Paimon currently supports Merge Into syntax in Spark 3+,
which allow a set of up
This is a simple demo that, if a row exists in the target table update it,
else insert it.
```sql
-
-- Here both source and target tables have the same schema: (a INT, b INT, c
STRING), and a is a primary key.
MERGE INTO target
@@ -170,7 +184,6 @@ WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
-
```
**Example: Two**
@@ -178,7 +191,6 @@ THEN INSERT *
This is a demo with multiple, conditional clauses.
```sql
-
-- Here both source and target tables have the same schema: (a INT, b INT, c
STRING), and a is a primary key.
MERGE INTO target
@@ -194,15 +206,12 @@ WHEN NOT MATCHED AND c > 'c9' THEN
INSERT (a, b, c) VALUES (a, b * 1.1, c) -- when not matched but meet
the condition 3, then transform and insert this row;
WHEN NOT MATCHED THEN
INSERT * -- when not matched, insert this row without any transformation;
-
```
## Streaming Write
{{< hint info >}}
-Paimon currently supports Spark 3+ for streaming write.
-
Paimon Structured Streaming only supports the two `append` and `complete`
modes.
{{< /hint >}}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 83ddbccfef..2139dca4a9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -208,24 +208,18 @@ public class SchemaManager implements Serializable {
return createTable(schema, false);
}
- public TableSchema createTable(Schema schema, boolean ignoreIfExistsSame)
throws Exception {
+ public TableSchema createTable(Schema schema, boolean externalTable)
throws Exception {
while (true) {
Optional<TableSchema> latest = latest();
if (latest.isPresent()) {
- TableSchema oldSchema = latest.get();
- boolean isSame =
- Objects.equals(oldSchema.fields(), schema.fields())
- && Objects.equals(oldSchema.partitionKeys(),
schema.partitionKeys())
- && Objects.equals(oldSchema.primaryKeys(),
schema.primaryKeys())
- && Objects.equals(oldSchema.options(),
schema.options());
- if (ignoreIfExistsSame && isSame) {
- return oldSchema;
+ TableSchema latestSchema = latest.get();
+ if (externalTable) {
+ checkSchemaForExternalTable(latestSchema, schema);
+ return latestSchema;
+ } else {
+ throw new IllegalStateException(
+ "Schema in filesystem exists, creation is not
allowed.");
}
-
- throw new IllegalStateException(
- "Schema in filesystem exists, please use updating,"
- + " latest schema is: "
- + oldSchema);
}
List<DataField> fields = schema.fields();
@@ -254,6 +248,38 @@ public class SchemaManager implements Serializable {
}
}
+ private void checkSchemaForExternalTable(TableSchema existsSchema, Schema
newSchema) {
+ // When creating an external table, if the table already exists in the
location, we can
+ // choose not to specify the fields.
+ if (newSchema.fields().isEmpty()
+ // When the fields are explicitly specified, we need check for
consistency.
+ || (Objects.equals(existsSchema.fields(), newSchema.fields())
+ && Objects.equals(existsSchema.partitionKeys(),
newSchema.partitionKeys())
+ && Objects.equals(existsSchema.primaryKeys(),
newSchema.primaryKeys()))) {
+ // check for options
+ Map<String, String> existsOptions = existsSchema.options();
+ Map<String, String> newOptions = newSchema.options();
+ newOptions.forEach(
+ (key, value) -> {
+ if (!key.equals(Catalog.OWNER_PROP)
+ && (!existsOptions.containsKey(key)
+ ||
!existsOptions.get(key).equals(value))) {
+ throw new RuntimeException(
+ "New schema's options are not equal to the
exists schema's, new schema: "
+ + newOptions
+ + ", exists schema: "
+ + existsOptions);
+ }
+ });
+ } else {
+ throw new RuntimeException(
+ "New schema is not equal to exists schema, new schema: "
+ + newSchema
+ + ", exists schema: "
+ + existsSchema);
+ }
+ }
+
/** Update {@link SchemaChange}s. */
public TableSchema commitChanges(SchemaChange... changes) throws Exception
{
return commitChanges(Arrays.asList(changes));
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 151e2b4d2c..c74ede9815 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -720,11 +720,7 @@ public class HiveCatalog extends AbstractCatalog {
try {
tableSchema = schemaManager(identifier,
location).createTable(schema, externalTable);
} catch (Exception e) {
- throw new RuntimeException(
- "Failed to commit changes of table "
- + identifier.getFullName()
- + " to underlying files.",
- e);
+ throw new RuntimeException("Failed to create table " +
identifier.getFullName(), e);
}
try {
@@ -735,7 +731,9 @@ public class HiveCatalog extends AbstractCatalog {
identifier, tableSchema, location,
externalTable)));
} catch (Exception e) {
try {
- fileIO.deleteDirectoryQuietly(location);
+ if (!externalTable) {
+ fileIO.deleteDirectoryQuietly(location);
+ }
} catch (Exception ee) {
LOG.error("Delete directory[{}] fail for table {}", location,
identifier, ee);
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 5ad1b13b7c..d6318c723f 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -75,7 +75,6 @@ import static
org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf;
import static org.apache.paimon.spark.utils.CatalogUtils.checkNamespace;
import static org.apache.paimon.spark.utils.CatalogUtils.toIdentifier;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Spark {@link TableCatalog} for paimon. */
public class SparkCatalog extends SparkBaseCatalog implements SupportFunction,
SupportView {
@@ -298,26 +297,8 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction, S
Map<String, String> properties)
throws TableAlreadyExistsException, NoSuchNamespaceException {
try {
- String provider = properties.get(TableCatalog.PROP_PROVIDER);
- if ((!usePaimon(provider))
- &&
SparkSource.FORMAT_NAMES().contains(provider.toLowerCase())) {
- Map<String, String> newProperties = new HashMap<>(properties);
- newProperties.put(TYPE.key(), FORMAT_TABLE.toString());
- newProperties.put(FILE_FORMAT.key(), provider.toLowerCase());
- catalog.createTable(
- toIdentifier(ident),
- toInitialSchema(schema, partitions, newProperties),
- false);
- } else {
- checkArgument(
- usePaimon(provider),
- "SparkCatalog can only create paimon table, but
current provider is %s",
- provider);
- catalog.createTable(
- toIdentifier(ident),
- toInitialSchema(schema, partitions, properties),
- false);
- }
+ catalog.createTable(
+ toIdentifier(ident), toInitialSchema(schema, partitions,
properties), false);
return loadTable(ident);
} catch (Catalog.TableAlreadyExistException e) {
throw new TableAlreadyExistsException(ident);
@@ -406,9 +387,12 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction, S
private Schema toInitialSchema(
StructType schema, Transform[] partitions, Map<String, String>
properties) {
Map<String, String> normalizedProperties = new HashMap<>(properties);
- if (!normalizedProperties.containsKey(TableCatalog.PROP_PROVIDER)) {
- normalizedProperties.put(TableCatalog.PROP_PROVIDER,
SparkSource.NAME());
+ String provider = properties.get(TableCatalog.PROP_PROVIDER);
+ if (!usePaimon(provider) &&
SparkSource.FORMAT_NAMES().contains(provider.toLowerCase())) {
+ normalizedProperties.put(TYPE.key(), FORMAT_TABLE.toString());
+ normalizedProperties.put(FILE_FORMAT.key(),
provider.toLowerCase());
}
+ normalizedProperties.remove(TableCatalog.PROP_PROVIDER);
normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER);
normalizedProperties.remove(TableCatalog.PROP_COMMENT);
if (normalizedProperties.containsKey(TableCatalog.PROP_LOCATION)) {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
index 0170a29f68..d80d7350a6 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
@@ -118,7 +118,7 @@ object SparkSource {
val NAME = "paimon"
- val FORMAT_NAMES = Seq("csv", "orc", "parquet")
+ val FORMAT_NAMES: Seq[String] = Seq("csv", "orc", "parquet")
def toBaseRelation(table: FileStoreTable, _sqlContext: SQLContext):
BaseRelation = {
new BaseRelation {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
index 6ad5274496..3ed2c98306 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
@@ -161,7 +161,7 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
test("Paimon DDL: create table without using paimon") {
withTable("paimon_tbl") {
sql("CREATE TABLE paimon_tbl (id int)")
-
assert(loadTable("paimon_tbl").options().get("provider").equals("paimon"))
+ assert(!loadTable("paimon_tbl").options().containsKey("provider"))
}
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
index e99e4434ef..1189f1f290 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
@@ -326,13 +326,7 @@ abstract class DDLWithHiveCatalogTestBase extends
PaimonHiveTestBase {
spark.sql(
s"CREATE TABLE external_tbl (id INT) USING paimon LOCATION
'$expertTbLocation'")
checkAnswer(spark.sql("SELECT * FROM external_tbl"), Row(1))
- assert(
- loadTable("paimon_db", "external_tbl")
- .location()
- .toString
- .split(':')
- .apply(1)
- .equals(expertTbLocation))
+ assert(getActualTableLocation("paimon_db",
"external_tbl").equals(expertTbLocation))
// create managed table
spark.sql(s"CREATE TABLE managed_tbl (id INT) USING paimon")
@@ -373,12 +367,8 @@ abstract class DDLWithHiveCatalogTestBase extends
PaimonHiveTestBase {
spark.sql("ALTER TABLE external_tbl RENAME TO
external_tbl_renamed")
checkAnswer(spark.sql("SELECT * FROM external_tbl_renamed"),
Row(1))
assert(
- loadTable("paimon_db", "external_tbl_renamed")
- .location()
- .toString
- .split(':')
- .apply(1)
- .equals(expertTbLocation))
+ getActualTableLocation("paimon_db",
"external_tbl_renamed").equals(
+ expertTbLocation))
// create managed table
spark.sql(s"CREATE TABLE managed_tbl (id INT) USING paimon")
@@ -389,12 +379,55 @@ abstract class DDLWithHiveCatalogTestBase extends
PaimonHiveTestBase {
spark.sql("ALTER TABLE managed_tbl RENAME TO
managed_tbl_renamed")
checkAnswer(spark.sql("SELECT * FROM managed_tbl_renamed"),
Row(1))
assert(
- !loadTable("paimon_db", "managed_tbl_renamed")
- .location()
- .toString
- .split(':')
- .apply(1)
- .equals(managedTbLocation.toString))
+ !getActualTableLocation("paimon_db",
"managed_tbl_renamed").equals(
+ managedTbLocation.toString))
+ }
+ }
+ }
+ }
+ }
+
+ test("Paimon DDL with hive catalog: create external table without schema") {
+ Seq(sparkCatalogName, paimonHiveCatalogName).foreach {
+ catalogName =>
+ spark.sql(s"USE $catalogName")
+ withTempDir {
+ tbLocation =>
+ withDatabase("paimon_db") {
+ spark.sql(s"CREATE DATABASE IF NOT EXISTS paimon_db")
+ spark.sql(s"USE paimon_db")
+ withTable("t1", "t2", "t3", "t4", "t5") {
+ val expertTbLocation = tbLocation.getCanonicalPath
+ spark.sql(s"""
+ |CREATE TABLE t1 (id INT, pt INT) USING paimon
+ |PARTITIONED BY (pt)
+ |TBLPROPERTIES('primary-key' = 'id', 'k1' = 'v1')
+ |LOCATION '$expertTbLocation'
+ |""".stripMargin)
+ spark.sql("INSERT INTO t1 VALUES (1, 1)")
+
+ // create table without schema
+ spark.sql(s"CREATE TABLE t2 USING paimon LOCATION
'$expertTbLocation'")
+ checkAnswer(spark.sql("SELECT * FROM t2"), Row(1, 1))
+ assert(getActualTableLocation("paimon_db",
"t2").equals(expertTbLocation))
+
+ // create table with wrong schema
+ intercept[Exception] {
+ spark.sql(
+ s"CREATE TABLE t3 (fake_col INT) USING paimon LOCATION
'$expertTbLocation'")
+ }
+
+ // create table with exists props
+ spark.sql(
+ s"CREATE TABLE t4 USING paimon TBLPROPERTIES ('k1' = 'v1')
LOCATION '$expertTbLocation'")
+ checkAnswer(spark.sql("SELECT * FROM t4"), Row(1, 1))
+ assert(getActualTableLocation("paimon_db",
"t4").equals(expertTbLocation))
+
+ // create table with new props
+ intercept[Exception] {
+ spark.sql(
+ s"CREATE TABLE t5 USING paimon TBLPROPERTIES ('k2' = 'v2')
LOCATION '$expertTbLocation'")
+ }
}
}
}
@@ -445,4 +478,8 @@ abstract class DDLWithHiveCatalogTestBase extends
PaimonHiveTestBase {
.toMap
tableProps("path").split(":")(1)
}
+
+ def getActualTableLocation(dbName: String, tblName: String): String = {
+ loadTable(dbName, tblName).location().toString.split(':').apply(1)
+ }
}