This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6a7c8a6f3f [core][spark] Support history-preserving replace table
(#7860)
6a7c8a6f3f is described below
commit 6a7c8a6f3fb0c9153e6fbde4d8798d825e7a7f73
Author: Zouxxyy <[email protected]>
AuthorDate: Wed May 20 19:30:39 2026 +0800
[core][spark] Support history-preserving replace table (#7860)
Support Spark replace-table flows for Paimon tables while preserving
table history where possible.
This change adds staged replace support for `REPLACE TABLE` and `CREATE
OR REPLACE TABLE ... AS SELECT`, using truncate plus schema replacement
for compatible FileStore tables so old snapshots remain available for
time travel. It also falls back to drop-and-create when the source or
target is not a Paimon table, handles SparkGenericCatalog provider
checks, and documents the Spark SQL behavior.
---
docs/content/spark/sql-ddl.md | 56 +++++++
.../main/java/org/apache/paimon/rest/RESTApi.java | 19 +++
.../java/org/apache/paimon/rest/ResourcePaths.java | 11 ++
.../paimon/rest/requests/ReplaceTableRequest.java | 47 ++++++
.../org/apache/paimon/catalog/AbstractCatalog.java | 90 +++++++++++
.../org/apache/paimon/catalog/CachingCatalog.java | 8 +
.../java/org/apache/paimon/catalog/Catalog.java | 19 +++
.../org/apache/paimon/catalog/DelegateCatalog.java | 6 +
.../apache/paimon/catalog/FileSystemCatalog.java | 14 ++
.../java/org/apache/paimon/rest/RESTCatalog.java | 22 +++
.../org/apache/paimon/catalog/CatalogTestBase.java | 94 +++++++++++
.../org/apache/paimon/jdbc/JdbcCatalogTest.java | 6 +
.../org/apache/paimon/rest/RESTCatalogServer.java | 39 +++++
.../org/apache/paimon/rest/RESTCatalogTest.java | 5 +
.../java/org/apache/paimon/hive/HiveCatalog.java | 35 +++++
.../shim/PaimonCreateTableAsSelectStrategy.scala | 76 ++++-----
.../shim/PaimonReplaceTableAsSelectStrategy.scala} | 26 ++--
.../shim/PaimonCreateTableAsSelectStrategy.scala | 63 +++-----
.../shim/PaimonReplaceTableAsSelectStrategy.scala} | 26 ++--
.../shim/PaimonCreateTableAsSelectStrategy.scala | 63 +++-----
.../shim/PaimonReplaceTableAsSelectStrategy.scala | 168 ++++++++++++++++++++
.../shim/PaimonCreateTableAsSelectStrategy.scala | 90 -----------
.../apache/spark/sql/paimon/shims/Spark4Shim.scala | 105 ++++++++++++-
.../apache/paimon/spark/RollbackStagedTable.java | 173 +++++++++++++++++++++
.../java/org/apache/paimon/spark/SparkCatalog.java | 100 ++++++++++++
.../apache/paimon/spark/SparkGenericCatalog.java | 49 ++++++
.../paimon/spark/catalog/SparkBaseCatalog.java | 7 +-
.../paimon/spark/execution/PaimonStrategy.scala | 10 +-
.../spark/sql/execution/PaimonStrategyHelper.scala | 25 ++-
.../shim/PaimonCreateTableAsSelectStrategy.scala | 61 +++-----
.../shim/PaimonReplaceTableAsSelectStrategy.scala | 151 ++++++++++++++++++
.../apache/spark/sql/paimon/shims/SparkShim.scala | 50 +++++-
.../org/apache/paimon/spark/sql/DDLTestBase.scala | 157 +++++++++++++++++++
.../spark/sql/DDLWithHiveCatalogTestBase.scala | 135 ++++++++++++++++
.../paimon/spark/sql/DataFrameWriteTestBase.scala | 86 ++++++++++
.../paimon/spark/sql/FormatTableTestBase.scala | 44 ++++++
.../sql/ReplaceTableWithRestCatalogTest.scala | 81 ++++++++++
.../apache/spark/sql/paimon/shims/Spark3Shim.scala | 107 ++++++++++++-
.../apache/spark/sql/paimon/shims/Spark4Shim.scala | 105 ++++++++++++-
39 files changed, 2124 insertions(+), 305 deletions(-)
diff --git a/docs/content/spark/sql-ddl.md b/docs/content/spark/sql-ddl.md
index 6ca8831c15..42b101896f 100644
--- a/docs/content/spark/sql-ddl.md
+++ b/docs/content/spark/sql-ddl.md
@@ -285,6 +285,62 @@ CREATE TABLE my_table_all (
CREATE TABLE my_table_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key'
= 'dt,hh') AS SELECT * FROM my_table_all;
```
+### Replace Table
+
+Paimon supports preserving snapshot history for Spark `REPLACE TABLE` from
**Spark 3.4**.
+
+```sql
+CREATE TABLE my_table (
+ user_id BIGINT,
+ item_id BIGINT,
+ behavior STRING
+) TBLPROPERTIES (
+ 'primary-key' = 'user_id',
+ 'bucket' = '2'
+);
+
+INSERT INTO my_table VALUES (1, 10, 'pv');
+
+REPLACE TABLE my_table (
+ user_id BIGINT,
+ item_id BIGINT,
+ category STRING
+) TBLPROPERTIES (
+ 'primary-key' = 'user_id',
+ 'bucket' = '4'
+);
+```
+
+In Paimon, this is not an atomic replacement. Paimon changes Spark's
drop+create replace path to
+truncate the current table and commit a new schema, while preserving the table
location and snapshot
+history. The current table becomes empty and uses the new schema, but old
snapshots can still be
+queried by time travel.
+
+```sql
+SELECT * FROM my_table;
+
+SELECT * FROM my_table VERSION AS OF 1;
+```
+
+`REPLACE TABLE` requires the table to exist. If the table does not exist, use
+`CREATE OR REPLACE TABLE` instead.
+
+`REPLACE TABLE` does not accept `AS SELECT`. To replace a table and populate
it with query results,
+use `CREATE OR REPLACE TABLE ... AS SELECT`.
+
+```sql
+CREATE OR REPLACE TABLE my_table
+TBLPROPERTIES (
+ 'primary-key' = 'user_id',
+ 'bucket' = '4'
+)
+AS SELECT user_id, item_id, behavior FROM source_table;
+```
+
+When the existing table and target table use different table types,
+uses its fallback drop+create behavior instead of snapshot-preserving replace
+behavior.
+
### Create Table Like
A new table can be created from an existing source table. Available from
**Spark 3.4**.
diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
index 88761ca5f0..7433a30388 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
@@ -51,6 +51,7 @@ import
org.apache.paimon.rest.requests.MarkDonePartitionsRequest;
import org.apache.paimon.rest.requests.RegisterTableRequest;
import org.apache.paimon.rest.requests.RenameBranchRequest;
import org.apache.paimon.rest.requests.RenameTableRequest;
+import org.apache.paimon.rest.requests.ReplaceTableRequest;
import org.apache.paimon.rest.requests.ResetConsumerRequest;
import org.apache.paimon.rest.requests.RollbackSchemaRequest;
import org.apache.paimon.rest.requests.RollbackTableRequest;
@@ -778,6 +779,24 @@ public class RESTApi {
restAuthFunction);
}
+ /**
+ * Replace table.
+ *
+ * @param identifier database name and table name.
+ * @param schema schema to replace table
+ * @throws NoSuchResourceException Exception thrown on HTTP 404 means the
table not exists
+ * @throws ForbiddenException Exception thrown on HTTP 403 means don't
have the permission for
+ * this table
+ */
+ public void replaceTable(Identifier identifier, Schema schema) {
+ ReplaceTableRequest request = new ReplaceTableRequest(schema);
+ client.post(
+ resourcePaths.replaceTable(
+ identifier.getDatabaseName(),
identifier.getObjectName()),
+ request,
+ restAuthFunction);
+ }
+
/**
* Auth table query.
*
diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
index 6d1d16c82f..66a1653232 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
@@ -98,6 +98,17 @@ public class ResourcePaths {
return SLASH.join(V1, prefix, TABLES, "rename");
}
+ public String replaceTable(String databaseName, String objectName) {
+ return SLASH.join(
+ V1,
+ prefix,
+ DATABASES,
+ encodeString(databaseName),
+ TABLES,
+ encodeString(objectName),
+ "replace");
+ }
+
public String commitTable(String databaseName, String objectName) {
return SLASH.join(
V1,
diff --git
a/paimon-api/src/main/java/org/apache/paimon/rest/requests/ReplaceTableRequest.java
b/paimon-api/src/main/java/org/apache/paimon/rest/requests/ReplaceTableRequest.java
new file mode 100644
index 0000000000..526724a313
--- /dev/null
+++
b/paimon-api/src/main/java/org/apache/paimon/rest/requests/ReplaceTableRequest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.requests;
+
+import org.apache.paimon.rest.RESTRequest;
+import org.apache.paimon.schema.Schema;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/** Request for replacing table. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ReplaceTableRequest implements RESTRequest {
+
+ private static final String FIELD_SCHEMA = "schema";
+
+ @JsonProperty(FIELD_SCHEMA)
+ private final Schema schema;
+
+ @JsonCreator
+ public ReplaceTableRequest(@JsonProperty(FIELD_SCHEMA) Schema schema) {
+ this.schema = schema;
+ }
+
+ @JsonGetter(FIELD_SCHEMA)
+ public Schema getSchema() {
+ return schema;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 9512e2f767..4a8a8b1b91 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -21,6 +21,7 @@ package org.apache.paimon.catalog;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.PagedList;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.TableType;
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
@@ -42,6 +43,7 @@ import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Instant;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableSnapshot;
+import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.SnapshotNotExistException;
@@ -500,6 +502,94 @@ public abstract class AbstractCatalog implements Catalog {
protected abstract void alterTableImpl(Identifier identifier,
List<SchemaChange> changes)
throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException;
+ @Override
+ public void replaceTable(Identifier identifier, Schema newSchema, boolean
ignoreIfNotExists)
+ throws TableNotExistException {
+ checkNotBranch(identifier, "replaceTable");
+ checkNotSystemTable(identifier, "replaceTable");
+ validateCreateTable(newSchema, false);
+ validateCustomTablePath(newSchema.options());
+ copyTableDefaultOptions(newSchema.options());
+
+ Table existing;
+ try {
+ existing = getTable(identifier);
+ } catch (TableNotExistException e) {
+ if (ignoreIfNotExists) {
+ return;
+ }
+ throw e;
+ }
+
+ TableType targetTableType =
Options.fromMap(newSchema.options()).get(TYPE);
+ if (!(existing instanceof FileStoreTable) ||
!targetTableType.equals(TableType.TABLE)) {
+ dropAndCreateTable(identifier, newSchema);
+ return;
+ }
+
+ // todo: support this
+ List<String> oldPartitionKeys = ((FileStoreTable)
existing).schema().partitionKeys();
+ List<String> newPartitionKeys = newSchema.partitionKeys();
+ if (!Objects.equals(oldPartitionKeys, newPartitionKeys)) {
+ throw new UnsupportedOperationException(
+ "replaceTable does not support changing partition keys
(old="
+ + oldPartitionKeys
+ + ", new="
+ + newPartitionKeys
+ + "). Drop and re-create the table instead.");
+ }
+
+ replaceTableImpl(identifier, (FileStoreTable) existing, newSchema);
+ }
+
+ private void dropAndCreateTable(Identifier identifier, Schema newSchema)
+ throws TableNotExistException {
+ dropTable(identifier, false);
+ try {
+ createTable(identifier, newSchema, false);
+ } catch (TableAlreadyExistException | DatabaseNotExistException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** Truncate visible data first, then append the new schema. Non-atomic on
failure. */
+ protected void replaceTableImpl(
+ Identifier identifier, FileStoreTable existingTable, Schema
newSchema)
+ throws TableNotExistException {
+ truncateTable(existingTable);
+ try {
+ appendNewSchema(existingTable, newSchema);
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** Append a new schema (id = latest + 1) via atomic CAS. Returns the new
schema id. */
+ protected long appendNewSchema(FileStoreTable existingTable, Schema
newSchema)
+ throws Exception {
+ SchemaManager sm = existingTable.schemaManager();
+ while (true) {
+ TableSchema latest = sm.latestOrThrow("Cannot replace: schema
chain is empty.");
+ TableSchema staged = TableSchema.create(latest.id() + 1,
newSchema);
+ if (sm.commit(staged)) {
+ return staged.id();
+ }
+ }
+ }
+
+ protected void truncateTable(FileStoreTable existingTable) {
+ try (TableCommitImpl commit =
+ existingTable.newCommit("replace-table-" +
java.util.UUID.randomUUID())) {
+ commit.truncateTable();
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Override
public Table getTable(Identifier identifier) throws TableNotExistException
{
return CatalogUtils.loadTable(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index c779db7d20..9fc7b124f9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -24,6 +24,7 @@ import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.partition.PartitionStatistics;
+import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
@@ -224,6 +225,13 @@ public class CachingCatalog extends DelegateCatalog {
invalidateTable(identifier);
}
+ @Override
+ public void replaceTable(Identifier identifier, Schema newSchema, boolean
ignoreIfNotExists)
+ throws TableNotExistException {
+ super.replaceTable(identifier, newSchema, ignoreIfNotExists);
+ invalidateTable(identifier);
+ }
+
@Override
public Table getTable(Identifier identifier) throws TableNotExistException
{
// For system table, do not cache it directly. Instead, cache the
origin table and then wrap
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index ab66f937ea..57fa040a2a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -358,6 +358,25 @@ public interface Catalog extends AutoCloseable {
alterTable(identifier, Collections.singletonList(change),
ignoreIfNotExists);
}
+ /**
+ * Replace an existing table with a new {@link Schema}.
+ *
+ * <p>For compatible FileStore tables, it truncates visible data and
appends a new schema to the
+ * schema chain, preserving old schemas and snapshots for time travel.
Other table kinds may
+ * fall back to drop-and-create behavior.
+ *
+ * @param identifier path of the table to be replaced
+ * @param newSchema the new {@link Schema}
+ * @param ignoreIfNotExists if true, do nothing when the table does not
exist
+ * @throws TableNotExistException if the table does not exist and {@code
ignoreIfNotExists} is
+ * false
+ */
+ default void replaceTable(Identifier identifier, Schema newSchema, boolean
ignoreIfNotExists)
+ throws TableNotExistException {
+ throw new UnsupportedOperationException(
+ "Catalog " + getClass().getName() + " does not support
replaceTable.");
+ }
+
// ======================= partition methods
===============================
/**
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index ec5138a8cb..0f18f7d045 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -171,6 +171,12 @@ public abstract class DelegateCatalog implements Catalog {
wrapped.alterTable(identifier, changes, ignoreIfNotExists);
}
+ @Override
+ public void replaceTable(Identifier identifier, Schema newSchema, boolean
ignoreIfNotExists)
+ throws TableNotExistException {
+ wrapped.replaceTable(identifier, newSchema, ignoreIfNotExists);
+ }
+
@Override
public void registerTable(Identifier identifier, String path)
throws TableAlreadyExistException {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index cea266f703..b0b8b1ac7f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -25,6 +25,7 @@ import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -179,6 +180,19 @@ public class FileSystemCatalog extends AbstractCatalog {
}
}
+ @Override
+ protected void replaceTableImpl(
+ Identifier identifier, FileStoreTable existingTable, Schema
newSchema) {
+ truncateTable(existingTable);
+ try {
+ runWithLock(identifier, () -> appendNewSchema(existingTable,
newSchema));
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
protected static <T> T uncheck(Callable<T> callable) {
try {
return callable.call();
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 94442d5c30..7e2f6cfd27 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -589,6 +589,28 @@ public class RESTCatalog implements Catalog {
}
}
+ @Override
+ public void replaceTable(Identifier identifier, Schema newSchema, boolean
ignoreIfNotExists)
+ throws TableNotExistException {
+ checkNotBranch(identifier, "replaceTable");
+ checkNotSystemTable(identifier, "replaceTable");
+ validateCreateTable(newSchema, dataTokenEnabled);
+ try {
+ tableDefaultOptions.forEach(newSchema.options()::putIfAbsent);
+ api.replaceTable(identifier, newSchema);
+ } catch (NoSuchResourceException e) {
+ if (!ignoreIfNotExists) {
+ throw new TableNotExistException(identifier);
+ }
+ } catch (NotImplementedException e) {
+ throw new UnsupportedOperationException(e.getMessage());
+ } catch (ForbiddenException e) {
+ throw new TableNoPermissionException(identifier, e);
+ } catch (BadRequestException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+
@Override
public TableQueryAuthResult authTableQuery(Identifier identifier,
@Nullable List<String> select)
throws TableNotExistException {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 4f4aed63fa..07539d914f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -1171,6 +1171,96 @@ public abstract class CatalogTestBase {
baseAlterTable(initOptions);
}
+ @Test
+ public void testReplaceTable() throws Exception {
+ if (!supportsReplaceTable()) {
+ return;
+ }
+ catalog.createDatabase("replace_db", true);
+ Identifier identifier = Identifier.create("replace_db", "t");
+
+ Schema initialSchema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("data", DataTypes.STRING())
+ .column("pt", DataTypes.STRING())
+ .partitionKeys("pt")
+ .primaryKey("id", "pt")
+ .option("bucket", "2")
+ .build();
+ catalog.createTable(identifier, initialSchema, false);
+
+ Table created = catalog.getTable(identifier);
+ String oldLocation = ((FileStoreTable) created).location().toString();
+ BatchWriteBuilder writeBuilder = created.newBatchWriteBuilder();
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(
+ GenericRow.of(1, BinaryString.fromString("old"),
BinaryString.fromString("a")));
+ commit.commit(write.prepareCommit());
+ }
+
+ long oldSnapshotId =
+ ((FileStoreTable) catalog.getTable(identifier))
+ .snapshotManager()
+ .latestSnapshotId();
+
+ // Replace with new PK + bucket (partition keys unchanged)
+ Schema newSchema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("pt", DataTypes.STRING())
+ .partitionKeys("pt")
+ .primaryKey("id", "pt")
+ .option("bucket", "4")
+ .build();
+ catalog.replaceTable(identifier, newSchema, false);
+
+ FileStoreTable replaced = (FileStoreTable)
catalog.getTable(identifier);
+ assertThat(replaced.partitionKeys()).containsExactly("pt");
+ assertThat(replaced.primaryKeys()).containsExactly("id", "pt");
+ assertThat(replaced.options().get("bucket")).isEqualTo("4");
+ assertThat(replaced.location().toString()).isEqualTo(oldLocation);
+ assertThat(read(replaced, null, null, null, null)).isEmpty();
+
+ // Time-travel to old snapshot still returns old data with old schema
+ FileStoreTable oldView =
+ replaced.copy(Collections.singletonMap("scan.snapshot-id", ""
+ oldSnapshotId));
+ assertThat(oldView.schema().fieldNames()).containsExactly("id",
"data", "pt");
+ assertThat(read(oldView, null, null, null, null)).hasSize(1);
+
+ // Changing partition keys is rejected
+ Schema changePartitionKeys =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("pt", DataTypes.STRING())
+ .primaryKey("id")
+ .option("bucket", "4")
+ .build();
+ assertThatExceptionOfType(UnsupportedOperationException.class)
+ .isThrownBy(() -> catalog.replaceTable(identifier,
changePartitionKeys, false))
+ .withMessageContaining("partition keys");
+
+ // ignoreIfNotExists = true: missing table is silently skipped
+ Identifier missing = Identifier.create("replace_db", "missing");
+ catalog.replaceTable(missing, newSchema, true);
+
+ // ignoreIfNotExists = false: missing table throws
+ assertThatExceptionOfType(Catalog.TableNotExistException.class)
+ .isThrownBy(() -> catalog.replaceTable(missing, newSchema,
false));
+
+ // System table is rejected
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ catalog.replaceTable(
+ Identifier.create("replace_db",
"$system_table"),
+ newSchema,
+ false));
+ }
+
@Test
public void testView() throws Exception {
if (!supportsView()) {
@@ -1658,6 +1748,10 @@ public abstract class CatalogTestBase {
return true;
}
+ protected boolean supportsReplaceTable() {
+ return true;
+ }
+
protected void checkPartition(Partition expected, Partition actual) {
assertThat(actual).isEqualTo(expected);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
index 2b121dd4cd..fd3c6fdc59 100644
--- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
@@ -140,6 +140,12 @@ public class JdbcCatalogTest extends CatalogTestBase {
return true;
}
+ @Override
+ protected boolean supportsReplaceTable() {
+ // jdbc lock interferes with the test data commit; replace path itself
works at runtime
+ return false;
+ }
+
@Test
public void testRepairTableNotExist() throws Exception {
String databaseName = "repair_db";
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 51d2f0cf13..74e4f4e465 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -63,6 +63,7 @@ import
org.apache.paimon.rest.requests.ListPartitionsByNamesRequest;
import org.apache.paimon.rest.requests.MarkDonePartitionsRequest;
import org.apache.paimon.rest.requests.RenameBranchRequest;
import org.apache.paimon.rest.requests.RenameTableRequest;
+import org.apache.paimon.rest.requests.ReplaceTableRequest;
import org.apache.paimon.rest.requests.ResetConsumerRequest;
import org.apache.paimon.rest.requests.RollbackSchemaRequest;
import org.apache.paimon.rest.requests.RollbackTableRequest;
@@ -429,6 +430,10 @@ public class RESTCatalogServer {
resources.length == 4
&&
ResourcePaths.TABLES.equals(resources[1])
&&
"rollback-schema".equals(resources[3]);
+ boolean isReplaceTable =
+ resources.length == 4
+ &&
ResourcePaths.TABLES.equals(resources[1])
+ && "replace".equals(resources[3]);
boolean isPartitions =
resources.length == 4
&&
ResourcePaths.TABLES.equals(resources[1])
@@ -552,6 +557,8 @@ public class RESTCatalogServer {
}
} else if (isRollbackSchema) {
return rollbackSchemaHandle(identifier,
restAuthParameter.data());
+ } else if (isReplaceTable) {
+ return replaceTableHandle(identifier,
restAuthParameter.data());
} else if (isTable) {
return tableHandle(
restAuthParameter.method(),
@@ -1754,6 +1761,38 @@ public class RESTCatalogServer {
}
}
+ private MockResponse replaceTableHandle(Identifier identifier, String
data) throws Exception {
+ ReplaceTableRequest requestBody = RESTApi.fromJson(data,
ReplaceTableRequest.class);
+ Schema newSchema = requestBody.getSchema();
+ if (!tableMetadataStore.containsKey(identifier.getFullName())) {
+ throw new Catalog.TableNotExistException(identifier);
+ }
+ TableMetadata tableMetadata =
tableMetadataStore.get(identifier.getFullName());
+ if (isFormatTable(tableMetadata.schema().toSchema()) ||
isFormatTable(newSchema)) {
+ throw new UnsupportedOperationException("replaceTable does not
support format tables.");
+ }
+ catalog.replaceTable(identifier, newSchema, false);
+ TableSchema replacedSchema = catalog.loadTableSchema(identifier);
+ TableMetadata newTableMetadata =
+ createTableMetadata(
+ identifier,
+ replacedSchema.id(),
+ replacedSchema.toSchema(),
+ tableMetadata.uuid(),
+ tableMetadata.isExternal());
+ tableMetadataStore.put(identifier.getFullName(), newTableMetadata);
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+ Snapshot truncateSnapshot = table.snapshotManager().latestSnapshot();
+ if (truncateSnapshot != null) {
+ tableLatestSnapshotStore.put(
+ identifier.getFullName(), new
TableSnapshot(truncateSnapshot, 0L, 0L, 0L, 0L));
+ } else {
+ tableLatestSnapshotStore.remove(identifier.getFullName());
+ }
+ tablePartitionsStore.remove(identifier.getFullName());
+ return new MockResponse().setResponseCode(200);
+ }
+
private MockResponse renameTableHandle(String data) throws Exception {
RenameTableRequest requestBody = RESTApi.fromJson(data,
RenameTableRequest.class);
Identifier fromTable = requestBody.getSource();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index a07c6eaed2..8cefa2ec7b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -3173,6 +3173,11 @@ public abstract class RESTCatalogTest extends
CatalogTestBase {
return true;
}
+ @Override
+ protected boolean supportsReplaceTable() {
+ return true;
+ }
+
// TODO implement this
@Override
@Test
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index ac0706b40b..d32ec08648 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -1296,6 +1296,41 @@ public class HiveCatalog extends AbstractCatalog {
clients().execute(client -> HiveAlterTableUtils.alterTable(client,
identifier, table));
}
+ @Override
+ protected void replaceTableImpl(
+ Identifier identifier, FileStoreTable existingTable, Schema
newSchema)
+ throws TableNotExistException {
+ Table hmsTable = getHmsTable(identifier);
+ if (!isPaimonTable(hmsTable)) {
+ throw new UnsupportedOperationException("Only data table support
replaceTable.");
+ }
+
+ truncateTable(existingTable);
+
+ SchemaManager schemaManager = existingTable.schemaManager();
+ long newSchemaId;
+ try {
+ newSchemaId = runWithLock(identifier, () ->
appendNewSchema(existingTable, newSchema));
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to replaceTable " +
identifier.getFullName(), e);
+ }
+
+ // currently only changes to main branch affect metastore
+ if (!DEFAULT_MAIN_BRANCH.equals(identifier.getBranchNameOrDefault())) {
+ return;
+ }
+
+ try {
+ TableSchema newTableSchema = schemaManager.schema(newSchemaId);
+ alterTableToHms(hmsTable, identifier, newTableSchema,
Collections.emptySet());
+ } catch (Exception te) {
+ schemaManager.deleteSchema(newSchemaId);
+ throw new RuntimeException(te);
+ }
+ }
+
@Override
public boolean caseSensitive() {
return options.getOptional(CASE_SENSITIVE).orElse(false);
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
index cc6258e6eb..96f194668f 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
@@ -18,14 +18,13 @@
package org.apache.spark.sql.execution.shim
-import org.apache.paimon.CoreOptions
-import org.apache.paimon.iceberg.IcebergOptions
+import org.apache.paimon.spark.SparkCatalog
import org.apache.paimon.spark.catalog.FormatTableCatalog
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect,
LogicalPlan}
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util,
StagingTableCatalog}
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.connector.catalog.CatalogV2Util
+import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan}
import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -34,47 +33,40 @@ import scala.collection.JavaConverters._
case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) extends
Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case CreateTableAsSelect(catalog, ident, parts, query, props, options,
ifNotExists) =>
- catalog match {
- case _: StagingTableCatalog =>
- throw new RuntimeException("Paimon can't extend StagingTableCatalog
for now.")
- case _ =>
- val coreOptionKeys =
CoreOptions.getOptions.asScala.map(_.key()).toSeq
+ case CreateTableAsSelect(
+ catalog: SparkCatalog,
+ ident,
+ parts,
+ query,
+ props,
+ options,
+ ifNotExists) =>
+ val (tableOptions, writeOptions) =
PaimonStrategyHelper.splitTableAndWriteOptions(options)
+ val newProps = CatalogV2Util.withDefaultOwnership(props) ++ tableOptions
- // Include Iceberg compatibility options in table properties (fix
for DataFrame writer options)
- val icebergOptionKeys =
IcebergOptions.getOptions.asScala.map(_.key()).toSeq
-
- val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys
-
- val (tableOptions, writeOptions) = options.partition {
- case (key, _) => allTableOptionKeys.contains(key)
- }
- val newProps = CatalogV2Util.withDefaultOwnership(props) ++
tableOptions
-
- val isPartitionedFormatTable = {
- catalog match {
- case catalog: FormatTableCatalog =>
- catalog.isFormatTable(newProps.get("provider").orNull) &&
parts.nonEmpty
- case _ => false
- }
- }
-
- if (isPartitionedFormatTable) {
- throw new UnsupportedOperationException(
- "Using CTAS with partitioned format table is not supported yet.")
- }
+ val isPartitionedFormatTable = {
+ catalog match {
+ case formatCatalog: FormatTableCatalog =>
+ formatCatalog.isFormatTable(newProps.get("provider").orNull) &&
parts.nonEmpty
+ case _ => false
+ }
+ }
- CreateTableAsSelectExec(
- catalog,
- ident,
- parts,
- query,
- planLater(query),
- newProps,
- new CaseInsensitiveStringMap(writeOptions.asJava),
- ifNotExists
- ) :: Nil
+ if (isPartitionedFormatTable) {
+ throw new UnsupportedOperationException(
+ "Using CTAS with partitioned format table is not supported yet.")
}
+
+ CreateTableAsSelectExec(
+ catalog,
+ ident,
+ parts,
+ query,
+ planLater(query),
+ newProps,
+ new CaseInsensitiveStringMap(writeOptions.asJava),
+ ifNotExists
+ ) :: Nil
case _ => Nil
}
}
diff --git
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala
similarity index 53%
copy from
paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala
copy to
paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala
index 9fb3a7b54a..d637c006da 100644
---
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala
@@ -16,26 +16,18 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution
+package org.apache.spark.sql.execution.shim
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.CatalogUtils
-import org.apache.spark.sql.catalyst.plans.logical.TableSpec
-import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
+import org.apache.spark.sql.{SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
-trait PaimonStrategyHelper {
+case class PaimonReplaceTableAsSelectStrategy(spark: SparkSession) extends
Strategy {
- def spark: SparkSession
-
- protected def makeQualifiedDBObjectPath(location: String): String = {
- CatalogUtils.makeQualifiedDBObjectPath(
- spark.sharedState.conf.get(WAREHOUSE_PATH),
- location,
- spark.sharedState.hadoopConf)
- }
+ override def apply(plan: LogicalPlan): Seq[SparkPlan] = Nil
+}
- protected def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = {
- tableSpec.copy(location =
tableSpec.location.map(makeQualifiedDBObjectPath(_)))
- }
+case class PaimonReplaceTableStrategy(spark: SparkSession) extends Strategy {
+ override def apply(plan: LogicalPlan): Seq[SparkPlan] = Nil
}
diff --git
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
index a09996f153..42d6fe2816 100644
---
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
+++
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
@@ -18,15 +18,12 @@
package org.apache.spark.sql.execution.shim
-import org.apache.paimon.CoreOptions
-import org.apache.paimon.iceberg.IcebergOptions
import org.apache.paimon.spark.SparkCatalog
import org.apache.paimon.spark.catalog.FormatTableCatalog
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.analysis.ResolvedDBObjectName
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect,
LogicalPlan, TableSpec}
-import org.apache.spark.sql.connector.catalog.StagingTableCatalog
import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan}
import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -47,46 +44,32 @@ case class PaimonCreateTableAsSelectStrategy(spark:
SparkSession)
tableSpec: TableSpec,
options,
ifNotExists) =>
- catalog match {
- case _: StagingTableCatalog =>
- throw new RuntimeException("Paimon can't extend StagingTableCatalog
for now.")
- case _ =>
- val coreOptionKeys =
CoreOptions.getOptions.asScala.map(_.key()).toSeq
+ val (tableOptions, writeOptions) =
PaimonStrategyHelper.splitTableAndWriteOptions(options)
+ val qualifiedSpec = qualifyTableSpec(tableSpec, tableOptions)
- // Include Iceberg compatibility options in table properties (fix
for DataFrame writer options)
- val icebergOptionKeys =
IcebergOptions.getOptions.asScala.map(_.key()).toSeq
-
- val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys
-
- val (tableOptions, writeOptions) = options.partition {
- case (key, _) => allTableOptionKeys.contains(key)
- }
- val newTableSpec = tableSpec.copy(properties = tableSpec.properties
++ tableOptions)
-
- val isPartitionedFormatTable = {
- catalog match {
- case catalog: FormatTableCatalog =>
- catalog.isFormatTable(newTableSpec.provider.orNull) &&
parts.nonEmpty
- case _ => false
- }
- }
-
- if (isPartitionedFormatTable) {
- throw new UnsupportedOperationException(
- "Using CTAS with partitioned format table is not supported yet.")
- }
+ val isPartitionedFormatTable = {
+ catalog match {
+ case formatCatalog: FormatTableCatalog =>
+ formatCatalog.isFormatTable(qualifiedSpec.provider.orNull) &&
parts.nonEmpty
+ case _ => false
+ }
+ }
- CreateTableAsSelectExec(
- catalog.asTableCatalog,
- ident.asIdentifier,
- parts,
- query,
- planLater(query),
- qualifyLocInTableSpec(newTableSpec),
- new CaseInsensitiveStringMap(writeOptions.asJava),
- ifNotExists
- ) :: Nil
+ if (isPartitionedFormatTable) {
+ throw new UnsupportedOperationException(
+ "Using CTAS with partitioned format table is not supported yet.")
}
+
+ CreateTableAsSelectExec(
+ catalog.asTableCatalog,
+ ident.asIdentifier,
+ parts,
+ query,
+ planLater(query),
+ qualifiedSpec,
+ new CaseInsensitiveStringMap(writeOptions.asJava),
+ ifNotExists
+ ) :: Nil
case _ => Nil
}
}
diff --git
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala
similarity index 53%
rename from
paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala
rename to
paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala
index 9fb3a7b54a..d637c006da 100644
---
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala
+++
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala
@@ -16,26 +16,18 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution
+package org.apache.spark.sql.execution.shim
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.CatalogUtils
-import org.apache.spark.sql.catalyst.plans.logical.TableSpec
-import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
+import org.apache.spark.sql.{SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
-trait PaimonStrategyHelper {
+case class PaimonReplaceTableAsSelectStrategy(spark: SparkSession) extends
Strategy {
- def spark: SparkSession
-
- protected def makeQualifiedDBObjectPath(location: String): String = {
- CatalogUtils.makeQualifiedDBObjectPath(
- spark.sharedState.conf.get(WAREHOUSE_PATH),
- location,
- spark.sharedState.hadoopConf)
- }
+ override def apply(plan: LogicalPlan): Seq[SparkPlan] = Nil
+}
- protected def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = {
- tableSpec.copy(location =
tableSpec.location.map(makeQualifiedDBObjectPath(_)))
- }
+case class PaimonReplaceTableStrategy(spark: SparkSession) extends Strategy {
+ override def apply(plan: LogicalPlan): Seq[SparkPlan] = Nil
}
diff --git
a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
index 4a82f35188..14f540f3e1 100644
---
a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
+++
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
@@ -18,15 +18,12 @@
package org.apache.spark.sql.execution.shim
-import org.apache.paimon.CoreOptions
-import org.apache.paimon.iceberg.IcebergOptions
import org.apache.paimon.spark.SparkCatalog
import org.apache.paimon.spark.catalog.FormatTableCatalog
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect,
LogicalPlan, TableSpec}
-import org.apache.spark.sql.connector.catalog.StagingTableCatalog
import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan}
import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -49,46 +46,32 @@ case class PaimonCreateTableAsSelectStrategy(spark:
SparkSession)
ifNotExists,
analyzedQuery) =>
assert(analyzedQuery.isDefined)
- catalog match {
- case _: StagingTableCatalog =>
- throw new RuntimeException("Paimon can't extend StagingTableCatalog
for now.")
- case _ =>
- val coreOptionKeys =
CoreOptions.getOptions.asScala.map(_.key()).toSeq
+ val (tableOptions, writeOptions) =
PaimonStrategyHelper.splitTableAndWriteOptions(options)
+ val qualifiedSpec = qualifyTableSpec(tableSpec, tableOptions)
- // Include Iceberg compatibility options in table properties (fix
for DataFrame writer options)
- val icebergOptionKeys =
IcebergOptions.getOptions.asScala.map(_.key()).toSeq
-
- val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys
-
- val (tableOptions, writeOptions) = options.partition {
- case (key, _) => allTableOptionKeys.contains(key)
- }
- val newTableSpec = tableSpec.copy(properties = tableSpec.properties
++ tableOptions)
-
- val isPartitionedFormatTable = {
- catalog match {
- case catalog: FormatTableCatalog =>
- catalog.isFormatTable(newTableSpec.provider.orNull) &&
parts.nonEmpty
- case _ => false
- }
- }
-
- if (isPartitionedFormatTable) {
- throw new UnsupportedOperationException(
- "Using CTAS with partitioned format table is not supported yet.")
- }
+ val isPartitionedFormatTable = {
+ catalog match {
+ case formatCatalog: FormatTableCatalog =>
+ formatCatalog.isFormatTable(qualifiedSpec.provider.orNull) &&
parts.nonEmpty
+ case _ => false
+ }
+ }
- CreateTableAsSelectExec(
- catalog.asTableCatalog,
- ident,
- parts,
- analyzedQuery.get,
- planLater(query),
- qualifyLocInTableSpec(newTableSpec),
- new CaseInsensitiveStringMap(writeOptions.asJava),
- ifNotExists
- ) :: Nil
+ if (isPartitionedFormatTable) {
+ throw new UnsupportedOperationException(
+ "Using CTAS with partitioned format table is not supported yet.")
}
+
+ CreateTableAsSelectExec(
+ catalog.asTableCatalog,
+ ident,
+ parts,
+ analyzedQuery.get,
+ planLater(query),
+ qualifiedSpec,
+ new CaseInsensitiveStringMap(writeOptions.asJava),
+ ifNotExists
+ ) :: Nil
case _ => Nil
}
}
diff --git
a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala
new file mode 100644
index 0000000000..b741627cdb
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.shim
+
+import org.apache.paimon.CoreOptions.TYPE
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.{SparkCatalog, SparkGenericCatalog,
SparkSource, SparkTable}
+import org.apache.paimon.spark.catalog.SparkBaseCatalog
+
+import org.apache.spark.sql.{SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException,
ResolvedIdentifier}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReplaceTable,
ReplaceTableAsSelect, TableSpec}
+import org.apache.spark.sql.connector.catalog.{Identifier,
StagingTableCatalog, Table, TableCatalog}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan}
+import
org.apache.spark.sql.execution.datasources.v2.{AtomicReplaceTableAsSelectExec,
ReplaceTableAsSelectExec}
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+
+case class PaimonReplaceTableAsSelectStrategy(spark: SparkSession)
+ extends Strategy
+ with PaimonStrategyHelper {
+
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+ override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case ReplaceTableAsSelect(
+ ResolvedIdentifier(catalog: SparkBaseCatalog, ident),
+ parts,
+ query,
+ tableSpec: TableSpec,
+ options,
+ orCreate,
+ analyzedQuery) if
PaimonReplaceTableStrategyHelper.supportsCatalog(catalog, tableSpec) =>
+ assert(analyzedQuery.isDefined)
+ val (tableOptions, writeOptions) =
PaimonStrategyHelper.splitTableAndWriteOptions(options)
+ val qualifiedSpec = qualifyTableSpec(tableSpec, tableOptions)
+ val writeOpts = new CaseInsensitiveStringMap(writeOptions.asJava)
+ if (PaimonReplaceTableStrategyHelper.canAtomicReplace(catalog, ident,
qualifiedSpec, parts)) {
+ AtomicReplaceTableAsSelectExec(
+ catalog.asInstanceOf[StagingTableCatalog],
+ ident,
+ parts,
+ analyzedQuery.get,
+ planLater(query),
+ qualifiedSpec,
+ writeOpts,
+ orCreate = orCreate,
+ invalidateCache
+ ) :: Nil
+ } else {
+ ReplaceTableAsSelectExec(
+ catalog,
+ ident,
+ parts,
+ analyzedQuery.get,
+ planLater(query),
+ qualifiedSpec,
+ writeOpts,
+ orCreate = orCreate,
+ invalidateCache
+ ) :: Nil
+ }
+ case _ => Nil
+ }
+
+ private def invalidateCache(tableCatalog: TableCatalog, table: Table, ident:
Identifier): Unit = {
+ tableCatalog.invalidateTable(ident)
+ }
+}
+
+case class PaimonReplaceTableStrategy(spark: SparkSession)
+ extends Strategy
+ with PaimonStrategyHelper {
+
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+ override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case replace @ ReplaceTable(
+ ResolvedIdentifier(catalog: SparkBaseCatalog, ident),
+ schemaOrColumns,
+ parts,
+ tableSpec: TableSpec,
+ orCreate) if
PaimonReplaceTableStrategyHelper.supportsCatalog(catalog, tableSpec) =>
+ val columns =
+ SparkShimLoader.shim.toReplaceTableColumns(
+ replace.tableSchema,
+ schemaOrColumns,
+ catalog,
+ ident)
+ val qualifiedSpec = qualifyTableSpec(tableSpec, Map.empty)
+ if (PaimonReplaceTableStrategyHelper.canAtomicReplace(catalog, ident,
qualifiedSpec, parts)) {
+ SparkShimLoader.shim.createAtomicReplaceTableExec(
+ catalog.asInstanceOf[StagingTableCatalog],
+ ident,
+ columns,
+ parts,
+ qualifiedSpec,
+ orCreate = orCreate) :: Nil
+ } else {
+ SparkShimLoader.shim.createReplaceTableExec(
+ catalog,
+ ident,
+ columns,
+ parts,
+ qualifiedSpec,
+ orCreate = orCreate) :: Nil
+ }
+ case _ => Nil
+ }
+}
+
+private[shim] object PaimonReplaceTableStrategyHelper {
+
+ def supportsCatalog(catalog: SparkBaseCatalog, tableSpec: TableSpec):
Boolean = catalog match {
+ case _: SparkCatalog => true
+ case _: SparkGenericCatalog =>
+ tableSpec.provider.exists(_.equalsIgnoreCase(SparkSource.NAME))
+ case _ => false
+ }
+
+ /**
+ * Whether replace can use Spark's staged replace path. Paimon's
replaceTable is not a
+ * rollbackable atomic replace; it swaps the current schema and truncates
current data while
+ * preserving old snapshots. Return false for cases replaceTable would
reject so Spark falls back
+ * to drop+create.
+ */
+ def canAtomicReplace(
+ catalog: SparkBaseCatalog,
+ ident: Identifier,
+ tableSpec: TableSpec,
+ parts: Seq[Transform]): Boolean = {
+ try {
+ val existing = catalog.loadTable(ident)
+ if (!existing.isInstanceOf[SparkTable]) return false
+ val existingProvider =
+
Option(existing.properties().get(TableCatalog.PROP_PROVIDER)).getOrElse(SparkSource.NAME)
+ val targetProvider = tableSpec.provider.getOrElse(SparkSource.NAME)
+ if (!existingProvider.equalsIgnoreCase(targetProvider)) return false
+ val existingType = Options.fromMap(existing.properties()).get(TYPE)
+ val targetType = Options.fromMap(tableSpec.properties.asJava).get(TYPE)
+ if (existingType != targetType) return false
+ val existingParts = existing.partitioning().toSeq
+ existingParts.size == parts.size &&
+ existingParts.zip(parts).forall { case (a, b) => a.toString ==
b.toString }
+ } catch {
+ case _: NoSuchTableException => true
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
deleted file mode 100644
index 61e25b7c16..0000000000
---
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.shim
-
-import org.apache.paimon.CoreOptions
-import org.apache.paimon.iceberg.IcebergOptions
-import org.apache.paimon.spark.SparkCatalog
-import org.apache.paimon.spark.catalog.FormatTableCatalog
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier
-import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect,
LogicalPlan, TableSpec}
-import org.apache.spark.sql.connector.catalog.StagingTableCatalog
-import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan,
SparkStrategy}
-import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec
-
-import scala.collection.JavaConverters._
-
-case class PaimonCreateTableAsSelectStrategy(spark: SparkSession)
- extends SparkStrategy
- with PaimonStrategyHelper {
-
- import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
-
- override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case CreateTableAsSelect(
- ResolvedIdentifier(catalog: SparkCatalog, ident),
- parts,
- query,
- tableSpec: TableSpec,
- options,
- ifNotExists,
- true) =>
- catalog match {
- case _: StagingTableCatalog =>
- throw new RuntimeException("Paimon can't extend StagingTableCatalog
for now.")
- case _ =>
- val coreOptionKeys =
CoreOptions.getOptions.asScala.map(_.key()).toSeq
-
- // Include Iceberg compatibility options in table properties (fix
for DataFrame writer options)
- val icebergOptionKeys =
IcebergOptions.getOptions.asScala.map(_.key()).toSeq
-
- val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys
-
- val (tableOptions, writeOptions) = options.partition {
- case (key, _) => allTableOptionKeys.contains(key)
- }
- val newTableSpec = tableSpec.copy(properties = tableSpec.properties
++ tableOptions)
-
- val isPartitionedFormatTable = {
- catalog match {
- case catalog: FormatTableCatalog =>
- catalog.isFormatTable(newTableSpec.provider.orNull) &&
parts.nonEmpty
- case _ => false
- }
- }
-
- if (isPartitionedFormatTable) {
- throw new UnsupportedOperationException(
- "Using CTAS with partitioned format table is not supported yet.")
- }
-
- CreateTableAsSelectExec(
- catalog.asTableCatalog,
- ident,
- parts,
- query,
- qualifyLocInTableSpec(newTableSpec),
- writeOptions,
- ifNotExists) :: Nil
- }
- case _ => Nil
- }
-}
diff --git
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
index 11fdfbd579..e08c87d4d3 100644
---
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
+++
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
@@ -32,14 +32,15 @@ import
org.apache.spark.sql.catalyst.analysis.{CTESubstitution, SubstituteUnreso
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment,
CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable,
MergeRows, SubqueryAlias, UnresolvedWith, UpdateAction}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment,
ColumnDefinition, CTERelationRef, InsertAction, LogicalPlan, MergeAction,
MergeIntoTable, MergeRows, SubqueryAlias, TableSpec, UnresolvedWith,
UpdateAction}
import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Keep
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.util.ArrayData
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier,
Table, TableCatalog}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GeneratedColumn,
IdentityColumn, ResolveDefaultColumns}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column,
Identifier, StagingTableCatalog, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
-import org.apache.spark.sql.execution.SparkFormatTable
+import org.apache.spark.sql.execution.{SparkFormatTable, SparkPlan}
import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex,
PartitionSpec}
+import
org.apache.spark.sql.execution.datasources.v2.{AtomicReplaceTableAsSelectExec,
AtomicReplaceTableExec, ReplaceTableAsSelectExec, ReplaceTableExec}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.streaming.{FileStreamSink,
MetadataLogFileIndex}
import org.apache.spark.sql.internal.SQLConf
@@ -102,6 +103,102 @@ class Spark4Shim extends SparkShim {
tableCatalog.createTable(ident, columns, partitions, properties)
}
+ override def createReplaceTableAsSelectExec(
+ catalog: TableCatalog,
+ ident: Identifier,
+ partitioning: Seq[Transform],
+ query: LogicalPlan,
+ tableSpec: TableSpec,
+ writeOptions: Map[String, String],
+ orCreate: Boolean): SparkPlan = {
+ ReplaceTableAsSelectExec(
+ catalog,
+ ident,
+ partitioning,
+ query,
+ tableSpec,
+ writeOptions,
+ orCreate = orCreate,
+ invalidateCache)
+ }
+
+ override def createAtomicReplaceTableAsSelectExec(
+ catalog: StagingTableCatalog,
+ ident: Identifier,
+ partitioning: Seq[Transform],
+ query: LogicalPlan,
+ tableSpec: TableSpec,
+ writeOptions: Map[String, String],
+ orCreate: Boolean): SparkPlan = {
+ AtomicReplaceTableAsSelectExec(
+ catalog,
+ ident,
+ partitioning,
+ query,
+ tableSpec,
+ writeOptions,
+ orCreate = orCreate,
+ invalidateCache)
+ }
+
+ override def createReplaceTableExec(
+ catalog: TableCatalog,
+ ident: Identifier,
+ columns: Array[Column],
+ partitioning: Seq[Transform],
+ tableSpec: TableSpec,
+ orCreate: Boolean): SparkPlan = {
+ ReplaceTableExec(
+ catalog,
+ ident,
+ columns,
+ partitioning,
+ tableSpec,
+ orCreate = orCreate,
+ invalidateCache)
+ }
+
+ override def createAtomicReplaceTableExec(
+ catalog: StagingTableCatalog,
+ ident: Identifier,
+ columns: Array[Column],
+ partitioning: Seq[Transform],
+ tableSpec: TableSpec,
+ orCreate: Boolean): SparkPlan = {
+ AtomicReplaceTableExec(
+ catalog,
+ ident,
+ columns,
+ partitioning,
+ tableSpec,
+ orCreate = orCreate,
+ invalidateCache)
+ }
+
+ override def toReplaceTableColumns(
+ tableSchema: StructType,
+ schemaOrColumns: Any,
+ catalog: TableCatalog,
+ ident: Identifier): Array[Column] = {
+ val statementType = "REPLACE TABLE"
+ val columns = schemaOrColumns.asInstanceOf[Seq[ColumnDefinition]]
+ ResolveDefaultColumns.validateCatalogForDefaultValue(columns, catalog,
ident)
+ GeneratedColumn.validateGeneratedColumns(tableSchema, catalog, ident,
statementType)
+ IdentityColumn.validateIdentityColumn(tableSchema, catalog, ident)
+ columns.map(_.toV2Column(statementType)).toArray
+ }
+
+ override def copyTableSpec(
+ tableSpec: TableSpec,
+ additionalProperties: Map[String, String],
+ location: Option[String]): TableSpec = {
+ tableSpec.copy(properties = tableSpec.properties ++ additionalProperties,
location = location)
+ }
+
+ private def invalidateCache(tableCatalog: TableCatalog, table: Table, ident:
Identifier): Unit = {
+ tableCatalog.invalidateTable(ident)
+ }
+
override def createCTERelationRef(
cteId: Long,
resolved: Boolean,
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/RollbackStagedTable.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/RollbackStagedTable.java
new file mode 100644
index 0000000000..2f90545b60
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/RollbackStagedTable.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.spark.sql.connector.catalog.StagedTable;
+import org.apache.spark.sql.connector.catalog.SupportsDelete;
+import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.SupportsWrite;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/** A staged table that rolls back by invoking the provided abort action. */
+class RollbackStagedTable implements StagedTable, SupportsRead, SupportsWrite,
SupportsDelete {
+
+ @FunctionalInterface
+ interface Action {
+ void run() throws Exception;
+ }
+
+ @FunctionalInterface
+ interface StagedAction {
+ void run(RollbackStagedTable stagedTable) throws Exception;
+ }
+
+ private static final Action DO_NOTHING = () -> {};
+
+ private final Table table;
+ private final StagedAction commitAction;
+ private final Action abortAction;
+ private boolean committed;
+ private boolean aborted;
+ private boolean writeStarted;
+
+ RollbackStagedTable(Table table, Action abortAction) {
+ this(table, DO_NOTHING, abortAction);
+ }
+
+ RollbackStagedTable(Table table, Action commitAction, Action abortAction) {
+ this(table, ignored -> commitAction.run(), abortAction);
+ }
+
+ RollbackStagedTable(Table table, StagedAction commitAction, Action
abortAction) {
+ this.table = table;
+ this.commitAction = commitAction;
+ this.abortAction = abortAction;
+ }
+
+ @Override
+ public String name() {
+ return table.name();
+ }
+
+ @Override
+ public StructType schema() {
+ return table.schema();
+ }
+
+ @Override
+ public Transform[] partitioning() {
+ return table.partitioning();
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ return table.properties();
+ }
+
+ @Override
+ public Set<TableCapability> capabilities() {
+ return table.capabilities();
+ }
+
+ @Override
+ public void deleteWhere(Filter[] filters) {
+ call(SupportsDelete.class, t -> t.deleteWhere(filters));
+ }
+
+ @Override
+ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
+ return callReturning(SupportsRead.class, t ->
t.newScanBuilder(options));
+ }
+
+ @Override
+ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
+ writeStarted = true;
+ return callReturning(SupportsWrite.class, t ->
t.newWriteBuilder(info));
+ }
+
+ boolean hasWriteStarted() {
+ return writeStarted;
+ }
+
+ @Override
+ public void commitStagedChanges() {
+ if (committed || aborted) {
+ return;
+ }
+
+ try {
+ commitAction.run(this);
+ committed = true;
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void abortStagedChanges() {
+ if (committed || aborted) {
+ return;
+ }
+
+ try {
+ abortAction.run();
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ aborted = true;
+ }
+ }
+
+ private <T> void call(Class<? extends T> requiredClass, Consumer<T> task) {
+ callReturning(
+ requiredClass,
+ instance -> {
+ task.accept(instance);
+ return null;
+ });
+ }
+
+ private <T, R> R callReturning(Class<? extends T> requiredClass,
Function<T, R> task) {
+ if (requiredClass.isInstance(table)) {
+ return task.apply(requiredClass.cast(table));
+ }
+
+ throw new UnsupportedOperationException(
+ String.format(
+ "Table does not implement %s: %s (%s)",
+ requiredClass.getSimpleName(), table.name(),
table.getClass().getName()));
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 159aa98e37..451a5fae26 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -61,6 +61,7 @@ import
org.apache.spark.sql.catalyst.parser.extensions.UnResolvedPaimonV1Functio
import org.apache.spark.sql.connector.catalog.FunctionCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
+import org.apache.spark.sql.connector.catalog.StagedTable;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.TableChange;
@@ -389,6 +390,87 @@ public class SparkCatalog extends SparkBaseCatalog
}
}
+ @Override
+ public StagedTable stageCreate(
+ Identifier ident,
+ StructType schema,
+ Transform[] partitions,
+ Map<String, String> properties)
+ throws TableAlreadyExistsException, NoSuchNamespaceException {
+ return stageCreateDirectly(ident, schema, partitions, properties);
+ }
+
+ @Override
+ public StagedTable stageReplace(
+ Identifier ident,
+ StructType schema,
+ Transform[] partitions,
+ Map<String, String> properties)
+ throws NoSuchNamespaceException, NoSuchTableException {
+ return stageReplaceInternal(ident, schema, partitions, properties);
+ }
+
+ @Override
+ public StagedTable stageCreateOrReplace(
+ Identifier ident,
+ StructType schema,
+ Transform[] partitions,
+ Map<String, String> properties)
+ throws NoSuchNamespaceException {
+ try {
+ return stageReplaceInternal(ident, schema, partitions, properties);
+ } catch (NoSuchTableException e) {
+ try {
+ return stageCreate(ident, schema, partitions, properties);
+ } catch (TableAlreadyExistsException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ private StagedTable stageReplaceInternal(
+ Identifier ident,
+ StructType schema,
+ Transform[] partitions,
+ Map<String, String> properties)
+ throws NoSuchNamespaceException, NoSuchTableException {
+ org.apache.paimon.catalog.Identifier tableIdent = toIdentifier(ident,
catalogName);
+ Schema targetSchema = toInitialSchema(schema, partitions, properties);
+
+ try {
+ catalog.replaceTable(tableIdent, targetSchema, false);
+ return new RollbackStagedTable(loadTable(ident), () -> {});
+ } catch (Catalog.TableNotExistException e) {
+ throw new NoSuchTableException(ident);
+ } catch (UnsupportedOperationException e) {
+ // Catalog cannot replace in-place; fall back to drop+create,
losing snapshot history.
+ LOG.warn(
+ "Catalog {} does not support replaceTable, falling back to
drop+create for {}.",
+ catalog.getClass().getName(),
+ tableIdent.getFullName(),
+ e);
+ return stageReplaceByDropAndCreate(ident, tableIdent,
targetSchema);
+ }
+ }
+
+ private StagedTable stageReplaceByDropAndCreate(
+ Identifier ident, org.apache.paimon.catalog.Identifier tableIdent,
Schema targetSchema)
+ throws NoSuchTableException, NoSuchNamespaceException {
+ try {
+ catalog.dropTable(tableIdent, false);
+ } catch (Catalog.TableNotExistException e) {
+ throw new NoSuchTableException(ident);
+ }
+ try {
+ catalog.createTable(tableIdent, targetSchema, false);
+ } catch (Catalog.TableAlreadyExistException e) {
+ throw new RuntimeException(e);
+ } catch (Catalog.DatabaseNotExistException e) {
+ throw new NoSuchNamespaceException(ident.namespace());
+ }
+ return new RollbackStagedTable(loadTable(ident), () -> {});
+ }
+
private SchemaChange toSchemaChange(TableChange change) {
if (change instanceof TableChange.SetProperty) {
TableChange.SetProperty set = (TableChange.SetProperty) change;
@@ -457,6 +539,24 @@ public class SparkCatalog extends SparkBaseCatalog
return move;
}
+ private StagedTable stageCreateDirectly(
+ Identifier ident,
+ StructType schema,
+ Transform[] partitions,
+ Map<String, String> properties)
+ throws TableAlreadyExistsException, NoSuchNamespaceException {
+ org.apache.spark.sql.connector.catalog.Table table =
+ createTable(ident, schema, partitions, properties);
+ if (table == null) {
+ try {
+ table = loadTable(ident);
+ } catch (NoSuchTableException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return new RollbackStagedTable(table, () -> dropTable(ident));
+ }
+
private Schema toInitialSchema(
StructType schema, Transform[] partitions, Map<String, String>
properties) {
Map<String, String> normalizedProperties = new HashMap<>(properties);
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
index e79af9a0b4..b89eaf0925 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
@@ -43,6 +43,8 @@ import org.apache.spark.sql.connector.catalog.FunctionCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
import org.apache.spark.sql.connector.catalog.PaimonCatalogUtils;
+import org.apache.spark.sql.connector.catalog.StagedTable;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
@@ -245,6 +247,49 @@ public class SparkGenericCatalog extends SparkBaseCatalog
implements CatalogExte
}
}
+ @Override
+ public StagedTable stageCreate(
+ Identifier ident,
+ StructType schema,
+ Transform[] partitions,
+ Map<String, String> properties)
+ throws TableAlreadyExistsException, NoSuchNamespaceException {
+ if (usePaimon(properties.get(TableCatalog.PROP_PROVIDER))) {
+ return sparkCatalog.stageCreate(ident, schema, partitions,
properties);
+ } else {
+ return asStagingTableCatalog().stageCreate(ident, schema,
partitions, properties);
+ }
+ }
+
+ @Override
+ public StagedTable stageReplace(
+ Identifier ident,
+ StructType schema,
+ Transform[] partitions,
+ Map<String, String> properties)
+ throws NoSuchNamespaceException, NoSuchTableException {
+ if (usePaimon(properties.get(TableCatalog.PROP_PROVIDER))) {
+ return sparkCatalog.stageReplace(ident, schema, partitions,
properties);
+ } else {
+ return asStagingTableCatalog().stageReplace(ident, schema,
partitions, properties);
+ }
+ }
+
+ @Override
+ public StagedTable stageCreateOrReplace(
+ Identifier ident,
+ StructType schema,
+ Transform[] partitions,
+ Map<String, String> properties)
+ throws NoSuchNamespaceException {
+ if (usePaimon(properties.get(TableCatalog.PROP_PROVIDER))) {
+ return sparkCatalog.stageCreateOrReplace(ident, schema,
partitions, properties);
+ } else {
+ return asStagingTableCatalog()
+ .stageCreateOrReplace(ident, schema, partitions,
properties);
+ }
+ }
+
@Override
public final void initialize(String name, CaseInsensitiveStringMap
options) {
SparkSession sparkSession = PaimonSparkSession$.MODULE$.active();
@@ -362,6 +407,10 @@ public class SparkGenericCatalog extends SparkBaseCatalog
implements CatalogExte
return (FunctionCatalog) getDelegateCatalog();
}
+ private StagingTableCatalog asStagingTableCatalog() {
+ return (StagingTableCatalog) getDelegateCatalog();
+ }
+
// ======================= Function methods ===============================
@Override
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java
index ac6736e2e1..284b5d099f 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java
@@ -25,6 +25,7 @@ import org.apache.paimon.spark.procedure.Procedure;
import org.apache.paimon.spark.procedure.ProcedureBuilder;
import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.TableCatalogCapability;
@@ -39,7 +40,11 @@ import static
org.apache.spark.sql.connector.catalog.TableCatalogCapability.SUPP
/** Spark base catalog. */
public abstract class SparkBaseCatalog
- implements TableCatalog, SupportsNamespaces, ProcedureCatalog,
WithPaimonCatalog {
+ implements TableCatalog,
+ SupportsNamespaces,
+ ProcedureCatalog,
+ WithPaimonCatalog,
+ StagingTableCatalog {
protected String catalogName;
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
index 3be8b5a74e..63c61a16e8 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
@@ -29,11 +29,11 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace,
ResolvedTable}
import org.apache.spark.sql.catalyst.expressions.{Expression,
GenericInternalRow, PredicateHelper}
-import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect,
DescribeRelation, LogicalPlan, ShowCreateTable}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect,
DescribeRelation, LogicalPlan, ReplaceTable, ReplaceTableAsSelect,
ShowCreateTable}
import org.apache.spark.sql.connector.catalog.{Identifier,
PaimonLookupCatalog, TableCatalog}
import org.apache.spark.sql.execution.{PaimonDescribeTableExec, SparkPlan,
SparkStrategy}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits,
DataSourceV2Relation}
-import org.apache.spark.sql.execution.shim.PaimonCreateTableAsSelectStrategy
+import org.apache.spark.sql.execution.shim.{PaimonCreateTableAsSelectStrategy,
PaimonReplaceTableAsSelectStrategy, PaimonReplaceTableStrategy}
import org.apache.spark.sql.paimon.shims.SparkShimLoader
import scala.collection.JavaConverters._
@@ -51,6 +51,12 @@ case class PaimonStrategy(spark: SparkSession)
case ctas: CreateTableAsSelect =>
PaimonCreateTableAsSelectStrategy(spark)(ctas)
+ case rtas: ReplaceTableAsSelect =>
+ PaimonReplaceTableAsSelectStrategy(spark)(rtas)
+
+ case rt: ReplaceTable =>
+ PaimonReplaceTableStrategy(spark)(rt)
+
case c @ PaimonCallCommand(procedure, args) =>
val input = buildInternalRow(args)
PaimonCallExec(c.output, procedure, input) :: Nil
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala
index 9fb3a7b54a..2ee33a1829 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala
@@ -18,10 +18,16 @@
package org.apache.spark.sql.execution
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.iceberg.IcebergOptions
+
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.catalyst.plans.logical.TableSpec
import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
+
+import scala.collection.JavaConverters._
trait PaimonStrategyHelper {
@@ -34,8 +40,23 @@ trait PaimonStrategyHelper {
spark.sharedState.hadoopConf)
}
- protected def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = {
- tableSpec.copy(location =
tableSpec.location.map(makeQualifiedDBObjectPath(_)))
+ protected def qualifyTableSpec(
+ tableSpec: TableSpec,
+ tableOptions: Map[String, String]): TableSpec = {
+ SparkShimLoader.shim.copyTableSpec(
+ tableSpec,
+ tableOptions,
+ tableSpec.location.map(makeQualifiedDBObjectPath))
}
+}
+object PaimonStrategyHelper {
+ private val tableOptionKeys: Set[String] =
+ (CoreOptions.getOptions.asScala.map(_.key()) ++
IcebergOptions.getOptions.asScala.map(
+ _.key())).toSet
+
+ def splitTableAndWriteOptions(
+ options: Map[String, String]): (Map[String, String], Map[String,
String]) = {
+ options.partition { case (key, _) => tableOptionKeys.contains(key) }
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
index 61e25b7c16..cc0c12960c 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
@@ -18,20 +18,15 @@
package org.apache.spark.sql.execution.shim
-import org.apache.paimon.CoreOptions
-import org.apache.paimon.iceberg.IcebergOptions
import org.apache.paimon.spark.SparkCatalog
import org.apache.paimon.spark.catalog.FormatTableCatalog
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect,
LogicalPlan, TableSpec}
-import org.apache.spark.sql.connector.catalog.StagingTableCatalog
import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan,
SparkStrategy}
import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec
-import scala.collection.JavaConverters._
-
case class PaimonCreateTableAsSelectStrategy(spark: SparkSession)
extends SparkStrategy
with PaimonStrategyHelper {
@@ -47,44 +42,30 @@ case class PaimonCreateTableAsSelectStrategy(spark:
SparkSession)
options,
ifNotExists,
true) =>
- catalog match {
- case _: StagingTableCatalog =>
- throw new RuntimeException("Paimon can't extend StagingTableCatalog
for now.")
- case _ =>
- val coreOptionKeys =
CoreOptions.getOptions.asScala.map(_.key()).toSeq
-
- // Include Iceberg compatibility options in table properties (fix
for DataFrame writer options)
- val icebergOptionKeys =
IcebergOptions.getOptions.asScala.map(_.key()).toSeq
-
- val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys
-
- val (tableOptions, writeOptions) = options.partition {
- case (key, _) => allTableOptionKeys.contains(key)
- }
- val newTableSpec = tableSpec.copy(properties = tableSpec.properties
++ tableOptions)
+ val (tableOptions, writeOptions) =
PaimonStrategyHelper.splitTableAndWriteOptions(options)
+ val qualifiedSpec = qualifyTableSpec(tableSpec, tableOptions)
- val isPartitionedFormatTable = {
- catalog match {
- case catalog: FormatTableCatalog =>
- catalog.isFormatTable(newTableSpec.provider.orNull) &&
parts.nonEmpty
- case _ => false
- }
- }
-
- if (isPartitionedFormatTable) {
- throw new UnsupportedOperationException(
- "Using CTAS with partitioned format table is not supported yet.")
- }
+ val isPartitionedFormatTable = {
+ catalog match {
+ case formatCatalog: FormatTableCatalog =>
+ formatCatalog.isFormatTable(qualifiedSpec.provider.orNull) &&
parts.nonEmpty
+ case _ => false
+ }
+ }
- CreateTableAsSelectExec(
- catalog.asTableCatalog,
- ident,
- parts,
- query,
- qualifyLocInTableSpec(newTableSpec),
- writeOptions,
- ifNotExists) :: Nil
+ if (isPartitionedFormatTable) {
+ throw new UnsupportedOperationException(
+ "Using CTAS with partitioned format table is not supported yet.")
}
+
+ CreateTableAsSelectExec(
+ catalog.asTableCatalog,
+ ident,
+ parts,
+ query,
+ qualifiedSpec,
+ writeOptions,
+ ifNotExists) :: Nil
case _ => Nil
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala
new file mode 100644
index 0000000000..60410b631e
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.shim
+
+import org.apache.paimon.CoreOptions.TYPE
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.{SparkCatalog, SparkGenericCatalog,
SparkSource, SparkTable}
+import org.apache.paimon.spark.catalog.SparkBaseCatalog
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException,
ResolvedIdentifier}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReplaceTable,
ReplaceTableAsSelect, TableSpec}
+import org.apache.spark.sql.connector.catalog.{Identifier,
StagingTableCatalog, TableCatalog}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan,
SparkStrategy}
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
+
+import scala.collection.JavaConverters._
+
+case class PaimonReplaceTableAsSelectStrategy(spark: SparkSession)
+ extends SparkStrategy
+ with PaimonStrategyHelper {
+
+ override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case ReplaceTableAsSelect(
+ ResolvedIdentifier(catalog: SparkBaseCatalog, ident),
+ parts,
+ query,
+ tableSpec: TableSpec,
+ options,
+ orCreate,
+ true) if PaimonReplaceTableStrategyHelper.supportsCatalog(catalog,
tableSpec) =>
+ val (tableOptions, writeOptions) =
PaimonStrategyHelper.splitTableAndWriteOptions(options)
+ val qualifiedSpec = qualifyTableSpec(tableSpec, tableOptions)
+ if (PaimonReplaceTableStrategyHelper.canAtomicReplace(catalog, ident,
qualifiedSpec, parts)) {
+ SparkShimLoader.shim.createAtomicReplaceTableAsSelectExec(
+ catalog.asInstanceOf[StagingTableCatalog],
+ ident,
+ parts,
+ query,
+ qualifiedSpec,
+ writeOptions,
+ orCreate = orCreate) :: Nil
+ } else {
+ SparkShimLoader.shim.createReplaceTableAsSelectExec(
+ catalog,
+ ident,
+ parts,
+ query,
+ qualifiedSpec,
+ writeOptions,
+ orCreate = orCreate) :: Nil
+ }
+ case _ => Nil
+ }
+}
+
+case class PaimonReplaceTableStrategy(spark: SparkSession)
+ extends SparkStrategy
+ with PaimonStrategyHelper {
+
+ override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case replace @ ReplaceTable(
+ ResolvedIdentifier(catalog: SparkBaseCatalog, ident),
+ schemaOrColumns,
+ parts,
+ tableSpec: TableSpec,
+ orCreate) if
PaimonReplaceTableStrategyHelper.supportsCatalog(catalog, tableSpec) =>
+ val columns =
+ SparkShimLoader.shim.toReplaceTableColumns(
+ replace.tableSchema,
+ schemaOrColumns,
+ catalog,
+ ident)
+ val qualifiedSpec = qualifyTableSpec(tableSpec, Map.empty)
+ if (PaimonReplaceTableStrategyHelper.canAtomicReplace(catalog, ident,
qualifiedSpec, parts)) {
+ SparkShimLoader.shim.createAtomicReplaceTableExec(
+ catalog.asInstanceOf[StagingTableCatalog],
+ ident,
+ columns,
+ parts,
+ qualifiedSpec,
+ orCreate = orCreate) :: Nil
+ } else {
+ SparkShimLoader.shim.createReplaceTableExec(
+ catalog,
+ ident,
+ columns,
+ parts,
+ qualifiedSpec,
+ orCreate = orCreate) :: Nil
+ }
+ case _ => Nil
+ }
+}
+
+private[shim] object PaimonReplaceTableStrategyHelper {
+
+ def supportsCatalog(catalog: SparkBaseCatalog, tableSpec: TableSpec):
Boolean = catalog match {
+ case _: SparkCatalog => true
+ case _: SparkGenericCatalog =>
+ tableSpec.provider.exists(_.equalsIgnoreCase(SparkSource.NAME))
+ case _ => false
+ }
+
+ /**
+ * Whether replace can use Spark's staged replace path. Paimon's
replaceTable is not a
+ * rollbackable atomic replace; it swaps the current schema and truncates
current data while
+ * preserving old snapshots. Return false for cases replaceTable would
reject so Spark falls back
+ * to drop+create.
+ */
+ def canAtomicReplace(
+ catalog: SparkBaseCatalog,
+ ident: Identifier,
+ tableSpec: TableSpec,
+ parts: Seq[Transform]): Boolean = {
+ try {
+ val existing = catalog.loadTable(ident)
+ if (!existing.isInstanceOf[SparkTable]) return false
+ val existingProvider =
+
Option(existing.properties().get(TableCatalog.PROP_PROVIDER)).getOrElse(SparkSource.NAME)
+ val targetProvider = tableSpec.provider.getOrElse(SparkSource.NAME)
+ if (!existingProvider.equalsIgnoreCase(targetProvider)) return false
+ val existingType = Options.fromMap(existing.properties()).get(TYPE)
+ val targetType = Options.fromMap(tableSpec.properties.asJava).get(TYPE)
+ if (existingType != targetType) return false
+ val existingParts = existing.partitioning().toSeq
+ existingParts.size == parts.size &&
+ existingParts.zip(parts).forall { case (a, b) => a.toString ==
b.toString }
+ } catch {
+ case _: NoSuchTableException => true
+ }
+ }
+
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
index 3ceb494396..38efd8c006 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
@@ -27,11 +27,12 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{Assignment,
CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable,
SubqueryAlias, UnresolvedWith, UpdateAction}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment,
CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable,
SubqueryAlias, TableSpec, UnresolvedWith, UpdateAction}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.ArrayData
-import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{Column, Identifier,
StagingTableCatalog, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.StructType
@@ -66,6 +67,51 @@ trait SparkShim {
partitions: Array[Transform],
properties: JMap[String, String]): Table
+ def createReplaceTableAsSelectExec(
+ catalog: TableCatalog,
+ ident: Identifier,
+ partitioning: Seq[Transform],
+ query: LogicalPlan,
+ tableSpec: TableSpec,
+ writeOptions: Map[String, String],
+ orCreate: Boolean): SparkPlan
+
+ def createAtomicReplaceTableAsSelectExec(
+ catalog: StagingTableCatalog,
+ ident: Identifier,
+ partitioning: Seq[Transform],
+ query: LogicalPlan,
+ tableSpec: TableSpec,
+ writeOptions: Map[String, String],
+ orCreate: Boolean): SparkPlan
+
+ def createReplaceTableExec(
+ catalog: TableCatalog,
+ ident: Identifier,
+ columns: Array[Column],
+ partitioning: Seq[Transform],
+ tableSpec: TableSpec,
+ orCreate: Boolean): SparkPlan
+
+ def createAtomicReplaceTableExec(
+ catalog: StagingTableCatalog,
+ ident: Identifier,
+ columns: Array[Column],
+ partitioning: Seq[Transform],
+ tableSpec: TableSpec,
+ orCreate: Boolean): SparkPlan
+
+ def toReplaceTableColumns(
+ tableSchema: StructType,
+ schemaOrColumns: Any,
+ catalog: TableCatalog,
+ ident: Identifier): Array[Column]
+
+ def copyTableSpec(
+ tableSpec: TableSpec,
+ additionalProperties: Map[String, String],
+ location: Option[String]): TableSpec
+
def createCTERelationRef(
cteId: Long,
resolved: Boolean,
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
index 3f2c0e8996..47a280868d 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
@@ -276,6 +276,163 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
}
}
+ test("Paimon DDL: REPLACE TABLE replaces in-place and preserves old
snapshots") {
+ assume(gteqSpark3_4)
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (id BIGINT, data STRING)
+ |USING paimon
+ |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+ |""".stripMargin)
+ sql("INSERT INTO t VALUES (1, 'old')")
+ val oldLocation = loadTable("t").location().toString
+ val oldSnapshotId = loadTable("t").snapshotManager().latestSnapshotId()
+
+ sql("""
+ |REPLACE TABLE t (id BIGINT, name STRING)
+ |USING paimon
+ |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '4')
+ |""".stripMargin)
+
+ val replaced = loadTable("t")
+ Assertions.assertEquals(oldLocation, replaced.location().toString)
+ Assertions.assertEquals("4", replaced.options().get("bucket"))
+ Assertions.assertEquals(Seq("id", "name"),
spark.table("t").schema.fieldNames.toSeq)
+ checkAnswer(sql("SELECT * FROM t"), Seq.empty[Row])
+
+ checkAnswer(
+ sql(s"SELECT id, data FROM t VERSION AS OF $oldSnapshotId"),
+ Seq((1L, "old")).toDF())
+ }
+ }
+
+ test("Paimon DDL: REPLACE TABLE without SELECT fails if table is missing") {
+ assume(gteqSpark3_4)
+ withTable("missing") {
+ val error = intercept[AnalysisException] {
+ sql("""
+ |REPLACE TABLE missing (id BIGINT, data STRING)
+ |USING paimon
+ |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+ |""".stripMargin)
+ }.getMessage
+
+ Assertions.assertTrue(
+ error.contains("TABLE_OR_VIEW_NOT_FOUND") ||
+ error.contains("cannot be found") ||
+ error.contains("not found"))
+ }
+ }
+
+ test("Paimon DDL: CREATE TABLE fails when table exists") {
+ withTable("t") {
+ sql("CREATE TABLE t (id BIGINT, data STRING) USING paimon")
+
+ val error = intercept[AnalysisException] {
+ sql("CREATE TABLE t (id BIGINT, name STRING) USING paimon")
+ }.getMessage
+
+ Assertions.assertTrue(
+ error.contains("TABLE_OR_VIEW_ALREADY_EXISTS") ||
error.contains("already exists"))
+ }
+ }
+
+ test("Paimon DDL: CREATE OR REPLACE TABLE AS SELECT on partitioned table") {
+ assume(gteqSpark3_4)
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (id BIGINT, data STRING, pt STRING)
+ |USING paimon
+ |PARTITIONED BY (pt)
+ |TBLPROPERTIES ('primary-key' = 'id,pt', 'bucket' = '2')
+ |""".stripMargin)
+ sql("INSERT INTO t VALUES (1, 'old', 'p0')")
+ val oldLocation = loadTable("t").location().toString
+ Seq((2L, "x2", "p1"), (3L, "x3", "p2"))
+ .toDF("id", "data", "pt")
+ .createOrReplaceTempView("source")
+
+ sql("""
+ |CREATE OR REPLACE TABLE t
+ |USING paimon
+ |PARTITIONED BY (pt)
+ |TBLPROPERTIES ('primary-key' = 'id,pt', 'bucket' = '3')
+ |AS SELECT * FROM source
+ |""".stripMargin)
+
+ val replaced = loadTable("t")
+ Assertions.assertEquals(oldLocation, replaced.location().toString)
+ Assertions.assertEquals("3", replaced.options().get("bucket"))
+ checkAnswer(
+ sql("SELECT * FROM t ORDER BY id"),
+ Seq((2L, "x2", "p1"), (3L, "x3", "p2")).toDF())
+ }
+ }
+
+ test("Paimon DDL: CREATE OR REPLACE TABLE AS SELECT supports incompatible
schema") {
+ assume(gteqSpark3_4)
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (id BIGINT, data STRING)
+ |USING paimon
+ |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+ |""".stripMargin)
+ sql("INSERT INTO t VALUES (1, 'old')")
+ val oldLocation = loadTable("t").location().toString
+ val oldSnapshotId = loadTable("t").snapshotManager().latestSnapshotId()
+ Seq(("2", 20), ("3", 30)).toDF("id",
"amount").createOrReplaceTempView("source")
+
+ sql("""
+ |CREATE OR REPLACE TABLE t
+ |USING paimon
+ |TBLPROPERTIES ('bucket' = '-1')
+ |AS SELECT * FROM source
+ |""".stripMargin)
+
+ val replaced = loadTable("t")
+ Assertions.assertEquals(oldLocation, replaced.location().toString)
+ Assertions.assertEquals("-1", replaced.options().get("bucket"))
+ Assertions.assertEquals(Seq("id", "amount"),
spark.table("t").schema.fieldNames.toSeq)
+ Assertions.assertEquals("string",
spark.table("t").schema("id").dataType.typeName)
+ Assertions.assertEquals("integer",
spark.table("t").schema("amount").dataType.typeName)
+ checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(("2", 20), ("3",
30)).toDF())
+ checkAnswer(
+ sql(s"SELECT id, data FROM t VERSION AS OF $oldSnapshotId"),
+ Seq((1L, "old")).toDF())
+ }
+ }
+
+ test("Paimon DDL: REPLACE TABLE supports incompatible schema and preserves
old snapshots") {
+ assume(gteqSpark3_4)
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (id BIGINT, data STRING)
+ |USING paimon
+ |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+ |""".stripMargin)
+ sql("INSERT INTO t VALUES (1, 'old')")
+ val oldLocation = loadTable("t").location().toString
+ val oldSnapshotId = loadTable("t").snapshotManager().latestSnapshotId()
+
+ sql("""
+ |REPLACE TABLE t (id STRING, amount INT)
+ |USING paimon
+ |TBLPROPERTIES ('bucket' = '-1')
+ |""".stripMargin)
+
+ val replaced = loadTable("t")
+ Assertions.assertEquals(oldLocation, replaced.location().toString)
+ Assertions.assertEquals("-1", replaced.options().get("bucket"))
+ Assertions.assertEquals(Seq("id", "amount"),
spark.table("t").schema.fieldNames.toSeq)
+ Assertions.assertEquals("string",
spark.table("t").schema("id").dataType.typeName)
+ Assertions.assertEquals("integer",
spark.table("t").schema("amount").dataType.typeName)
+ checkAnswer(sql("SELECT * FROM t"), Seq.empty[Row])
+ checkAnswer(
+ sql(s"SELECT id, data FROM t VERSION AS OF $oldSnapshotId"),
+ Seq((1L, "old")).toDF())
+ }
+ }
+
fileFormats.foreach {
format =>
test(s"Paimon DDL: create table with char/varchar/string, file.format:
$format") {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
index d395a16692..feae8be0da 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
@@ -726,6 +726,141 @@ abstract class DDLWithHiveCatalogTestBase extends
PaimonHiveTestBase {
}
}
+ test("Paimon DDL with hive catalog: SparkGenericCatalog explicit Paimon
replace") {
+ assume(gteqSpark3_4)
+ spark.sql(s"USE $sparkCatalogName")
+ withDatabase("paimon_db") {
+ spark.sql("CREATE DATABASE paimon_db")
+ spark.sql("USE paimon_db")
+
+ withTable("rt", "rtas", "missing") {
+ spark.sql("""
+ |CREATE TABLE rt (id BIGINT, data STRING)
+ |USING paimon
+ |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+ |""".stripMargin)
+ spark.sql("INSERT INTO rt VALUES (1, 'old')")
+ val oldSnapshotId = loadTable("paimon_db",
"rt").snapshotManager().latestSnapshotId()
+
+ spark.sql("""
+ |REPLACE TABLE rt (id BIGINT, name STRING)
+ |USING paimon
+ |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '4')
+ |""".stripMargin)
+
+ val replaced = loadTable("paimon_db", "rt")
+ Assertions.assertEquals("4", replaced.options().get("bucket"))
+ Assertions.assertEquals(Seq("id", "name"),
spark.table("rt").schema.fieldNames.toSeq)
+ checkAnswer(spark.sql("SELECT * FROM rt"), Seq.empty[Row])
+ checkAnswer(
+ spark.sql(s"SELECT id, data FROM rt VERSION AS OF $oldSnapshotId"),
+ Row(1L, "old") :: Nil)
+
+ val error = intercept[AnalysisException] {
+ spark.sql("""
+ |REPLACE TABLE missing (id BIGINT, data STRING)
+ |USING paimon
+ |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+ |""".stripMargin)
+ }.getMessage
+ Assertions.assertTrue(
+ error.contains("TABLE_OR_VIEW_NOT_FOUND") ||
+ error.contains("cannot be found") ||
+ error.contains("not found"))
+
+ Seq((2L, "new")).toDF("id", "data").createOrReplaceTempView("source")
+ spark.sql("""
+ |CREATE TABLE rtas (id BIGINT, data STRING)
+ |USING paimon
+ |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+ |""".stripMargin)
+ spark.sql("INSERT INTO rtas VALUES (1, 'old')")
+ val oldLocation = loadTable("paimon_db", "rtas").location().toString
+ val oldRtasSnapshotId =
+ loadTable("paimon_db", "rtas").snapshotManager().latestSnapshotId()
+ spark.sql("""
+ |CREATE OR REPLACE TABLE rtas
+ |USING paimon
+ |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '3')
+ |AS SELECT * FROM source
+ |""".stripMargin)
+
+ val replacedAsSelect = loadTable("paimon_db", "rtas")
+ Assertions.assertEquals(oldLocation,
replacedAsSelect.location().toString)
+ Assertions.assertEquals("3", replacedAsSelect.options().get("bucket"))
+ checkAnswer(spark.sql("SELECT * FROM rtas"), Row(2L, "new") :: Nil)
+ checkAnswer(
+ spark.sql(s"SELECT id, data FROM rtas VERSION AS OF
$oldRtasSnapshotId"),
+ Row(1L, "old") :: Nil)
+ }
+ }
+ }
+
+ test("Paimon DDL with hive catalog: SparkGenericCatalog explicit Paimon
replace fallback") {
+ assume(gteqSpark3_4)
+ spark.sql(s"USE $sparkCatalogName")
+ withDatabase("paimon_db") {
+ spark.sql("CREATE DATABASE paimon_db")
+ spark.sql("USE paimon_db")
+
+ withTable("csv_to_paimon", "rtas_csv_to_paimon") {
+ spark.sql("CREATE TABLE csv_to_paimon (id BIGINT, data STRING) USING
csv")
+ spark.sql("INSERT INTO csv_to_paimon VALUES (1, 'csv')")
+
+ spark.sql("""
+ |REPLACE TABLE csv_to_paimon (id BIGINT, name STRING)
+ |USING paimon
+ |TBLPROPERTIES ('bucket' = '-1')
+ |""".stripMargin)
+
+ val paimonTable = loadTable("paimon_db", "csv_to_paimon")
+ Assertions.assertEquals("-1", paimonTable.options().get("bucket"))
+ Assertions.assertEquals(
+ Seq("id", "name"),
+ spark.table("csv_to_paimon").schema.fieldNames.toSeq)
+ checkAnswer(spark.sql("SELECT * FROM csv_to_paimon"), Seq.empty[Row])
+
+ Seq((2L, "new")).toDF("id",
"data").createOrReplaceTempView("provider_source")
+ spark.sql("""
+ |CREATE TABLE rtas_csv_to_paimon (id BIGINT, data STRING)
+ |USING csv
+ |""".stripMargin)
+
+ spark.sql("""
+ |CREATE OR REPLACE TABLE rtas_csv_to_paimon
+ |USING paimon
+ |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '3')
+ |AS SELECT * FROM provider_source
+ |""".stripMargin)
+
+ val rtasPaimonTable = loadTable("paimon_db", "rtas_csv_to_paimon")
+ Assertions.assertEquals("3", rtasPaimonTable.options().get("bucket"))
+ checkAnswer(spark.sql("SELECT * FROM rtas_csv_to_paimon"), Row(2L,
"new") :: Nil)
+ }
+ }
+ }
+
+ test("Paimon DDL with hive catalog: SparkGenericCatalog CTAS with non-Paimon
provider") {
+ assume(gteqSpark3_4)
+ spark.sql(s"USE $sparkCatalogName")
+ withDatabase("paimon_db") {
+ spark.sql("CREATE DATABASE paimon_db")
+ spark.sql("USE paimon_db")
+
+ withTable("csv_ctas") {
+ Seq((1L, "x1"), (2L, "x2")).toDF("id",
"data").createOrReplaceTempView("source")
+ spark.sql("CREATE TABLE csv_ctas USING csv AS SELECT * FROM source")
+
+ val csvTable = spark.sessionState.catalog.getTableMetadata(
+ TableIdentifier("csv_ctas", Some("paimon_db")))
+ Assertions.assertTrue(csvTable.provider.contains("csv"))
+ checkAnswer(
+ spark.sql("SELECT * FROM csv_ctas ORDER BY id"),
+ Row(1L, "x1") :: Row(2L, "x2") :: Nil)
+ }
+ }
+ }
+
test("Paimon DDL with hive catalog: Create Table As Select") {
Seq("paimon", sparkCatalogName, paimonHiveCatalogName).foreach {
catalogName =>
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala
index b25e41a3fb..01def579ef 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala
@@ -183,6 +183,92 @@ abstract class DataFrameWriteTestBase extends
PaimonSparkTestBase {
}
}
+ test("Paimon dataframe: writer v2 replace") {
+ assume(gteqSpark3_4)
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (id BIGINT, data STRING)
+ |USING paimon
+ |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+ |""".stripMargin)
+ sql("INSERT INTO t VALUES (1, 'old')")
+
+ val oldLocation = loadTable("t").location().toString
+ val oldSnapshotId = loadTable("t").snapshotManager().latestSnapshotId()
+
+ spark
+ .range(2, 4)
+ .selectExpr("id", "concat('v', cast(id as string)) AS data")
+ .writeTo("t")
+ .using("paimon")
+ .tableProperty("primary-key", "id")
+ .tableProperty("bucket", "3")
+ .replace()
+
+ val table = loadTable("t")
+ Assertions.assertEquals("3", table.options().get("bucket"))
+ Assertions.assertEquals(oldLocation, table.location().toString)
+ checkAnswer(sql("SELECT * FROM t ORDER BY id"), Row(2L, "v2") :: Row(3L,
"v3") :: Nil)
+ checkAnswer(sql(s"SELECT * FROM t VERSION AS OF $oldSnapshotId"),
Row(1L, "old") :: Nil)
+ }
+ }
+
+ test("Paimon dataframe: writer v2 create fails when table exists") {
+ assume(gteqSpark3_4)
+ withTable("t") {
+ sql("CREATE TABLE t (id BIGINT, data STRING) USING paimon")
+
+ val error = intercept[Exception] {
+ spark
+ .range(2)
+ .selectExpr("id", "concat('v', cast(id as string)) AS data")
+ .writeTo("t")
+ .using("paimon")
+ .create()
+ }.getMessage
+
+ Assertions.assertTrue(
+ error.contains("TABLE_OR_VIEW_ALREADY_EXISTS") ||
error.contains("already exists"))
+ }
+ }
+
+ test("Paimon dataframe: writer v2 create or replace") {
+ assume(gteqSpark3_4)
+ withTable("t") {
+ spark
+ .range(2)
+ .selectExpr("id", "concat('v', cast(id as string)) AS data")
+ .writeTo("t")
+ .using("paimon")
+ .tableProperty("primary-key", "id")
+ .tableProperty("bucket", "2")
+ .createOrReplace()
+
+ val createdLocation = loadTable("t").location().toString
+ checkAnswer(sql("SELECT * FROM t ORDER BY id"), Row(0L, "v0") :: Row(1L,
"v1") :: Nil)
+
+ spark
+ .range(3, 5)
+ .selectExpr(
+ "id",
+ "concat('v', cast(id as string)) AS data",
+ "concat('n', cast(id as string)) AS note")
+ .writeTo("t")
+ .using("paimon")
+ .tableProperty("primary-key", "id")
+ .tableProperty("bucket", "4")
+ .createOrReplace()
+
+ val table = loadTable("t")
+ Assertions.assertEquals(Seq("id", "data", "note"),
spark.table("t").schema.fieldNames.toSeq)
+ Assertions.assertEquals("4", table.options().get("bucket"))
+ Assertions.assertEquals(createdLocation, table.location().toString)
+ checkAnswer(
+ sql("SELECT * FROM t ORDER BY id"),
+ Row(3L, "v3", "n3") :: Row(4L, "v4", "n4") :: Nil)
+ }
+ }
+
test("Paimon: DataFrameWrite partition table") {
withTable("t") {
spark.sql(s"""
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
index d031f7e60c..503005f247 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
@@ -35,6 +35,8 @@ import
org.apache.spark.sql.execution.datasources.v2.BatchScanExec
abstract class FormatTableTestBase extends PaimonHiveTestBase with
AdaptiveSparkPlanHelper {
+ import testImplicits._
+
override protected def beforeEach(): Unit = {
sql(s"USE $paimonHiveCatalogName")
sql(s"USE $hiveDbName")
@@ -158,6 +160,48 @@ abstract class FormatTableTestBase extends
PaimonHiveTestBase with AdaptiveSpark
}
}
+ test("Format table: create or replace as select supports table type change")
{
+ assume(gteqSpark3_4)
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (id BIGINT, data STRING)
+ |USING paimon
+ |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+ |""".stripMargin)
+ sql("INSERT INTO t VALUES (1, 'old')")
+ Seq((2L, "new")).toDF("id", "data").createOrReplaceTempView("source")
+
+ sql("""
+ |CREATE OR REPLACE TABLE t
+ |USING csv
+ |AS SELECT * FROM source
+ |""".stripMargin)
+
+ assert(paimonCatalog.getTable(Identifier.create(hiveDbName,
"t")).isInstanceOf[FormatTable])
+ checkAnswer(sql("SELECT * FROM t"), Seq(Row(2L, "new")))
+ }
+ }
+
+ test("Format table: replace table supports table type change") {
+ assume(gteqSpark3_4)
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (id BIGINT, data STRING)
+ |USING paimon
+ |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+ |""".stripMargin)
+ sql("INSERT INTO t VALUES (1, 'old')")
+
+ sql("""
+ |REPLACE TABLE t (id BIGINT, data STRING)
+ |USING csv
+ |""".stripMargin)
+
+ assert(paimonCatalog.getTable(Identifier.create(hiveDbName,
"t")).isInstanceOf[FormatTable])
+ checkAnswer(sql("SELECT * FROM t"), Seq.empty[Row])
+ }
+ }
+
test("Format table: read compressed files") {
for (format <- Seq("csv", "json")) {
withTable("compress_t") {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/ReplaceTableWithRestCatalogTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/ReplaceTableWithRestCatalogTest.scala
new file mode 100644
index 0000000000..d535e7b8c1
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/ReplaceTableWithRestCatalogTest.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestWithRestCatalogBase
+
+import org.apache.spark.sql.Row
+import org.junit.jupiter.api.Assertions
+
+class ReplaceTableWithRestCatalogTest extends
PaimonSparkTestWithRestCatalogBase {
+
+ import testImplicits._
+
+ test("Replace table with REST catalog: REPLACE TABLE preserves snapshots") {
+ assume(gteqSpark3_4)
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (id BIGINT, data STRING)
+ |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+ |""".stripMargin)
+ sql("INSERT INTO t VALUES (1, 'old')")
+ val oldLocation = loadTable("t").location().toString
+ val oldSnapshotId = loadTable("t").snapshotManager().latestSnapshotId()
+
+ sql("""
+ |REPLACE TABLE t (id BIGINT, name STRING)
+ |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '4')
+ |""".stripMargin)
+
+ val replaced = loadTable("t")
+ Assertions.assertEquals(oldLocation, replaced.location().toString)
+ Assertions.assertEquals("4", replaced.options().get("bucket"))
+ Assertions.assertEquals(Seq("id", "name"),
spark.table("t").schema.fieldNames.toSeq)
+ checkAnswer(sql("SELECT * FROM t"), Seq.empty[Row])
+
+ checkAnswer(
+ sql(s"SELECT id, data FROM t VERSION AS OF $oldSnapshotId"),
+ Seq((1L, "old")).toDF())
+ }
+ }
+
+ test("Replace table with REST catalog: CREATE OR REPLACE TABLE AS SELECT") {
+ assume(gteqSpark3_4)
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (id BIGINT, data STRING)
+ |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+ |""".stripMargin)
+ sql("INSERT INTO t VALUES (1, 'old')")
+ val oldLocation = loadTable("t").location().toString
+ Seq((2L, "x2"), (3L, "x3")).toDF("id",
"data").createOrReplaceTempView("source")
+
+ sql("""
+ |CREATE OR REPLACE TABLE t
+ |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '3')
+ |AS SELECT * FROM source
+ |""".stripMargin)
+
+ val replaced = loadTable("t")
+ Assertions.assertEquals(oldLocation, replaced.location().toString)
+ Assertions.assertEquals("3", replaced.options().get("bucket"))
+ checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq((2L, "x2"), (3L,
"x3")).toDF())
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
index c71b3df923..1798b12410 100644
---
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
+++
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
@@ -32,17 +32,19 @@ import
org.apache.spark.sql.catalyst.analysis.{CTESubstitution, SubstituteUnreso
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment,
CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable,
SubqueryAlias, UnresolvedWith, UpdateAction}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment,
CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable,
SubqueryAlias, TableSpec, UnresolvedWith, UpdateAction}
// NOTE: `MergeRows` / `MergeRows.Keep` were introduced in Spark 3.4. We
access them only via
// reflection inside the `mergeRowsKeep*` method bodies so that loading
`Spark3Shim` does not fail
// on Spark 3.2 / 3.3 runtimes that still ship `paimon-spark3-common` (the
module targets 3.5.8 at
// compile time but must also run on 3.2 / 3.3).
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.util.ArrayData
-import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GeneratedColumn,
ResolveDefaultColumns}
+import org.apache.spark.sql.connector.catalog.{Column, Identifier,
StagingTableCatalog, Table, TableCatalog}
+import
org.apache.spark.sql.connector.catalog.CatalogV2Util.structTypeToV2Columns
import org.apache.spark.sql.connector.expressions.Transform
-import org.apache.spark.sql.execution.SparkFormatTable
+import org.apache.spark.sql.execution.{SparkFormatTable, SparkPlan}
import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex,
PartitionSpec}
+import
org.apache.spark.sql.execution.datasources.v2.{AtomicReplaceTableAsSelectExec,
AtomicReplaceTableExec, ReplaceTableAsSelectExec, ReplaceTableExec}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.streaming.{FileStreamSink,
MetadataLogFileIndex}
import org.apache.spark.sql.internal.SQLConf
@@ -86,6 +88,103 @@ class Spark3Shim extends SparkShim {
tableCatalog.createTable(ident, schema, partitions, properties)
}
+ override def createReplaceTableAsSelectExec(
+ catalog: TableCatalog,
+ ident: Identifier,
+ partitioning: Seq[Transform],
+ query: LogicalPlan,
+ tableSpec: TableSpec,
+ writeOptions: Map[String, String],
+ orCreate: Boolean): SparkPlan = {
+ ReplaceTableAsSelectExec(
+ catalog,
+ ident,
+ partitioning,
+ query,
+ tableSpec,
+ writeOptions,
+ orCreate = orCreate,
+ invalidateCache)
+ }
+
+ override def createAtomicReplaceTableAsSelectExec(
+ catalog: StagingTableCatalog,
+ ident: Identifier,
+ partitioning: Seq[Transform],
+ query: LogicalPlan,
+ tableSpec: TableSpec,
+ writeOptions: Map[String, String],
+ orCreate: Boolean): SparkPlan = {
+ AtomicReplaceTableAsSelectExec(
+ catalog,
+ ident,
+ partitioning,
+ query,
+ tableSpec,
+ writeOptions,
+ orCreate = orCreate,
+ invalidateCache)
+ }
+
+ override def createReplaceTableExec(
+ catalog: TableCatalog,
+ ident: Identifier,
+ columns: Array[Column],
+ partitioning: Seq[Transform],
+ tableSpec: TableSpec,
+ orCreate: Boolean): SparkPlan = {
+ ReplaceTableExec(
+ catalog,
+ ident,
+ columns,
+ partitioning,
+ tableSpec,
+ orCreate = orCreate,
+ invalidateCache)
+ }
+
+ override def createAtomicReplaceTableExec(
+ catalog: StagingTableCatalog,
+ ident: Identifier,
+ columns: Array[Column],
+ partitioning: Seq[Transform],
+ tableSpec: TableSpec,
+ orCreate: Boolean): SparkPlan = {
+ AtomicReplaceTableExec(
+ catalog,
+ ident,
+ columns,
+ partitioning,
+ tableSpec,
+ orCreate = orCreate,
+ invalidateCache)
+ }
+
+ override def toReplaceTableColumns(
+ tableSchema: StructType,
+ schemaOrColumns: Any,
+ catalog: TableCatalog,
+ ident: Identifier): Array[Column] = {
+ val statementType = "CREATE TABLE"
+ val schema = schemaOrColumns.asInstanceOf[StructType]
+ ResolveDefaultColumns.validateCatalogForDefaultValue(schema, catalog,
ident)
+ val newSchema =
+ ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(schema,
statementType)
+ GeneratedColumn.validateGeneratedColumns(newSchema, catalog, ident,
statementType)
+ structTypeToV2Columns(newSchema)
+ }
+
+ override def copyTableSpec(
+ tableSpec: TableSpec,
+ additionalProperties: Map[String, String],
+ location: Option[String]): TableSpec = {
+ tableSpec.copy(properties = tableSpec.properties ++ additionalProperties,
location = location)
+ }
+
+ private def invalidateCache(tableCatalog: TableCatalog, table: Table, ident:
Identifier): Unit = {
+ tableCatalog.invalidateTable(ident)
+ }
+
override def createCTERelationRef(
cteId: Long,
resolved: Boolean,
diff --git
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
index b66b896927..516c02a09f 100644
---
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
+++
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
@@ -32,14 +32,15 @@ import
org.apache.spark.sql.catalyst.analysis.CTESubstitution
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment,
CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable,
MergeRows, SubqueryAlias, UnresolvedWith, UpdateAction}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment,
ColumnDefinition, CTERelationRef, InsertAction, LogicalPlan, MergeAction,
MergeIntoTable, MergeRows, SubqueryAlias, TableSpec, UnresolvedWith,
UpdateAction}
import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Copy, Insert,
Keep, Update}
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.util.ArrayData
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier,
Table, TableCatalog}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GeneratedColumn,
IdentityColumn, ResolveDefaultColumns}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column,
Identifier, StagingTableCatalog, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
-import org.apache.spark.sql.execution.SparkFormatTable
+import org.apache.spark.sql.execution.{SparkFormatTable, SparkPlan}
import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex,
PartitionSpec}
+import
org.apache.spark.sql.execution.datasources.v2.{AtomicReplaceTableAsSelectExec,
AtomicReplaceTableExec, ReplaceTableAsSelectExec, ReplaceTableExec}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.streaming.runtime.MetadataLogFileIndex
import org.apache.spark.sql.execution.streaming.sinks.FileStreamSink
@@ -86,6 +87,102 @@ class Spark4Shim extends SparkShim {
tableCatalog.createTable(ident, columns, partitions, properties)
}
+ override def createReplaceTableAsSelectExec(
+ catalog: TableCatalog,
+ ident: Identifier,
+ partitioning: Seq[Transform],
+ query: LogicalPlan,
+ tableSpec: TableSpec,
+ writeOptions: Map[String, String],
+ orCreate: Boolean): SparkPlan = {
+ ReplaceTableAsSelectExec(
+ catalog,
+ ident,
+ partitioning,
+ query,
+ tableSpec,
+ writeOptions,
+ orCreate = orCreate,
+ invalidateCache)
+ }
+
+ override def createAtomicReplaceTableAsSelectExec(
+ catalog: StagingTableCatalog,
+ ident: Identifier,
+ partitioning: Seq[Transform],
+ query: LogicalPlan,
+ tableSpec: TableSpec,
+ writeOptions: Map[String, String],
+ orCreate: Boolean): SparkPlan = {
+ AtomicReplaceTableAsSelectExec(
+ catalog,
+ ident,
+ partitioning,
+ query,
+ tableSpec,
+ writeOptions,
+ orCreate = orCreate,
+ invalidateCache)
+ }
+
+ override def createReplaceTableExec(
+ catalog: TableCatalog,
+ ident: Identifier,
+ columns: Array[Column],
+ partitioning: Seq[Transform],
+ tableSpec: TableSpec,
+ orCreate: Boolean): SparkPlan = {
+ ReplaceTableExec(
+ catalog,
+ ident,
+ columns,
+ partitioning,
+ tableSpec,
+ orCreate = orCreate,
+ invalidateCache)
+ }
+
+ override def createAtomicReplaceTableExec(
+ catalog: StagingTableCatalog,
+ ident: Identifier,
+ columns: Array[Column],
+ partitioning: Seq[Transform],
+ tableSpec: TableSpec,
+ orCreate: Boolean): SparkPlan = {
+ AtomicReplaceTableExec(
+ catalog,
+ ident,
+ columns,
+ partitioning,
+ tableSpec,
+ orCreate = orCreate,
+ invalidateCache)
+ }
+
+ override def toReplaceTableColumns(
+ tableSchema: StructType,
+ schemaOrColumns: Any,
+ catalog: TableCatalog,
+ ident: Identifier): Array[Column] = {
+ val statementType = "REPLACE TABLE"
+ val columns = schemaOrColumns.asInstanceOf[Seq[ColumnDefinition]]
+ ResolveDefaultColumns.validateCatalogForDefaultValue(columns, catalog,
ident)
+ GeneratedColumn.validateGeneratedColumns(tableSchema, catalog, ident,
statementType)
+ IdentityColumn.validateIdentityColumn(tableSchema, catalog, ident)
+ columns.map(_.toV2Column(statementType)).toArray
+ }
+
+ override def copyTableSpec(
+ tableSpec: TableSpec,
+ additionalProperties: Map[String, String],
+ location: Option[String]): TableSpec = {
+ tableSpec.copy(properties = tableSpec.properties ++ additionalProperties,
location = location)
+ }
+
+ private def invalidateCache(tableCatalog: TableCatalog, ident: Identifier):
Unit = {
+ tableCatalog.invalidateTable(ident)
+ }
+
override def createCTERelationRef(
cteId: Long,
resolved: Boolean,