This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cc9712bfe92 [FLINK-32518][table] Enable atomicity for [CREATE OR] 
REPLACE TABLE AS statement (#22995)
cc9712bfe92 is described below

commit cc9712bfe92de4a68e87baef8f6937dcdc8a8634
Author: zhangmang <[email protected]>
AuthorDate: Tue Jul 18 20:43:41 2023 +0800

    [FLINK-32518][table] Enable atomicity for [CREATE OR] REPLACE TABLE AS 
statement (#22995)
---
 .../generated/table_config_configuration.html      |   4 +-
 .../flink/table/api/config/TableConfigOptions.java |   8 +-
 .../table/api/internal/TableEnvironmentImpl.java   | 124 +++++++-----
 ...atusHook.java => StagingSinkJobStatusHook.java} |   9 +-
 .../table/operations/ReplaceTableAsOperation.java  |  20 ++
 .../operations/StagedSinkModifyOperation.java      |  15 +-
 .../apache/flink/table/catalog/StagedTable.java    |  15 +-
 .../connector/sink/abilities/SupportsStaging.java  |  17 +-
 .../runtime/batch/sql/AtomicRtasITCase.java        |  35 ++++
 .../runtime/stream/sql/AtomicRtasITCase.java       |  36 ++++
 .../runtime/utils/AtomicCtasITCaseBase.java        |  11 +-
 .../runtime/utils/AtomicRtasITCaseBase.java        | 225 +++++++++++++++++++++
 12 files changed, 438 insertions(+), 81 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/table_config_configuration.html 
b/docs/layouts/shortcodes/generated/table_config_configuration.html
index bc78ecb4c55..c67ddf6a92d 100644
--- a/docs/layouts/shortcodes/generated/table_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/table_config_configuration.html
@@ -21,10 +21,10 @@
             <td>The name of the default database in the initial catalog to be 
created when instantiating TableEnvironment.</td>
         </tr>
         <tr>
-            <td><h5>table.ctas.atomicity-enabled</h5><br> <span class="label 
label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
+            <td><h5>table.rtas-ctas.atomicity-enabled</h5><br> <span 
class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
-            <td>Specifies if the CREATE TABLE AS SELECT statement is executed 
atomically. By default, the statement is non-atomic. The target table is 
created on the client side, and it will not be dropped even though the job 
fails or is canceled. If set this option to true and the underlying 
DynamicTableSink implements the SupportsStaging interface, the statement is 
expected to be executed atomically, the behavior of which depends on the actual 
DynamicTableSink.</td>
+            <td>Specifies if the CREATE TABLE/REPLACE TABLE/CREATE OR REPLACE 
AS SELECT statement is executed atomically. By default, the statement is 
non-atomic. The target table is created/replaced on the client side, and it 
will not be rolled back even though the job fails or is canceled. If set this 
option to true and the underlying DynamicTableSink implements the 
SupportsStaging interface, the statement is expected to be executed atomically, 
the behavior of which depends on the actu [...]
         </tr>
         <tr>
             <td><h5>table.display.max-column-width</h5><br> <span class="label 
label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
index 2871be40be0..edfe1b63475 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
@@ -198,13 +198,13 @@ public class TableConfigOptions {
                             "Local directory that is used by planner for 
storing downloaded resources.");
 
     @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
-    public static final ConfigOption<Boolean> TABLE_CTAS_ATOMICITY_ENABLED =
-            key("table.ctas.atomicity-enabled")
+    public static final ConfigOption<Boolean> 
TABLE_RTAS_CTAS_ATOMICITY_ENABLED =
+            key("table.rtas-ctas.atomicity-enabled")
                     .booleanType()
                     .defaultValue(false)
                     .withDescription(
-                            "Specifies if the CREATE TABLE AS SELECT statement 
is executed atomically. By default, the statement is non-atomic. "
-                                    + "The target table is created on the 
client side, and it will not be dropped even though the job fails or is 
canceled. "
+                            "Specifies if the CREATE TABLE/REPLACE 
TABLE/CREATE OR REPLACE AS SELECT statement is executed atomically. By default, 
the statement is non-atomic. "
+                                    + "The target table is created/replaced on 
the client side, and it will not be rolled back even though the job fails or is 
canceled. "
                                     + "If set this option to true and the 
underlying DynamicTableSink implements the SupportsStaging interface, "
                                     + "the statement is expected to be 
executed atomically, the behavior of which depends on the actual 
DynamicTableSink.");
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 145b82fee78..4e6c91f6a51 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -68,7 +68,7 @@ import org.apache.flink.table.delegation.ExecutorFactory;
 import org.apache.flink.table.delegation.InternalPlan;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.delegation.Planner;
-import org.apache.flink.table.execution.CtasJobStatusHook;
+import org.apache.flink.table.execution.StagingSinkJobStatusHook;
 import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.FactoryUtil;
@@ -797,12 +797,11 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
         List<JobStatusHook> jobStatusHookList = new LinkedList<>();
         for (ModifyOperation modify : operations) {
             if (modify instanceof CreateTableASOperation) {
-                // execute CREATE TABLE first for CTAS statements
                 CreateTableASOperation ctasOperation = 
(CreateTableASOperation) modify;
                 mapOperations.add(getModifyOperation(ctasOperation, 
jobStatusHookList));
             } else if (modify instanceof ReplaceTableAsOperation) {
                 ReplaceTableAsOperation rtasOperation = 
(ReplaceTableAsOperation) modify;
-                mapOperations.add(getOperation(rtasOperation));
+                mapOperations.add(getModifyOperation(rtasOperation, 
jobStatusHookList));
             } else {
                 boolean isRowLevelModification = 
isRowLevelModification(modify);
                 if (isRowLevelModification) {
@@ -833,26 +832,46 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
         return executeInternal(transformations, sinkIdentifierNames, 
jobStatusHookList);
     }
 
-    private ModifyOperation getOperation(ReplaceTableAsOperation 
rtasOperation) {
-        // rtas drop table first, then create
+    private ModifyOperation getModifyOperation(
+            ReplaceTableAsOperation rtasOperation, List<JobStatusHook> 
jobStatusHookList) {
         CreateTableOperation createTableOperation = 
rtasOperation.getCreateTableOperation();
         ObjectIdentifier tableIdentifier = 
createTableOperation.getTableIdentifier();
-        try {
-            catalogManager.dropTable(tableIdentifier, 
rtasOperation.isCreateOrReplace());
-        } catch (ValidationException e) {
-            if (String.format(
-                            "Table with identifier '%s' does not exist.",
-                            tableIdentifier.asSummaryString())
-                    .equals(e.getMessage())) {
-                throw new TableException(
-                        String.format(
-                                "The table %s to be replaced doesn't exist. "
-                                        + "You can try to use CREATE TABLE AS 
statement or "
-                                        + "CREATE OR REPLACE TABLE AS 
statement.",
-                                tableIdentifier));
-            } else {
-                throw e;
-            }
+        // First check if the replacedTable exists
+        Optional<ContextResolvedTable> replacedTable = 
catalogManager.getTable(tableIdentifier);
+        if (!rtasOperation.isCreateOrReplace() && !replacedTable.isPresent()) {
+            throw new TableException(
+                    String.format(
+                            "The table %s to be replaced doesn't exist. "
+                                    + "You can try to use CREATE TABLE AS 
statement or "
+                                    + "CREATE OR REPLACE TABLE AS statement.",
+                            tableIdentifier));
+        }
+        Catalog catalog =
+                
catalogManager.getCatalogOrThrowException(tableIdentifier.getCatalogName());
+        ResolvedCatalogTable catalogTable =
+                
catalogManager.resolveCatalogTable(createTableOperation.getCatalogTable());
+        Optional<DynamicTableSink> stagingDynamicTableSink =
+                getSupportsStagingDynamicTableSink(createTableOperation, 
catalog, catalogTable);
+        if (stagingDynamicTableSink.isPresent()) {
+            // use atomic rtas
+            DynamicTableSink dynamicTableSink = stagingDynamicTableSink.get();
+            SupportsStaging.StagingPurpose stagingPurpose =
+                    rtasOperation.isCreateOrReplace()
+                            ? 
SupportsStaging.StagingPurpose.CREATE_OR_REPLACE_TABLE_AS
+                            : SupportsStaging.StagingPurpose.REPLACE_TABLE_AS;
+
+            StagedTable stagedTable =
+                    ((SupportsStaging) dynamicTableSink)
+                            .applyStaging(new 
SinkStagingContext(stagingPurpose));
+            StagingSinkJobStatusHook stagingSinkJobStatusHook =
+                    new StagingSinkJobStatusHook(stagedTable);
+            jobStatusHookList.add(stagingSinkJobStatusHook);
+            return rtasOperation.toStagedSinkModifyOperation(
+                    tableIdentifier, catalogTable, catalog, dynamicTableSink);
+        }
+        // non-atomic rtas drop table first if exists, then create
+        if (replacedTable.isPresent()) {
+            catalogManager.dropTable(tableIdentifier, false);
         }
         executeInternal(createTableOperation);
         return rtasOperation.toSinkModifyOperation(catalogManager);
@@ -861,51 +880,62 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
     private ModifyOperation getModifyOperation(
             CreateTableASOperation ctasOperation, List<JobStatusHook> 
jobStatusHookList) {
         CreateTableOperation createTableOperation = 
ctasOperation.getCreateTableOperation();
-        if (tableConfig.get(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED)) {
-            ObjectIdentifier tableIdentifier = 
createTableOperation.getTableIdentifier();
-            Catalog catalog =
-                    
catalogManager.getCatalog(tableIdentifier.getCatalogName()).orElse(null);
-            ResolvedCatalogTable catalogTable =
-                    
catalogManager.resolveCatalogTable(createTableOperation.getCatalogTable());
+        ObjectIdentifier tableIdentifier = 
createTableOperation.getTableIdentifier();
+        Catalog catalog =
+                
catalogManager.getCatalogOrThrowException(tableIdentifier.getCatalogName());
+        ResolvedCatalogTable catalogTable =
+                
catalogManager.resolveCatalogTable(createTableOperation.getCatalogTable());
+        Optional<DynamicTableSink> stagingDynamicTableSink =
+                getSupportsStagingDynamicTableSink(createTableOperation, 
catalog, catalogTable);
+        if (stagingDynamicTableSink.isPresent()) {
+            // use atomic ctas
+            DynamicTableSink dynamicTableSink = stagingDynamicTableSink.get();
+            SupportsStaging.StagingPurpose stagingPurpose =
+                    createTableOperation.isIgnoreIfExists()
+                            ? 
SupportsStaging.StagingPurpose.CREATE_TABLE_AS_IF_NOT_EXISTS
+                            : SupportsStaging.StagingPurpose.CREATE_TABLE_AS;
+            StagedTable stagedTable =
+                    ((SupportsStaging) dynamicTableSink)
+                            .applyStaging(new 
SinkStagingContext(stagingPurpose));
+            StagingSinkJobStatusHook stagingSinkJobStatusHook =
+                    new StagingSinkJobStatusHook(stagedTable);
+            jobStatusHookList.add(stagingSinkJobStatusHook);
+            return ctasOperation.toStagedSinkModifyOperation(
+                    tableIdentifier, catalogTable, catalog, dynamicTableSink);
+        }
+        // use non-atomic ctas, create table first
+        executeInternal(createTableOperation);
+        return ctasOperation.toSinkModifyOperation(catalogManager);
+    }
+
+    private Optional<DynamicTableSink> getSupportsStagingDynamicTableSink(
+            CreateTableOperation createTableOperation,
+            Catalog catalog,
+            ResolvedCatalogTable catalogTable) {
+        if 
(tableConfig.get(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED)) {
             if (!TableFactoryUtil.isLegacyConnectorOptions(
                     catalog,
                     tableConfig,
                     isStreamingMode,
-                    tableIdentifier,
+                    createTableOperation.getTableIdentifier(),
                     catalogTable,
                     createTableOperation.isTemporary())) {
                 DynamicTableSink dynamicTableSink =
                         ExecutableOperationUtils.createDynamicTableSink(
                                 catalog,
                                 () -> 
moduleManager.getFactory((Module::getTableSinkFactory)),
-                                tableIdentifier,
+                                createTableOperation.getTableIdentifier(),
                                 catalogTable,
                                 Collections.emptyMap(),
                                 tableConfig,
                                 resourceManager.getUserClassLoader(),
                                 createTableOperation.isTemporary());
                 if (dynamicTableSink instanceof SupportsStaging) {
-                    // use atomic ctas
-                    SupportsStaging.StagingPurpose stagingPurpose =
-                            createTableOperation.isIgnoreIfExists()
-                                    ? 
SupportsStaging.StagingPurpose.CREATE_TABLE_AS_IF_NOT_EXISTS
-                                    : 
SupportsStaging.StagingPurpose.CREATE_TABLE_AS;
-                    StagedTable stagedTable =
-                            ((SupportsStaging) dynamicTableSink)
-                                    .applyStaging(new 
SinkStagingContext(stagingPurpose));
-                    CtasJobStatusHook ctasJobStatusHook = new 
CtasJobStatusHook(stagedTable);
-                    jobStatusHookList.add(ctasJobStatusHook);
-                    return ctasOperation.toStagedSinkModifyOperation(
-                            createTableOperation.getTableIdentifier(),
-                            catalogTable,
-                            catalog,
-                            dynamicTableSink);
+                    return Optional.of(dynamicTableSink);
                 }
             }
         }
-        // use non-atomic ctas, create table first
-        executeInternal(createTableOperation);
-        return ctasOperation.toSinkModifyOperation(catalogManager);
+        return Optional.empty();
     }
 
     private TableResultInternal executeInternal(
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/execution/CtasJobStatusHook.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/execution/StagingSinkJobStatusHook.java
similarity index 83%
rename from 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/execution/CtasJobStatusHook.java
rename to 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/execution/StagingSinkJobStatusHook.java
index 7a3ad40257a..61fee317b79 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/execution/CtasJobStatusHook.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/execution/StagingSinkJobStatusHook.java
@@ -23,14 +23,15 @@ import org.apache.flink.core.execution.JobStatusHook;
 import org.apache.flink.table.catalog.StagedTable;
 
 /**
- * This hook is used to implement atomic semantics for CTAS(CREATE TABLE AS 
SELECT) statement. It'll
- * call the corresponding interfaces of the inner {@link StagedTable} on job 
status changes.
+ * This hook is used to implement atomic semantics for CTAS(CREATE TABLE AS 
SELECT) or RTAS([CREATE
+ * OR] REPLACE TABLE AS SELECT) statement. It'll call the corresponding 
interfaces of the inner
+ * {@link StagedTable} on job status changes.
  */
-public class CtasJobStatusHook implements JobStatusHook {
+public class StagingSinkJobStatusHook implements JobStatusHook {
 
     private final StagedTable stagedTable;
 
-    public CtasJobStatusHook(StagedTable stagedTable) {
+    public StagingSinkJobStatusHook(StagedTable stagedTable) {
         this.stagedTable = stagedTable;
     }
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ReplaceTableAsOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ReplaceTableAsOperation.java
index 45f00b0f5de..dbddadde198 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ReplaceTableAsOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ReplaceTableAsOperation.java
@@ -19,7 +19,12 @@
 package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
 
 import java.util.Collections;
@@ -71,6 +76,21 @@ public class ReplaceTableAsOperation implements 
ModifyOperation {
                 Collections.emptyMap());
     }
 
+    public StagedSinkModifyOperation toStagedSinkModifyOperation(
+            ObjectIdentifier tableIdentifier,
+            ResolvedCatalogTable catalogTable,
+            Catalog catalog,
+            DynamicTableSink dynamicTableSink) {
+        return new StagedSinkModifyOperation(
+                ContextResolvedTable.permanent(tableIdentifier, catalog, 
catalogTable),
+                sinkModifyQuery,
+                Collections.emptyMap(),
+                null, // targetColumns
+                false,
+                Collections.emptyMap(),
+                dynamicTableSink);
+    }
+
     @Override
     public String asSummaryString() {
         Map<String, Object> params = new LinkedHashMap<>();
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/StagedSinkModifyOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/StagedSinkModifyOperation.java
index c6459458283..057b7d9f1d7 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/StagedSinkModifyOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/StagedSinkModifyOperation.java
@@ -29,14 +29,15 @@ import java.util.Map;
 
 /**
  * DML operation that tells to write to a sink which implements {@link 
SupportsStaging}. Currently.
- * this operation is only for CTAS(CREATE TABLE AS SELECT) statement.
+ * this operation is for CTAS(CREATE TABLE AS SELECT) and RTAS([CREATE OR] 
REPLACE TABLE AS SELECT)
+ * statement.
  *
- * <p>StagedSinkModifyOperation is an extension of SinkModifyOperation in the 
atomic CTAS scenario.
- * Whiling checking whether the corresponding sink support atomic CTAS or not, 
we will need to get
- * DynamicTableSink firstly and check whether it implements {@link 
SupportsStaging} and then call
- * the method {@link SupportsStaging#applyStaging}. We maintain the 
DynamicTableSink in this
- * operation so that we can reuse this DynamicTableSink instead of creating a 
new DynamicTableSink
- * during translating the operation again which is error-prone.
+ * <p>StagedSinkModifyOperation is an extension of SinkModifyOperation in the 
atomic CTAS/RTAS
+ * scenario. Whiling checking whether the corresponding sink support atomic 
CTAS/RTAS or not, we
+ * will need to get DynamicTableSink firstly and check whether it implements 
{@link SupportsStaging}
+ * and then call the method {@link SupportsStaging#applyStaging}. We maintain 
the DynamicTableSink
+ * in this operation so that we can reuse this DynamicTableSink instead of 
creating a new
+ * DynamicTableSink during translating the operation again which is 
error-prone.
  */
 @Internal
 public class StagedSinkModifyOperation extends SinkModifyOperation {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java
index 4da92b264c4..b8ff249e6ad 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java
@@ -26,9 +26,10 @@ import java.io.Serializable;
 
 /**
  * The {@link StagedTable} is designed to implement Flink's atomic semantic 
for CTAS(CREATE TABLE AS
- * SELECT) statement using a two-phase commit protocol. The {@link 
StagedTable} is supposed to be
- * returned via method {@link SupportsStaging#applyStaging} by the {@link 
DynamicTableSink} which
- * implements the {@link SupportsStaging} interface.
+ * SELECT) and RTAS([CREATE OR] REPLACE TABLE AS SELECT) statement using a 
two-phase commit
+ * protocol. The {@link StagedTable} is supposed to be returned via method 
{@link
+ * SupportsStaging#applyStaging} by the {@link DynamicTableSink} which 
implements the {@link
+ * SupportsStaging} interface.
  *
  * <p>When the Flink job for writing to a {@link DynamicTableSink} with atomic 
semantic supporting
  * is CREATED, the {@link StagedTable#begin()} will be called; when the Flink 
job is FINISHED, the
@@ -41,15 +42,15 @@ import java.io.Serializable;
 public interface StagedTable extends Serializable {
 
     /**
-     * This method will be called when the job is started. In Flink's atomic 
CTAS scenario, it is
-     * expected to do initialization work; For example, initializing the 
client of the underlying
+     * This method will be called when the job is started. In Flink's atomic 
CTAS/RTAS scenario, it
+     * is expected to do initialization work; For example, initializing the 
client of the underlying
      * service, the tmp path of the underlying storage, or even call the start 
transaction API of
      * the underlying service, etc.
      */
     void begin();
 
     /**
-     * This method will be called when the job succeeds. In Flink's atomic 
CTAS scenario, it is
+     * This method will be called when the job succeeds. In Flink's atomic 
CTAS/RTAS scenario, it is
      * expected to do some commit work. For example, moving the underlying 
data to the target
      * directory to make it visible, writing buffer data to the underlying 
storage service, or even
      * call the commit transaction API of the underlying service, etc.
@@ -57,7 +58,7 @@ public interface StagedTable extends Serializable {
     void commit();
 
     /**
-     * This method will be called when the job is failed or is canceled. In 
Flink's atomic CTAS
+     * This method will be called when the job is failed or is canceled. In 
Flink's atomic CTAS/RTAS
      * scenario, it is expected to do some cleaning work for writing; For 
example, delete the data
      * in the tmp directory, delete the temporary data in the underlying 
storage service, or even
      * call the rollback transaction API of the underlying service, etc.
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsStaging.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsStaging.java
index 9d79db183f7..eb862920382 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsStaging.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsStaging.java
@@ -24,11 +24,12 @@ import 
org.apache.flink.table.connector.sink.DynamicTableSink;
 
 /**
  * Interface for {@link DynamicTableSink}s that support atomic semantic for 
CTAS(CREATE TABLE AS
- * SELECT) statement using a two-phase commit protocol. The table sink is 
responsible for returning
- * a {@link StagedTable} to tell the Flink how to implement the atomicity 
semantics.
+ * SELECT) or RTAS([CREATE OR] REPLACE TABLE AS SELECT) statement using a 
two-phase commit protocol.
+ * The table sink is responsible for returning a {@link StagedTable} to tell 
the Flink how to
+ * implement the atomicity semantics.
  *
- * <p>If the user turns on {@link 
TableConfigOptions#TABLE_CTAS_ATOMICITY_ENABLED}, and the {@link
- * DynamicTableSink} implements {@link SupportsStaging}, the planner will call 
method {@link
+ * <p>If the user turns on {@link 
TableConfigOptions#TABLE_RTAS_CTAS_ATOMICITY_ENABLED}, and the
+ * {@link DynamicTableSink} implements {@link SupportsStaging}, the planner 
will call method {@link
  * #applyStaging(StagingContext)} to get the {@link StagedTable} returned by 
the sink, then the
  * {@link StagedTable} will be used by Flink to implement a two-phase commit 
with the actual
  * implementation of the {@link StagedTable}.
@@ -38,8 +39,8 @@ public interface SupportsStaging {
 
     /**
      * Provides a {@link StagingContext} for the sink modification and return 
a {@link StagedTable}.
-     * The {@link StagedTable} provides transaction abstraction to support 
atomicity for CTAS. Flink
-     * will call the relevant API of StagedTable when the Job status switches,
+     * The {@link StagedTable} provides transaction abstraction to support 
atomicity for CTAS/RTAS.
+     * Flink will call the relevant API of StagedTable when the Job status 
switches,
      *
      * <p>Note: This method will be called at the compile stage.
      *
@@ -74,6 +75,8 @@ public interface SupportsStaging {
     @PublicEvolving
     enum StagingPurpose {
         CREATE_TABLE_AS,
-        CREATE_TABLE_AS_IF_NOT_EXISTS
+        CREATE_TABLE_AS_IF_NOT_EXISTS,
+        REPLACE_TABLE_AS,
+        CREATE_OR_REPLACE_TABLE_AS
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AtomicRtasITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AtomicRtasITCase.java
new file mode 100644
index 00000000000..efe93520a30
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AtomicRtasITCase.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.runtime.utils.AtomicRtasITCaseBase;
+import org.apache.flink.table.planner.utils.TestingTableEnvironment;
+
+/** Tests atomic rtas in batch mode. */
+public class AtomicRtasITCase extends AtomicRtasITCaseBase {
+
+    @Override
+    protected TableEnvironment getTableEnvironment() {
+        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
+        return TestingTableEnvironment.create(settings, null, 
TableConfig.getDefault());
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/AtomicRtasITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/AtomicRtasITCase.java
new file mode 100644
index 00000000000..d86b9b807b6
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/AtomicRtasITCase.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.planner.runtime.utils.AtomicRtasITCaseBase;
+
+/** Tests atomic rtas in stream mode. */
+public class AtomicRtasITCase extends AtomicRtasITCaseBase {
+
+    @Override
+    protected TableEnvironment getTableEnvironment() {
+        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().build();
+        return StreamTableEnvironment.create(
+                StreamExecutionEnvironment.getExecutionEnvironment(), 
settings);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/AtomicCtasITCaseBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/AtomicCtasITCaseBase.java
index ee8deaac591..341cc02d6c8 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/AtomicCtasITCaseBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/AtomicCtasITCaseBase.java
@@ -27,6 +27,7 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -57,6 +58,10 @@ public abstract class AtomicCtasITCaseBase extends 
TestLogger {
 
         String sourceDDL = "create table t1(a int, b varchar) with 
('connector' = 'COLLECTION')";
         tEnv.executeSql(sourceDDL);
+    }
+
+    @AfterEach
+    void clean() {
         // clean data
         TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS.clear();
         TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST.clear();
@@ -74,7 +79,7 @@ public abstract class AtomicCtasITCaseBase extends TestLogger 
{
 
     private void commonTestForAtomicCtas(String tableName, boolean 
ifNotExists, File tmpDataFolder)
             throws Exception {
-        tEnv.getConfig().set(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED, 
true);
+        
tEnv.getConfig().set(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED, 
true);
         String dataDir = tmpDataFolder.getAbsolutePath();
         String sqlFragment = ifNotExists ? " if not exists " + tableName : 
tableName;
         tEnv.executeSql(
@@ -101,7 +106,7 @@ public abstract class AtomicCtasITCaseBase extends 
TestLogger {
 
     @Test
     void testAtomicCtasWithException(@TempDir Path temporaryFolder) throws 
Exception {
-        tEnv.getConfig().set(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED, 
true);
+        
tEnv.getConfig().set(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED, 
true);
         String dataDir = temporaryFolder.toFile().getAbsolutePath();
         assertThatCode(
                         () ->
@@ -121,7 +126,7 @@ public abstract class AtomicCtasITCaseBase extends 
TestLogger {
 
     @Test
     void testWithoutAtomicCtas(@TempDir Path temporaryFolder) throws Exception 
{
-        tEnv.getConfig().set(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED, 
false);
+        
tEnv.getConfig().set(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED, 
false);
         String dataDir = temporaryFolder.toFile().getAbsolutePath();
         tEnv.executeSql(
                         "create table atomic_ctas_table with ('connector' = 
'test-staging', 'data-dir' = '"
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/AtomicRtasITCaseBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/AtomicRtasITCaseBase.java
new file mode 100644
index 00000000000..13b0e69125b
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/AtomicRtasITCaseBase.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.utils;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+import 
org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory;
+import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** The base case of atomic rtas ITCase. */
+public abstract class AtomicRtasITCaseBase extends TestLogger {
+
+    protected TableEnvironment tEnv;
+
+    protected abstract TableEnvironment getTableEnvironment();
+
+    @BeforeEach
+    void setup() {
+        tEnv = getTableEnvironment();
+        List<Row> sourceData = Collections.singletonList(Row.of(1, "ZM"));
+
+        TestCollectionTableFactory.reset();
+        TestCollectionTableFactory.initData(sourceData);
+
+        String sourceDDL = "create table t1(a int, b varchar) with 
('connector' = 'COLLECTION')";
+        tEnv.executeSql(sourceDDL);
+    }
+
+    @AfterEach
+    void clean() {
+        // clean data
+        TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS.clear();
+        TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST.clear();
+    }
+
+    @Test
+    void testAtomicReplaceTableAs(@TempDir Path temporaryFolder) throws 
Exception {
+        commonTestForAtomicReplaceTableAs(
+                "atomic_replace_table", false, true, temporaryFolder.toFile());
+    }
+
+    @Test
+    void testAtomicReplaceTableAsWithReplacedTableNotExists(@TempDir Path 
temporaryFolder)
+            throws Exception {
+        commonTestForAtomicReplaceTableAs(
+                "atomic_replace_table_not_exists", false, false, 
temporaryFolder.toFile());
+    }
+
+    @Test
+    void testAtomicCreateOrReplaceTableAs(@TempDir Path temporaryFolder) 
throws Exception {
+        commonTestForAtomicReplaceTableAs(
+                "atomic_create_or_replace_table", true, true, 
temporaryFolder.toFile());
+    }
+
+    @Test
+    void testAtomicCreateOrReplaceTableAsWithReplacedTableNotExists(@TempDir 
Path temporaryFolder)
+            throws Exception {
+        commonTestForAtomicReplaceTableAs(
+                "atomic_create_or_replace_table_not_exists", true, false, 
temporaryFolder.toFile());
+    }
+
+    private void commonTestForAtomicReplaceTableAs(
+            String tableName,
+            boolean isCreateOrReplace,
+            boolean isCreateReplacedTable,
+            File tmpDataFolder)
+            throws Exception {
+        if (isCreateReplacedTable) {
+            tEnv.executeSql("create table " + tableName + " (a int) with 
('connector' = 'PRINT')");
+        }
+
+        
tEnv.getConfig().set(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED, 
true);
+        String dataDir = tmpDataFolder.getAbsolutePath();
+        String sqlFragment = getCreateOrReplaceSqlFragment(isCreateOrReplace, 
tableName);
+        String sql =
+                sqlFragment
+                        + " with ('connector' = 'test-staging', 'data-dir' = '"
+                        + dataDir
+                        + "') as select * from t1";
+        if (!isCreateOrReplace && !isCreateReplacedTable) {
+            assertThatThrownBy(() -> tEnv.executeSql(sql))
+                    .isInstanceOf(TableException.class)
+                    .hasMessage(
+                            "The table `default_catalog`.`default_database`.`"
+                                    + tableName
+                                    + "` to be replaced doesn't exist."
+                                    + " You can try to use CREATE TABLE AS 
statement or CREATE OR REPLACE TABLE AS statement.");
+        } else {
+            tEnv.executeSql(sql).await();
+            if (isCreateReplacedTable) {
+                assertThat(tEnv.listTables()).contains(tableName);
+            } else {
+                assertThat(tEnv.listTables()).doesNotContain(tableName);
+            }
+            verifyDataFile(dataDir, "data");
+            
assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).hasSize(2);
+            
assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS)
+                    .contains("begin", "commit");
+            
assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST).hasSize(1);
+            if (isCreateOrReplace) {
+                
assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST)
+                        
.contains(SupportsStaging.StagingPurpose.CREATE_OR_REPLACE_TABLE_AS);
+            } else {
+                
assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST)
+                        
.contains(SupportsStaging.StagingPurpose.REPLACE_TABLE_AS);
+            }
+        }
+    }
+
+    @Test
+    void testAtomicReplaceTableAsWithException(@TempDir Path temporaryFolder) {
+        commonTestForAtomicReplaceTableAsWithException(
+                "atomic_replace_table_fail", false, temporaryFolder.toFile());
+    }
+
+    @Test
+    void testAtomicCreateOrReplaceTableAsWithException(@TempDir Path 
temporaryFolder) {
+        commonTestForAtomicReplaceTableAsWithException(
+                "atomic_create_or_replace_table_fail", true, 
temporaryFolder.toFile());
+    }
+
+    private void commonTestForAtomicReplaceTableAsWithException(
+            String tableName, boolean isCreateOrReplace, File tmpDataFolder) {
+        tEnv.executeSql("create table " + tableName + " (a int) with 
('connector' = 'PRINT')");
+        
tEnv.getConfig().set(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED, 
true);
+        String dataDir = tmpDataFolder.getAbsolutePath();
+        String sqlFragment = getCreateOrReplaceSqlFragment(isCreateOrReplace, 
tableName);
+        assertThatCode(
+                        () ->
+                                tEnv.executeSql(
+                                                sqlFragment
+                                                        + " with ('connector' 
= 'test-staging', 'data-dir' = '"
+                                                        + dataDir
+                                                        + "', 'sink-fail' = '"
+                                                        + true
+                                                        + "') as select * from 
t1")
+                                        .await())
+                .hasRootCauseMessage("Test StagedTable abort method.");
+
+        
assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).hasSize(2);
+        assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS)
+                .contains("begin", "abort");
+    }
+
+    @Test
+    void testWithoutAtomicReplaceTableAs(@TempDir Path temporaryFolder) throws 
Exception {
+        commonTestForWithoutAtomicReplaceTableAs(
+                "non_atomic_replace_table", false, temporaryFolder.toFile());
+    }
+
+    @Test
+    void testWithoutAtomicCreateOrReplaceTableAs(@TempDir Path 
temporaryFolder) throws Exception {
+        commonTestForWithoutAtomicReplaceTableAs(
+                "non_atomic_create_or_replace_table", true, 
temporaryFolder.toFile());
+    }
+
+    private void commonTestForWithoutAtomicReplaceTableAs(
+            String tableName, boolean isCreateOrReplace, File tmpDataFolder) 
throws Exception {
+        tEnv.executeSql("create table " + tableName + " (a int) with 
('connector' = 'PRINT')");
+        
tEnv.getConfig().set(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED, 
false);
+        String dataDir = tmpDataFolder.getAbsolutePath();
+        String sqlFragment = getCreateOrReplaceSqlFragment(isCreateOrReplace, 
tableName);
+
+        tEnv.executeSql(
+                        sqlFragment
+                                + " with ('connector' = 'test-staging', 
'data-dir' = '"
+                                + dataDir
+                                + "') as select * from t1")
+                .await();
+        assertThat(tEnv.listTables()).contains(tableName);
+        // Not using StagedTable, so need to read the hidden file
+        verifyDataFile(dataDir, "_data");
+        
assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).hasSize(0);
+        
assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST).hasSize(0);
+    }
+
+    private void verifyDataFile(String dataDir, String fileName) throws 
IOException {
+        File dataFile = new File(dataDir, fileName);
+        assertThat(dataFile).exists();
+        assertThat(dataFile).isFile();
+        assertThat(FileUtils.readFileUtf8(dataFile)).isEqualTo("1,ZM");
+    }
+
+    private String getCreateOrReplaceSqlFragment(boolean isCreateOrReplace, 
String tableName) {
+        return isCreateOrReplace
+                ? " create or replace table " + tableName
+                : " replace table " + tableName;
+    }
+}


Reply via email to