luoyuxia commented on code in PR #22839:
URL: https://github.com/apache/flink/pull/22839#discussion_r1250008294


##########
docs/layouts/shortcodes/generated/table_config_configuration.html:
##########
@@ -20,6 +20,12 @@
             <td>String</td>
             <td>The name of the default database in the initial catalog to be 
created when instantiating TableEnvironment.</td>
         </tr>
+        <tr>
+            <td><h5>table.ctas.atomicity-enabled</h5><br> <span class="label 
label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Specifies if the CREATE TABLE AS SELECT statement is executed 
atomically. By default, the statement is non-atomic. The target table is 
created in client side, and it will not be dropped even though the job fails or 
is cancelled. If set this option to true and DynamicTableSink implements the 
SupportsStaging interface, the statement is expected to be executed atomically, 
the behavior of which depends on the actual DynamicTableSink.</td>

Review Comment:
   nit
   ```suggestion
               <td>Specifies if the CREATE TABLE AS SELECT statement is 
executed atomically. By default, the statement is non-atomic. The target table 
is created on the client side and it will not be dropped even though the job 
fails or is canceled. If set this option to true and the underlying 
DynamicTableSink implements the SupportsStaging interface, the statement is 
expected to be executed atomically, the behavior of which depends on the actual 
DynamicTableSink.</td>
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -816,7 +825,56 @@ public TableResultInternal 
executeInternal(List<ModifyOperation> operations) {
 
         List<Transformation<?>> transformations = translate(mapOperations);
         List<String> sinkIdentifierNames = 
extractSinkIdentifierNames(mapOperations);
-        return executeInternal(transformations, sinkIdentifierNames);
+        return executeInternal(transformations, sinkIdentifierNames, 
jobStatusHookList);
+    }
+
+    private ModifyOperation getOperation(
+            CreateTableASOperation ctasOperation, List<JobStatusHook> 
jobStatusHookList) {
+        CreateTableOperation createTableOperation = 
ctasOperation.getCreateTableOperation();
+        ObjectIdentifier tableIdentifier = 
createTableOperation.getTableIdentifier();
+        Catalog catalog = 
catalogManager.getCatalog(tableIdentifier.getCatalogName()).orElse(null);
+        ResolvedCatalogTable catalogTable =
+                
catalogManager.resolveCatalogTable(createTableOperation.getCatalogTable());
+
+        if (!TableFactoryUtil.isLegacyConnectorOptions(
+                catalog,
+                tableConfig,
+                isStreamingMode,
+                tableIdentifier,
+                catalogTable,
+                createTableOperation.isTemporary())) {
+            DynamicTableSink dynamicTableSink =
+                    ExecutableOperationUtils.createDynamicTableSink(
+                            catalog,
+                            () -> 
moduleManager.getFactory((Module::getTableSinkFactory)),
+                            tableIdentifier,
+                            catalogTable,
+                            Collections.emptyMap(),
+                            tableConfig,
+                            resourceManager.getUserClassLoader(),
+                            createTableOperation.isTemporary());
+            if (dynamicTableSink instanceof SupportsStaging

Review Comment:
   You can check  
`tableConfig.get(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED)` at first so 
that we can avoid creating dynamicTableSink in the earlier stage.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+
+import java.io.Serializable;
+
+/**
+ * The {@link StagedTable} is designed to implement atomic semantic using a 
two-phase commit

Review Comment:
   ```suggestion
    * The {@link StagedTable} is designed to implement Flink's atomic semantic 
