luoyuxia commented on code in PR #22839: URL: https://github.com/apache/flink/pull/22839#discussion_r1246593727
########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestSupportsStagingTableFactory.java: ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.factories; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.catalog.StagedTable; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.ProviderContext; +import org.apache.flink.table.connector.sink.DataStreamSinkProvider; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.abilities.SupportsStaging; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.util.FileUtils; + +import java.io.File; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +/** A factory to create table to support staging for test purpose. */ +public class TestSupportsStagingTableFactory implements DynamicTableSinkFactory { + + public static final String IDENTIFIER = "test-staging"; + + public static final List<String> JOB_STATUS_CHANGE_PROCESS = new LinkedList<>(); + + private static final ConfigOption<String> DATA_DIR = + ConfigOptions.key("data-dir") + .stringType() + .noDefaultValue() + .withDescription("The data id used to write the rows."); + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + helper.validate(); + String dataDir = helper.getOptions().get(DATA_DIR); + return new SupportsStagingTableSink(dataDir); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + return Collections.singleton(DATA_DIR); + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + return Collections.emptySet(); + } + + /** A sink that supports staging. */ + private static class SupportsStagingTableSink implements DynamicTableSink, SupportsStaging { + + private String dataDir; + private TestStagedTable stagedTable; + + public SupportsStagingTableSink(String dataDir) { + this(dataDir, null); + } + + public SupportsStagingTableSink(String dataDir, TestStagedTable stagedTable) { + this.dataDir = dataDir; + this.stagedTable = stagedTable; + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + return ChangelogMode.insertOnly(); + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + return new DataStreamSinkProvider() { + @Override + public DataStreamSink<?> consumeDataStream( + ProviderContext providerContext, DataStream<RowData> dataStream) { + if (stagedTable != null) { + return dataStream + .addSink(new StagedSinkFunction(dataDir)) + .setParallelism(1); + } else { + // otherwise, do nothing + return dataStream.addSink(new DiscardingSink<>()); + } + } + }; + } + + @Override + public DynamicTableSink copy() { + return new SupportsStagingTableSink(dataDir, stagedTable); + } + + @Override + public String asSummaryString() { + return "SupportsStagingTableSink"; + } + + @Override + public StagedTable applyStaging(StagingContext context) { + JOB_STATUS_CHANGE_PROCESS.clear(); Review Comment: Don't forget to check the `StagingContext#stagingPurpose`. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java: ########## @@ -197,6 +197,17 @@ private TableConfigOptions() {} .withDescription( "Local directory that is used by planner for storing downloaded resources."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption<Boolean> TABLE_CTAS_ATOMICITY_ENABLED = + key("table.ctas.atomicity-enabled") + .booleanType() + .defaultValue(false) + .withDescription( Review Comment: suggestion: ``` "Specifies if the CREATE TABLE AS SELECT statement is executed atomically. " + "By default, the statement is non-atomic. The target table is created in client side, and it will not be dropped even though the job fails or is cancelled. " + "If set this option to true and DynamicTableSink implements the SupportsStaging interface, the statement is expected to be executed atomically, " + "the behavior of which depends on the actual DynamicTableSink." ``` Please also don't forget to update `table_config_configuration.html` ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java: ########## @@ -819,6 +828,57 @@ public TableResultInternal executeInternal(List<ModifyOperation> operations) { return executeInternal(transformations, sinkIdentifierNames); } + private void executeCreateTableASOperation( + CreateTableASOperation ctasOperation, List<ModifyOperation> mapOperations) { + CreateTableOperation createTableOperation = ctasOperation.getCreateTableOperation(); + ObjectIdentifier tableIdentifier = createTableOperation.getTableIdentifier(); + Catalog catalog = catalogManager.getCatalog(tableIdentifier.getCatalogName()).orElse(null); + ResolvedCatalogTable catalogTable = + catalogManager.resolveCatalogTable(createTableOperation.getCatalogTable()); + + if (!TableFactoryUtil.isLegacyConnectorOptions( + catalog, + tableConfig, + isStreamingMode, + tableIdentifier, + catalogTable, + createTableOperation.isTemporary())) { + DynamicTableSink dynamicTableSink = + ExecutableOperationUtils.createDynamicTableSink( + catalog, + () -> moduleManager.getFactory((Module::getTableSinkFactory)), + tableIdentifier, + catalogTable, + Collections.emptyMap(), + tableConfig, + resourceManager.getUserClassLoader(), + createTableOperation.isTemporary()); + if (dynamicTableSink instanceof SupportsStaging + && tableConfig.get(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED)) { + // use atomic ctas + SupportsStaging.StagingPurpose stagingPurpose = + createTableOperation.isIgnoreIfExists() + ? SupportsStaging.StagingPurpose.CREATE_TABLE_AS_IF_NOT_EXISTS + : SupportsStaging.StagingPurpose.CREATE_TABLE_AS; + StagedTable stagedTable = + ((SupportsStaging) dynamicTableSink) + .applyStaging(new SinkStagingContext(stagingPurpose)); + CtasJobStatusHook ctasJobStatusHook = new CtasJobStatusHook(stagedTable); + mapOperations.add( + ctasOperation.toStagedSinkModifyOperation( + createTableOperation.getTableIdentifier(), + catalogTable, + catalog, + dynamicTableSink)); + jobStatusHookList.add(ctasJobStatusHook); Review Comment: I'm concern about making `jobStatusHookList` as a variable of class and then clear the `jobStatusHookList` in `createPipeline`. For me, it's too obscure and hard to understand. Could it be some thing like: ``` jobStatusHookList = new ArayList(); // the following method will add jobStatusHook to `jobStatusHookList` getOperation(ctasOperation, jobStatusHookList); executeInternal(transformations, sinkIdentifierNames, jobStatusHookList ); ``` Of course we then need a new method `executeInternal` that accepts `jobStatusHookList`. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.abilities.SupportsStaging; + +import java.io.Serializable; + Review Comment: suggesttion ``` /** * The {@link StagedTable} is designed to implement atomic semantic using a two-phase commit * protocol. The {@link StagedTable} is supposed to be returned via method {@link * SupportsStaging#applyStaging} by the {@link DynamicTableSink} which implements the {@link * SupportsStaging} interface. * * <p>When the Flink job for writing to a {@link DynamicTableSink} is CREATED, the {@link * StagedTable#begin()} will be called; when the Flink job is FINISHED, the {@link * StagedTable#commit()} will be called; when the Flink job is FAILED or CANCELED, the {@link * StagedTable#abort()} will be called; * * <p>See more in {@link SupportsStaging}. */ ``` ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestSupportsStagingTableFactory.java: ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.factories; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.table.catalog.StagedTable; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.ProviderContext; +import org.apache.flink.table.connector.sink.DataStreamSinkProvider; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.abilities.SupportsStaging; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.util.FileUtils; + +import java.io.File; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +/** A factory to create table to support staging for test purpose. */ +public class TestSupportsStagingTableFactory implements DynamicTableSinkFactory { + + public static final String IDENTIFIER = "test-staging"; + + public static final List<String> JOB_STATUS_CHANGE_PROCESS = new LinkedList<>(); + + private static final ConfigOption<String> DATA_DIR = + ConfigOptions.key("data-dir") + .stringType() + .noDefaultValue() + .withDescription("The data id used to write the rows."); + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + helper.validate(); + String dataDir = helper.getOptions().get(DATA_DIR); + return new SupportsStagingTableSink(dataDir); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + return Collections.singleton(DATA_DIR); + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + return Collections.emptySet(); + } + + /** A sink that supports staging. */ + private static class SupportsStagingTableSink implements DynamicTableSink, SupportsStaging { + + private String dataDir; Review Comment: nit: ``` private finnal String dataDir; ``` ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/StagedTableITCase.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.batch.sql; + +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory; +import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory; +import org.apache.flink.table.planner.runtime.utils.BatchTestBase; +import org.apache.flink.types.Row; +import org.apache.flink.util.FileUtils; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests staged table in batch mode. */ +public class StagedTableITCase extends BatchTestBase { + + @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + @Override + public void before() throws Exception { + super.before(); + List<Row> sourceData = Arrays.asList(Row.of(1, "ZM")); + + TestCollectionTableFactory.reset(); + TestCollectionTableFactory.initData(sourceData); + + String sourceDDL = "create table t1(a int, b varchar) with ('connector' = 'COLLECTION')"; + tEnv().executeSql(sourceDDL); + } + + @Test Review Comment: use `org.junit.jupiter.api.Test` ########## flink-python/src/main/java/org/apache/flink/table/executor/python/ChainingOptimizingExecutor.java: ########## @@ -63,6 +66,24 @@ public Pipeline createPipeline( return executor.createPipeline(chainedTransformations, configuration, defaultJobName); Review Comment: Please don't foget this comment; I mean the method ``` createPipeline( List<Transformation<?>> transformations, ReadableConfig configuration, String defaultJobName) ``` can just call method ``` return createPipeline(transformations, configuration, defaultJobName, Collections.emptyList()); ``` ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java: ########## @@ -784,11 +795,9 @@ public CompiledPlan compilePlan(List<ModifyOperation> operations) { public TableResultInternal executeInternal(List<ModifyOperation> operations) { List<ModifyOperation> mapOperations = new ArrayList<>(); for (ModifyOperation modify : operations) { - // execute CREATE TABLE first for CTAS statements if (modify instanceof CreateTableASOperation) { CreateTableASOperation ctasOperation = (CreateTableASOperation) modify; - executeInternal(ctasOperation.getCreateTableOperation()); - mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager)); + executeCreateTableASOperation(ctasOperation, mapOperations); Review Comment: The method name make me confused. It doesn't really do the execution. Rename to some thing like `getOperation(CreateTableASOperation ctasOperation)`? Then add the got operation to `mapOperations`? ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/StagedSinkModifyOperation.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.ContextResolvedTable; +import org.apache.flink.table.connector.sink.DynamicTableSink; + +import javax.annotation.Nullable; + +import java.util.Map; + +/** + * DML operation that tells to write to a sink. Review Comment: ```suggestion * DML operation that tells to write to a sink which implements {@link SupportsStaging}. Curerntly. this operation is only for CTAS(CREATE TABLE AS SELECT) statement. ``` ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/execution/CtasJobStatusHook.java: ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.execution; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.execution.JobStatusHook; +import org.apache.flink.table.catalog.StagedTable; + +/** + * This hook is used to implement atomic semantics for CTAS(CREATE TABLE AS SELECT) statement. It'll + * call the corresponding interfaces of {@link StagedTable} on job status changing. Review Comment: ```suggestion * call the corresponding interfaces of the inner {@link StagedTable} on job status changing. ``` ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/StagedTableITCase.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.batch.sql; + +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory; +import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory; +import org.apache.flink.table.planner.runtime.utils.BatchTestBase; +import org.apache.flink.types.Row; +import org.apache.flink.util.FileUtils; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests staged table in batch mode. */ +public class StagedTableITCase extends BatchTestBase { + + @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); Review Comment: The new test should follow Junit5 style: ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/StagedTableITCase.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.batch.sql; + +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory; +import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory; +import org.apache.flink.table.planner.runtime.utils.BatchTestBase; +import org.apache.flink.types.Row; +import org.apache.flink.util.FileUtils; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests staged table in batch mode. */ +public class StagedTableITCase extends BatchTestBase { + + @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + @Override + public void before() throws Exception { + super.before(); + List<Row> sourceData = Arrays.asList(Row.of(1, "ZM")); + + TestCollectionTableFactory.reset(); + TestCollectionTableFactory.initData(sourceData); + + String sourceDDL = "create table t1(a int, b varchar) with ('connector' = 'COLLECTION')"; + tEnv().executeSql(sourceDDL); + } + + @Test + public void testStagedTableWithAtomicCtas() throws Exception { Review Comment: I think the test can still be improved in the following aspects: - IIUC, if `TABLE_CTAS_ATOMICITY_ENABLED` is true, the table won't be created by framewrok it self. So, please make sure the table hasn't been created in your test - please test the case if `TABLE_CTAS_ATOMICITY_ENABLED` is false which should simiar to non-atomic ctas - please test `abort` method, you can control wheter fail while writing data with a config , and try to make it fail so that abort method should be called ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/StagedSinkModifyOperation.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.ContextResolvedTable; +import org.apache.flink.table.connector.sink.DynamicTableSink; + +import javax.annotation.Nullable; + +import java.util.Map; + +/** + * DML operation that tells to write to a sink. + * + * <p>The sink is described by {@link #getContextResolvedTable()}, and in general is used for every + * sink which implementation is defined with {@link DynamicTableSink}. + * + * <p>StagedSinkModifyOperation is an extension of SinkModifyOperation to the atomic CTAS scenario. Review Comment: suggestion: ``` * <p>StagedSinkModifyOperation is an extension of SinkModifyOperation in the atomic CTAS scenario. * Whiling checking whether the corresponding sink support atomic CTAS or not, we will need to get DynamicTableSink firstly and check whether it implements {@link SupportsStaging} and then call the method {@link SupportsStaging#applyStaging}. We maintain the DynamicTableSink in this operation so that we can reuse this DynamicTableSink instead of creating a new DynamicTableSink again which is error-prone. ``` ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/StagedTableITCase.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.batch.sql; + +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory; +import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory; +import org.apache.flink.table.planner.runtime.utils.BatchTestBase; +import org.apache.flink.types.Row; +import org.apache.flink.util.FileUtils; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests staged table in batch mode. */ +public class StagedTableITCase extends BatchTestBase { + + @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + @Override + public void before() throws Exception { + super.before(); + List<Row> sourceData = Arrays.asList(Row.of(1, "ZM")); + + TestCollectionTableFactory.reset(); + TestCollectionTableFactory.initData(sourceData); + + String sourceDDL = "create table t1(a int, b varchar) with ('connector' = 'COLLECTION')"; + tEnv().executeSql(sourceDDL); + } + + @Test + public void testStagedTableWithAtomicCtas() throws Exception { Review Comment: ```suggestion void testStagedTableWithAtomicCtas() throws Exception { ``` ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/StagedTableITCase.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.batch.sql; + +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory; +import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory; +import org.apache.flink.table.planner.runtime.utils.BatchTestBase; +import org.apache.flink.types.Row; +import org.apache.flink.util.FileUtils; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests staged table in batch mode. */ +public class StagedTableITCase extends BatchTestBase { + + @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before Review Comment: ```suggestion @BeforeEach ``` ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsStaging.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.sink.abilities; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.core.execution.JobStatusHook; +import org.apache.flink.table.catalog.StagedTable; +import org.apache.flink.table.connector.sink.DynamicTableSink; + +/** + * Enables different staged operations to ensure atomicity in a {@link DynamicTableSink}. + * + * <p>By default, if this interface is not implemented, indicating that atomic operations are not + * supported, then a non-atomic implementation is used. + */ +@PublicEvolving +public interface SupportsStaging { + + /** + * Provides a {@link StagedTable} that provided transaction abstraction. StagedTable will be + * combined with {@link JobStatusHook} to achieve atomicity support in the Flink framework. Call + * the relevant API of StagedTable when the Job state is switched. + * + * <p>This method will be called at the compile stage. + * + * @param context Tell DynamicTableSink, the operation type of this StagedTable, expandable + * @return {@link StagedTable} that can be serialized and provides atomic operations + */ + StagedTable applyStaging(StagingContext context); + + /** + * The context is intended to tell DynamicTableSink the type of this operation. In this way, + * DynamicTableSink can return the corresponding implementation of StagedTable according to the + * specific operation. More types of operations can be extended in the future. + */ + @PublicEvolving + interface StagingContext { + StagingPurpose getStagingPurpose(); + } + + /** + * The type of StagedTable final visibility that was expects for staging purpose. Review Comment: please don't ignore my comment in here. ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/StagedTableITCase.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.batch.sql; + +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory; +import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory; +import org.apache.flink.table.planner.runtime.utils.BatchTestBase; +import org.apache.flink.types.Row; +import org.apache.flink.util.FileUtils; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests staged table in batch mode. */ +public class StagedTableITCase extends BatchTestBase { + + @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); Review Comment: @TempDir Path temporaryFolder; you can find example in the codebase. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
