luoyuxia commented on code in PR #22839:
URL: https://github.com/apache/flink/pull/22839#discussion_r1246593727


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestSupportsStagingTableFactory.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.catalog.StagedTable;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.FileUtils;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+/** A factory to create table to support staging for test purpose. */
+public class TestSupportsStagingTableFactory implements 
DynamicTableSinkFactory {
+
+    public static final String IDENTIFIER = "test-staging";
+
+    public static final List<String> JOB_STATUS_CHANGE_PROCESS = new 
LinkedList<>();
+
+    private static final ConfigOption<String> DATA_DIR =
+            ConfigOptions.key("data-dir")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The data id used to write the rows.");
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        helper.validate();
+        String dataDir = helper.getOptions().get(DATA_DIR);
+        return new SupportsStagingTableSink(dataDir);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Collections.singleton(DATA_DIR);
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return Collections.emptySet();
+    }
+
+    /** A sink that supports staging. */
+    private static class SupportsStagingTableSink implements DynamicTableSink, 
SupportsStaging {
+
+        private String dataDir;
+        private TestStagedTable stagedTable;
+
+        public SupportsStagingTableSink(String dataDir) {
+            this(dataDir, null);
+        }
+
+        public SupportsStagingTableSink(String dataDir, TestStagedTable 
stagedTable) {
+            this.dataDir = dataDir;
+            this.stagedTable = stagedTable;
+        }
+
+        @Override
+        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+            return ChangelogMode.insertOnly();
+        }
+
+        @Override
+        public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+            return new DataStreamSinkProvider() {
+                @Override
+                public DataStreamSink<?> consumeDataStream(
+                        ProviderContext providerContext, DataStream<RowData> 
dataStream) {
+                    if (stagedTable != null) {
+                        return dataStream
+                                .addSink(new StagedSinkFunction(dataDir))
+                                .setParallelism(1);
+                    } else {
+                        // otherwise, do nothing
+                        return dataStream.addSink(new DiscardingSink<>());
+                    }
+                }
+            };
+        }
+
+        @Override
+        public DynamicTableSink copy() {
+            return new SupportsStagingTableSink(dataDir, stagedTable);
+        }
+
+        @Override
+        public String asSummaryString() {
+            return "SupportsStagingTableSink";
+        }
+
+        @Override
+        public StagedTable applyStaging(StagingContext context) {
+            JOB_STATUS_CHANGE_PROCESS.clear();

Review Comment:
   Don't forget to check the `StagingContext#stagingPurpose`.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java:
##########
@@ -197,6 +197,17 @@ private TableConfigOptions() {}
                     .withDescription(
                             "Local directory that is used by planner for 
storing downloaded resources.");
 
+    @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<Boolean> TABLE_CTAS_ATOMICITY_ENABLED =
+            key("table.ctas.atomicity-enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(

Review Comment:
   suggestion:
   ```
   "Specifies if the CREATE TABLE AS SELECT statement is executed atomically. "
                                       + "By default, the statement is 
non-atomic. The target table is created in client side, and it will not be 
dropped even though the job fails or is cancelled. "
                                       + "If set this option to true and 
DynamicTableSink implements the SupportsStaging interface, the statement is 
expected to be executed atomically, "
                                       + "the behavior of which depends on the 
actual DynamicTableSink."
   ```
   Please also don't forget to update `table_config_configuration.html`



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -819,6 +828,57 @@ public TableResultInternal 
executeInternal(List<ModifyOperation> operations) {
         return executeInternal(transformations, sinkIdentifierNames);
     }
 
+    private void executeCreateTableASOperation(
+            CreateTableASOperation ctasOperation, List<ModifyOperation> 
mapOperations) {
+        CreateTableOperation createTableOperation = 
ctasOperation.getCreateTableOperation();
+        ObjectIdentifier tableIdentifier = 
createTableOperation.getTableIdentifier();
+        Catalog catalog = 
catalogManager.getCatalog(tableIdentifier.getCatalogName()).orElse(null);
+        ResolvedCatalogTable catalogTable =
+                
catalogManager.resolveCatalogTable(createTableOperation.getCatalogTable());
+
+        if (!TableFactoryUtil.isLegacyConnectorOptions(
+                catalog,
+                tableConfig,
+                isStreamingMode,
+                tableIdentifier,
+                catalogTable,
+                createTableOperation.isTemporary())) {
+            DynamicTableSink dynamicTableSink =
+                    ExecutableOperationUtils.createDynamicTableSink(
+                            catalog,
+                            () -> 
moduleManager.getFactory((Module::getTableSinkFactory)),
+                            tableIdentifier,
+                            catalogTable,
+                            Collections.emptyMap(),
+                            tableConfig,
+                            resourceManager.getUserClassLoader(),
+                            createTableOperation.isTemporary());
+            if (dynamicTableSink instanceof SupportsStaging
+                    && 
tableConfig.get(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED)) {
+                // use atomic ctas
+                SupportsStaging.StagingPurpose stagingPurpose =
+                        createTableOperation.isIgnoreIfExists()
+                                ? 
SupportsStaging.StagingPurpose.CREATE_TABLE_AS_IF_NOT_EXISTS
+                                : 
SupportsStaging.StagingPurpose.CREATE_TABLE_AS;
+                StagedTable stagedTable =
+                        ((SupportsStaging) dynamicTableSink)
+                                .applyStaging(new 
SinkStagingContext(stagingPurpose));
+                CtasJobStatusHook ctasJobStatusHook = new 
CtasJobStatusHook(stagedTable);
+                mapOperations.add(
+                        ctasOperation.toStagedSinkModifyOperation(
+                                createTableOperation.getTableIdentifier(),
+                                catalogTable,
+                                catalog,
+                                dynamicTableSink));
+                jobStatusHookList.add(ctasJobStatusHook);

Review Comment:
   I'm concern about making `jobStatusHookList` as a variable of class and then 
clear the `jobStatusHookList` in `createPipeline`. For me, it's too obscure and 
hard to understand.
   Could it be some thing like:
   ```
   jobStatusHookList = new ArayList();
   // the following method will add jobStatusHook to `jobStatusHookList`
   getOperation(ctasOperation, jobStatusHookList);
   executeInternal(transformations, sinkIdentifierNames, jobStatusHookList );
   ```
   Of course we then need a new method `executeInternal` that accepts 
`jobStatusHookList`.
   



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+
+import java.io.Serializable;
+

Review Comment:
   suggesttion
   ```
   /**
    * The {@link StagedTable} is designed to implement atomic semantic using a 
two-phase commit
    * protocol. The {@link StagedTable} is supposed to be returned via method 
{@link
    * SupportsStaging#applyStaging} by the {@link DynamicTableSink} which 
implements the {@link
    * SupportsStaging} interface.
    *
    * <p>When the Flink job for writing to a {@link DynamicTableSink} is 
CREATED, the {@link
    * StagedTable#begin()} will be called; when the Flink job is FINISHED, the 
{@link
    * StagedTable#commit()} will be called; when the Flink job is FAILED or 
CANCELED, the {@link
    * StagedTable#abort()} will be called;
    *
    * <p>See more in {@link SupportsStaging}.
    */
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestSupportsStagingTableFactory.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.catalog.StagedTable;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.FileUtils;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+/** A factory to create table to support staging for test purpose. */
+public class TestSupportsStagingTableFactory implements 
DynamicTableSinkFactory {
+
+    public static final String IDENTIFIER = "test-staging";
+
+    public static final List<String> JOB_STATUS_CHANGE_PROCESS = new 
LinkedList<>();
+
+    private static final ConfigOption<String> DATA_DIR =
+            ConfigOptions.key("data-dir")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The data id used to write the rows.");
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        helper.validate();
+        String dataDir = helper.getOptions().get(DATA_DIR);
+        return new SupportsStagingTableSink(dataDir);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Collections.singleton(DATA_DIR);
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return Collections.emptySet();
+    }
+
+    /** A sink that supports staging. */
+    private static class SupportsStagingTableSink implements DynamicTableSink, 
SupportsStaging {
+
+        private String dataDir;

Review Comment:
   nit:
   ```
   private finnal String dataDir;
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/StagedTableITCase.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.config.TableConfigOptions;
+import 
org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory;
+import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests staged table in batch mode. */
+public class StagedTableITCase extends BatchTestBase {
+
+    @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+    @Before
+    @Override
+    public void before() throws Exception {
+        super.before();
+        List<Row> sourceData = Arrays.asList(Row.of(1, "ZM"));
+
+        TestCollectionTableFactory.reset();
+        TestCollectionTableFactory.initData(sourceData);
+
+        String sourceDDL = "create table t1(a int, b varchar) with 
('connector' = 'COLLECTION')";
+        tEnv().executeSql(sourceDDL);
+    }
+
+    @Test

Review Comment:
   use `org.junit.jupiter.api.Test`



##########
flink-python/src/main/java/org/apache/flink/table/executor/python/ChainingOptimizingExecutor.java:
##########
@@ -63,6 +66,24 @@ public Pipeline createPipeline(
         return executor.createPipeline(chainedTransformations, configuration, 
defaultJobName);

Review Comment:
   Please don't foget this comment; I mean the method 
   ```
   createPipeline(
               List<Transformation<?>> transformations,
               ReadableConfig configuration,
               String defaultJobName)
   ```
   can just call method 
   ```
   return createPipeline(transformations, configuration, defaultJobName, 
Collections.emptyList());
   ```
   



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -784,11 +795,9 @@ public CompiledPlan compilePlan(List<ModifyOperation> 
operations) {
     public TableResultInternal executeInternal(List<ModifyOperation> 
operations) {
         List<ModifyOperation> mapOperations = new ArrayList<>();
         for (ModifyOperation modify : operations) {
-            // execute CREATE TABLE first for CTAS statements
             if (modify instanceof CreateTableASOperation) {
                 CreateTableASOperation ctasOperation = 
(CreateTableASOperation) modify;
-                executeInternal(ctasOperation.getCreateTableOperation());
-                
mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager));
+                executeCreateTableASOperation(ctasOperation, mapOperations);

Review Comment:
   The method name make me confused. It doesn't really do the execution. Rename 
to some thing like 
   `getOperation(CreateTableASOperation ctasOperation)`? Then add the got 
operation to `mapOperations`?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/StagedSinkModifyOperation.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/**
+ * DML operation that tells to write to a sink.

Review Comment:
   ```suggestion
    * DML operation that tells to write to a sink which implements {@link 
SupportsStaging}. Curerntly. this operation is only for CTAS(CREATE TABLE AS 
SELECT) statement.
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/execution/CtasJobStatusHook.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.execution;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.execution.JobStatusHook;
+import org.apache.flink.table.catalog.StagedTable;
+
+/**
+ * This hook is used to implement atomic semantics for CTAS(CREATE TABLE AS 
SELECT) statement. It'll
+ * call the corresponding interfaces of {@link StagedTable} on job status 
changing.

Review Comment:
   ```suggestion
    * call the corresponding interfaces of the inner {@link StagedTable} on job 
status changing.
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/StagedTableITCase.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.config.TableConfigOptions;
+import 
org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory;
+import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests staged table in batch mode. */
+public class StagedTableITCase extends BatchTestBase {
+
+    @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();

Review Comment:
   The new test should follow Junit5 style:



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/StagedTableITCase.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.config.TableConfigOptions;
+import 
org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory;
+import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests staged table in batch mode. */
+public class StagedTableITCase extends BatchTestBase {
+
+    @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+    @Before
+    @Override
+    public void before() throws Exception {
+        super.before();
+        List<Row> sourceData = Arrays.asList(Row.of(1, "ZM"));
+
+        TestCollectionTableFactory.reset();
+        TestCollectionTableFactory.initData(sourceData);
+
+        String sourceDDL = "create table t1(a int, b varchar) with 
('connector' = 'COLLECTION')";
+        tEnv().executeSql(sourceDDL);
+    }
+
+    @Test
+    public void testStagedTableWithAtomicCtas() throws Exception {

Review Comment:
   I think the test can still be improved in the following aspects:
   - IIUC, if `TABLE_CTAS_ATOMICITY_ENABLED` is true, the table won't be 
created by framewrok it self. So, please make sure the table hasn't been 
created in your test
   - please test the case if `TABLE_CTAS_ATOMICITY_ENABLED` is false which 
should simiar to non-atomic ctas
   - please test `abort` method, you can control wheter fail while writing data 
with a config , and try to make it fail so that abort method should be called 
   



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/StagedSinkModifyOperation.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/**
+ * DML operation that tells to write to a sink.
+ *
+ * <p>The sink is described by {@link #getContextResolvedTable()}, and in 
general is used for every
+ * sink which implementation is defined with {@link DynamicTableSink}.
+ *
+ * <p>StagedSinkModifyOperation is an extension of SinkModifyOperation to the 
atomic CTAS scenario.

Review Comment:
   suggestion:
   ```
   * <p>StagedSinkModifyOperation is an extension of SinkModifyOperation in the 
atomic CTAS scenario.
   *  Whiling checking whether the corresponding sink support atomic CTAS or 
not, we will need to get  DynamicTableSink firstly and check whether it 
implements {@link SupportsStaging} and then call  the method {@link 
SupportsStaging#applyStaging}.  We maintain the DynamicTableSink in this  
operation so that we can reuse this DynamicTableSink instead of creating a new 
DynamicTableSink again which is error-prone.
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/StagedTableITCase.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.config.TableConfigOptions;
+import 
org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory;
+import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests staged table in batch mode. */
+public class StagedTableITCase extends BatchTestBase {
+
+    @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+    @Before
+    @Override
+    public void before() throws Exception {
+        super.before();
+        List<Row> sourceData = Arrays.asList(Row.of(1, "ZM"));
+
+        TestCollectionTableFactory.reset();
+        TestCollectionTableFactory.initData(sourceData);
+
+        String sourceDDL = "create table t1(a int, b varchar) with 
('connector' = 'COLLECTION')";
+        tEnv().executeSql(sourceDDL);
+    }
+
+    @Test
+    public void testStagedTableWithAtomicCtas() throws Exception {

Review Comment:
   ```suggestion
       void testStagedTableWithAtomicCtas() throws Exception {
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/StagedTableITCase.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.config.TableConfigOptions;
+import 
org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory;
+import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests staged table in batch mode. */
+public class StagedTableITCase extends BatchTestBase {
+
+    @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+    @Before

Review Comment:
   ```suggestion
       @BeforeEach
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsStaging.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+import org.apache.flink.table.catalog.StagedTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+/**
+ * Enables different staged operations to ensure atomicity in a {@link 
DynamicTableSink}.
+ *
+ * <p>By default, if this interface is not implemented, indicating that atomic 
operations are not
+ * supported, then a non-atomic implementation is used.
+ */
+@PublicEvolving
+public interface SupportsStaging {
+
+    /**
+     * Provides a {@link StagedTable} that provided transaction abstraction. 
StagedTable will be
+     * combined with {@link JobStatusHook} to achieve atomicity support in the 
Flink framework. Call
+     * the relevant API of StagedTable when the Job state is switched.
+     *
+     * <p>This method will be called at the compile stage.
+     *
+     * @param context Tell DynamicTableSink, the operation type of this 
StagedTable, expandable
+     * @return {@link StagedTable} that can be serialized and provides atomic 
operations
+     */
+    StagedTable applyStaging(StagingContext context);
+
+    /**
+     * The context is intended to tell DynamicTableSink the type of this 
operation. In this way,
+     * DynamicTableSink can return the corresponding implementation of 
StagedTable according to the
+     * specific operation. More types of operations can be extended in the 
future.
+     */
+    @PublicEvolving
+    interface StagingContext {
+        StagingPurpose getStagingPurpose();
+    }
+
+    /**
+     * The type of StagedTable final visibility that was expects for staging 
purpose.

Review Comment:
   please don't ignore my comment in here.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/StagedTableITCase.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.config.TableConfigOptions;
+import 
org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory;
+import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests staged table in batch mode. */
+public class StagedTableITCase extends BatchTestBase {
+
+    @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();

Review Comment:
   @TempDir Path temporaryFolder;
   you can find example in the codebase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to