luoyuxia commented on code in PR #22839:
URL: https://github.com/apache/flink/pull/22839#discussion_r1241151298
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -774,11 +783,56 @@ public CompiledPlan compilePlan(List<ModifyOperation>
operations) {
public TableResultInternal executeInternal(List<ModifyOperation>
operations) {
List<ModifyOperation> mapOperations = new ArrayList<>();
for (ModifyOperation modify : operations) {
- // execute CREATE TABLE first for CTAS statements
if (modify instanceof CreateTableASOperation) {
CreateTableASOperation ctasOperation =
(CreateTableASOperation) modify;
- executeInternal(ctasOperation.getCreateTableOperation());
-
mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager));
+ CreateTableOperation createTableOperation =
ctasOperation.getCreateTableOperation();
Review Comment:
I'm wondering can these code for ctas be extracted to a method for now it
looks to be tedious to me.
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/execution/CtasJobStatusHook.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.execution;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.execution.JobStatusHook;
+import org.apache.flink.table.catalog.StagedTable;
+
+/**
Review Comment:
nit:
```
/**
* This hook is used to implement atomic semantics for CTAS(CREATE TABLE AS
SELECT) statement. It'll
* call the corresponding interfaces of {@link StagedTable} on job status
changing.
*/
```
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -774,11 +783,56 @@ public CompiledPlan compilePlan(List<ModifyOperation>
operations) {
public TableResultInternal executeInternal(List<ModifyOperation>
operations) {
List<ModifyOperation> mapOperations = new ArrayList<>();
for (ModifyOperation modify : operations) {
- // execute CREATE TABLE first for CTAS statements
if (modify instanceof CreateTableASOperation) {
CreateTableASOperation ctasOperation =
(CreateTableASOperation) modify;
- executeInternal(ctasOperation.getCreateTableOperation());
-
mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager));
+ CreateTableOperation createTableOperation =
ctasOperation.getCreateTableOperation();
+ ObjectIdentifier tableIdentifier =
createTableOperation.getTableIdentifier();
+ Catalog catalog =
catalogManager.getCatalog(tableIdentifier.getCatalogName()).get();
+ ResolvedCatalogTable catalogTable =
+
catalogManager.resolveCatalogTable(createTableOperation.getCatalogTable());
+
+ Optional<DynamicTableSink> dynamicTableSinkOptional =
+ TableFactoryUtil.getDynamicTableSink(
+ catalogTable,
+ tableIdentifier,
+ createTableOperation.isTemporary(),
+ catalog,
+ moduleManager,
+ tableConfig,
+ isStreamingMode,
+ resourceManager.getUserClassLoader());
+ if (dynamicTableSinkOptional.isPresent()
+ && dynamicTableSinkOptional.get() instanceof
SupportsStaging
+ &&
tableConfig.get(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED)) {
+ DynamicTableSink dynamicTableSink =
dynamicTableSinkOptional.get();
+ StagedTable stagedTable =
+ ((SupportsStaging) dynamicTableSink)
+ .applyStaging(
+ new
SupportsStaging.StagingContext() {
Review Comment:
I'm not a fans for lambda function. I think we can create a calss name
`SinkStagingContext` just like `SinkRuntimeProviderContext`.
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Executor.java:
##########
@@ -61,6 +62,21 @@ Pipeline createPipeline(
ReadableConfig tableConfiguration,
@Nullable String defaultJobName);
+ /**
+ * Translates the given transformations to a {@link Pipeline}.
Review Comment:
nit:
```suggestion
* Translates the given transformations with a list of {@link
JobStatusHook}s to a {@link Pipeline} .
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsStaging.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+import org.apache.flink.table.catalog.StagedTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+/**
Review Comment:
Please elaborate the interface in the java doc as it's the core interface
such as what the interface is for and when this interface will be consider. And
when the interface is considered, which method will be called.
A good doc make it easy to understand the mechanism and convenient for
connector to integrate
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsStaging.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+import org.apache.flink.table.catalog.StagedTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+/**
+ * Enables different staged operations to ensure atomicity in a {@link
DynamicTableSink}.
+ *
+ * <p>By default, if this interface is not implemented, indicating that atomic
operations are not
+ * supported, then a non-atomic implementation is used.
+ */
+@PublicEvolving
+public interface SupportsStaging {
+
+ /**
+ * Provides a {@link StagedTable} that provided transaction abstraction.
StagedTable will be
+ * combined with {@link JobStatusHook} to achieve atomicity support in the
Flink framework. Call
+ * the relevant API of StagedTable when the Job state is switched.
+ *
+ * <p>This method will be called at the compile stage.
+ *
+ * @param context Tell DynamicTableSink, the operation type of this
StagedTable, expandable
+ * @return {@link StagedTable} that can be serialized and provides atomic
operations
+ */
+ StagedTable applyStaging(StagingContext context);
+
+ /**
Review Comment:
nit:
```
/**
* The context is intended to tell DynamicTableSink the type of this
operation. Currently, it'll
* provide what kind of operation the staging sink is for. In this way,
the DynamicTableSink can
* return the corresponding implementation of StagedTable according to
the specific operation.
* More types of operations can be extended in the future.
*/
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+
+import java.io.Serializable;
+
+/**
+ * A {@link StagedTable} for atomic semantics using a two-phase commit
protocol, combined with
+ * {@link JobStatusHook} for atomic CTAS. {@link StagedTable} will be a member
variable of
+ * CtasJobStatusHook and can be serialized;
+ *
+ * <p>CtasJobStatusHook#onCreated will call the begin method of StagedTable;
+ * CtasJobStatusHook#onFinished will call the commit method of StagedTable;
+ * CtasJobStatusHook#onFailed and CtasJobStatusHook#onCanceled will call the
abort method of
+ * StagedTable;
+ */
+@PublicEvolving
+public interface StagedTable extends Serializable {
+
+ /**
+ * This method will be called when the job is started. Similar to what it
means to open a
+ * transaction in a relational database; In Flink's atomic CTAS scenario,
it is used to do some
+ * initialization work; For example, initializing the client of the
underlying service, the tmp
+ * path of the underlying storage, or even call the start transaction API
of the underlying
+ * service, etc.
+ */
+ void begin();
+
+ /**
+ * This method will be called when the job is succeeds. Similar to what it
means to commit the
+ * transaction in a relational database; In Flink's atomic CTAS scenario,
it is used to do some
+ * data visibility related work; For example, moving the underlying data
to the target
+ * directory, writing buffer data to the underlying storage service, or
even call the commit
+ * transaction API of the underlying service, etc.
+ */
+ void commit();
+
+ /**
+ * This method will be called when the job is failed or canceled. Similar
to what it means to
Review Comment:
Suggestion:
```
/**
* This method will be called when the job is failed or canceled. In
Flink's atomic CTAS
* scenario, it is expected to do some cleaning work for a writing; For
example, delete the data
* in tmp directory, delete the temporary data in the underlying storage
service, or even call
* the rollback transaction API of the underlying service, etc.
*/
```
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/StagedTableITCase.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.config.TableConfigOptions;
+import
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests staged table in a table environment. */
+public class StagedTableITCase extends BatchTestBase {
+
+ @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ @Override
+ public void before() throws Exception {
+ super.before();
+ List<Row> sourceData = Arrays.asList(Row.of(1, "ZM"));
+
+ TestCollectionTableFactory.reset();
+ TestCollectionTableFactory.initData(sourceData);
+
+ String sourceDDL = "create table t1(a int, b varchar) with
('connector' = 'COLLECTION')";
+ tEnv().executeSql(sourceDDL);
+ }
+
+ @Test
+ public void testStagedTableWithAtomicCtas() throws Exception {
Review Comment:
How can you make sure the data is invisible before commiting in this test?
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+
+import java.io.Serializable;
+
+/**
+ * A {@link StagedTable} for atomic semantics using a two-phase commit
protocol, combined with
+ * {@link JobStatusHook} for atomic CTAS. {@link StagedTable} will be a member
variable of
Review Comment:
Also, please elaborate the interface in the java doc as it's the core
interface such as what the interface is for and who should return it And when
the interface is considered, which method will be called in which condition.
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/execution/CtasJobStatusHook.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.execution;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.execution.JobStatusHook;
+import org.apache.flink.table.catalog.StagedTable;
+
+/**
Review Comment:
Another minor comment on the java doc of `JobStatusHook`.
```Hooks provided by users on job status changing```,
but the hook now shouldn't be provided by users since it has marked as
internal.
Change it to `Hooks on job status changing`?
That means we don't intend users to use it, althogh users can use it anyway.
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Executor.java:
##########
@@ -61,6 +62,21 @@ Pipeline createPipeline(
ReadableConfig tableConfiguration,
@Nullable String defaultJobName);
+ /**
+ * Translates the given transformations to a {@link Pipeline}.
+ *
+ * @param transformations list of transformations
+ * @param tableConfiguration table-specific configuration options
+ * @param defaultJobName default job name if not specified via {@link
PipelineOptions#NAME}
+ * @param jobStatusHookList list of JobStatusHooks
Review Comment:
nit:
```suggestion
* @param jobStatusHookList list of {@link JobStatusHook}s
```
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java:
##########
@@ -164,4 +171,54 @@ public static boolean isLegacyConnectorOptions(
}
}
}
+
+ /**
+ * Create a DynamicTableSink for ResolvedCatalogTable using table factory
associated with the
+ * catalog.
+ */
+ public static Optional<DynamicTableSink> getDynamicTableSink(
Review Comment:
`TableFactoryUtil` is not suitable to put this method. I also need this util
method whiling supporting to truncate table in my
[pr](https://github.com/apache/flink/pull/22696/files#diff-b4a2c05dbc05190ed95ee36ccae3084356dea84a1c316bf89254e6217471d9d7R47).
Would you mind rebase after my pr is merged?
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/StagedSinkModifyOperation.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/**
Review Comment:
nit:
```
/**
* DML operation that tells to write to a sink which implements {@link
SupportsStaging}.
*
* <p>The sink is described by {@link #getContextResolvedTable()}, and in
general is used for every
* sink which implementation is defined with {@link DynamicTableSink}.
*
* <p>StagedSinkModifyOperation is an extension of SinkModifyOperation in
the atomic CTAS scenario.
* Whiling checking whether the corresponding sink support atomic CTAS or
not, we will need to get
* DynamicTableSink firstly and check whether it implements {@link
SupportsStaging} and then call
* the method {@link SupportsStaging#applyStaging}. We maintain the
DynamicTableSink in this
* operation so that we can reuse this DynamicTableSink instead of creating
a new DynamicTableSink
* which is error-prone.
*/
```
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultExecutor.java:
##########
@@ -85,6 +86,21 @@ public Pipeline createPipeline(
return streamGraph;
}
+ @Override
+ public Pipeline createPipeline(
+ List<Transformation<?>> transformations,
+ ReadableConfig tableConfiguration,
+ @Nullable String defaultJobName,
+ List<JobStatusHook> jobStatusHookList) {
+ StreamGraph streamGraph =
+ (StreamGraph) createPipeline(transformations,
tableConfiguration, defaultJobName);
+ for (JobStatusHook hook : jobStatusHookList) {
+ streamGraph.registerJobStatusHook(hook);
+ }
+ jobStatusHookList.clear();
Review Comment:
Why clear the `jobStatusHookList`?
##########
flink-python/src/main/java/org/apache/flink/table/executor/python/ChainingOptimizingExecutor.java:
##########
@@ -63,6 +66,24 @@ public Pipeline createPipeline(
return executor.createPipeline(chainedTransformations, configuration,
defaultJobName);
Review Comment:
this method can be simplied to
```return createPipeline(transformations, configuration, defaultJobName,
Collections.emptyList());```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsStaging.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+import org.apache.flink.table.catalog.StagedTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+/**
+ * Enables different staged operations to ensure atomicity in a {@link
DynamicTableSink}.
+ *
+ * <p>By default, if this interface is not implemented, indicating that atomic
operations are not
+ * supported, then a non-atomic implementation is used.
+ */
+@PublicEvolving
+public interface SupportsStaging {
+
+ /**
+ * Provides a {@link StagedTable} that provided transaction abstraction.
StagedTable will be
Review Comment:
Suggestion:
```
/**
* Provides a {@link StagingContext} for the sink modification and
return a {@link StagedTable}.
* The {@link StagedTable} provides transaction abstraction to support
atomicity for CTAS. Flink
* will call the relevant API of StagedTable when the Job status
switches,
*
* <p>Note: This method will be called at the compile stage.
*
* @param context The context for the sink modification
* @return {@link StagedTable} that will be leveraged by Flink framework
to provide atomicity
* semantics.
*/
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsStaging.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+import org.apache.flink.table.catalog.StagedTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+/**
+ * Enables different staged operations to ensure atomicity in a {@link
DynamicTableSink}.
+ *
+ * <p>By default, if this interface is not implemented, indicating that atomic
operations are not
+ * supported, then a non-atomic implementation is used.
+ */
+@PublicEvolving
+public interface SupportsStaging {
+
+ /**
+ * Provides a {@link StagedTable} that provided transaction abstraction.
StagedTable will be
+ * combined with {@link JobStatusHook} to achieve atomicity support in the
Flink framework. Call
+ * the relevant API of StagedTable when the Job state is switched.
+ *
+ * <p>This method will be called at the compile stage.
+ *
+ * @param context Tell DynamicTableSink, the operation type of this
StagedTable, expandable
+ * @return {@link StagedTable} that can be serialized and provides atomic
operations
+ */
+ StagedTable applyStaging(StagingContext context);
+
+ /**
+ * The context is intended to tell DynamicTableSink the type of this
operation. In this way,
+ * DynamicTableSink can return the corresponding implementation of
StagedTable according to the
+ * specific operation. More types of operations can be extended in the
future.
+ */
+ @PublicEvolving
+ interface StagingContext {
+ StagingPurpose getStagingPurpose();
+ }
+
+ /**
+ * The type of StagedTable final visibility that was expects for staging
purpose.
Review Comment:
suggestion:
```
/**
* The type of operation the staging sink is for.
*
* <p>Currently, two types of operation are supported:
*
* <ul>
* <li>CREATE_TABLE_AS - for the operation of CREATE TABLE AS SELECT
statement
* <li>CREATE_TABLE_AS_IF_NOT_EXISTS - for the operation of CREATE
TABLE AS SELECT IF NOT
* EXISTS statement.
* </ul>
*/
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+
+import java.io.Serializable;
+
+/**
+ * A {@link StagedTable} for atomic semantics using a two-phase commit
protocol, combined with
+ * {@link JobStatusHook} for atomic CTAS. {@link StagedTable} will be a member
variable of
+ * CtasJobStatusHook and can be serialized;
+ *
+ * <p>CtasJobStatusHook#onCreated will call the begin method of StagedTable;
+ * CtasJobStatusHook#onFinished will call the commit method of StagedTable;
+ * CtasJobStatusHook#onFailed and CtasJobStatusHook#onCanceled will call the
abort method of
+ * StagedTable;
+ */
+@PublicEvolving
+public interface StagedTable extends Serializable {
+
+ /**
+ * This method will be called when the job is started. Similar to what it
means to open a
+ * transaction in a relational database; In Flink's atomic CTAS scenario,
it is used to do some
+ * initialization work; For example, initializing the client of the
underlying service, the tmp
+ * path of the underlying storage, or even call the start transaction API
of the underlying
+ * service, etc.
+ */
+ void begin();
+
+ /**
Review Comment:
Suggestion:
```
/**
* This method will be called when the job is succeeds. In Flink's
atomic CTAS scenario, it is
* expected to do some commit work. For example, moving the underlying
data to the target
* directory to make it visible, writing buffer data to the underlying
storage service, or even
* call the commit transaction API of the underlying service, etc.
*/
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+
+import java.io.Serializable;
+
+/**
+ * A {@link StagedTable} for atomic semantics using a two-phase commit
protocol, combined with
+ * {@link JobStatusHook} for atomic CTAS. {@link StagedTable} will be a member
variable of
+ * CtasJobStatusHook and can be serialized;
+ *
+ * <p>CtasJobStatusHook#onCreated will call the begin method of StagedTable;
+ * CtasJobStatusHook#onFinished will call the commit method of StagedTable;
+ * CtasJobStatusHook#onFailed and CtasJobStatusHook#onCanceled will call the
abort method of
+ * StagedTable;
+ */
+@PublicEvolving
+public interface StagedTable extends Serializable {
+
+ /**
Review Comment:
Suggestion:
```
/**
* This method will be called when the job is started. In Flink's atomic
CTAS scenario, it is
* expected to do initialization work; For example, initializing the
client of the underlying
* service, the tmp path of the underlying storage, or even call the
start transaction API of
* the underlying service, etc.
*/
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+
+import java.io.Serializable;
+
+/**
+ * A {@link StagedTable} for atomic semantics using a two-phase commit
protocol, combined with
+ * {@link JobStatusHook} for atomic CTAS. {@link StagedTable} will be a member
variable of
Review Comment:
Please don't refer `JobStatusHook` in here since it's not a public api. Talk
about `JobStatusHook` will make devs confused.
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/StagedTableITCase.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.config.TableConfigOptions;
+import
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests staged table in a table environment. */
+public class StagedTableITCase extends BatchTestBase {
Review Comment:
Please test it in both batch and stream mode.
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestSupportsStagingTableFactory.java:
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.catalog.StagedTable;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.FileUtils;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.Set;
+
+/** A factory to create table to support staging for test purpose. */
+public class TestSupportsStagingTableFactory implements
DynamicTableSinkFactory {
+
+ public static final String IDENTIFIER = "test-staging";
+
+ private static final ConfigOption<String> DATA_DIR =
+ ConfigOptions.key("data-dir")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The data id used to write the rows.");
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
+ helper.validate();
+ String dataDir = helper.getOptions().get(DATA_DIR);
+ return new SupportsStagingTableSink(dataDir);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.singleton(DATA_DIR);
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return Collections.emptySet();
+ }
+
+ /** A sink that supports staging. */
+ private static class SupportsStagingTableSink implements DynamicTableSink,
SupportsStaging {
+
+ private String dataDir;
+ private TestStagedTable stagedTable;
+
+ public SupportsStagingTableSink(String dataDir) {
+ this(dataDir, null);
+ }
+
+ public SupportsStagingTableSink(String dataDir, TestStagedTable
stagedTable) {
+ this.dataDir = dataDir;
+ this.stagedTable = stagedTable;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ return ChangelogMode.insertOnly();
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ return new DataStreamSinkProvider() {
+ @Override
+ public DataStreamSink<?> consumeDataStream(
+ ProviderContext providerContext, DataStream<RowData>
dataStream) {
+ if (stagedTable != null) {
+ return dataStream
+ .addSink(new StagedSinkFunction(dataDir))
+ .setParallelism(1);
+ } else {
+ // otherwise, do nothing
+ return dataStream.addSink(new DiscardingSink<>());
+ }
+ }
+ };
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return new SupportsStagingTableSink(dataDir, stagedTable);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "SupportsStagingTableSink";
+ }
+
+ @Override
+ public StagedTable applyStaging(StagingContext context) {
+ stagedTable = new TestStagedTable();
+ return stagedTable;
+ }
+ }
+
+ /** The sink for delete existing data. */
+ private static class StagedSinkFunction extends RichSinkFunction<RowData> {
+
+ private String dataDir;
+
+ public StagedSinkFunction(String dataDir) {
+ this.dataDir = dataDir;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ File parentDir = new File(dataDir);
+ if (parentDir.exists()) {
+ parentDir.delete();
+ }
+ parentDir.mkdirs();
+ new File(dataDir, "data").createNewFile();
+ }
+
+ @Override
+ public void invoke(RowData value, Context context) throws Exception {
+ FileUtils.writeFileUtf8(
+ new File(dataDir, "data"), value.getInt(0) + "," +
value.getString(1));
+ }
+ }
+
+ /** A StagedTable for test. */
+ private static class TestStagedTable implements StagedTable {
+
+ @Override
+ public void begin() {}
Review Comment:
If we do nothing in these method, we can never know these method will be
really called when job begin, finish, and abort.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+
+import java.io.Serializable;
+
+/**
+ * A {@link StagedTable} for atomic semantics using a two-phase commit
protocol, combined with
+ * {@link JobStatusHook} for atomic CTAS. {@link StagedTable} will be a member
variable of
+ * CtasJobStatusHook and can be serialized;
+ *
+ * <p>CtasJobStatusHook#onCreated will call the begin method of StagedTable;
+ * CtasJobStatusHook#onFinished will call the commit method of StagedTable;
+ * CtasJobStatusHook#onFailed and CtasJobStatusHook#onCanceled will call the
abort method of
+ * StagedTable;
+ */
+@PublicEvolving
+public interface StagedTable extends Serializable {
+
+ /**
Review Comment:
No need to refer `relational database`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]