This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a7c8a6f3f [core][spark] Support history-preserving replace table 
(#7860)
6a7c8a6f3f is described below

commit 6a7c8a6f3fb0c9153e6fbde4d8798d825e7a7f73
Author: Zouxxyy <[email protected]>
AuthorDate: Wed May 20 19:30:39 2026 +0800

    [core][spark] Support history-preserving replace table (#7860)
    
    Support Spark replace-table flows for Paimon tables while preserving
    table history where possible.
    
    This change adds staged replace support for `REPLACE TABLE` and `CREATE
    OR REPLACE TABLE ... AS SELECT`, using truncate plus schema replacement
    for compatible FileStore tables so old snapshots remain available for
    time travel. It also falls back to drop-and-create when the source or
    target is not a Paimon table, handles SparkGenericCatalog provider
    checks, and documents the Spark SQL behavior.
---
 docs/content/spark/sql-ddl.md                      |  56 +++++++
 .../main/java/org/apache/paimon/rest/RESTApi.java  |  19 +++
 .../java/org/apache/paimon/rest/ResourcePaths.java |  11 ++
 .../paimon/rest/requests/ReplaceTableRequest.java  |  47 ++++++
 .../org/apache/paimon/catalog/AbstractCatalog.java |  90 +++++++++++
 .../org/apache/paimon/catalog/CachingCatalog.java  |   8 +
 .../java/org/apache/paimon/catalog/Catalog.java    |  19 +++
 .../org/apache/paimon/catalog/DelegateCatalog.java |   6 +
 .../apache/paimon/catalog/FileSystemCatalog.java   |  14 ++
 .../java/org/apache/paimon/rest/RESTCatalog.java   |  22 +++
 .../org/apache/paimon/catalog/CatalogTestBase.java |  94 +++++++++++
 .../org/apache/paimon/jdbc/JdbcCatalogTest.java    |   6 +
 .../org/apache/paimon/rest/RESTCatalogServer.java  |  39 +++++
 .../org/apache/paimon/rest/RESTCatalogTest.java    |   5 +
 .../java/org/apache/paimon/hive/HiveCatalog.java   |  35 +++++
 .../shim/PaimonCreateTableAsSelectStrategy.scala   |  76 ++++-----
 .../shim/PaimonReplaceTableAsSelectStrategy.scala} |  26 ++--
 .../shim/PaimonCreateTableAsSelectStrategy.scala   |  63 +++-----
 .../shim/PaimonReplaceTableAsSelectStrategy.scala} |  26 ++--
 .../shim/PaimonCreateTableAsSelectStrategy.scala   |  63 +++-----
 .../shim/PaimonReplaceTableAsSelectStrategy.scala  | 168 ++++++++++++++++++++
 .../shim/PaimonCreateTableAsSelectStrategy.scala   |  90 -----------
 .../apache/spark/sql/paimon/shims/Spark4Shim.scala | 105 ++++++++++++-
 .../apache/paimon/spark/RollbackStagedTable.java   | 173 +++++++++++++++++++++
 .../java/org/apache/paimon/spark/SparkCatalog.java | 100 ++++++++++++
 .../apache/paimon/spark/SparkGenericCatalog.java   |  49 ++++++
 .../paimon/spark/catalog/SparkBaseCatalog.java     |   7 +-
 .../paimon/spark/execution/PaimonStrategy.scala    |  10 +-
 .../spark/sql/execution/PaimonStrategyHelper.scala |  25 ++-
 .../shim/PaimonCreateTableAsSelectStrategy.scala   |  61 +++-----
 .../shim/PaimonReplaceTableAsSelectStrategy.scala  | 151 ++++++++++++++++++
 .../apache/spark/sql/paimon/shims/SparkShim.scala  |  50 +++++-
 .../org/apache/paimon/spark/sql/DDLTestBase.scala  | 157 +++++++++++++++++++
 .../spark/sql/DDLWithHiveCatalogTestBase.scala     | 135 ++++++++++++++++
 .../paimon/spark/sql/DataFrameWriteTestBase.scala  |  86 ++++++++++
 .../paimon/spark/sql/FormatTableTestBase.scala     |  44 ++++++
 .../sql/ReplaceTableWithRestCatalogTest.scala      |  81 ++++++++++
 .../apache/spark/sql/paimon/shims/Spark3Shim.scala | 107 ++++++++++++-
 .../apache/spark/sql/paimon/shims/Spark4Shim.scala | 105 ++++++++++++-
 39 files changed, 2124 insertions(+), 305 deletions(-)