for CTAS(CREATE TABLE AS SELECT) statement using a two-phase commit
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+
+import java.io.Serializable;
+
+/**
+ * The {@link StagedTable} is designed to implement atomic semantic using a 
two-phase commit
+ * protocol. The {@link StagedTable} is supposed to be returned via method 
{@link
+ * SupportsStaging#applyStaging} by the {@link DynamicTableSink} which 
implements the {@link
+ * SupportsStaging} interface.
+ *
+ * <p>When the Flink job for writing to a {@link DynamicTableSink} is CREATED, 
the {@link
+ * StagedTable#begin()} will be called; when the Flink job is FINISHED, the 
{@link
+ * StagedTable#commit()} will be called; when the Flink job is FAILED or 
CANCELED, the {@link
+ * StagedTable#abort()} will be called;
+ *
+ * <p>See more in {@link SupportsStaging}.
+ */
+@PublicEvolving
+public interface StagedTable extends Serializable {
+
+    /**
+     * This method will be called when the job is started. In Flink's atomic 
CTAS scenario, it is
+     * expected to do initialization work; For example, initializing the 
client of the underlying
+     * service, the tmp path of the underlying storage, or even call the start 
transaction API of
+     * the underlying service, etc.
+     */
+    void begin();
+
+    /**
+     * This method will be called when the job is succeeds. In Flink's atomic 
CTAS scenario, it is
+     * expected to do some commit work. For example, moving the underlying 
data to the target
+     * directory to make it visible, writing buffer data to the underlying 
storage service, or even
+     * call the commit transaction API of the underlying service, etc.
+     */
+    void commit();
+
+    /**
+     * This method will be called when the job is failed or canceled. In 
Flink's atomic CTAS

Review Comment:
   ```suggestion
        * This method will be called when the job is failed or is canceled. In 
Flink's atomic CTAS
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/StagedTableITCase.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+import 
org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory;
+import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/** Tests staged table in batch mode. */
+public class StagedTableITCase extends BatchTestBase {
+
+    @TempDir Path temporaryFolder;
+
+    private File tmpDataFolder;
+
+    @BeforeEach
+    void setup() throws Exception {
+        super.before();
+        List<Row> sourceData = Arrays.asList(Row.of(1, "ZM"));
+
+        TestCollectionTableFactory.reset();
+        TestCollectionTableFactory.initData(sourceData);
+
+        String sourceDDL = "create table t1(a int, b varchar) with 
('connector' = 'COLLECTION')";
+        tEnv().executeSql(sourceDDL);
+        tmpDataFolder = TempDirUtils.newFolder(temporaryFolder);
+    }
+
+    @AfterEach
+    void close() {
+        if (tmpDataFolder != null) {
+            tmpDataFolder.delete();

Review Comment:
   Why delete it. Can't you provide a temPath for each method?
   like:
   testStagedTableWithAtomicCtas(@TempDir Path temporaryFolder) 



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsStaging.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+import org.apache.flink.table.catalog.StagedTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+/**
+ * Interface for {@link DynamicTableSink}s that support CTAS(CREATE TABLE AS 
SELECT) statement
+ * atomic semantic using a two-phase commit protocol. The table sink is 
responsible for telling
+ * planner how to implement atomicity semantics via {@link StagedTable}.
+ *
+ * <p>If the user turns on {@link 
TableConfigOptions#TABLE_CTAS_ATOMICITY_ENABLED}, and {@link

Review Comment:
   suggestion:
   
   If the user turns on {@link 
TableConfigOptions#TABLE_CTAS_ATOMICITY_ENABLED}, and the {@link 
DynamicTableSink} implements {@link SupportsStaging}, the planner will call 
method {@link #applyStaging(StagingContext)} to get the {@link StagedTable} 
returned by the sink, then {@link StagedTable} will be used by Flink to 
implement a two-phase commit with the actual implementation of {@link 
StagedTable}.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/StagedTableITCase.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+import 
org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory;
+import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/** Tests staged table in batch mode. */
+public class StagedTableITCase extends BatchTestBase {
+
+    @TempDir Path temporaryFolder;
+
+    private File tmpDataFolder;
+
+    @BeforeEach
+    void setup() throws Exception {
+        super.before();
+        List<Row> sourceData = Arrays.asList(Row.of(1, "ZM"));
+
+        TestCollectionTableFactory.reset();
+        TestCollectionTableFactory.initData(sourceData);
+
+        String sourceDDL = "create table t1(a int, b varchar) with 
('connector' = 'COLLECTION')";
+        tEnv().executeSql(sourceDDL);
+        tmpDataFolder = TempDirUtils.newFolder(temporaryFolder);
+    }
+
+    @AfterEach
+    void close() {
+        if (tmpDataFolder != null) {
+            tmpDataFolder.delete();
+        }
+    }
+
+    @Test
+    void testStagedTableWithAtomicCtas() throws Exception {

Review Comment:
   ```suggestion
       void testAtomicCtas() throws Exception {
   ```
   It's not for test staged table, it's for test atomic ctas.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/execution/CtasJobStatusHook.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.execution;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.execution.JobStatusHook;
+import org.apache.flink.table.catalog.StagedTable;
+
+/**
+ * This hook is used to implement atomic semantics for CTAS(CREATE TABLE AS 
SELECT) statement. It'll
+ * call the corresponding interfaces of the inner {@link StagedTable} on job 
status changing.

Review Comment:
   ```suggestion
    * call the corresponding interfaces of the inner {@link StagedTable} on job 
status changes.
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java:
##########
@@ -197,6 +197,17 @@ private TableConfigOptions() {}
                     .withDescription(
                             "Local directory that is used by planner for 
storing downloaded resources.");
 
+    @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<Boolean> TABLE_CTAS_ATOMICITY_ENABLED =
+            key("table.ctas.atomicity-enabled")

Review Comment:
   dito: don't forget to also update in here.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+
+import java.io.Serializable;
+
+/**
+ * The {@link StagedTable} is designed to implement atomic semantic using a 
two-phase commit
+ * protocol. The {@link StagedTable} is supposed to be returned via method 
{@link
+ * SupportsStaging#applyStaging} by the {@link DynamicTableSink} which 
implements the {@link
+ * SupportsStaging} interface.
+ *
+ * <p>When the Flink job for writing to a {@link DynamicTableSink} is CREATED, 
the {@link
+ * StagedTable#begin()} will be called; when the Flink job is FINISHED, the 
{@link
+ * StagedTable#commit()} will be called; when the Flink job is FAILED or 
CANCELED, the {@link
+ * StagedTable#abort()} will be called;
+ *
+ * <p>See more in {@link SupportsStaging}.
+ */
+@PublicEvolving
+public interface StagedTable extends Serializable {
+
+    /**
+     * This method will be called when the job is started. In Flink's atomic 
CTAS scenario, it is
+     * expected to do initialization work; For example, initializing the 
client of the underlying
+     * service, the tmp path of the underlying storage, or even call the start 
transaction API of
+     * the underlying service, etc.
+     */
+    void begin();
+
+    /**
+     * This method will be called when the job is succeeds. In Flink's atomic 
CTAS scenario, it is

Review Comment:
   ```suggestion
        * This method will be called when the job succeeds. In Flink's atomic 
CTAS scenario, it is
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsStaging.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+import org.apache.flink.table.catalog.StagedTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+/**
+ * Interface for {@link DynamicTableSink}s that support CTAS(CREATE TABLE AS 
SELECT) statement

Review Comment:
   suggestion:
   Interface for {@link DynamicTableSink}s that support atomic semantic for 
CTAS(CREATE TABLE AS SELECT) statement using a two-phase commit protocol. The 
table sink is responsible for returning 
   a {@link StagedTable} to tell the Flink how to implement the atomicity 
semantics.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+
+import java.io.Serializable;
+
+/**
+ * The {@link StagedTable} is designed to implement atomic semantic using a 
two-phase commit
+ * protocol. The {@link StagedTable} is supposed to be returned via method 
{@link
+ * SupportsStaging#applyStaging} by the {@link DynamicTableSink} which 
implements the {@link
+ * SupportsStaging} interface.
+ *
+ * <p>When the Flink job for writing to a {@link DynamicTableSink} is CREATED, 
the {@link
+ * StagedTable#begin()} will be called; when the Flink job is FINISHED, the 
{@link
+ * StagedTable#commit()} will be called; when the Flink job is FAILED or 
CANCELED, the {@link
+ * StagedTable#abort()} will be called;
+ *
+ * <p>See more in {@link SupportsStaging}.
+ */
+@PublicEvolving
+public interface StagedTable extends Serializable {
+
+    /**
+     * This method will be called when the job is started. In Flink's atomic 
CTAS scenario, it is
+     * expected to do initialization work; For example, initializing the 
client of the underlying
+     * service, the tmp path of the underlying storage, or even call the start 
transaction API of
+     * the underlying service, etc.
+     */
+    void begin();
+
+    /**
+     * This method will be called when the job is succeeds. In Flink's atomic 
CTAS scenario, it is
+     * expected to do some commit work. For example, moving the underlying 
data to the target
+     * directory to make it visible, writing buffer data to the underlying 
storage service, or even
+     * call the commit transaction API of the underlying service, etc.
+     */
+    void commit();
+
+    /**
+     * This method will be called when the job is failed or canceled. In 
Flink's atomic CTAS
+     * scenario, it is expected to do some cleaning work for a writing; For 
example, delete the data
+     * in tmp directory, delete the temporary data in the underlying storage 
service, or even call

Review Comment:
   ```suggestion
        * in the tmp directory, delete the temporary data in the underlying 
storage service, or even call
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+
+import java.io.Serializable;
+
+/**
+ * The {@link StagedTable} is designed to implement atomic semantic using a 
two-phase commit
+ * protocol. The {@link StagedTable} is supposed to be returned via method 
{@link
+ * SupportsStaging#applyStaging} by the {@link DynamicTableSink} which 
implements the {@link
+ * SupportsStaging} interface.
+ *
+ * <p>When the Flink job for writing to a {@link DynamicTableSink} is CREATED, 
the {@link

Review Comment:
   ```suggestion
    * <p>When the Flink job for writing to a {@link DynamicTableSink} with 
atomic semantic supporting is CREATED, the {@link
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+
+import java.io.Serializable;
+
+/**
+ * The {@link StagedTable} is designed to implement atomic semantic using a 
two-phase commit
+ * protocol. The {@link StagedTable} is supposed to be returned via method 
{@link
+ * SupportsStaging#applyStaging} by the {@link DynamicTableSink} which 
implements the {@link
+ * SupportsStaging} interface.
+ *
+ * <p>When the Flink job for writing to a {@link DynamicTableSink} is CREATED, 
the {@link
+ * StagedTable#begin()} will be called; when the Flink job is FINISHED, the 
{@link
+ * StagedTable#commit()} will be called; when the Flink job is FAILED or 
CANCELED, the {@link
+ * StagedTable#abort()} will be called;
+ *
+ * <p>See more in {@link SupportsStaging}.
+ */
+@PublicEvolving
+public interface StagedTable extends Serializable {
+
+    /**
+     * This method will be called when the job is started. In Flink's atomic 
CTAS scenario, it is
+     * expected to do initialization work; For example, initializing the 
client of the underlying
+     * service, the tmp path of the underlying storage, or even call the start 
transaction API of
+     * the underlying service, etc.
+     */
+    void begin();
+
+    /**
+     * This method will be called when the job is succeeds. In Flink's atomic 
CTAS scenario, it is
+     * expected to do some commit work. For example, moving the underlying 
data to the target
+     * directory to make it visible, writing buffer data to the underlying 
storage service, or even
+     * call the commit transaction API of the underlying service, etc.
+     */
+    void commit();
+
+    /**
+     * This method will be called when the job is failed or canceled. In 
Flink's atomic CTAS
+     * scenario, it is expected to do some cleaning work for a writing; For 
example, delete the data

Review Comment:
   ```suggestion
        * scenario, it is expected to do some cleaning work for writing; For 
example, delete the data
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/StagedTableITCase.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+import 
org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory;
+import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/** Tests staged table in stream mode. */
+public class StagedTableITCase extends StreamingTestBase {

Review Comment:
   dito



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/StagedTableITCase.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+import 
org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory;
+import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/** Tests staged table in batch mode. */
+public class StagedTableITCase extends BatchTestBase {

Review Comment:
   ```suggestion
   public class AtomicCtasITCase extends BatchTestBase {
   ```
   ?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/StagedTableITCase.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+import 
org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory;
+import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/** Tests staged table in batch mode. */
+public class StagedTableITCase extends BatchTestBase {
+
+    @TempDir Path temporaryFolder;
+
+    private File tmpDataFolder;
+
+    @BeforeEach
+    void setup() throws Exception {
+        super.before();
+        List<Row> sourceData = Arrays.asList(Row.of(1, "ZM"));
+
+        TestCollectionTableFactory.reset();
+        TestCollectionTableFactory.initData(sourceData);
+
+        String sourceDDL = "create table t1(a int, b varchar) with 
('connector' = 'COLLECTION')";
+        tEnv().executeSql(sourceDDL);
+        tmpDataFolder = TempDirUtils.newFolder(temporaryFolder);
+    }
+
+    @AfterEach
+    void close() {
+        if (tmpDataFolder != null) {
+            tmpDataFolder.delete();
+        }
+    }
+
+    @Test
+    void testStagedTableWithAtomicCtas() throws Exception {
+        commonTestStagedTableWithAtomicCtas("ctas_batch_table", false);
+    }
+
+    @Test
+    void testStagedTableWithAtomicCtasIfNotExists() throws Exception {
+        commonTestStagedTableWithAtomicCtas("ctas_if_not_exists_batch_table", 
true);
+    }
+
+    void commonTestStagedTableWithAtomicCtas(String tableName, boolean 
ifNotExists)
+            throws Exception {
+        
tEnv().getConfig().set(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED, true);
+        String dataDir = tmpDataFolder.getAbsolutePath();
+        String sqlFragment = ifNotExists ? " if not exists " + tableName : 
tableName;
+        tEnv().executeSql(
+                        "create table "
+                                + sqlFragment
+                                + " with ('connector' = 'test-staging', 
'data-dir' = '"
+                                + dataDir
+                                + "') as select * from t1")
+                .await();
+        assertThat(tEnv().listTables()).doesNotContain(tableName);
+        File file = new File(dataDir, "data");
+        assertThat(file).exists();
+        assertThat(file).isFile();
+        assertThat(FileUtils.readFileUtf8(file)).isEqualTo("1,ZM");
+        
assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).hasSize(2);
+        assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS)
+                .contains("begin", "commit");
+        
assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST).hasSize(1);
+        if (ifNotExists) {
+            assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST)
+                    
.contains(SupportsStaging.StagingPurpose.CREATE_TABLE_AS_IF_NOT_EXISTS);
+        } else {
+            assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST)
+                    .contains(SupportsStaging.StagingPurpose.CREATE_TABLE_AS);
+        }
+    }
+
+    @Test
+    void testFailStagedTableWithAtomicCtas() {
+        
tEnv().getConfig().set(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED, true);
+        String dataDir = tmpDataFolder.getAbsolutePath();
+        assertThatCode(
+                        () ->
+                                tEnv().executeSql(
+                                                "create table 
ctas_batch_table_fail with ('connector' = 'test-staging', 'data-dir' = '"
+                                                        + dataDir
+                                                        + "', 'sink-fail' = '"
+                                                        + true
+                                                        + "') as select * from 
t1")
+                                        .await())
+                .hasRootCauseMessage("Test StagedTable abort method.");
+
+        
assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).hasSize(2);
+        assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS)
+                .contains("begin", "abort");
+    }
+
+    @Test
+    void testStagedTableWithoutAtomicCtas() throws Exception {
+        
tEnv().getConfig().set(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED, false);
+        String dataDir = tmpDataFolder.getAbsolutePath();
+        tEnv().executeSql(
+                        "create table ctas_batch_table with ('connector' = 
'test-staging', 'data-dir' = '"
+                                + dataDir
+                                + "') as select * from t1")
+                .await();
+        assertThat(tEnv().listTables()).contains("ctas_batch_table");
+        // Not using StagedTable, so need to read the hidden file
+        File file = new File(dataDir, "_data");
+        assertThat(file).exists();

Review Comment:
   Please extract 
   ```
   assertThat(file).exists();
   assertThat(file).isFile();
   assertThat(FileUtils.readFileUtf8(file)).isEqualTo("1,ZM");
   ```
   to a common method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to