luoyuxia commented on code in PR #22839:
URL: https://github.com/apache/flink/pull/22839#discussion_r1241151298


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -774,11 +783,56 @@ public CompiledPlan compilePlan(List<ModifyOperation> 
operations) {
     public TableResultInternal executeInternal(List<ModifyOperation> 
operations) {
         List<ModifyOperation> mapOperations = new ArrayList<>();
         for (ModifyOperation modify : operations) {
-            // execute CREATE TABLE first for CTAS statements
             if (modify instanceof CreateTableASOperation) {
                 CreateTableASOperation ctasOperation = 
(CreateTableASOperation) modify;
-                executeInternal(ctasOperation.getCreateTableOperation());
-                
mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager));
+                CreateTableOperation createTableOperation = 
ctasOperation.getCreateTableOperation();

Review Comment:
   I'm wondering can these code for ctas be extracted to a method for now it 
looks to be  tedious to me.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/execution/CtasJobStatusHook.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.execution;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.execution.JobStatusHook;
+import org.apache.flink.table.catalog.StagedTable;
+
+/**

Review Comment:
   nit:
   ```
   /**
    * This hook is used to implement atomic semantics for CTAS(CREATE TABLE AS 
SELECT) statement. It'll
    * call the corresponding interfaces of {@link StagedTable} on job status 
changing.
    */
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -774,11 +783,56 @@ public CompiledPlan compilePlan(List<ModifyOperation> 
operations) {
     public TableResultInternal executeInternal(List<ModifyOperation> 
operations) {
         List<ModifyOperation> mapOperations = new ArrayList<>();
         for (ModifyOperation modify : operations) {
-            // execute CREATE TABLE first for CTAS statements
             if (modify instanceof CreateTableASOperation) {
                 CreateTableASOperation ctasOperation = 
(CreateTableASOperation) modify;
-                executeInternal(ctasOperation.getCreateTableOperation());
-                
mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager));
+                CreateTableOperation createTableOperation = 
ctasOperation.getCreateTableOperation();
+                ObjectIdentifier tableIdentifier = 
createTableOperation.getTableIdentifier();
+                Catalog catalog = 
catalogManager.getCatalog(tableIdentifier.getCatalogName()).get();
+                ResolvedCatalogTable catalogTable =
+                        
catalogManager.resolveCatalogTable(createTableOperation.getCatalogTable());
+
+                Optional<DynamicTableSink> dynamicTableSinkOptional =
+                        TableFactoryUtil.getDynamicTableSink(
+                                catalogTable,
+                                tableIdentifier,
+                                createTableOperation.isTemporary(),
+                                catalog,
+                                moduleManager,
+                                tableConfig,
+                                isStreamingMode,
+                                resourceManager.getUserClassLoader());
+                if (dynamicTableSinkOptional.isPresent()
+                        && dynamicTableSinkOptional.get() instanceof 
SupportsStaging
+                        && 
tableConfig.get(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED)) {
+                    DynamicTableSink dynamicTableSink = 
dynamicTableSinkOptional.get();
+                    StagedTable stagedTable =
+                            ((SupportsStaging) dynamicTableSink)
+                                    .applyStaging(
+                                            new 
SupportsStaging.StagingContext() {

Review Comment:
   I'm not a fans for lambda function. I think we can create a calss name 
`SinkStagingContext` just like `SinkRuntimeProviderContext`.
   



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Executor.java:
##########
@@ -61,6 +62,21 @@ Pipeline createPipeline(
             ReadableConfig tableConfiguration,
             @Nullable String defaultJobName);
 
+    /**
+     * Translates the given transformations to a {@link Pipeline}.

Review Comment:
   nit:
   ```suggestion
        * Translates the given transformations with a list of {@link 
JobStatusHook}s to a {@link Pipeline} .
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsStaging.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+import org.apache.flink.table.catalog.StagedTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+/**

Review Comment:
   Please elaborate the interface in the java doc as it's the core interface  
such as what the interface is for and when this interface will be consider. And 
when the interface is considered,  which method will be called.
   A good doc make it easy to understand the mechanism and convenient for 
connector to integrate



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsStaging.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+import org.apache.flink.table.catalog.StagedTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+/**
+ * Enables different staged operations to ensure atomicity in a {@link 
DynamicTableSink}.
+ *
+ * <p>By default, if this interface is not implemented, indicating that atomic 
operations are not
+ * supported, then a non-atomic implementation is used.
+ */
+@PublicEvolving
+public interface SupportsStaging {
+
+    /**
+     * Provides a {@link StagedTable} that provided transaction abstraction. 
StagedTable will be
+     * combined with {@link JobStatusHook} to achieve atomicity support in the 
Flink framework. Call
+     * the relevant API of StagedTable when the Job state is switched.
+     *
+     * <p>This method will be called at the compile stage.
+     *
+     * @param context Tell DynamicTableSink, the operation type of this 
StagedTable, expandable
+     * @return {@link StagedTable} that can be serialized and provides atomic 
operations
+     */
+    StagedTable applyStaging(StagingContext context);
+
+    /**

Review Comment:
   nit:
   ```
   /**
        * The context is intended to tell DynamicTableSink the type of this 
operation. Currently, it'll
        * provide what kind of operation the staging sink is for. In this way, 
the DynamicTableSink can
        * return the corresponding implementation of StagedTable according to 
the specific operation.
        * More types of operations can be extended in the future.
        */
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+
+import java.io.Serializable;
+
+/**
+ * A {@link StagedTable} for atomic semantics using a two-phase commit 
protocol, combined with
+ * {@link JobStatusHook} for atomic CTAS. {@link StagedTable} will be a member 
variable of
+ * CtasJobStatusHook and can be serialized;
+ *
+ * <p>CtasJobStatusHook#onCreated will call the begin method of StagedTable;
+ * CtasJobStatusHook#onFinished will call the commit method of StagedTable;
+ * CtasJobStatusHook#onFailed and CtasJobStatusHook#onCanceled will call the 
abort method of
+ * StagedTable;
+ */
+@PublicEvolving
+public interface StagedTable extends Serializable {
+
+    /**
+     * This method will be called when the job is started. Similar to what it 
means to open a
+     * transaction in a relational database; In Flink's atomic CTAS scenario, 
it is used to do some
+     * initialization work; For example, initializing the client of the 
underlying service, the tmp
+     * path of the underlying storage, or even call the start transaction API 
of the underlying
+     * service, etc.
+     */
+    void begin();
+
+    /**
+     * This method will be called when the job is succeeds. Similar to what it 
means to commit the
+     * transaction in a relational database; In Flink's atomic CTAS scenario, 
it is used to do some
+     * data visibility related work; For example, moving the underlying data 
to the target
+     * directory, writing buffer data to the underlying storage service, or 
even call the commit
+     * transaction API of the underlying service, etc.
+     */
+    void commit();
+
+    /**
+     * This method will be called when the job is failed or canceled. Similar 
to what it means to

Review Comment:
   Suggestion:
   ```
   /**
        * This method will be called when the job is failed or canceled. In 
Flink's atomic CTAS
        * scenario, it is expected to do some cleaning work for a writing; For 
example, delete the data
        * in tmp directory, delete the temporary data in the underlying storage 
service, or even call
        * the rollback transaction API of the underlying service, etc.
        */
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/StagedTableITCase.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.config.TableConfigOptions;
+import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests staged table in a table environment. */
+public class StagedTableITCase extends BatchTestBase {
+
+    @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+    @Before
+    @Override
+    public void before() throws Exception {
+        super.before();
+        List<Row> sourceData = Arrays.asList(Row.of(1, "ZM"));
+
+        TestCollectionTableFactory.reset();
+        TestCollectionTableFactory.initData(sourceData);
+
+        String sourceDDL = "create table t1(a int, b varchar) with 
('connector' = 'COLLECTION')";
+        tEnv().executeSql(sourceDDL);
+    }
+
+    @Test
+    public void testStagedTableWithAtomicCtas() throws Exception {

Review Comment:
   How can you make sure the data is invisible before commiting  in this test? 



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+
+import java.io.Serializable;
+
+/**
+ * A {@link StagedTable} for atomic semantics using a two-phase commit 
protocol, combined with
+ * {@link JobStatusHook} for atomic CTAS. {@link StagedTable} will be a member 
variable of

Review Comment:
   Also, please elaborate the interface in the java doc as it's the core 
interface such as what the interface is for and who should return it And when 
the interface is considered, which method will be called in which condition.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/execution/CtasJobStatusHook.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.execution;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.execution.JobStatusHook;
+import org.apache.flink.table.catalog.StagedTable;
+
+/**

Review Comment:
   Another minor comment on the java doc of `JobStatusHook`.
   ```Hooks provided by users on job status changing```,
   but the hook now shouldn't be provided by users since it has marked as 
internal.
   Change it to `Hooks on job status changing`? 
   That means we don't intend users to use it, althogh users can use it anyway.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Executor.java:
##########
@@ -61,6 +62,21 @@ Pipeline createPipeline(
             ReadableConfig tableConfiguration,
             @Nullable String defaultJobName);
 
+    /**
+     * Translates the given transformations to a {@link Pipeline}.
+     *
+     * @param transformations list of transformations
+     * @param tableConfiguration table-specific configuration options
+     * @param defaultJobName default job name if not specified via {@link 
PipelineOptions#NAME}
+     * @param jobStatusHookList list of JobStatusHooks

Review Comment:
   nit:
   ```suggestion
        * @param jobStatusHookList list of {@link JobStatusHook}s
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java:
##########
@@ -164,4 +171,54 @@ public static boolean isLegacyConnectorOptions(
             }
         }
     }
+
+    /**
+     * Create a DynamicTableSink for ResolvedCatalogTable using table factory 
associated with the
+     * catalog.
+     */
+    public static Optional<DynamicTableSink> getDynamicTableSink(

Review Comment:
   `TableFactoryUtil` is not suitable to put this method. I also need this util 
method whiling supporting to truncate table in my 
[pr](https://github.com/apache/flink/pull/22696/files#diff-b4a2c05dbc05190ed95ee36ccae3084356dea84a1c316bf89254e6217471d9d7R47).
   Would you mind rebase after my pr is merged?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/StagedSinkModifyOperation.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/**

Review Comment:
   nit:
   ```
   /**
    * DML operation that tells to write to a sink which implements {@link 
SupportsStaging}.
    *
    * <p>The sink is described by {@link #getContextResolvedTable()}, and in 
general is used for every
    * sink which implementation is defined with {@link DynamicTableSink}.
    *
    * <p>StagedSinkModifyOperation is an extension of SinkModifyOperation in 
the atomic CTAS scenario.
    * Whiling checking whether the corresponding sink support atomic CTAS or 
not, we will need to get
    * DynamicTableSink firstly and check whether it implements {@link 
SupportsStaging} and then call
    * the method {@link SupportsStaging#applyStaging}. We maintain the 
DynamicTableSink in this
    * operation so that we can reuse this DynamicTableSink instead of creating 
a new DynamicTableSink
    * which is error-prone.
    */
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultExecutor.java:
##########
@@ -85,6 +86,21 @@ public Pipeline createPipeline(
         return streamGraph;
     }
 
+    @Override
+    public Pipeline createPipeline(
+            List<Transformation<?>> transformations,
+            ReadableConfig tableConfiguration,
+            @Nullable String defaultJobName,
+            List<JobStatusHook> jobStatusHookList) {
+        StreamGraph streamGraph =
+                (StreamGraph) createPipeline(transformations, 
tableConfiguration, defaultJobName);
+        for (JobStatusHook hook : jobStatusHookList) {
+            streamGraph.registerJobStatusHook(hook);
+        }
+        jobStatusHookList.clear();

Review Comment:
   Why clear the `jobStatusHookList`?



##########
flink-python/src/main/java/org/apache/flink/table/executor/python/ChainingOptimizingExecutor.java:
##########
@@ -63,6 +66,24 @@ public Pipeline createPipeline(
         return executor.createPipeline(chainedTransformations, configuration, 
defaultJobName);

Review Comment:
   this method can be simplied to 
   ```return createPipeline(transformations, configuration, defaultJobName, 
Collections.emptyList());```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsStaging.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+import org.apache.flink.table.catalog.StagedTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+/**
+ * Enables different staged operations to ensure atomicity in a {@link 
DynamicTableSink}.
+ *
+ * <p>By default, if this interface is not implemented, indicating that atomic 
operations are not
+ * supported, then a non-atomic implementation is used.
+ */
+@PublicEvolving
+public interface SupportsStaging {
+
+    /**
+     * Provides a {@link StagedTable} that provided transaction abstraction. 
StagedTable will be

Review Comment:
   Suggestion:
   ```
   /**
        * Provides a {@link StagingContext} for the sink modification and 
return a {@link StagedTable}.
        * The {@link StagedTable} provides transaction abstraction to support 
atomicity for CTAS. Flink
        * will call the relevant API of StagedTable when the Job status 
switches,
        *
        * <p>Note: This method will be called at the compile stage.
        *
        * @param context The context for the sink modification
        * @return {@link StagedTable} that will be leveraged by Flink framework 
to provide atomicity
        *     semantics.
        */
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsStaging.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+import org.apache.flink.table.catalog.StagedTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+/**
+ * Enables different staged operations to ensure atomicity in a {@link 
DynamicTableSink}.
+ *
+ * <p>By default, if this interface is not implemented, indicating that atomic 
operations are not
+ * supported, then a non-atomic implementation is used.
+ */
+@PublicEvolving
+public interface SupportsStaging {
+
+    /**
+     * Provides a {@link StagedTable} that provided transaction abstraction. 
StagedTable will be
+     * combined with {@link JobStatusHook} to achieve atomicity support in the 
Flink framework. Call
+     * the relevant API of StagedTable when the Job state is switched.
+     *
+     * <p>This method will be called at the compile stage.
+     *
+     * @param context Tell DynamicTableSink, the operation type of this 
StagedTable, expandable
+     * @return {@link StagedTable} that can be serialized and provides atomic 
operations
+     */
+    StagedTable applyStaging(StagingContext context);
+
+    /**
+     * The context is intended to tell DynamicTableSink the type of this 
operation. In this way,
+     * DynamicTableSink can return the corresponding implementation of 
StagedTable according to the
+     * specific operation. More types of operations can be extended in the 
future.
+     */
+    @PublicEvolving
+    interface StagingContext {
+        StagingPurpose getStagingPurpose();
+    }
+
+    /**
+     * The type of StagedTable final visibility that was expects for staging 
purpose.

Review Comment:
   suggestion:
   ```
   /**
        * The type of operation the staging sink is for.
        *
        * <p>Currently, two types of operation are supported:
        *
        * <ul>
        *   <li>CREATE_TABLE_AS - for the operation of CREATE TABLE AS SELECT 
statement
        *   <li>CREATE_TABLE_AS_IF_NOT_EXISTS - for the operation of CREATE 
TABLE AS SELECT IF NOT
        *       EXISTS statement.
        * </ul>
        */
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+
+import java.io.Serializable;
+
+/**
+ * A {@link StagedTable} for atomic semantics using a two-phase commit 
protocol, combined with
+ * {@link JobStatusHook} for atomic CTAS. {@link StagedTable} will be a member 
variable of
+ * CtasJobStatusHook and can be serialized;
+ *
+ * <p>CtasJobStatusHook#onCreated will call the begin method of StagedTable;
+ * CtasJobStatusHook#onFinished will call the commit method of StagedTable;
+ * CtasJobStatusHook#onFailed and CtasJobStatusHook#onCanceled will call the 
abort method of
+ * StagedTable;
+ */
+@PublicEvolving
+public interface StagedTable extends Serializable {
+
+    /**
+     * This method will be called when the job is started. Similar to what it 
means to open a
+     * transaction in a relational database; In Flink's atomic CTAS scenario, 
it is used to do some
+     * initialization work; For example, initializing the client of the 
underlying service, the tmp
+     * path of the underlying storage, or even call the start transaction API 
of the underlying
+     * service, etc.
+     */
+    void begin();
+
+    /**

Review Comment:
   Suggestion:
   ```
   /**
        * This method will be called when the job is succeeds. In Flink's 
atomic CTAS scenario, it is
        * expected to do some commit work. For example, moving the underlying 
data to the target
        * directory to make it visible, writing buffer data to the underlying 
storage service, or even
        * call the commit transaction API of the underlying service, etc.
        */
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+
+import java.io.Serializable;
+
+/**
+ * A {@link StagedTable} for atomic semantics using a two-phase commit 
protocol, combined with
+ * {@link JobStatusHook} for atomic CTAS. {@link StagedTable} will be a member 
variable of
+ * CtasJobStatusHook and can be serialized;
+ *
+ * <p>CtasJobStatusHook#onCreated will call the begin method of StagedTable;
+ * CtasJobStatusHook#onFinished will call the commit method of StagedTable;
+ * CtasJobStatusHook#onFailed and CtasJobStatusHook#onCanceled will call the 
abort method of
+ * StagedTable;
+ */
+@PublicEvolving
+public interface StagedTable extends Serializable {
+
+    /**

Review Comment:
   Suggestion:
   ```
    /**
        * This method will be called when the job is started. In Flink's atomic 
CTAS scenario, it is
        * expected to do initialization work; For example, initializing the 
client of the underlying
        * service, the tmp path of the underlying storage, or even call the 
start transaction API of
        * the underlying service, etc.
        */
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+
+import java.io.Serializable;
+
+/**
+ * A {@link StagedTable} for atomic semantics using a two-phase commit 
protocol, combined with
+ * {@link JobStatusHook} for atomic CTAS. {@link StagedTable} will be a member 
variable of

Review Comment:
   Please don't refer `JobStatusHook` in here since it's not a public api. Talk 
about `JobStatusHook` will make devs confused.
   



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/StagedTableITCase.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.config.TableConfigOptions;
+import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests staged table in a table environment. */
+public class StagedTableITCase extends BatchTestBase {

Review Comment:
   Please test it in both batch and stream mode.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestSupportsStagingTableFactory.java:
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.catalog.StagedTable;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.FileUtils;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.Set;
+
+/** A factory to create table to support staging for test purpose. */
+public class TestSupportsStagingTableFactory implements 
DynamicTableSinkFactory {
+
+    public static final String IDENTIFIER = "test-staging";
+
+    private static final ConfigOption<String> DATA_DIR =
+            ConfigOptions.key("data-dir")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The data id used to write the rows.");
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        helper.validate();
+        String dataDir = helper.getOptions().get(DATA_DIR);
+        return new SupportsStagingTableSink(dataDir);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Collections.singleton(DATA_DIR);
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return Collections.emptySet();
+    }
+
+    /** A sink that supports staging. */
+    private static class SupportsStagingTableSink implements DynamicTableSink, 
SupportsStaging {
+
+        private String dataDir;
+        private TestStagedTable stagedTable;
+
+        public SupportsStagingTableSink(String dataDir) {
+            this(dataDir, null);
+        }
+
+        public SupportsStagingTableSink(String dataDir, TestStagedTable 
stagedTable) {
+            this.dataDir = dataDir;
+            this.stagedTable = stagedTable;
+        }
+
+        @Override
+        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+            return ChangelogMode.insertOnly();
+        }
+
+        @Override
+        public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+            return new DataStreamSinkProvider() {
+                @Override
+                public DataStreamSink<?> consumeDataStream(
+                        ProviderContext providerContext, DataStream<RowData> 
dataStream) {
+                    if (stagedTable != null) {
+                        return dataStream
+                                .addSink(new StagedSinkFunction(dataDir))
+                                .setParallelism(1);
+                    } else {
+                        // otherwise, do nothing
+                        return dataStream.addSink(new DiscardingSink<>());
+                    }
+                }
+            };
+        }
+
+        @Override
+        public DynamicTableSink copy() {
+            return new SupportsStagingTableSink(dataDir, stagedTable);
+        }
+
+        @Override
+        public String asSummaryString() {
+            return "SupportsStagingTableSink";
+        }
+
+        @Override
+        public StagedTable applyStaging(StagingContext context) {
+            stagedTable = new TestStagedTable();
+            return stagedTable;
+        }
+    }
+
+    /** The sink for delete existing data. */
+    private static class StagedSinkFunction extends RichSinkFunction<RowData> {
+
+        private String dataDir;
+
+        public StagedSinkFunction(String dataDir) {
+            this.dataDir = dataDir;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            super.open(parameters);
+            File parentDir = new File(dataDir);
+            if (parentDir.exists()) {
+                parentDir.delete();
+            }
+            parentDir.mkdirs();
+            new File(dataDir, "data").createNewFile();
+        }
+
+        @Override
+        public void invoke(RowData value, Context context) throws Exception {
+            FileUtils.writeFileUtf8(
+                    new File(dataDir, "data"), value.getInt(0) + "," + 
value.getString(1));
+        }
+    }
+
+    /** A StagedTable for test. */
+    private static class TestStagedTable implements StagedTable {
+
+        @Override
+        public void begin() {}

Review Comment:
   If we do nothing in these method, we can never know these method will be 
really called when job begin, finish, and abort.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+
+import java.io.Serializable;
+
+/**
+ * A {@link StagedTable} for atomic semantics using a two-phase commit 
protocol, combined with
+ * {@link JobStatusHook} for atomic CTAS. {@link StagedTable} will be a member 
variable of
+ * CtasJobStatusHook and can be serialized;
+ *
+ * <p>CtasJobStatusHook#onCreated will call the begin method of StagedTable;
+ * CtasJobStatusHook#onFinished will call the commit method of StagedTable;
+ * CtasJobStatusHook#onFailed and CtasJobStatusHook#onCanceled will call the 
abort method of
+ * StagedTable;
+ */
+@PublicEvolving
+public interface StagedTable extends Serializable {
+
+    /**

Review Comment:
   No need to refer `relational database`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to