diff --git a/docs/content/spark/sql-ddl.md b/docs/content/spark/sql-ddl.md
index 6ca8831c15..42b101896f 100644
--- a/docs/content/spark/sql-ddl.md
+++ b/docs/content/spark/sql-ddl.md
@@ -285,6 +285,62 @@ CREATE TABLE my_table_all (
 CREATE TABLE my_table_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' 
= 'dt,hh') AS SELECT * FROM my_table_all;
 ```
 
+### Replace Table
+
+Paimon supports preserving snapshot history for Spark `REPLACE TABLE` from 
**Spark 3.4**.
+
+```sql
+CREATE TABLE my_table (
+    user_id BIGINT,
+    item_id BIGINT,
+    behavior STRING
+) TBLPROPERTIES (
+    'primary-key' = 'user_id',
+    'bucket' = '2'
+);
+
+INSERT INTO my_table VALUES (1, 10, 'pv');
+
+REPLACE TABLE my_table (
+    user_id BIGINT,
+    item_id BIGINT,
+    category STRING
+) TBLPROPERTIES (
+    'primary-key' = 'user_id',
+    'bucket' = '4'
+);
+```
+
+In Paimon, this is not an atomic replacement. Paimon changes Spark's 
drop+create replace path to
+truncate the current table and commit a new schema, while preserving the table 
location and snapshot
+history. The current table becomes empty and uses the new schema, but old 
snapshots can still be
+queried by time travel.
+
+```sql
+SELECT * FROM my_table;
+
+SELECT * FROM my_table VERSION AS OF 1;
+```
+
+`REPLACE TABLE` requires the table to exist. If the table does not exist, use
+`CREATE OR REPLACE TABLE` instead.
+
+`REPLACE TABLE` does not accept `AS SELECT`. To replace a table and populate 
it with query results,
+use `CREATE OR REPLACE TABLE ... AS SELECT`.
+
+```sql
+CREATE OR REPLACE TABLE my_table
+TBLPROPERTIES (
+    'primary-key' = 'user_id',
+    'bucket' = '4'
+)
+AS SELECT user_id, item_id, behavior FROM source_table;
+```
+
+When the existing table and target table use different table types,
+uses its fallback drop+create behavior instead of snapshot-preserving replace
+behavior.
+
 ### Create Table Like
 
 A new table can be created from an existing source table. Available from 
**Spark 3.4**.
diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java 
b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
index 88761ca5f0..7433a30388 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
@@ -51,6 +51,7 @@ import 
org.apache.paimon.rest.requests.MarkDonePartitionsRequest;
 import org.apache.paimon.rest.requests.RegisterTableRequest;
 import org.apache.paimon.rest.requests.RenameBranchRequest;
 import org.apache.paimon.rest.requests.RenameTableRequest;
+import org.apache.paimon.rest.requests.ReplaceTableRequest;
 import org.apache.paimon.rest.requests.ResetConsumerRequest;
 import org.apache.paimon.rest.requests.RollbackSchemaRequest;
 import org.apache.paimon.rest.requests.RollbackTableRequest;
@@ -778,6 +779,24 @@ public class RESTApi {
                 restAuthFunction);
     }
 
+    /**
+     * Replace table.
+     *
+     * @param identifier database name and table name.
+     * @param schema schema to replace table
+     * @throws NoSuchResourceException Exception thrown on HTTP 404 means the 
table not exists
+     * @throws ForbiddenException Exception thrown on HTTP 403 means don't 
have the permission for
+     *     this table
+     */
+    public void replaceTable(Identifier identifier, Schema schema) {
+        ReplaceTableRequest request = new ReplaceTableRequest(schema);
+        client.post(
+                resourcePaths.replaceTable(
+                        identifier.getDatabaseName(), 
identifier.getObjectName()),
+                request,
+                restAuthFunction);
+    }
+
     /**
      * Auth table query.
      *
diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java 
b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
index 6d1d16c82f..66a1653232 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
@@ -98,6 +98,17 @@ public class ResourcePaths {
         return SLASH.join(V1, prefix, TABLES, "rename");
     }
 
+    public String replaceTable(String databaseName, String objectName) {
+        return SLASH.join(
+                V1,
+                prefix,
+                DATABASES,
+                encodeString(databaseName),
+                TABLES,
+                encodeString(objectName),
+                "replace");
+    }
+
     public String commitTable(String databaseName, String objectName) {
         return SLASH.join(
                 V1,
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/rest/requests/ReplaceTableRequest.java
 
b/paimon-api/src/main/java/org/apache/paimon/rest/requests/ReplaceTableRequest.java
new file mode 100644
index 0000000000..526724a313
--- /dev/null
+++ 
b/paimon-api/src/main/java/org/apache/paimon/rest/requests/ReplaceTableRequest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.requests;
+
+import org.apache.paimon.rest.RESTRequest;
+import org.apache.paimon.schema.Schema;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/** Request for replacing table. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ReplaceTableRequest implements RESTRequest {
+
+    private static final String FIELD_SCHEMA = "schema";
+
+    @JsonProperty(FIELD_SCHEMA)
+    private final Schema schema;
+
+    @JsonCreator
+    public ReplaceTableRequest(@JsonProperty(FIELD_SCHEMA) Schema schema) {
+        this.schema = schema;
+    }
+
+    @JsonGetter(FIELD_SCHEMA)
+    public Schema getSchema() {
+        return schema;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 9512e2f767..4a8a8b1b91 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -21,6 +21,7 @@ package org.apache.paimon.catalog;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.PagedList;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.TableType;
 import org.apache.paimon.factories.FactoryUtil;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
@@ -42,6 +43,7 @@ import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.table.Instant;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableSnapshot;
+import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.system.SystemTableLoader;
 import org.apache.paimon.utils.SnapshotNotExistException;
 
@@ -500,6 +502,94 @@ public abstract class AbstractCatalog implements Catalog {
     protected abstract void alterTableImpl(Identifier identifier, 
List<SchemaChange> changes)
             throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException;
 
+    @Override
+    public void replaceTable(Identifier identifier, Schema newSchema, boolean 
ignoreIfNotExists)
+            throws TableNotExistException {
+        checkNotBranch(identifier, "replaceTable");
+        checkNotSystemTable(identifier, "replaceTable");
+        validateCreateTable(newSchema, false);
+        validateCustomTablePath(newSchema.options());
+        copyTableDefaultOptions(newSchema.options());
+
+        Table existing;
+        try {
+            existing = getTable(identifier);
+        } catch (TableNotExistException e) {
+            if (ignoreIfNotExists) {
+                return;
+            }
+            throw e;
+        }
+
+        TableType targetTableType = 
Options.fromMap(newSchema.options()).get(TYPE);
+        if (!(existing instanceof FileStoreTable) || 
!targetTableType.equals(TableType.TABLE)) {
+            dropAndCreateTable(identifier, newSchema);
+            return;
+        }
+
+        // todo: support this
+        List<String> oldPartitionKeys = ((FileStoreTable) 
existing).schema().partitionKeys();
+        List<String> newPartitionKeys = newSchema.partitionKeys();
+        if (!Objects.equals(oldPartitionKeys, newPartitionKeys)) {
+            throw new UnsupportedOperationException(
+                    "replaceTable does not support changing partition keys 
(old="
+                            + oldPartitionKeys
+                            + ", new="
+                            + newPartitionKeys
+                            + "). Drop and re-create the table instead.");
+        }
+
+        replaceTableImpl(identifier, (FileStoreTable) existing, newSchema);
+    }
+
+    private void dropAndCreateTable(Identifier identifier, Schema newSchema)
+            throws TableNotExistException {
+        dropTable(identifier, false);
+        try {
+            createTable(identifier, newSchema, false);
+        } catch (TableAlreadyExistException | DatabaseNotExistException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /** Truncate visible data first, then append the new schema. Non-atomic on 
failure. */
+    protected void replaceTableImpl(
+            Identifier identifier, FileStoreTable existingTable, Schema 
newSchema)
+            throws TableNotExistException {
+        truncateTable(existingTable);
+        try {
+            appendNewSchema(existingTable, newSchema);
+        } catch (RuntimeException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /** Append a new schema (id = latest + 1) via atomic CAS. Returns the new 
schema id. */
+    protected long appendNewSchema(FileStoreTable existingTable, Schema 
newSchema)
+            throws Exception {
+        SchemaManager sm = existingTable.schemaManager();
+        while (true) {
+            TableSchema latest = sm.latestOrThrow("Cannot replace: schema 
chain is empty.");
+            TableSchema staged = TableSchema.create(latest.id() + 1, 
newSchema);
+            if (sm.commit(staged)) {
+                return staged.id();
+            }
+        }
+    }
+
+    protected void truncateTable(FileStoreTable existingTable) {
+        try (TableCommitImpl commit =
+                existingTable.newCommit("replace-table-" + 
java.util.UUID.randomUUID())) {
+            commit.truncateTable();
+        } catch (RuntimeException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     @Override
     public Table getTable(Identifier identifier) throws TableNotExistException 
{
         return CatalogUtils.loadTable(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index c779db7d20..9fc7b124f9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -24,6 +24,7 @@ import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.partition.PartitionStatistics;
+import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
@@ -224,6 +225,13 @@ public class CachingCatalog extends DelegateCatalog {
         invalidateTable(identifier);
     }
 
+    @Override
+    public void replaceTable(Identifier identifier, Schema newSchema, boolean 
ignoreIfNotExists)
+            throws TableNotExistException {
+        super.replaceTable(identifier, newSchema, ignoreIfNotExists);
+        invalidateTable(identifier);
+    }
+
     @Override
     public Table getTable(Identifier identifier) throws TableNotExistException 
{
         // For system table, do not cache it directly. Instead, cache the 
origin table and then wrap
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index ab66f937ea..57fa040a2a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -358,6 +358,25 @@ public interface Catalog extends AutoCloseable {
         alterTable(identifier, Collections.singletonList(change), 
ignoreIfNotExists);
     }
 
+    /**
+     * Replace an existing table with a new {@link Schema}.
+     *
+     * <p>For compatible FileStore tables, it truncates visible data and 
appends a new schema to the
+     * schema chain, preserving old schemas and snapshots for time travel. 
Other table kinds may
+     * fall back to drop-and-create behavior.
+     *
+     * @param identifier path of the table to be replaced
+     * @param newSchema the new {@link Schema}
+     * @param ignoreIfNotExists if true, do nothing when the table does not 
exist
+     * @throws TableNotExistException if the table does not exist and {@code 
ignoreIfNotExists} is
+     *     false
+     */
+    default void replaceTable(Identifier identifier, Schema newSchema, boolean 
ignoreIfNotExists)
+            throws TableNotExistException {
+        throw new UnsupportedOperationException(
+                "Catalog " + getClass().getName() + " does not support 
replaceTable.");
+    }
+
     // ======================= partition methods 
===============================
 
     /**
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index ec5138a8cb..0f18f7d045 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -171,6 +171,12 @@ public abstract class DelegateCatalog implements Catalog {
         wrapped.alterTable(identifier, changes, ignoreIfNotExists);
     }
 
+    @Override
+    public void replaceTable(Identifier identifier, Schema newSchema, boolean 
ignoreIfNotExists)
+            throws TableNotExistException {
+        wrapped.replaceTable(identifier, newSchema, ignoreIfNotExists);
+    }
+
     @Override
     public void registerTable(Identifier identifier, String path)
             throws TableAlreadyExistException {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index cea266f703..b0b8b1ac7f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -25,6 +25,7 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -179,6 +180,19 @@ public class FileSystemCatalog extends AbstractCatalog {
         }
     }
 
+    @Override
+    protected void replaceTableImpl(
+            Identifier identifier, FileStoreTable existingTable, Schema 
newSchema) {
+        truncateTable(existingTable);
+        try {
+            runWithLock(identifier, () -> appendNewSchema(existingTable, 
newSchema));
+        } catch (RuntimeException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     protected static <T> T uncheck(Callable<T> callable) {
         try {
             return callable.call();
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 94442d5c30..7e2f6cfd27 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -589,6 +589,28 @@ public class RESTCatalog implements Catalog {
         }
     }
 
+    @Override
+    public void replaceTable(Identifier identifier, Schema newSchema, boolean 
ignoreIfNotExists)
+            throws TableNotExistException {
+        checkNotBranch(identifier, "replaceTable");
+        checkNotSystemTable(identifier, "replaceTable");
+        validateCreateTable(newSchema, dataTokenEnabled);
+        try {
+            tableDefaultOptions.forEach(newSchema.options()::putIfAbsent);
+            api.replaceTable(identifier, newSchema);
+        } catch (NoSuchResourceException e) {
+            if (!ignoreIfNotExists) {
+                throw new TableNotExistException(identifier);
+            }
+        } catch (NotImplementedException e) {
+            throw new UnsupportedOperationException(e.getMessage());
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(identifier, e);
+        } catch (BadRequestException e) {
+            throw new IllegalArgumentException(e.getMessage());
+        }
+    }
+
     @Override
     public TableQueryAuthResult authTableQuery(Identifier identifier, 
@Nullable List<String> select)
             throws TableNotExistException {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 4f4aed63fa..07539d914f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -1171,6 +1171,96 @@ public abstract class CatalogTestBase {
         baseAlterTable(initOptions);
     }
 
+    @Test
+    public void testReplaceTable() throws Exception {
+        if (!supportsReplaceTable()) {
+            return;
+        }
+        catalog.createDatabase("replace_db", true);
+        Identifier identifier = Identifier.create("replace_db", "t");
+
+        Schema initialSchema =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("data", DataTypes.STRING())
+                        .column("pt", DataTypes.STRING())
+                        .partitionKeys("pt")
+                        .primaryKey("id", "pt")
+                        .option("bucket", "2")
+                        .build();
+        catalog.createTable(identifier, initialSchema, false);
+
+        Table created = catalog.getTable(identifier);
+        String oldLocation = ((FileStoreTable) created).location().toString();
+        BatchWriteBuilder writeBuilder = created.newBatchWriteBuilder();
+        try (BatchTableWrite write = writeBuilder.newWrite();
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            write.write(
+                    GenericRow.of(1, BinaryString.fromString("old"), 
BinaryString.fromString("a")));
+            commit.commit(write.prepareCommit());
+        }
+
+        long oldSnapshotId =
+                ((FileStoreTable) catalog.getTable(identifier))
+                        .snapshotManager()
+                        .latestSnapshotId();
+
+        // Replace with new PK + bucket (partition keys unchanged)
+        Schema newSchema =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("name", DataTypes.STRING())
+                        .column("pt", DataTypes.STRING())
+                        .partitionKeys("pt")
+                        .primaryKey("id", "pt")
+                        .option("bucket", "4")
+                        .build();
+        catalog.replaceTable(identifier, newSchema, false);
+
+        FileStoreTable replaced = (FileStoreTable) 
catalog.getTable(identifier);
+        assertThat(replaced.partitionKeys()).containsExactly("pt");
+        assertThat(replaced.primaryKeys()).containsExactly("id", "pt");
+        assertThat(replaced.options().get("bucket")).isEqualTo("4");
+        assertThat(replaced.location().toString()).isEqualTo(oldLocation);
+        assertThat(read(replaced, null, null, null, null)).isEmpty();
+
+        // Time-travel to old snapshot still returns old data with old schema
+        FileStoreTable oldView =
+                replaced.copy(Collections.singletonMap("scan.snapshot-id", "" 
+ oldSnapshotId));
+        assertThat(oldView.schema().fieldNames()).containsExactly("id", 
"data", "pt");
+        assertThat(read(oldView, null, null, null, null)).hasSize(1);
+
+        // Changing partition keys is rejected
+        Schema changePartitionKeys =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("name", DataTypes.STRING())
+                        .column("pt", DataTypes.STRING())
+                        .primaryKey("id")
+                        .option("bucket", "4")
+                        .build();
+        assertThatExceptionOfType(UnsupportedOperationException.class)
+                .isThrownBy(() -> catalog.replaceTable(identifier, 
changePartitionKeys, false))
+                .withMessageContaining("partition keys");
+
+        // ignoreIfNotExists = true: missing table is silently skipped
+        Identifier missing = Identifier.create("replace_db", "missing");
+        catalog.replaceTable(missing, newSchema, true);
+
+        // ignoreIfNotExists = false: missing table throws
+        assertThatExceptionOfType(Catalog.TableNotExistException.class)
+                .isThrownBy(() -> catalog.replaceTable(missing, newSchema, 
false));
+
+        // System table is rejected
+        assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(
+                        () ->
+                                catalog.replaceTable(
+                                        Identifier.create("replace_db", 
"$system_table"),
+                                        newSchema,
+                                        false));
+    }
+
     @Test
     public void testView() throws Exception {
         if (!supportsView()) {
@@ -1658,6 +1748,10 @@ public abstract class CatalogTestBase {
         return true;
     }
 
+    protected boolean supportsReplaceTable() {
+        return true;
+    }
+
     protected void checkPartition(Partition expected, Partition actual) {
         assertThat(actual).isEqualTo(expected);
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
index 2b121dd4cd..fd3c6fdc59 100644
--- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
@@ -140,6 +140,12 @@ public class JdbcCatalogTest extends CatalogTestBase {
         return true;
     }
 
+    @Override
+    protected boolean supportsReplaceTable() {
+        // jdbc lock interferes with the test data commit; replace path itself 
works at runtime
+        return false;
+    }
+
     @Test
     public void testRepairTableNotExist() throws Exception {
         String databaseName = "repair_db";
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 51d2f0cf13..74e4f4e465 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -63,6 +63,7 @@ import 
org.apache.paimon.rest.requests.ListPartitionsByNamesRequest;
 import org.apache.paimon.rest.requests.MarkDonePartitionsRequest;
 import org.apache.paimon.rest.requests.RenameBranchRequest;
 import org.apache.paimon.rest.requests.RenameTableRequest;
+import org.apache.paimon.rest.requests.ReplaceTableRequest;
 import org.apache.paimon.rest.requests.ResetConsumerRequest;
 import org.apache.paimon.rest.requests.RollbackSchemaRequest;
 import org.apache.paimon.rest.requests.RollbackTableRequest;
@@ -429,6 +430,10 @@ public class RESTCatalogServer {
                                 resources.length == 4
                                         && 
ResourcePaths.TABLES.equals(resources[1])
                                         && 
"rollback-schema".equals(resources[3]);
+                        boolean isReplaceTable =
+                                resources.length == 4
+                                        && 
ResourcePaths.TABLES.equals(resources[1])
+                                        && "replace".equals(resources[3]);
                         boolean isPartitions =
                                 resources.length == 4
                                         && 
ResourcePaths.TABLES.equals(resources[1])
@@ -552,6 +557,8 @@ public class RESTCatalogServer {
                             }
                         } else if (isRollbackSchema) {
                             return rollbackSchemaHandle(identifier, 
restAuthParameter.data());
+                        } else if (isReplaceTable) {
+                            return replaceTableHandle(identifier, 
restAuthParameter.data());
                         } else if (isTable) {
                             return tableHandle(
                                     restAuthParameter.method(),
@@ -1754,6 +1761,38 @@ public class RESTCatalogServer {
         }
     }
 
+    private MockResponse replaceTableHandle(Identifier identifier, String 
data) throws Exception {
+        ReplaceTableRequest requestBody = RESTApi.fromJson(data, 
ReplaceTableRequest.class);
+        Schema newSchema = requestBody.getSchema();
+        if (!tableMetadataStore.containsKey(identifier.getFullName())) {
+            throw new Catalog.TableNotExistException(identifier);
+        }
+        TableMetadata tableMetadata = 
tableMetadataStore.get(identifier.getFullName());
+        if (isFormatTable(tableMetadata.schema().toSchema()) || 
isFormatTable(newSchema)) {
+            throw new UnsupportedOperationException("replaceTable does not 
support format tables.");
+        }
+        catalog.replaceTable(identifier, newSchema, false);
+        TableSchema replacedSchema = catalog.loadTableSchema(identifier);
+        TableMetadata newTableMetadata =
+                createTableMetadata(
+                        identifier,
+                        replacedSchema.id(),
+                        replacedSchema.toSchema(),
+                        tableMetadata.uuid(),
+                        tableMetadata.isExternal());
+        tableMetadataStore.put(identifier.getFullName(), newTableMetadata);
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+        Snapshot truncateSnapshot = table.snapshotManager().latestSnapshot();
+        if (truncateSnapshot != null) {
+            tableLatestSnapshotStore.put(
+                    identifier.getFullName(), new 
TableSnapshot(truncateSnapshot, 0L, 0L, 0L, 0L));
+        } else {
+            tableLatestSnapshotStore.remove(identifier.getFullName());
+        }
+        tablePartitionsStore.remove(identifier.getFullName());
+        return new MockResponse().setResponseCode(200);
+    }
+
     private MockResponse renameTableHandle(String data) throws Exception {
         RenameTableRequest requestBody = RESTApi.fromJson(data, 
RenameTableRequest.class);
         Identifier fromTable = requestBody.getSource();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index a07c6eaed2..8cefa2ec7b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -3173,6 +3173,11 @@ public abstract class RESTCatalogTest extends 
CatalogTestBase {
         return true;
     }
 
+    @Override
+    protected boolean supportsReplaceTable() {
+        return true;
+    }
+
     // TODO implement this
     @Override
     @Test
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index ac0706b40b..d32ec08648 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -1296,6 +1296,41 @@ public class HiveCatalog extends AbstractCatalog {
         clients().execute(client -> HiveAlterTableUtils.alterTable(client, 
identifier, table));
     }
 
+    @Override
+    protected void replaceTableImpl(
+            Identifier identifier, FileStoreTable existingTable, Schema 
newSchema)
+            throws TableNotExistException {
+        Table hmsTable = getHmsTable(identifier);
+        if (!isPaimonTable(hmsTable)) {
+            throw new UnsupportedOperationException("Only data table support 
replaceTable.");
+        }
+
+        truncateTable(existingTable);
+
+        SchemaManager schemaManager = existingTable.schemaManager();
+        long newSchemaId;
+        try {
+            newSchemaId = runWithLock(identifier, () -> 
appendNewSchema(existingTable, newSchema));
+        } catch (RuntimeException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to replaceTable " + 
identifier.getFullName(), e);
+        }
+
+        // currently only changes to main branch affect metastore
+        if (!DEFAULT_MAIN_BRANCH.equals(identifier.getBranchNameOrDefault())) {
+            return;
+        }
+
+        try {
+            TableSchema newTableSchema = schemaManager.schema(newSchemaId);
+            alterTableToHms(hmsTable, identifier, newTableSchema, 
Collections.emptySet());
+        } catch (Exception te) {
+            schemaManager.deleteSchema(newSchemaId);
+            throw new RuntimeException(te);
+        }
+    }
+
     @Override
     public boolean caseSensitive() {
         return options.getOptional(CASE_SENSITIVE).orElse(false);
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
index cc6258e6eb..96f194668f 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
@@ -18,14 +18,13 @@
 
 package org.apache.spark.sql.execution.shim
 
-import org.apache.paimon.CoreOptions
-import org.apache.paimon.iceberg.IcebergOptions
+import org.apache.paimon.spark.SparkCatalog
 import org.apache.paimon.spark.catalog.FormatTableCatalog
 
 import org.apache.spark.sql.{SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, 
LogicalPlan}
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, 
StagingTableCatalog}
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.connector.catalog.CatalogV2Util
+import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan}
 import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -34,47 +33,40 @@ import scala.collection.JavaConverters._
 case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) extends 
Strategy {
 
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-    case CreateTableAsSelect(catalog, ident, parts, query, props, options, 
ifNotExists) =>
-      catalog match {
-        case _: StagingTableCatalog =>
-          throw new RuntimeException("Paimon can't extend StagingTableCatalog 
for now.")
-        case _ =>
-          val coreOptionKeys = 
CoreOptions.getOptions.asScala.map(_.key()).toSeq
+    case CreateTableAsSelect(
+          catalog: SparkCatalog,
+          ident,
+          parts,
+          query,
+          props,
+          options,
+          ifNotExists) =>
+      val (tableOptions, writeOptions) = 
PaimonStrategyHelper.splitTableAndWriteOptions(options)
+      val newProps = CatalogV2Util.withDefaultOwnership(props) ++ tableOptions
 
-          // Include Iceberg compatibility options in table properties (fix 
for DataFrame writer options)
-          val icebergOptionKeys = 
IcebergOptions.getOptions.asScala.map(_.key()).toSeq
-
-          val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys
-
-          val (tableOptions, writeOptions) = options.partition {
-            case (key, _) => allTableOptionKeys.contains(key)
-          }
-          val newProps = CatalogV2Util.withDefaultOwnership(props) ++ 
tableOptions
-
-          val isPartitionedFormatTable = {
-            catalog match {
-              case catalog: FormatTableCatalog =>
-                catalog.isFormatTable(newProps.get("provider").orNull) && 
parts.nonEmpty
-              case _ => false
-            }
-          }
-
-          if (isPartitionedFormatTable) {
-            throw new UnsupportedOperationException(
-              "Using CTAS with partitioned format table is not supported yet.")
-          }
+      val isPartitionedFormatTable = {
+        catalog match {
+          case formatCatalog: FormatTableCatalog =>
+            formatCatalog.isFormatTable(newProps.get("provider").orNull) && 
parts.nonEmpty
+          case _ => false
+        }
+      }
 
-          CreateTableAsSelectExec(
-            catalog,
-            ident,
-            parts,
-            query,
-            planLater(query),
-            newProps,
-            new CaseInsensitiveStringMap(writeOptions.asJava),
-            ifNotExists
-          ) :: Nil
+      if (isPartitionedFormatTable) {
+        throw new UnsupportedOperationException(
+          "Using CTAS with partitioned format table is not supported yet.")
       }
+
+      CreateTableAsSelectExec(
+        catalog,
+        ident,
+        parts,
+        query,
+        planLater(query),
+        newProps,
+        new CaseInsensitiveStringMap(writeOptions.asJava),
+        ifNotExists
+      ) :: Nil
     case _ => Nil
   }
 }
diff --git 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala
similarity index 53%
copy from 
paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala
copy to 
paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala
index 9fb3a7b54a..d637c006da 100644
--- 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala
@@ -16,26 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution
+package org.apache.spark.sql.execution.shim
 
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.CatalogUtils
-import org.apache.spark.sql.catalyst.plans.logical.TableSpec
-import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
+import org.apache.spark.sql.{SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
 
-trait PaimonStrategyHelper {
+case class PaimonReplaceTableAsSelectStrategy(spark: SparkSession) extends 
Strategy {
 
-  def spark: SparkSession
-
-  protected def makeQualifiedDBObjectPath(location: String): String = {
-    CatalogUtils.makeQualifiedDBObjectPath(
-      spark.sharedState.conf.get(WAREHOUSE_PATH),
-      location,
-      spark.sharedState.hadoopConf)
-  }
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = Nil
+}
 
-  protected def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = {
-    tableSpec.copy(location = 
tableSpec.location.map(makeQualifiedDBObjectPath(_)))
-  }
+case class PaimonReplaceTableStrategy(spark: SparkSession) extends Strategy {
 
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = Nil
 }
diff --git 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
index a09996f153..42d6fe2816 100644
--- 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
+++ 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
@@ -18,15 +18,12 @@
 
 package org.apache.spark.sql.execution.shim
 
-import org.apache.paimon.CoreOptions
-import org.apache.paimon.iceberg.IcebergOptions
 import org.apache.paimon.spark.SparkCatalog
 import org.apache.paimon.spark.catalog.FormatTableCatalog
 
 import org.apache.spark.sql.{SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.analysis.ResolvedDBObjectName
 import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, 
LogicalPlan, TableSpec}
-import org.apache.spark.sql.connector.catalog.StagingTableCatalog
 import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan}
 import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -47,46 +44,32 @@ case class PaimonCreateTableAsSelectStrategy(spark: 
SparkSession)
           tableSpec: TableSpec,
           options,
           ifNotExists) =>
-      catalog match {
-        case _: StagingTableCatalog =>
-          throw new RuntimeException("Paimon can't extend StagingTableCatalog 
for now.")
-        case _ =>
-          val coreOptionKeys = 
CoreOptions.getOptions.asScala.map(_.key()).toSeq
+      val (tableOptions, writeOptions) = 
PaimonStrategyHelper.splitTableAndWriteOptions(options)
+      val qualifiedSpec = qualifyTableSpec(tableSpec, tableOptions)
 
-          // Include Iceberg compatibility options in table properties (fix 
for DataFrame writer options)
-          val icebergOptionKeys = 
IcebergOptions.getOptions.asScala.map(_.key()).toSeq
-
-          val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys
-
-          val (tableOptions, writeOptions) = options.partition {
-            case (key, _) => allTableOptionKeys.contains(key)
-          }
-          val newTableSpec = tableSpec.copy(properties = tableSpec.properties 
++ tableOptions)
-
-          val isPartitionedFormatTable = {
-            catalog match {
-              case catalog: FormatTableCatalog =>
-                catalog.isFormatTable(newTableSpec.provider.orNull) && 
parts.nonEmpty
-              case _ => false
-            }
-          }
-
-          if (isPartitionedFormatTable) {
-            throw new UnsupportedOperationException(
-              "Using CTAS with partitioned format table is not supported yet.")
-          }
+      val isPartitionedFormatTable = {
+        catalog match {
+          case formatCatalog: FormatTableCatalog =>
+            formatCatalog.isFormatTable(qualifiedSpec.provider.orNull) && 
parts.nonEmpty
+          case _ => false
+        }
+      }
 
-          CreateTableAsSelectExec(
-            catalog.asTableCatalog,
-            ident.asIdentifier,
-            parts,
-            query,
-            planLater(query),
-            qualifyLocInTableSpec(newTableSpec),
-            new CaseInsensitiveStringMap(writeOptions.asJava),
-            ifNotExists
-          ) :: Nil
+      if (isPartitionedFormatTable) {
+        throw new UnsupportedOperationException(
+          "Using CTAS with partitioned format table is not supported yet.")
       }
+
+      CreateTableAsSelectExec(
+        catalog.asTableCatalog,
+        ident.asIdentifier,
+        parts,
+        query,
+        planLater(query),
+        qualifiedSpec,
+        new CaseInsensitiveStringMap(writeOptions.asJava),
+        ifNotExists
+      ) :: Nil
     case _ => Nil
   }
 }
diff --git 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala
 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala
similarity index 53%
rename from 
paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala
rename to 
paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala
index 9fb3a7b54a..d637c006da 100644
--- 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala
+++ 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala
@@ -16,26 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution
+package org.apache.spark.sql.execution.shim
 
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.CatalogUtils
-import org.apache.spark.sql.catalyst.plans.logical.TableSpec
-import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
+import org.apache.spark.sql.{SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
 
-trait PaimonStrategyHelper {
+case class PaimonReplaceTableAsSelectStrategy(spark: SparkSession) extends 
Strategy {
 
-  def spark: SparkSession
-
-  protected def makeQualifiedDBObjectPath(location: String): String = {
-    CatalogUtils.makeQualifiedDBObjectPath(
-      spark.sharedState.conf.get(WAREHOUSE_PATH),
-      location,
-      spark.sharedState.hadoopConf)
-  }
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = Nil
+}
 
-  protected def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = {
-    tableSpec.copy(location = 
tableSpec.location.map(makeQualifiedDBObjectPath(_)))
-  }
+case class PaimonReplaceTableStrategy(spark: SparkSession) extends Strategy {
 
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = Nil
 }
diff --git 
a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
 
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
index 4a82f35188..14f540f3e1 100644
--- 
a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
+++ 
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
@@ -18,15 +18,12 @@
 
 package org.apache.spark.sql.execution.shim
 
-import org.apache.paimon.CoreOptions
-import org.apache.paimon.iceberg.IcebergOptions
 import org.apache.paimon.spark.SparkCatalog
 import org.apache.paimon.spark.catalog.FormatTableCatalog
 
 import org.apache.spark.sql.{SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier
 import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, 
LogicalPlan, TableSpec}
-import org.apache.spark.sql.connector.catalog.StagingTableCatalog
 import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan}
 import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -49,46 +46,32 @@ case class PaimonCreateTableAsSelectStrategy(spark: 
SparkSession)
           ifNotExists,
           analyzedQuery) =>
       assert(analyzedQuery.isDefined)
-      catalog match {
-        case _: StagingTableCatalog =>
-          throw new RuntimeException("Paimon can't extend StagingTableCatalog 
for now.")
-        case _ =>
-          val coreOptionKeys = 
CoreOptions.getOptions.asScala.map(_.key()).toSeq
+      val (tableOptions, writeOptions) = 
PaimonStrategyHelper.splitTableAndWriteOptions(options)
+      val qualifiedSpec = qualifyTableSpec(tableSpec, tableOptions)
 
-          // Include Iceberg compatibility options in table properties (fix 
for DataFrame writer options)
-          val icebergOptionKeys = 
IcebergOptions.getOptions.asScala.map(_.key()).toSeq
-
-          val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys
-
-          val (tableOptions, writeOptions) = options.partition {
-            case (key, _) => allTableOptionKeys.contains(key)
-          }
-          val newTableSpec = tableSpec.copy(properties = tableSpec.properties 
++ tableOptions)
-
-          val isPartitionedFormatTable = {
-            catalog match {
-              case catalog: FormatTableCatalog =>
-                catalog.isFormatTable(newTableSpec.provider.orNull) && 
parts.nonEmpty
-              case _ => false
-            }
-          }
-
-          if (isPartitionedFormatTable) {
-            throw new UnsupportedOperationException(
-              "Using CTAS with partitioned format table is not supported yet.")
-          }
+      val isPartitionedFormatTable = {
+        catalog match {
+          case formatCatalog: FormatTableCatalog =>
+            formatCatalog.isFormatTable(qualifiedSpec.provider.orNull) && 
parts.nonEmpty
+          case _ => false
+        }
+      }
 
-          CreateTableAsSelectExec(
-            catalog.asTableCatalog,
-            ident,
-            parts,
-            analyzedQuery.get,
-            planLater(query),
-            qualifyLocInTableSpec(newTableSpec),
-            new CaseInsensitiveStringMap(writeOptions.asJava),
-            ifNotExists
-          ) :: Nil
+      if (isPartitionedFormatTable) {
+        throw new UnsupportedOperationException(
+          "Using CTAS with partitioned format table is not supported yet.")
       }
+
+      CreateTableAsSelectExec(
+        catalog.asTableCatalog,
+        ident,
+        parts,
+        analyzedQuery.get,
+        planLater(query),
+        qualifiedSpec,
+        new CaseInsensitiveStringMap(writeOptions.asJava),
+        ifNotExists
+      ) :: Nil
     case _ => Nil
   }
 }
diff --git 
a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala
 
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala
new file mode 100644
index 0000000000..b741627cdb
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.shim
+
+import org.apache.paimon.CoreOptions.TYPE
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.{SparkCatalog, SparkGenericCatalog, 
SparkSource, SparkTable}
+import org.apache.paimon.spark.catalog.SparkBaseCatalog
+
+import org.apache.spark.sql.{SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, 
ResolvedIdentifier}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReplaceTable, 
ReplaceTableAsSelect, TableSpec}
+import org.apache.spark.sql.connector.catalog.{Identifier, 
StagingTableCatalog, Table, TableCatalog}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan}
+import 
org.apache.spark.sql.execution.datasources.v2.{AtomicReplaceTableAsSelectExec, 
ReplaceTableAsSelectExec}
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+
+case class PaimonReplaceTableAsSelectStrategy(spark: SparkSession)
+  extends Strategy
+  with PaimonStrategyHelper {
+
+  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+    case ReplaceTableAsSelect(
+          ResolvedIdentifier(catalog: SparkBaseCatalog, ident),
+          parts,
+          query,
+          tableSpec: TableSpec,
+          options,
+          orCreate,
+          analyzedQuery) if 
PaimonReplaceTableStrategyHelper.supportsCatalog(catalog, tableSpec) =>
+      assert(analyzedQuery.isDefined)
+      val (tableOptions, writeOptions) = 
PaimonStrategyHelper.splitTableAndWriteOptions(options)
+      val qualifiedSpec = qualifyTableSpec(tableSpec, tableOptions)
+      val writeOpts = new CaseInsensitiveStringMap(writeOptions.asJava)
+      if (PaimonReplaceTableStrategyHelper.canAtomicReplace(catalog, ident, 
qualifiedSpec, parts)) {
+        AtomicReplaceTableAsSelectExec(
+          catalog.asInstanceOf[StagingTableCatalog],
+          ident,
+          parts,
+          analyzedQuery.get,
+          planLater(query),
+          qualifiedSpec,
+          writeOpts,
+          orCreate = orCreate,
+          invalidateCache
+        ) :: Nil
+      } else {
+        ReplaceTableAsSelectExec(
+          catalog,
+          ident,
+          parts,
+          analyzedQuery.get,
+          planLater(query),
+          qualifiedSpec,
+          writeOpts,
+          orCreate = orCreate,
+          invalidateCache
+        ) :: Nil
+      }
+    case _ => Nil
+  }
+
+  private def invalidateCache(tableCatalog: TableCatalog, table: Table, ident: 
Identifier): Unit = {
+    tableCatalog.invalidateTable(ident)
+  }
+}
+
+case class PaimonReplaceTableStrategy(spark: SparkSession)
+  extends Strategy
+  with PaimonStrategyHelper {
+
+  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+    case replace @ ReplaceTable(
+          ResolvedIdentifier(catalog: SparkBaseCatalog, ident),
+          schemaOrColumns,
+          parts,
+          tableSpec: TableSpec,
+          orCreate) if 
PaimonReplaceTableStrategyHelper.supportsCatalog(catalog, tableSpec) =>
+      val columns =
+        SparkShimLoader.shim.toReplaceTableColumns(
+          replace.tableSchema,
+          schemaOrColumns,
+          catalog,
+          ident)
+      val qualifiedSpec = qualifyTableSpec(tableSpec, Map.empty)
+      if (PaimonReplaceTableStrategyHelper.canAtomicReplace(catalog, ident, 
qualifiedSpec, parts)) {
+        SparkShimLoader.shim.createAtomicReplaceTableExec(
+          catalog.asInstanceOf[StagingTableCatalog],
+          ident,
+          columns,
+          parts,
+          qualifiedSpec,
+          orCreate = orCreate) :: Nil
+      } else {
+        SparkShimLoader.shim.createReplaceTableExec(
+          catalog,
+          ident,
+          columns,
+          parts,
+          qualifiedSpec,
+          orCreate = orCreate) :: Nil
+      }
+    case _ => Nil
+  }
+}
+
+private[shim] object PaimonReplaceTableStrategyHelper {
+
+  def supportsCatalog(catalog: SparkBaseCatalog, tableSpec: TableSpec): 
Boolean = catalog match {
+    case _: SparkCatalog => true
+    case _: SparkGenericCatalog =>
+      tableSpec.provider.exists(_.equalsIgnoreCase(SparkSource.NAME))
+    case _ => false
+  }
+
+  /**
+   * Whether replace can use Spark's staged replace path. Paimon's 
replaceTable is not a
+   * rollbackable atomic replace; it swaps the current schema and truncates 
current data while
+   * preserving old snapshots. Return false for cases replaceTable would 
reject so Spark falls back
+   * to drop+create.
+   */
+  def canAtomicReplace(
+      catalog: SparkBaseCatalog,
+      ident: Identifier,
+      tableSpec: TableSpec,
+      parts: Seq[Transform]): Boolean = {
+    try {
+      val existing = catalog.loadTable(ident)
+      if (!existing.isInstanceOf[SparkTable]) return false
+      val existingProvider =
+        
Option(existing.properties().get(TableCatalog.PROP_PROVIDER)).getOrElse(SparkSource.NAME)
+      val targetProvider = tableSpec.provider.getOrElse(SparkSource.NAME)
+      if (!existingProvider.equalsIgnoreCase(targetProvider)) return false
+      val existingType = Options.fromMap(existing.properties()).get(TYPE)
+      val targetType = Options.fromMap(tableSpec.properties.asJava).get(TYPE)
+      if (existingType != targetType) return false
+      val existingParts = existing.partitioning().toSeq
+      existingParts.size == parts.size &&
+      existingParts.zip(parts).forall { case (a, b) => a.toString == 
b.toString }
+    } catch {
+      case _: NoSuchTableException => true
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
 
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
deleted file mode 100644
index 61e25b7c16..0000000000
--- 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.shim
-
-import org.apache.paimon.CoreOptions
-import org.apache.paimon.iceberg.IcebergOptions
-import org.apache.paimon.spark.SparkCatalog
-import org.apache.paimon.spark.catalog.FormatTableCatalog
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier
-import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, 
LogicalPlan, TableSpec}
-import org.apache.spark.sql.connector.catalog.StagingTableCatalog
-import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan, 
SparkStrategy}
-import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec
-
-import scala.collection.JavaConverters._
-
-case class PaimonCreateTableAsSelectStrategy(spark: SparkSession)
-  extends SparkStrategy
-  with PaimonStrategyHelper {
-
-  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
-
-  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-    case CreateTableAsSelect(
-          ResolvedIdentifier(catalog: SparkCatalog, ident),
-          parts,
-          query,
-          tableSpec: TableSpec,
-          options,
-          ifNotExists,
-          true) =>
-      catalog match {
-        case _: StagingTableCatalog =>
-          throw new RuntimeException("Paimon can't extend StagingTableCatalog 
for now.")
-        case _ =>
-          val coreOptionKeys = 
CoreOptions.getOptions.asScala.map(_.key()).toSeq
-
-          // Include Iceberg compatibility options in table properties (fix 
for DataFrame writer options)
-          val icebergOptionKeys = 
IcebergOptions.getOptions.asScala.map(_.key()).toSeq
-
-          val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys
-
-          val (tableOptions, writeOptions) = options.partition {
-            case (key, _) => allTableOptionKeys.contains(key)
-          }
-          val newTableSpec = tableSpec.copy(properties = tableSpec.properties 
++ tableOptions)
-
-          val isPartitionedFormatTable = {
-            catalog match {
-              case catalog: FormatTableCatalog =>
-                catalog.isFormatTable(newTableSpec.provider.orNull) && 
parts.nonEmpty
-              case _ => false
-            }
-          }
-
-          if (isPartitionedFormatTable) {
-            throw new UnsupportedOperationException(
-              "Using CTAS with partitioned format table is not supported yet.")
-          }
-
-          CreateTableAsSelectExec(
-            catalog.asTableCatalog,
-            ident,
-            parts,
-            query,
-            qualifyLocInTableSpec(newTableSpec),
-            writeOptions,
-            ifNotExists) :: Nil
-      }
-    case _ => Nil
-  }
-}
diff --git 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
 
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
index 11fdfbd579..e08c87d4d3 100644
--- 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
+++ 
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
@@ -32,14 +32,15 @@ import 
org.apache.spark.sql.catalyst.analysis.{CTESubstitution, SubstituteUnreso
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment, 
CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable, 
MergeRows, SubqueryAlias, UnresolvedWith, UpdateAction}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment, 
ColumnDefinition, CTERelationRef, InsertAction, LogicalPlan, MergeAction, 
MergeIntoTable, MergeRows, SubqueryAlias, TableSpec, UnresolvedWith, 
UpdateAction}
 import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Keep
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.util.ArrayData
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, 
Table, TableCatalog}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GeneratedColumn, 
IdentityColumn, ResolveDefaultColumns}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, 
Identifier, StagingTableCatalog, Table, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
-import org.apache.spark.sql.execution.SparkFormatTable
+import org.apache.spark.sql.execution.{SparkFormatTable, SparkPlan}
 import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, 
PartitionSpec}
+import 
org.apache.spark.sql.execution.datasources.v2.{AtomicReplaceTableAsSelectExec, 
AtomicReplaceTableExec, ReplaceTableAsSelectExec, ReplaceTableExec}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.streaming.{FileStreamSink, 
MetadataLogFileIndex}
 import org.apache.spark.sql.internal.SQLConf
@@ -102,6 +103,102 @@ class Spark4Shim extends SparkShim {
     tableCatalog.createTable(ident, columns, partitions, properties)
   }
 
+  override def createReplaceTableAsSelectExec(
+      catalog: TableCatalog,
+      ident: Identifier,
+      partitioning: Seq[Transform],
+      query: LogicalPlan,
+      tableSpec: TableSpec,
+      writeOptions: Map[String, String],
+      orCreate: Boolean): SparkPlan = {
+    ReplaceTableAsSelectExec(
+      catalog,
+      ident,
+      partitioning,
+      query,
+      tableSpec,
+      writeOptions,
+      orCreate = orCreate,
+      invalidateCache)
+  }
+
+  override def createAtomicReplaceTableAsSelectExec(
+      catalog: StagingTableCatalog,
+      ident: Identifier,
+      partitioning: Seq[Transform],
+      query: LogicalPlan,
+      tableSpec: TableSpec,
+      writeOptions: Map[String, String],
+      orCreate: Boolean): SparkPlan = {
+    AtomicReplaceTableAsSelectExec(
+      catalog,
+      ident,
+      partitioning,
+      query,
+      tableSpec,
+      writeOptions,
+      orCreate = orCreate,
+      invalidateCache)
+  }
+
+  override def createReplaceTableExec(
+      catalog: TableCatalog,
+      ident: Identifier,
+      columns: Array[Column],
+      partitioning: Seq[Transform],
+      tableSpec: TableSpec,
+      orCreate: Boolean): SparkPlan = {
+    ReplaceTableExec(
+      catalog,
+      ident,
+      columns,
+      partitioning,
+      tableSpec,
+      orCreate = orCreate,
+      invalidateCache)
+  }
+
+  override def createAtomicReplaceTableExec(
+      catalog: StagingTableCatalog,
+      ident: Identifier,
+      columns: Array[Column],
+      partitioning: Seq[Transform],
+      tableSpec: TableSpec,
+      orCreate: Boolean): SparkPlan = {
+    AtomicReplaceTableExec(
+      catalog,
+      ident,
+      columns,
+      partitioning,
+      tableSpec,
+      orCreate = orCreate,
+      invalidateCache)
+  }
+
+  override def toReplaceTableColumns(
+      tableSchema: StructType,
+      schemaOrColumns: Any,
+      catalog: TableCatalog,
+      ident: Identifier): Array[Column] = {
+    val statementType = "REPLACE TABLE"
+    val columns = schemaOrColumns.asInstanceOf[Seq[ColumnDefinition]]
+    ResolveDefaultColumns.validateCatalogForDefaultValue(columns, catalog, 
ident)
+    GeneratedColumn.validateGeneratedColumns(tableSchema, catalog, ident, 
statementType)
+    IdentityColumn.validateIdentityColumn(tableSchema, catalog, ident)
+    columns.map(_.toV2Column(statementType)).toArray
+  }
+
+  override def copyTableSpec(
+      tableSpec: TableSpec,
+      additionalProperties: Map[String, String],
+      location: Option[String]): TableSpec = {
+    tableSpec.copy(properties = tableSpec.properties ++ additionalProperties, 
location = location)
+  }
+
+  private def invalidateCache(tableCatalog: TableCatalog, table: Table, ident: 
Identifier): Unit = {
+    tableCatalog.invalidateTable(ident)
+  }
+
   override def createCTERelationRef(
       cteId: Long,
       resolved: Boolean,
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/RollbackStagedTable.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/RollbackStagedTable.java
new file mode 100644
index 0000000000..2f90545b60
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/RollbackStagedTable.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.spark.sql.connector.catalog.StagedTable;
+import org.apache.spark.sql.connector.catalog.SupportsDelete;
+import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.SupportsWrite;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/** A staged table that rolls back by invoking the provided abort action. */
+class RollbackStagedTable implements StagedTable, SupportsRead, SupportsWrite, 
SupportsDelete {
+
+    @FunctionalInterface
+    interface Action {
+        void run() throws Exception;
+    }
+
+    @FunctionalInterface
+    interface StagedAction {
+        void run(RollbackStagedTable stagedTable) throws Exception;
+    }
+
+    private static final Action DO_NOTHING = () -> {};
+
+    private final Table table;
+    private final StagedAction commitAction;
+    private final Action abortAction;
+    private boolean committed;
+    private boolean aborted;
+    private boolean writeStarted;
+
+    RollbackStagedTable(Table table, Action abortAction) {
+        this(table, DO_NOTHING, abortAction);
+    }
+
+    RollbackStagedTable(Table table, Action commitAction, Action abortAction) {
+        this(table, ignored -> commitAction.run(), abortAction);
+    }
+
+    RollbackStagedTable(Table table, StagedAction commitAction, Action 
abortAction) {
+        this.table = table;
+        this.commitAction = commitAction;
+        this.abortAction = abortAction;
+    }
+
+    @Override
+    public String name() {
+        return table.name();
+    }
+
+    @Override
+    public StructType schema() {
+        return table.schema();
+    }
+
+    @Override
+    public Transform[] partitioning() {
+        return table.partitioning();
+    }
+
+    @Override
+    public Map<String, String> properties() {
+        return table.properties();
+    }
+
+    @Override
+    public Set<TableCapability> capabilities() {
+        return table.capabilities();
+    }
+
+    @Override
+    public void deleteWhere(Filter[] filters) {
+        call(SupportsDelete.class, t -> t.deleteWhere(filters));
+    }
+
+    @Override
+    public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
+        return callReturning(SupportsRead.class, t -> 
t.newScanBuilder(options));
+    }
+
+    @Override
+    public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
+        writeStarted = true;
+        return callReturning(SupportsWrite.class, t -> 
t.newWriteBuilder(info));
+    }
+
+    boolean hasWriteStarted() {
+        return writeStarted;
+    }
+
+    @Override
+    public void commitStagedChanges() {
+        if (committed || aborted) {
+            return;
+        }
+
+        try {
+            commitAction.run(this);
+            committed = true;
+        } catch (RuntimeException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void abortStagedChanges() {
+        if (committed || aborted) {
+            return;
+        }
+
+        try {
+            abortAction.run();
+        } catch (RuntimeException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            aborted = true;
+        }
+    }
+
+    private <T> void call(Class<? extends T> requiredClass, Consumer<T> task) {
+        callReturning(
+                requiredClass,
+                instance -> {
+                    task.accept(instance);
+                    return null;
+                });
+    }
+
+    private <T, R> R callReturning(Class<? extends T> requiredClass, 
Function<T, R> task) {
+        if (requiredClass.isInstance(table)) {
+            return task.apply(requiredClass.cast(table));
+        }
+
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Table does not implement %s: %s (%s)",
+                        requiredClass.getSimpleName(), table.name(), 
table.getClass().getName()));
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 159aa98e37..451a5fae26 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -61,6 +61,7 @@ import 
org.apache.spark.sql.catalyst.parser.extensions.UnResolvedPaimonV1Functio
 import org.apache.spark.sql.connector.catalog.FunctionCatalog;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.NamespaceChange;
+import org.apache.spark.sql.connector.catalog.StagedTable;
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.connector.catalog.TableChange;
@@ -389,6 +390,87 @@ public class SparkCatalog extends SparkBaseCatalog
         }
     }
 
+    @Override
+    public StagedTable stageCreate(
+            Identifier ident,
+            StructType schema,
+            Transform[] partitions,
+            Map<String, String> properties)
+            throws TableAlreadyExistsException, NoSuchNamespaceException {
+        return stageCreateDirectly(ident, schema, partitions, properties);
+    }
+
+    @Override
+    public StagedTable stageReplace(
+            Identifier ident,
+            StructType schema,
+            Transform[] partitions,
+            Map<String, String> properties)
+            throws NoSuchNamespaceException, NoSuchTableException {
+        return stageReplaceInternal(ident, schema, partitions, properties);
+    }
+
+    @Override
+    public StagedTable stageCreateOrReplace(
+            Identifier ident,
+            StructType schema,
+            Transform[] partitions,
+            Map<String, String> properties)
+            throws NoSuchNamespaceException {
+        try {
+            return stageReplaceInternal(ident, schema, partitions, properties);
+        } catch (NoSuchTableException e) {
+            try {
+                return stageCreate(ident, schema, partitions, properties);
+            } catch (TableAlreadyExistsException ex) {
+                throw new RuntimeException(ex);
+            }
+        }
+    }
+
+    private StagedTable stageReplaceInternal(
+            Identifier ident,
+            StructType schema,
+            Transform[] partitions,
+            Map<String, String> properties)
+            throws NoSuchNamespaceException, NoSuchTableException {
+        org.apache.paimon.catalog.Identifier tableIdent = toIdentifier(ident, 
catalogName);
+        Schema targetSchema = toInitialSchema(schema, partitions, properties);
+
+        try {
+            catalog.replaceTable(tableIdent, targetSchema, false);
+            return new RollbackStagedTable(loadTable(ident), () -> {});
+        } catch (Catalog.TableNotExistException e) {
+            throw new NoSuchTableException(ident);
+        } catch (UnsupportedOperationException e) {
+            // Catalog cannot replace in-place; fall back to drop+create, 
losing snapshot history.
+            LOG.warn(
+                    "Catalog {} does not support replaceTable, falling back to 
drop+create for {}.",
+                    catalog.getClass().getName(),
+                    tableIdent.getFullName(),
+                    e);
+            return stageReplaceByDropAndCreate(ident, tableIdent, 
targetSchema);
+        }
+    }
+
+    private StagedTable stageReplaceByDropAndCreate(
+            Identifier ident, org.apache.paimon.catalog.Identifier tableIdent, 
Schema targetSchema)
+            throws NoSuchTableException, NoSuchNamespaceException {
+        try {
+            catalog.dropTable(tableIdent, false);
+        } catch (Catalog.TableNotExistException e) {
+            throw new NoSuchTableException(ident);
+        }
+        try {
+            catalog.createTable(tableIdent, targetSchema, false);
+        } catch (Catalog.TableAlreadyExistException e) {
+            throw new RuntimeException(e);
+        } catch (Catalog.DatabaseNotExistException e) {
+            throw new NoSuchNamespaceException(ident.namespace());
+        }
+        return new RollbackStagedTable(loadTable(ident), () -> {});
+    }
+
     private SchemaChange toSchemaChange(TableChange change) {
         if (change instanceof TableChange.SetProperty) {
             TableChange.SetProperty set = (TableChange.SetProperty) change;
@@ -457,6 +539,24 @@ public class SparkCatalog extends SparkBaseCatalog
         return move;
     }
 
+    private StagedTable stageCreateDirectly(
+            Identifier ident,
+            StructType schema,
+            Transform[] partitions,
+            Map<String, String> properties)
+            throws TableAlreadyExistsException, NoSuchNamespaceException {
+        org.apache.spark.sql.connector.catalog.Table table =
+                createTable(ident, schema, partitions, properties);
+        if (table == null) {
+            try {
+                table = loadTable(ident);
+            } catch (NoSuchTableException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        return new RollbackStagedTable(table, () -> dropTable(ident));
+    }
+
     private Schema toInitialSchema(
             StructType schema, Transform[] partitions, Map<String, String> 
properties) {
         Map<String, String> normalizedProperties = new HashMap<>(properties);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
index e79af9a0b4..b89eaf0925 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
@@ -43,6 +43,8 @@ import org.apache.spark.sql.connector.catalog.FunctionCatalog;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.NamespaceChange;
 import org.apache.spark.sql.connector.catalog.PaimonCatalogUtils;
+import org.apache.spark.sql.connector.catalog.StagedTable;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
 import org.apache.spark.sql.connector.catalog.Table;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
@@ -245,6 +247,49 @@ public class SparkGenericCatalog extends SparkBaseCatalog 
implements CatalogExte
         }
     }
 
+    @Override
+    public StagedTable stageCreate(
+            Identifier ident,
+            StructType schema,
+            Transform[] partitions,
+            Map<String, String> properties)
+            throws TableAlreadyExistsException, NoSuchNamespaceException {
+        if (usePaimon(properties.get(TableCatalog.PROP_PROVIDER))) {
+            return sparkCatalog.stageCreate(ident, schema, partitions, 
properties);
+        } else {
+            return asStagingTableCatalog().stageCreate(ident, schema, 
partitions, properties);
+        }
+    }
+
+    @Override
+    public StagedTable stageReplace(
+            Identifier ident,
+            StructType schema,
+            Transform[] partitions,
+            Map<String, String> properties)
+            throws NoSuchNamespaceException, NoSuchTableException {
+        if (usePaimon(properties.get(TableCatalog.PROP_PROVIDER))) {
+            return sparkCatalog.stageReplace(ident, schema, partitions, 
properties);
+        } else {
+            return asStagingTableCatalog().stageReplace(ident, schema, 
partitions, properties);
+        }
+    }
+
+    @Override
+    public StagedTable stageCreateOrReplace(
+            Identifier ident,
+            StructType schema,
+            Transform[] partitions,
+            Map<String, String> properties)
+            throws NoSuchNamespaceException {
+        if (usePaimon(properties.get(TableCatalog.PROP_PROVIDER))) {
+            return sparkCatalog.stageCreateOrReplace(ident, schema, 
partitions, properties);
+        } else {
+            return asStagingTableCatalog()
+                    .stageCreateOrReplace(ident, schema, partitions, 
properties);
+        }
+    }
+
     @Override
     public final void initialize(String name, CaseInsensitiveStringMap 
options) {
         SparkSession sparkSession = PaimonSparkSession$.MODULE$.active();
@@ -362,6 +407,10 @@ public class SparkGenericCatalog extends SparkBaseCatalog 
implements CatalogExte
         return (FunctionCatalog) getDelegateCatalog();
     }
 
+    private StagingTableCatalog asStagingTableCatalog() {
+        return (StagingTableCatalog) getDelegateCatalog();
+    }
+
     // ======================= Function methods ===============================
 
     @Override
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java
index ac6736e2e1..284b5d099f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java
@@ -25,6 +25,7 @@ import org.apache.paimon.spark.procedure.Procedure;
 import org.apache.paimon.spark.procedure.ProcedureBuilder;
 
 import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.connector.catalog.TableCatalogCapability;
@@ -39,7 +40,11 @@ import static 
org.apache.spark.sql.connector.catalog.TableCatalogCapability.SUPP
 
 /** Spark base catalog. */
 public abstract class SparkBaseCatalog
-        implements TableCatalog, SupportsNamespaces, ProcedureCatalog, 
WithPaimonCatalog {
+        implements TableCatalog,
+                SupportsNamespaces,
+                ProcedureCatalog,
+                WithPaimonCatalog,
+                StagingTableCatalog {
 
     protected String catalogName;
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
index 3be8b5a74e..63c61a16e8 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
@@ -29,11 +29,11 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, 
ResolvedTable}
 import org.apache.spark.sql.catalyst.expressions.{Expression, 
GenericInternalRow, PredicateHelper}
-import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, 
DescribeRelation, LogicalPlan, ShowCreateTable}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, 
DescribeRelation, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, 
ShowCreateTable}
 import org.apache.spark.sql.connector.catalog.{Identifier, 
PaimonLookupCatalog, TableCatalog}
 import org.apache.spark.sql.execution.{PaimonDescribeTableExec, SparkPlan, 
SparkStrategy}
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, 
DataSourceV2Relation}
-import org.apache.spark.sql.execution.shim.PaimonCreateTableAsSelectStrategy
+import org.apache.spark.sql.execution.shim.{PaimonCreateTableAsSelectStrategy, 
PaimonReplaceTableAsSelectStrategy, PaimonReplaceTableStrategy}
 import org.apache.spark.sql.paimon.shims.SparkShimLoader
 
 import scala.collection.JavaConverters._
@@ -51,6 +51,12 @@ case class PaimonStrategy(spark: SparkSession)
     case ctas: CreateTableAsSelect =>
       PaimonCreateTableAsSelectStrategy(spark)(ctas)
 
+    case rtas: ReplaceTableAsSelect =>
+      PaimonReplaceTableAsSelectStrategy(spark)(rtas)
+
+    case rt: ReplaceTable =>
+      PaimonReplaceTableStrategy(spark)(rt)
+
     case c @ PaimonCallCommand(procedure, args) =>
       val input = buildInternalRow(args)
       PaimonCallExec(c.output, procedure, input) :: Nil
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala
index 9fb3a7b54a..2ee33a1829 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala
@@ -18,10 +18,16 @@
 
 package org.apache.spark.sql.execution
 
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.iceberg.IcebergOptions
+
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.CatalogUtils
 import org.apache.spark.sql.catalyst.plans.logical.TableSpec
 import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
+
+import scala.collection.JavaConverters._
 
 trait PaimonStrategyHelper {
 
@@ -34,8 +40,23 @@ trait PaimonStrategyHelper {
       spark.sharedState.hadoopConf)
   }
 
-  protected def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = {
-    tableSpec.copy(location = 
tableSpec.location.map(makeQualifiedDBObjectPath(_)))
+  protected def qualifyTableSpec(
+      tableSpec: TableSpec,
+      tableOptions: Map[String, String]): TableSpec = {
+    SparkShimLoader.shim.copyTableSpec(
+      tableSpec,
+      tableOptions,
+      tableSpec.location.map(makeQualifiedDBObjectPath))
   }
+}
 
+object PaimonStrategyHelper {
+  private val tableOptionKeys: Set[String] =
+    (CoreOptions.getOptions.asScala.map(_.key()) ++ 
IcebergOptions.getOptions.asScala.map(
+      _.key())).toSet
+
+  def splitTableAndWriteOptions(
+      options: Map[String, String]): (Map[String, String], Map[String, 
String]) = {
+    options.partition { case (key, _) => tableOptionKeys.contains(key) }
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
index 61e25b7c16..cc0c12960c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala
@@ -18,20 +18,15 @@
 
 package org.apache.spark.sql.execution.shim
 
-import org.apache.paimon.CoreOptions
-import org.apache.paimon.iceberg.IcebergOptions
 import org.apache.paimon.spark.SparkCatalog
 import org.apache.paimon.spark.catalog.FormatTableCatalog
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier
 import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, 
LogicalPlan, TableSpec}
-import org.apache.spark.sql.connector.catalog.StagingTableCatalog
 import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan, 
SparkStrategy}
 import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec
 
-import scala.collection.JavaConverters._
-
 case class PaimonCreateTableAsSelectStrategy(spark: SparkSession)
   extends SparkStrategy
   with PaimonStrategyHelper {
@@ -47,44 +42,30 @@ case class PaimonCreateTableAsSelectStrategy(spark: 
SparkSession)
           options,
           ifNotExists,
           true) =>
-      catalog match {
-        case _: StagingTableCatalog =>
-          throw new RuntimeException("Paimon can't extend StagingTableCatalog 
for now.")
-        case _ =>
-          val coreOptionKeys = 
CoreOptions.getOptions.asScala.map(_.key()).toSeq
-
-          // Include Iceberg compatibility options in table properties (fix 
for DataFrame writer options)
-          val icebergOptionKeys = 
IcebergOptions.getOptions.asScala.map(_.key()).toSeq
-
-          val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys
-
-          val (tableOptions, writeOptions) = options.partition {
-            case (key, _) => allTableOptionKeys.contains(key)
-          }
-          val newTableSpec = tableSpec.copy(properties = tableSpec.properties 
++ tableOptions)
+      val (tableOptions, writeOptions) = 
PaimonStrategyHelper.splitTableAndWriteOptions(options)
+      val qualifiedSpec = qualifyTableSpec(tableSpec, tableOptions)
 
-          val isPartitionedFormatTable = {
-            catalog match {
-              case catalog: FormatTableCatalog =>
-                catalog.isFormatTable(newTableSpec.provider.orNull) && 
parts.nonEmpty
-              case _ => false
-            }
-          }
-
-          if (isPartitionedFormatTable) {
-            throw new UnsupportedOperationException(
-              "Using CTAS with partitioned format table is not supported yet.")
-          }
+      val isPartitionedFormatTable = {
+        catalog match {
+          case formatCatalog: FormatTableCatalog =>
+            formatCatalog.isFormatTable(qualifiedSpec.provider.orNull) && 
parts.nonEmpty
+          case _ => false
+        }
+      }
 
-          CreateTableAsSelectExec(
-            catalog.asTableCatalog,
-            ident,
-            parts,
-            query,
-            qualifyLocInTableSpec(newTableSpec),
-            writeOptions,
-            ifNotExists) :: Nil
+      if (isPartitionedFormatTable) {
+        throw new UnsupportedOperationException(
+          "Using CTAS with partitioned format table is not supported yet.")
       }
+
+      CreateTableAsSelectExec(
+        catalog.asTableCatalog,
+        ident,
+        parts,
+        query,
+        qualifiedSpec,
+        writeOptions,
+        ifNotExists) :: Nil
     case _ => Nil
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala
new file mode 100644
index 0000000000..60410b631e
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonReplaceTableAsSelectStrategy.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.shim
+
+import org.apache.paimon.CoreOptions.TYPE
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.{SparkCatalog, SparkGenericCatalog, 
SparkSource, SparkTable}
+import org.apache.paimon.spark.catalog.SparkBaseCatalog
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, 
ResolvedIdentifier}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReplaceTable, 
ReplaceTableAsSelect, TableSpec}
+import org.apache.spark.sql.connector.catalog.{Identifier, 
StagingTableCatalog, TableCatalog}
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan, 
SparkStrategy}
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
+
+import scala.collection.JavaConverters._
+
+case class PaimonReplaceTableAsSelectStrategy(spark: SparkSession)
+  extends SparkStrategy
+  with PaimonStrategyHelper {
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+    case ReplaceTableAsSelect(
+          ResolvedIdentifier(catalog: SparkBaseCatalog, ident),
+          parts,
+          query,
+          tableSpec: TableSpec,
+          options,
+          orCreate,
+          true) if PaimonReplaceTableStrategyHelper.supportsCatalog(catalog, 
tableSpec) =>
+      val (tableOptions, writeOptions) = 
PaimonStrategyHelper.splitTableAndWriteOptions(options)
+      val qualifiedSpec = qualifyTableSpec(tableSpec, tableOptions)
+      if (PaimonReplaceTableStrategyHelper.canAtomicReplace(catalog, ident, 
qualifiedSpec, parts)) {
+        SparkShimLoader.shim.createAtomicReplaceTableAsSelectExec(
+          catalog.asInstanceOf[StagingTableCatalog],
+          ident,
+          parts,
+          query,
+          qualifiedSpec,
+          writeOptions,
+          orCreate = orCreate) :: Nil
+      } else {
+        SparkShimLoader.shim.createReplaceTableAsSelectExec(
+          catalog,
+          ident,
+          parts,
+          query,
+          qualifiedSpec,
+          writeOptions,
+          orCreate = orCreate) :: Nil
+      }
+    case _ => Nil
+  }
+}
+
+case class PaimonReplaceTableStrategy(spark: SparkSession)
+  extends SparkStrategy
+  with PaimonStrategyHelper {
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+    case replace @ ReplaceTable(
+          ResolvedIdentifier(catalog: SparkBaseCatalog, ident),
+          schemaOrColumns,
+          parts,
+          tableSpec: TableSpec,
+          orCreate) if 
PaimonReplaceTableStrategyHelper.supportsCatalog(catalog, tableSpec) =>
+      val columns =
+        SparkShimLoader.shim.toReplaceTableColumns(
+          replace.tableSchema,
+          schemaOrColumns,
+          catalog,
+          ident)
+      val qualifiedSpec = qualifyTableSpec(tableSpec, Map.empty)
+      if (PaimonReplaceTableStrategyHelper.canAtomicReplace(catalog, ident, 
qualifiedSpec, parts)) {
+        SparkShimLoader.shim.createAtomicReplaceTableExec(
+          catalog.asInstanceOf[StagingTableCatalog],
+          ident,
+          columns,
+          parts,
+          qualifiedSpec,
+          orCreate = orCreate) :: Nil
+      } else {
+        SparkShimLoader.shim.createReplaceTableExec(
+          catalog,
+          ident,
+          columns,
+          parts,
+          qualifiedSpec,
+          orCreate = orCreate) :: Nil
+      }
+    case _ => Nil
+  }
+}
+
+private[shim] object PaimonReplaceTableStrategyHelper {
+
+  def supportsCatalog(catalog: SparkBaseCatalog, tableSpec: TableSpec): 
Boolean = catalog match {
+    case _: SparkCatalog => true
+    case _: SparkGenericCatalog =>
+      tableSpec.provider.exists(_.equalsIgnoreCase(SparkSource.NAME))
+    case _ => false
+  }
+
+  /**
+   * Whether replace can use Spark's staged replace path. Paimon's 
replaceTable is not a
+   * rollbackable atomic replace; it swaps the current schema and truncates 
current data while
+   * preserving old snapshots. Return false for cases replaceTable would 
reject so Spark falls back
+   * to drop+create.
+   */
+  def canAtomicReplace(
+      catalog: SparkBaseCatalog,
+      ident: Identifier,
+      tableSpec: TableSpec,
+      parts: Seq[Transform]): Boolean = {
+    try {
+      val existing = catalog.loadTable(ident)
+      if (!existing.isInstanceOf[SparkTable]) return false
+      val existingProvider =
+        
Option(existing.properties().get(TableCatalog.PROP_PROVIDER)).getOrElse(SparkSource.NAME)
+      val targetProvider = tableSpec.provider.getOrElse(SparkSource.NAME)
+      if (!existingProvider.equalsIgnoreCase(targetProvider)) return false
+      val existingType = Options.fromMap(existing.properties()).get(TYPE)
+      val targetType = Options.fromMap(tableSpec.properties.asJava).get(TYPE)
+      if (existingType != targetType) return false
+      val existingParts = existing.partitioning().toSeq
+      existingParts.size == parts.size &&
+      existingParts.zip(parts).forall { case (a, b) => a.toString == 
b.toString }
+    } catch {
+      case _: NoSuchTableException => true
+    }
+  }
+
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
index 3ceb494396..38efd8c006 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
@@ -27,11 +27,12 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{Assignment, 
CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable, 
SubqueryAlias, UnresolvedWith, UpdateAction}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, 
CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable, 
SubqueryAlias, TableSpec, UnresolvedWith, UpdateAction}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.ArrayData
-import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{Column, Identifier, 
StagingTableCatalog, Table, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.types.StructType
 
@@ -66,6 +67,51 @@ trait SparkShim {
       partitions: Array[Transform],
       properties: JMap[String, String]): Table
 
+  def createReplaceTableAsSelectExec(
+      catalog: TableCatalog,
+      ident: Identifier,
+      partitioning: Seq[Transform],
+      query: LogicalPlan,
+      tableSpec: TableSpec,
+      writeOptions: Map[String, String],
+      orCreate: Boolean): SparkPlan
+
+  def createAtomicReplaceTableAsSelectExec(
+      catalog: StagingTableCatalog,
+      ident: Identifier,
+      partitioning: Seq[Transform],
+      query: LogicalPlan,
+      tableSpec: TableSpec,
+      writeOptions: Map[String, String],
+      orCreate: Boolean): SparkPlan
+
+  def createReplaceTableExec(
+      catalog: TableCatalog,
+      ident: Identifier,
+      columns: Array[Column],
+      partitioning: Seq[Transform],
+      tableSpec: TableSpec,
+      orCreate: Boolean): SparkPlan
+
+  def createAtomicReplaceTableExec(
+      catalog: StagingTableCatalog,
+      ident: Identifier,
+      columns: Array[Column],
+      partitioning: Seq[Transform],
+      tableSpec: TableSpec,
+      orCreate: Boolean): SparkPlan
+
+  def toReplaceTableColumns(
+      tableSchema: StructType,
+      schemaOrColumns: Any,
+      catalog: TableCatalog,
+      ident: Identifier): Array[Column]
+
+  def copyTableSpec(
+      tableSpec: TableSpec,
+      additionalProperties: Map[String, String],
+      location: Option[String]): TableSpec
+
   def createCTERelationRef(
       cteId: Long,
       resolved: Boolean,
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
index 3f2c0e8996..47a280868d 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
@@ -276,6 +276,163 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
     }
   }
 
+  test("Paimon DDL: REPLACE TABLE replaces in-place and preserves old 
snapshots") {
+    assume(gteqSpark3_4)
+    withTable("t") {
+      sql("""
+            |CREATE TABLE t (id BIGINT, data STRING)
+            |USING paimon
+            |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+            |""".stripMargin)
+      sql("INSERT INTO t VALUES (1, 'old')")
+      val oldLocation = loadTable("t").location().toString
+      val oldSnapshotId = loadTable("t").snapshotManager().latestSnapshotId()
+
+      sql("""
+            |REPLACE TABLE t (id BIGINT, name STRING)
+            |USING paimon
+            |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '4')
+            |""".stripMargin)
+
+      val replaced = loadTable("t")
+      Assertions.assertEquals(oldLocation, replaced.location().toString)
+      Assertions.assertEquals("4", replaced.options().get("bucket"))
+      Assertions.assertEquals(Seq("id", "name"), 
spark.table("t").schema.fieldNames.toSeq)
+      checkAnswer(sql("SELECT * FROM t"), Seq.empty[Row])
+
+      checkAnswer(
+        sql(s"SELECT id, data FROM t VERSION AS OF $oldSnapshotId"),
+        Seq((1L, "old")).toDF())
+    }
+  }
+
+  test("Paimon DDL: REPLACE TABLE without SELECT fails if table is missing") {
+    assume(gteqSpark3_4)
+    withTable("missing") {
+      val error = intercept[AnalysisException] {
+        sql("""
+              |REPLACE TABLE missing (id BIGINT, data STRING)
+              |USING paimon
+              |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+              |""".stripMargin)
+      }.getMessage
+
+      Assertions.assertTrue(
+        error.contains("TABLE_OR_VIEW_NOT_FOUND") ||
+          error.contains("cannot be found") ||
+          error.contains("not found"))
+    }
+  }
+
+  test("Paimon DDL: CREATE TABLE fails when table exists") {
+    withTable("t") {
+      sql("CREATE TABLE t (id BIGINT, data STRING) USING paimon")
+
+      val error = intercept[AnalysisException] {
+        sql("CREATE TABLE t (id BIGINT, name STRING) USING paimon")
+      }.getMessage
+
+      Assertions.assertTrue(
+        error.contains("TABLE_OR_VIEW_ALREADY_EXISTS") || 
error.contains("already exists"))
+    }
+  }
+
+  test("Paimon DDL: CREATE OR REPLACE TABLE AS SELECT on partitioned table") {
+    assume(gteqSpark3_4)
+    withTable("t") {
+      sql("""
+            |CREATE TABLE t (id BIGINT, data STRING, pt STRING)
+            |USING paimon
+            |PARTITIONED BY (pt)
+            |TBLPROPERTIES ('primary-key' = 'id,pt', 'bucket' = '2')
+            |""".stripMargin)
+      sql("INSERT INTO t VALUES (1, 'old', 'p0')")
+      val oldLocation = loadTable("t").location().toString
+      Seq((2L, "x2", "p1"), (3L, "x3", "p2"))
+        .toDF("id", "data", "pt")
+        .createOrReplaceTempView("source")
+
+      sql("""
+            |CREATE OR REPLACE TABLE t
+            |USING paimon
+            |PARTITIONED BY (pt)
+            |TBLPROPERTIES ('primary-key' = 'id,pt', 'bucket' = '3')
+            |AS SELECT * FROM source
+            |""".stripMargin)
+
+      val replaced = loadTable("t")
+      Assertions.assertEquals(oldLocation, replaced.location().toString)
+      Assertions.assertEquals("3", replaced.options().get("bucket"))
+      checkAnswer(
+        sql("SELECT * FROM t ORDER BY id"),
+        Seq((2L, "x2", "p1"), (3L, "x3", "p2")).toDF())
+    }
+  }
+
+  test("Paimon DDL: CREATE OR REPLACE TABLE AS SELECT supports incompatible 
schema") {
+    assume(gteqSpark3_4)
+    withTable("t") {
+      sql("""
+            |CREATE TABLE t (id BIGINT, data STRING)
+            |USING paimon
+            |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+            |""".stripMargin)
+      sql("INSERT INTO t VALUES (1, 'old')")
+      val oldLocation = loadTable("t").location().toString
+      val oldSnapshotId = loadTable("t").snapshotManager().latestSnapshotId()
+      Seq(("2", 20), ("3", 30)).toDF("id", 
"amount").createOrReplaceTempView("source")
+
+      sql("""
+            |CREATE OR REPLACE TABLE t
+            |USING paimon
+            |TBLPROPERTIES ('bucket' = '-1')
+            |AS SELECT * FROM source
+            |""".stripMargin)
+
+      val replaced = loadTable("t")
+      Assertions.assertEquals(oldLocation, replaced.location().toString)
+      Assertions.assertEquals("-1", replaced.options().get("bucket"))
+      Assertions.assertEquals(Seq("id", "amount"), 
spark.table("t").schema.fieldNames.toSeq)
+      Assertions.assertEquals("string", 
spark.table("t").schema("id").dataType.typeName)
+      Assertions.assertEquals("integer", 
spark.table("t").schema("amount").dataType.typeName)
+      checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(("2", 20), ("3", 
30)).toDF())
+      checkAnswer(
+        sql(s"SELECT id, data FROM t VERSION AS OF $oldSnapshotId"),
+        Seq((1L, "old")).toDF())
+    }
+  }
+
+  test("Paimon DDL: REPLACE TABLE supports incompatible schema and preserves 
old snapshots") {
+    assume(gteqSpark3_4)
+    withTable("t") {
+      sql("""
+            |CREATE TABLE t (id BIGINT, data STRING)
+            |USING paimon
+            |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+            |""".stripMargin)
+      sql("INSERT INTO t VALUES (1, 'old')")
+      val oldLocation = loadTable("t").location().toString
+      val oldSnapshotId = loadTable("t").snapshotManager().latestSnapshotId()
+
+      sql("""
+            |REPLACE TABLE t (id STRING, amount INT)
+            |USING paimon
+            |TBLPROPERTIES ('bucket' = '-1')
+            |""".stripMargin)
+
+      val replaced = loadTable("t")
+      Assertions.assertEquals(oldLocation, replaced.location().toString)
+      Assertions.assertEquals("-1", replaced.options().get("bucket"))
+      Assertions.assertEquals(Seq("id", "amount"), 
spark.table("t").schema.fieldNames.toSeq)
+      Assertions.assertEquals("string", 
spark.table("t").schema("id").dataType.typeName)
+      Assertions.assertEquals("integer", 
spark.table("t").schema("amount").dataType.typeName)
+      checkAnswer(sql("SELECT * FROM t"), Seq.empty[Row])
+      checkAnswer(
+        sql(s"SELECT id, data FROM t VERSION AS OF $oldSnapshotId"),
+        Seq((1L, "old")).toDF())
+    }
+  }
+
   fileFormats.foreach {
     format =>
       test(s"Paimon DDL: create table with char/varchar/string, file.format: 
$format") {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
index d395a16692..feae8be0da 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
@@ -726,6 +726,141 @@ abstract class DDLWithHiveCatalogTestBase extends 
PaimonHiveTestBase {
     }
   }
 
+  test("Paimon DDL with hive catalog: SparkGenericCatalog explicit Paimon 
replace") {
+    assume(gteqSpark3_4)
+    spark.sql(s"USE $sparkCatalogName")
+    withDatabase("paimon_db") {
+      spark.sql("CREATE DATABASE paimon_db")
+      spark.sql("USE paimon_db")
+
+      withTable("rt", "rtas", "missing") {
+        spark.sql("""
+                    |CREATE TABLE rt (id BIGINT, data STRING)
+                    |USING paimon
+                    |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+                    |""".stripMargin)
+        spark.sql("INSERT INTO rt VALUES (1, 'old')")
+        val oldSnapshotId = loadTable("paimon_db", 
"rt").snapshotManager().latestSnapshotId()
+
+        spark.sql("""
+                    |REPLACE TABLE rt (id BIGINT, name STRING)
+                    |USING paimon
+                    |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '4')
+                    |""".stripMargin)
+
+        val replaced = loadTable("paimon_db", "rt")
+        Assertions.assertEquals("4", replaced.options().get("bucket"))
+        Assertions.assertEquals(Seq("id", "name"), 
spark.table("rt").schema.fieldNames.toSeq)
+        checkAnswer(spark.sql("SELECT * FROM rt"), Seq.empty[Row])
+        checkAnswer(
+          spark.sql(s"SELECT id, data FROM rt VERSION AS OF $oldSnapshotId"),
+          Row(1L, "old") :: Nil)
+
+        val error = intercept[AnalysisException] {
+          spark.sql("""
+                      |REPLACE TABLE missing (id BIGINT, data STRING)
+                      |USING paimon
+                      |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+                      |""".stripMargin)
+        }.getMessage
+        Assertions.assertTrue(
+          error.contains("TABLE_OR_VIEW_NOT_FOUND") ||
+            error.contains("cannot be found") ||
+            error.contains("not found"))
+
+        Seq((2L, "new")).toDF("id", "data").createOrReplaceTempView("source")
+        spark.sql("""
+                    |CREATE TABLE rtas (id BIGINT, data STRING)
+                    |USING paimon
+                    |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+                    |""".stripMargin)
+        spark.sql("INSERT INTO rtas VALUES (1, 'old')")
+        val oldLocation = loadTable("paimon_db", "rtas").location().toString
+        val oldRtasSnapshotId =
+          loadTable("paimon_db", "rtas").snapshotManager().latestSnapshotId()
+        spark.sql("""
+                    |CREATE OR REPLACE TABLE rtas
+                    |USING paimon
+                    |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '3')
+                    |AS SELECT * FROM source
+                    |""".stripMargin)
+
+        val replacedAsSelect = loadTable("paimon_db", "rtas")
+        Assertions.assertEquals(oldLocation, 
replacedAsSelect.location().toString)
+        Assertions.assertEquals("3", replacedAsSelect.options().get("bucket"))
+        checkAnswer(spark.sql("SELECT * FROM rtas"), Row(2L, "new") :: Nil)
+        checkAnswer(
+          spark.sql(s"SELECT id, data FROM rtas VERSION AS OF 
$oldRtasSnapshotId"),
+          Row(1L, "old") :: Nil)
+      }
+    }
+  }
+
+  test("Paimon DDL with hive catalog: SparkGenericCatalog explicit Paimon 
replace fallback") {
+    assume(gteqSpark3_4)
+    spark.sql(s"USE $sparkCatalogName")
+    withDatabase("paimon_db") {
+      spark.sql("CREATE DATABASE paimon_db")
+      spark.sql("USE paimon_db")
+
+      withTable("csv_to_paimon", "rtas_csv_to_paimon") {
+        spark.sql("CREATE TABLE csv_to_paimon (id BIGINT, data STRING) USING 
csv")
+        spark.sql("INSERT INTO csv_to_paimon VALUES (1, 'csv')")
+
+        spark.sql("""
+                    |REPLACE TABLE csv_to_paimon (id BIGINT, name STRING)
+                    |USING paimon
+                    |TBLPROPERTIES ('bucket' = '-1')
+                    |""".stripMargin)
+
+        val paimonTable = loadTable("paimon_db", "csv_to_paimon")
+        Assertions.assertEquals("-1", paimonTable.options().get("bucket"))
+        Assertions.assertEquals(
+          Seq("id", "name"),
+          spark.table("csv_to_paimon").schema.fieldNames.toSeq)
+        checkAnswer(spark.sql("SELECT * FROM csv_to_paimon"), Seq.empty[Row])
+
+        Seq((2L, "new")).toDF("id", 
"data").createOrReplaceTempView("provider_source")
+        spark.sql("""
+                    |CREATE TABLE rtas_csv_to_paimon (id BIGINT, data STRING)
+                    |USING csv
+                    |""".stripMargin)
+
+        spark.sql("""
+                    |CREATE OR REPLACE TABLE rtas_csv_to_paimon
+                    |USING paimon
+                    |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '3')
+                    |AS SELECT * FROM provider_source
+                    |""".stripMargin)
+
+        val rtasPaimonTable = loadTable("paimon_db", "rtas_csv_to_paimon")
+        Assertions.assertEquals("3", rtasPaimonTable.options().get("bucket"))
+        checkAnswer(spark.sql("SELECT * FROM rtas_csv_to_paimon"), Row(2L, 
"new") :: Nil)
+      }
+    }
+  }
+
+  test("Paimon DDL with hive catalog: SparkGenericCatalog CTAS with non-Paimon 
provider") {
+    assume(gteqSpark3_4)
+    spark.sql(s"USE $sparkCatalogName")
+    withDatabase("paimon_db") {
+      spark.sql("CREATE DATABASE paimon_db")
+      spark.sql("USE paimon_db")
+
+      withTable("csv_ctas") {
+        Seq((1L, "x1"), (2L, "x2")).toDF("id", 
"data").createOrReplaceTempView("source")
+        spark.sql("CREATE TABLE csv_ctas USING csv AS SELECT * FROM source")
+
+        val csvTable = spark.sessionState.catalog.getTableMetadata(
+          TableIdentifier("csv_ctas", Some("paimon_db")))
+        Assertions.assertTrue(csvTable.provider.contains("csv"))
+        checkAnswer(
+          spark.sql("SELECT * FROM csv_ctas ORDER BY id"),
+          Row(1L, "x1") :: Row(2L, "x2") :: Nil)
+      }
+    }
+  }
+
   test("Paimon DDL with hive catalog: Create Table As Select") {
     Seq("paimon", sparkCatalogName, paimonHiveCatalogName).foreach {
       catalogName =>
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala
index b25e41a3fb..01def579ef 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala
@@ -183,6 +183,92 @@ abstract class DataFrameWriteTestBase extends 
PaimonSparkTestBase {
     }
   }
 
+  test("Paimon dataframe: writer v2 replace") {
+    assume(gteqSpark3_4)
+    withTable("t") {
+      sql("""
+            |CREATE TABLE t (id BIGINT, data STRING)
+            |USING paimon
+            |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+            |""".stripMargin)
+      sql("INSERT INTO t VALUES (1, 'old')")
+
+      val oldLocation = loadTable("t").location().toString
+      val oldSnapshotId = loadTable("t").snapshotManager().latestSnapshotId()
+
+      spark
+        .range(2, 4)
+        .selectExpr("id", "concat('v', cast(id as string)) AS data")
+        .writeTo("t")
+        .using("paimon")
+        .tableProperty("primary-key", "id")
+        .tableProperty("bucket", "3")
+        .replace()
+
+      val table = loadTable("t")
+      Assertions.assertEquals("3", table.options().get("bucket"))
+      Assertions.assertEquals(oldLocation, table.location().toString)
+      checkAnswer(sql("SELECT * FROM t ORDER BY id"), Row(2L, "v2") :: Row(3L, 
"v3") :: Nil)
+      checkAnswer(sql(s"SELECT * FROM t VERSION AS OF $oldSnapshotId"), 
Row(1L, "old") :: Nil)
+    }
+  }
+
+  test("Paimon dataframe: writer v2 create fails when table exists") {
+    assume(gteqSpark3_4)
+    withTable("t") {
+      sql("CREATE TABLE t (id BIGINT, data STRING) USING paimon")
+
+      val error = intercept[Exception] {
+        spark
+          .range(2)
+          .selectExpr("id", "concat('v', cast(id as string)) AS data")
+          .writeTo("t")
+          .using("paimon")
+          .create()
+      }.getMessage
+
+      Assertions.assertTrue(
+        error.contains("TABLE_OR_VIEW_ALREADY_EXISTS") || 
error.contains("already exists"))
+    }
+  }
+
+  test("Paimon dataframe: writer v2 create or replace") {
+    assume(gteqSpark3_4)
+    withTable("t") {
+      spark
+        .range(2)
+        .selectExpr("id", "concat('v', cast(id as string)) AS data")
+        .writeTo("t")
+        .using("paimon")
+        .tableProperty("primary-key", "id")
+        .tableProperty("bucket", "2")
+        .createOrReplace()
+
+      val createdLocation = loadTable("t").location().toString
+      checkAnswer(sql("SELECT * FROM t ORDER BY id"), Row(0L, "v0") :: Row(1L, 
"v1") :: Nil)
+
+      spark
+        .range(3, 5)
+        .selectExpr(
+          "id",
+          "concat('v', cast(id as string)) AS data",
+          "concat('n', cast(id as string)) AS note")
+        .writeTo("t")
+        .using("paimon")
+        .tableProperty("primary-key", "id")
+        .tableProperty("bucket", "4")
+        .createOrReplace()
+
+      val table = loadTable("t")
+      Assertions.assertEquals(Seq("id", "data", "note"), 
spark.table("t").schema.fieldNames.toSeq)
+      Assertions.assertEquals("4", table.options().get("bucket"))
+      Assertions.assertEquals(createdLocation, table.location().toString)
+      checkAnswer(
+        sql("SELECT * FROM t ORDER BY id"),
+        Row(3L, "v3", "n3") :: Row(4L, "v4", "n4") :: Nil)
+    }
+  }
+
   test("Paimon: DataFrameWrite partition table") {
     withTable("t") {
       spark.sql(s"""
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
index d031f7e60c..503005f247 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
@@ -35,6 +35,8 @@ import 
org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 
 abstract class FormatTableTestBase extends PaimonHiveTestBase with 
AdaptiveSparkPlanHelper {
 
+  import testImplicits._
+
   override protected def beforeEach(): Unit = {
     sql(s"USE $paimonHiveCatalogName")
     sql(s"USE $hiveDbName")
@@ -158,6 +160,48 @@ abstract class FormatTableTestBase extends 
PaimonHiveTestBase with AdaptiveSpark
     }
   }
 
+  test("Format table: create or replace as select supports table type change") 
{
+    assume(gteqSpark3_4)
+    withTable("t") {
+      sql("""
+            |CREATE TABLE t (id BIGINT, data STRING)
+            |USING paimon
+            |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+            |""".stripMargin)
+      sql("INSERT INTO t VALUES (1, 'old')")
+      Seq((2L, "new")).toDF("id", "data").createOrReplaceTempView("source")
+
+      sql("""
+            |CREATE OR REPLACE TABLE t
+            |USING csv
+            |AS SELECT * FROM source
+            |""".stripMargin)
+
+      assert(paimonCatalog.getTable(Identifier.create(hiveDbName, 
"t")).isInstanceOf[FormatTable])
+      checkAnswer(sql("SELECT * FROM t"), Seq(Row(2L, "new")))
+    }
+  }
+
+  test("Format table: replace table supports table type change") {
+    assume(gteqSpark3_4)
+    withTable("t") {
+      sql("""
+            |CREATE TABLE t (id BIGINT, data STRING)
+            |USING paimon
+            |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+            |""".stripMargin)
+      sql("INSERT INTO t VALUES (1, 'old')")
+
+      sql("""
+            |REPLACE TABLE t (id BIGINT, data STRING)
+            |USING csv
+            |""".stripMargin)
+
+      assert(paimonCatalog.getTable(Identifier.create(hiveDbName, 
"t")).isInstanceOf[FormatTable])
+      checkAnswer(sql("SELECT * FROM t"), Seq.empty[Row])
+    }
+  }
+
   test("Format table: read compressed files") {
     for (format <- Seq("csv", "json")) {
       withTable("compress_t") {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/ReplaceTableWithRestCatalogTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/ReplaceTableWithRestCatalogTest.scala
new file mode 100644
index 0000000000..d535e7b8c1
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/ReplaceTableWithRestCatalogTest.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestWithRestCatalogBase
+
+import org.apache.spark.sql.Row
+import org.junit.jupiter.api.Assertions
+
+class ReplaceTableWithRestCatalogTest extends 
PaimonSparkTestWithRestCatalogBase {
+
+  import testImplicits._
+
+  test("Replace table with REST catalog: REPLACE TABLE preserves snapshots") {
+    assume(gteqSpark3_4)
+    withTable("t") {
+      sql("""
+            |CREATE TABLE t (id BIGINT, data STRING)
+            |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+            |""".stripMargin)
+      sql("INSERT INTO t VALUES (1, 'old')")
+      val oldLocation = loadTable("t").location().toString
+      val oldSnapshotId = loadTable("t").snapshotManager().latestSnapshotId()
+
+      sql("""
+            |REPLACE TABLE t (id BIGINT, name STRING)
+            |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '4')
+            |""".stripMargin)
+
+      val replaced = loadTable("t")
+      Assertions.assertEquals(oldLocation, replaced.location().toString)
+      Assertions.assertEquals("4", replaced.options().get("bucket"))
+      Assertions.assertEquals(Seq("id", "name"), 
spark.table("t").schema.fieldNames.toSeq)
+      checkAnswer(sql("SELECT * FROM t"), Seq.empty[Row])
+
+      checkAnswer(
+        sql(s"SELECT id, data FROM t VERSION AS OF $oldSnapshotId"),
+        Seq((1L, "old")).toDF())
+    }
+  }
+
+  test("Replace table with REST catalog: CREATE OR REPLACE TABLE AS SELECT") {
+    assume(gteqSpark3_4)
+    withTable("t") {
+      sql("""
+            |CREATE TABLE t (id BIGINT, data STRING)
+            |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+            |""".stripMargin)
+      sql("INSERT INTO t VALUES (1, 'old')")
+      val oldLocation = loadTable("t").location().toString
+      Seq((2L, "x2"), (3L, "x3")).toDF("id", 
"data").createOrReplaceTempView("source")
+
+      sql("""
+            |CREATE OR REPLACE TABLE t
+            |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '3')
+            |AS SELECT * FROM source
+            |""".stripMargin)
+
+      val replaced = loadTable("t")
+      Assertions.assertEquals(oldLocation, replaced.location().toString)
+      Assertions.assertEquals("3", replaced.options().get("bucket"))
+      checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq((2L, "x2"), (3L, 
"x3")).toDF())
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
index c71b3df923..1798b12410 100644
--- 
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
+++ 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
@@ -32,17 +32,19 @@ import 
org.apache.spark.sql.catalyst.analysis.{CTESubstitution, SubstituteUnreso
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment, 
CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable, 
SubqueryAlias, UnresolvedWith, UpdateAction}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment, 
CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable, 
SubqueryAlias, TableSpec, UnresolvedWith, UpdateAction}
 // NOTE: `MergeRows` / `MergeRows.Keep` were introduced in Spark 3.4. We 
access them only via
 // reflection inside the `mergeRowsKeep*` method bodies so that loading 
`Spark3Shim` does not fail
 // on Spark 3.2 / 3.3 runtimes that still ship `paimon-spark3-common` (the 
module targets 3.5.8 at
 // compile time but must also run on 3.2 / 3.3).
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.util.ArrayData
-import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GeneratedColumn, 
ResolveDefaultColumns}
+import org.apache.spark.sql.connector.catalog.{Column, Identifier, 
StagingTableCatalog, Table, TableCatalog}
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Util.structTypeToV2Columns
 import org.apache.spark.sql.connector.expressions.Transform
-import org.apache.spark.sql.execution.SparkFormatTable
+import org.apache.spark.sql.execution.{SparkFormatTable, SparkPlan}
 import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, 
PartitionSpec}
+import 
org.apache.spark.sql.execution.datasources.v2.{AtomicReplaceTableAsSelectExec, 
AtomicReplaceTableExec, ReplaceTableAsSelectExec, ReplaceTableExec}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.streaming.{FileStreamSink, 
MetadataLogFileIndex}
 import org.apache.spark.sql.internal.SQLConf
@@ -86,6 +88,103 @@ class Spark3Shim extends SparkShim {
     tableCatalog.createTable(ident, schema, partitions, properties)
   }
 
+  override def createReplaceTableAsSelectExec(
+      catalog: TableCatalog,
+      ident: Identifier,
+      partitioning: Seq[Transform],
+      query: LogicalPlan,
+      tableSpec: TableSpec,
+      writeOptions: Map[String, String],
+      orCreate: Boolean): SparkPlan = {
+    ReplaceTableAsSelectExec(
+      catalog,
+      ident,
+      partitioning,
+      query,
+      tableSpec,
+      writeOptions,
+      orCreate = orCreate,
+      invalidateCache)
+  }
+
+  override def createAtomicReplaceTableAsSelectExec(
+      catalog: StagingTableCatalog,
+      ident: Identifier,
+      partitioning: Seq[Transform],
+      query: LogicalPlan,
+      tableSpec: TableSpec,
+      writeOptions: Map[String, String],
+      orCreate: Boolean): SparkPlan = {
+    AtomicReplaceTableAsSelectExec(
+      catalog,
+      ident,
+      partitioning,
+      query,
+      tableSpec,
+      writeOptions,
+      orCreate = orCreate,
+      invalidateCache)
+  }
+
+  override def createReplaceTableExec(
+      catalog: TableCatalog,
+      ident: Identifier,
+      columns: Array[Column],
+      partitioning: Seq[Transform],
+      tableSpec: TableSpec,
+      orCreate: Boolean): SparkPlan = {
+    ReplaceTableExec(
+      catalog,
+      ident,
+      columns,
+      partitioning,
+      tableSpec,
+      orCreate = orCreate,
+      invalidateCache)
+  }
+
+  override def createAtomicReplaceTableExec(
+      catalog: StagingTableCatalog,
+      ident: Identifier,
+      columns: Array[Column],
+      partitioning: Seq[Transform],
+      tableSpec: TableSpec,
+      orCreate: Boolean): SparkPlan = {
+    AtomicReplaceTableExec(
+      catalog,
+      ident,
+      columns,
+      partitioning,
+      tableSpec,
+      orCreate = orCreate,
+      invalidateCache)
+  }
+
+  override def toReplaceTableColumns(
+      tableSchema: StructType,
+      schemaOrColumns: Any,
+      catalog: TableCatalog,
+      ident: Identifier): Array[Column] = {
+    val statementType = "CREATE TABLE"
+    val schema = schemaOrColumns.asInstanceOf[StructType]
+    ResolveDefaultColumns.validateCatalogForDefaultValue(schema, catalog, 
ident)
+    val newSchema =
+      ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(schema, 
statementType)
+    GeneratedColumn.validateGeneratedColumns(newSchema, catalog, ident, 
statementType)
+    structTypeToV2Columns(newSchema)
+  }
+
+  override def copyTableSpec(
+      tableSpec: TableSpec,
+      additionalProperties: Map[String, String],
+      location: Option[String]): TableSpec = {
+    tableSpec.copy(properties = tableSpec.properties ++ additionalProperties, 
location = location)
+  }
+
+  private def invalidateCache(tableCatalog: TableCatalog, table: Table, ident: 
Identifier): Unit = {
+    tableCatalog.invalidateTable(ident)
+  }
+
   override def createCTERelationRef(
       cteId: Long,
       resolved: Boolean,
diff --git 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
index b66b896927..516c02a09f 100644
--- 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
+++ 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
@@ -32,14 +32,15 @@ import 
org.apache.spark.sql.catalyst.analysis.CTESubstitution
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment, 
CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable, 
MergeRows, SubqueryAlias, UnresolvedWith, UpdateAction}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment, 
ColumnDefinition, CTERelationRef, InsertAction, LogicalPlan, MergeAction, 
MergeIntoTable, MergeRows, SubqueryAlias, TableSpec, UnresolvedWith, 
UpdateAction}
 import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Copy, Insert, 
Keep, Update}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.util.ArrayData
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, 
Table, TableCatalog}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GeneratedColumn, 
IdentityColumn, ResolveDefaultColumns}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, 
Identifier, StagingTableCatalog, Table, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
-import org.apache.spark.sql.execution.SparkFormatTable
+import org.apache.spark.sql.execution.{SparkFormatTable, SparkPlan}
 import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, 
PartitionSpec}
+import 
org.apache.spark.sql.execution.datasources.v2.{AtomicReplaceTableAsSelectExec, 
AtomicReplaceTableExec, ReplaceTableAsSelectExec, ReplaceTableExec}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.streaming.runtime.MetadataLogFileIndex
 import org.apache.spark.sql.execution.streaming.sinks.FileStreamSink
@@ -86,6 +87,102 @@ class Spark4Shim extends SparkShim {
     tableCatalog.createTable(ident, columns, partitions, properties)
   }
 
+  override def createReplaceTableAsSelectExec(
+      catalog: TableCatalog,
+      ident: Identifier,
+      partitioning: Seq[Transform],
+      query: LogicalPlan,
+      tableSpec: TableSpec,
+      writeOptions: Map[String, String],
+      orCreate: Boolean): SparkPlan = {
+    ReplaceTableAsSelectExec(
+      catalog,
+      ident,
+      partitioning,
+      query,
+      tableSpec,
+      writeOptions,
+      orCreate = orCreate,
+      invalidateCache)
+  }
+
+  override def createAtomicReplaceTableAsSelectExec(
+      catalog: StagingTableCatalog,
+      ident: Identifier,
+      partitioning: Seq[Transform],
+      query: LogicalPlan,
+      tableSpec: TableSpec,
+      writeOptions: Map[String, String],
+      orCreate: Boolean): SparkPlan = {
+    AtomicReplaceTableAsSelectExec(
+      catalog,
+      ident,
+      partitioning,
+      query,
+      tableSpec,
+      writeOptions,
+      orCreate = orCreate,
+      invalidateCache)
+  }
+
+  override def createReplaceTableExec(
+      catalog: TableCatalog,
+      ident: Identifier,
+      columns: Array[Column],
+      partitioning: Seq[Transform],
+      tableSpec: TableSpec,
+      orCreate: Boolean): SparkPlan = {
+    ReplaceTableExec(
+      catalog,
+      ident,
+      columns,
+      partitioning,
+      tableSpec,
+      orCreate = orCreate,
+      invalidateCache)
+  }
+
+  override def createAtomicReplaceTableExec(
+      catalog: StagingTableCatalog,
+      ident: Identifier,
+      columns: Array[Column],
+      partitioning: Seq[Transform],
+      tableSpec: TableSpec,
+      orCreate: Boolean): SparkPlan = {
+    AtomicReplaceTableExec(
+      catalog,
+      ident,
+      columns,
+      partitioning,
+      tableSpec,
+      orCreate = orCreate,
+      invalidateCache)
+  }
+
+  override def toReplaceTableColumns(
+      tableSchema: StructType,
+      schemaOrColumns: Any,
+      catalog: TableCatalog,
+      ident: Identifier): Array[Column] = {
+    val statementType = "REPLACE TABLE"
+    val columns = schemaOrColumns.asInstanceOf[Seq[ColumnDefinition]]
+    ResolveDefaultColumns.validateCatalogForDefaultValue(columns, catalog, 
ident)
+    GeneratedColumn.validateGeneratedColumns(tableSchema, catalog, ident, 
statementType)
+    IdentityColumn.validateIdentityColumn(tableSchema, catalog, ident)
+    columns.map(_.toV2Column(statementType)).toArray
+  }
+
+  override def copyTableSpec(
+      tableSpec: TableSpec,
+      additionalProperties: Map[String, String],
+      location: Option[String]): TableSpec = {
+    tableSpec.copy(properties = tableSpec.properties ++ additionalProperties, 
location = location)
+  }
+
+  private def invalidateCache(tableCatalog: TableCatalog, ident: Identifier): 
Unit = {
+    tableCatalog.invalidateTable(ident)
+  }
+
   override def createCTERelationRef(
       cteId: Long,
       resolved: Boolean,

Reply via email to