luoyuxia commented on code in PR #22839:
URL: https://github.com/apache/flink/pull/22839#discussion_r1250008294
##########
docs/layouts/shortcodes/generated/table_config_configuration.html:
##########
@@ -20,6 +20,12 @@
<td>String</td>
<td>The name of the default database in the initial catalog to be
created when instantiating TableEnvironment.</td>
</tr>
+ <tr>
+ <td><h5>table.ctas.atomicity-enabled</h5><br> <span class="label
label-primary">Batch</span> <span class="label
label-primary">Streaming</span></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Specifies if the CREATE TABLE AS SELECT statement is executed
atomically. By default, the statement is non-atomic. The target table is
created in client side, and it will not be dropped even though the job fails or
is cancelled. If set this option to true and DynamicTableSink implements the
SupportsStaging interface, the statement is expected to be executed atomically,
the behavior of which depends on the actual DynamicTableSink.</td>
Review Comment:
nit
```suggestion
<td>Specifies if the CREATE TABLE AS SELECT statement is
executed atomically. By default, the statement is non-atomic. The target table
is created on the client side and it will not be dropped even though the job
fails or is canceled. If set this option to true and the underlying
DynamicTableSink implements the SupportsStaging interface, the statement is
expected to be executed atomically, the behavior of which depends on the actual
DynamicTableSink.</td>
```
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -816,7 +825,56 @@ public TableResultInternal
executeInternal(List<ModifyOperation> operations) {
List<Transformation<?>> transformations = translate(mapOperations);
List<String> sinkIdentifierNames =
extractSinkIdentifierNames(mapOperations);
- return executeInternal(transformations, sinkIdentifierNames);
+ return executeInternal(transformations, sinkIdentifierNames,
jobStatusHookList);
+ }
+
+ private ModifyOperation getOperation(
+ CreateTableASOperation ctasOperation, List<JobStatusHook>
jobStatusHookList) {
+ CreateTableOperation createTableOperation =
ctasOperation.getCreateTableOperation();
+ ObjectIdentifier tableIdentifier =
createTableOperation.getTableIdentifier();
+ Catalog catalog =
catalogManager.getCatalog(tableIdentifier.getCatalogName()).orElse(null);
+ ResolvedCatalogTable catalogTable =
+
catalogManager.resolveCatalogTable(createTableOperation.getCatalogTable());
+
+ if (!TableFactoryUtil.isLegacyConnectorOptions(
+ catalog,
+ tableConfig,
+ isStreamingMode,
+ tableIdentifier,
+ catalogTable,
+ createTableOperation.isTemporary())) {
+ DynamicTableSink dynamicTableSink =
+ ExecutableOperationUtils.createDynamicTableSink(
+ catalog,
+ () ->
moduleManager.getFactory((Module::getTableSinkFactory)),
+ tableIdentifier,
+ catalogTable,
+ Collections.emptyMap(),
+ tableConfig,
+ resourceManager.getUserClassLoader(),
+ createTableOperation.isTemporary());
+ if (dynamicTableSink instanceof SupportsStaging
Review Comment:
You can check
`tableConfig.get(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED)` at first so
that we can avoid creating dynamicTableSink in the earlier stage.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+
+import java.io.Serializable;
+
+/**
+ * The {@link StagedTable} is designed to implement atomic semantic using a
two-phase commit
Review Comment:
```suggestion
* The {@link StagedTable} is designed to implement Flink's atomic semantic
for CTAS(CREATE TABLE AS SELECT) statement using a two-phase commit
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+
+import java.io.Serializable;
+
+/**
+ * The {@link StagedTable} is designed to implement atomic semantic using a
two-phase commit
+ * protocol. The {@link StagedTable} is supposed to be returned via method
{@link
+ * SupportsStaging#applyStaging} by the {@link DynamicTableSink} which
implements the {@link
+ * SupportsStaging} interface.
+ *
+ * <p>When the Flink job for writing to a {@link DynamicTableSink} is CREATED,
the {@link
+ * StagedTable#begin()} will be called; when the Flink job is FINISHED, the
{@link
+ * StagedTable#commit()} will be called; when the Flink job is FAILED or
CANCELED, the {@link
+ * StagedTable#abort()} will be called;
+ *
+ * <p>See more in {@link SupportsStaging}.
+ */
+@PublicEvolving
+public interface StagedTable extends Serializable {
+
+ /**
+ * This method will be called when the job is started. In Flink's atomic
CTAS scenario, it is
+ * expected to do initialization work; For example, initializing the
client of the underlying
+ * service, the tmp path of the underlying storage, or even call the start
transaction API of
+ * the underlying service, etc.
+ */
+ void begin();
+
+ /**
+ * This method will be called when the job is succeeds. In Flink's atomic
CTAS scenario, it is
+ * expected to do some commit work. For example, moving the underlying
data to the target
+ * directory to make it visible, writing buffer data to the underlying
storage service, or even
+ * call the commit transaction API of the underlying service, etc.
+ */
+ void commit();
+
+ /**
+ * This method will be called when the job is failed or canceled. In
Flink's atomic CTAS
Review Comment:
```suggestion
* This method will be called when the job is failed or is canceled. In
Flink's atomic CTAS
```
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/StagedTableITCase.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+import
org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory;
+import
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/** Tests staged table in batch mode. */
+public class StagedTableITCase extends BatchTestBase {
+
+ @TempDir Path temporaryFolder;
+
+ private File tmpDataFolder;
+
+ @BeforeEach
+ void setup() throws Exception {
+ super.before();
+ List<Row> sourceData = Arrays.asList(Row.of(1, "ZM"));
+
+ TestCollectionTableFactory.reset();
+ TestCollectionTableFactory.initData(sourceData);
+
+ String sourceDDL = "create table t1(a int, b varchar) with
('connector' = 'COLLECTION')";
+ tEnv().executeSql(sourceDDL);
+ tmpDataFolder = TempDirUtils.newFolder(temporaryFolder);
+ }
+
+ @AfterEach
+ void close() {
+ if (tmpDataFolder != null) {
+ tmpDataFolder.delete();
Review Comment:
Why delete it. Can't you provide a temPath for each method?
like:
testStagedTableWithAtomicCtas(@TempDir Path temporaryFolder)
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsStaging.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+import org.apache.flink.table.catalog.StagedTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+/**
+ * Interface for {@link DynamicTableSink}s that support CTAS(CREATE TABLE AS
SELECT) statement
+ * atomic semantic using a two-phase commit protocol. The table sink is
responsible for telling
+ * planner how to implement atomicity semantics via {@link StagedTable}.
+ *
+ * <p>If the user turns on {@link
TableConfigOptions#TABLE_CTAS_ATOMICITY_ENABLED}, and {@link
Review Comment:
suggestion:
If the user turns on {@link
TableConfigOptions#TABLE_CTAS_ATOMICITY_ENABLED}, and the {@link
DynamicTableSink} implements {@link SupportsStaging}, the planner will call
method {@link #applyStaging(StagingContext)} to get the {@link StagedTable}
returned by the sink, then {@link StagedTable} will be used by Flink to
implement a two-phase commit with the actual implementation of {@link
StagedTable}.
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/StagedTableITCase.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+import
org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory;
+import
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/** Tests staged table in batch mode. */
+public class StagedTableITCase extends BatchTestBase {
+
+ @TempDir Path temporaryFolder;
+
+ private File tmpDataFolder;
+
+ @BeforeEach
+ void setup() throws Exception {
+ super.before();
+ List<Row> sourceData = Arrays.asList(Row.of(1, "ZM"));
+
+ TestCollectionTableFactory.reset();
+ TestCollectionTableFactory.initData(sourceData);
+
+ String sourceDDL = "create table t1(a int, b varchar) with
('connector' = 'COLLECTION')";
+ tEnv().executeSql(sourceDDL);
+ tmpDataFolder = TempDirUtils.newFolder(temporaryFolder);
+ }
+
+ @AfterEach
+ void close() {
+ if (tmpDataFolder != null) {
+ tmpDataFolder.delete();
+ }
+ }
+
+ @Test
+ void testStagedTableWithAtomicCtas() throws Exception {
Review Comment:
```suggestion
void testAtomicCtas() throws Exception {
```
It's not for test staged table, it's for test atomic ctas.
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/execution/CtasJobStatusHook.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.execution;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.execution.JobStatusHook;
+import org.apache.flink.table.catalog.StagedTable;
+
+/**
+ * This hook is used to implement atomic semantics for CTAS(CREATE TABLE AS
SELECT) statement. It'll
+ * call the corresponding interfaces of the inner {@link StagedTable} on job
status changing.
Review Comment:
```suggestion
* call the corresponding interfaces of the inner {@link StagedTable} on job
status changes.
```
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java:
##########
@@ -197,6 +197,17 @@ private TableConfigOptions() {}
.withDescription(
"Local directory that is used by planner for
storing downloaded resources.");
+ @Documentation.TableOption(execMode =
Documentation.ExecMode.BATCH_STREAMING)
+ public static final ConfigOption<Boolean> TABLE_CTAS_ATOMICITY_ENABLED =
+ key("table.ctas.atomicity-enabled")
Review Comment:
dito: don't forget to also update in here.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+
+import java.io.Serializable;
+
+/**
+ * The {@link StagedTable} is designed to implement atomic semantic using a
two-phase commit
+ * protocol. The {@link StagedTable} is supposed to be returned via method
{@link
+ * SupportsStaging#applyStaging} by the {@link DynamicTableSink} which
implements the {@link
+ * SupportsStaging} interface.
+ *
+ * <p>When the Flink job for writing to a {@link DynamicTableSink} is CREATED,
the {@link
+ * StagedTable#begin()} will be called; when the Flink job is FINISHED, the
{@link
+ * StagedTable#commit()} will be called; when the Flink job is FAILED or
CANCELED, the {@link
+ * StagedTable#abort()} will be called;
+ *
+ * <p>See more in {@link SupportsStaging}.
+ */
+@PublicEvolving
+public interface StagedTable extends Serializable {
+
+ /**
+ * This method will be called when the job is started. In Flink's atomic
CTAS scenario, it is
+ * expected to do initialization work; For example, initializing the
client of the underlying
+ * service, the tmp path of the underlying storage, or even call the start
transaction API of
+ * the underlying service, etc.
+ */
+ void begin();
+
+ /**
+ * This method will be called when the job is succeeds. In Flink's atomic
CTAS scenario, it is
Review Comment:
```suggestion
* This method will be called when the job succeeds. In Flink's atomic
CTAS scenario, it is
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsStaging.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.execution.JobStatusHook;
+import org.apache.flink.table.catalog.StagedTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+/**
+ * Interface for {@link DynamicTableSink}s that support CTAS(CREATE TABLE AS
SELECT) statement
Review Comment:
suggestion:
Interface for {@link DynamicTableSink}s that support atomic semantic for
CTAS(CREATE TABLE AS SELECT) statement using a two-phase commit protocol. The
table sink is responsible for returning
a {@link StagedTable} to tell the Flink how to implement the atomicity
semantics.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+
+import java.io.Serializable;
+
+/**
+ * The {@link StagedTable} is designed to implement atomic semantic using a
two-phase commit
+ * protocol. The {@link StagedTable} is supposed to be returned via method
{@link
+ * SupportsStaging#applyStaging} by the {@link DynamicTableSink} which
implements the {@link
+ * SupportsStaging} interface.
+ *
+ * <p>When the Flink job for writing to a {@link DynamicTableSink} is CREATED,
the {@link
+ * StagedTable#begin()} will be called; when the Flink job is FINISHED, the
{@link
+ * StagedTable#commit()} will be called; when the Flink job is FAILED or
CANCELED, the {@link
+ * StagedTable#abort()} will be called;
+ *
+ * <p>See more in {@link SupportsStaging}.
+ */
+@PublicEvolving
+public interface StagedTable extends Serializable {
+
+ /**
+ * This method will be called when the job is started. In Flink's atomic
CTAS scenario, it is
+ * expected to do initialization work; For example, initializing the
client of the underlying
+ * service, the tmp path of the underlying storage, or even call the start
transaction API of
+ * the underlying service, etc.
+ */
+ void begin();
+
+ /**
+ * This method will be called when the job is succeeds. In Flink's atomic
CTAS scenario, it is
+ * expected to do some commit work. For example, moving the underlying
data to the target
+ * directory to make it visible, writing buffer data to the underlying
storage service, or even
+ * call the commit transaction API of the underlying service, etc.
+ */
+ void commit();
+
+ /**
+ * This method will be called when the job is failed or canceled. In
Flink's atomic CTAS
+ * scenario, it is expected to do some cleaning work for a writing; For
example, delete the data
+ * in tmp directory, delete the temporary data in the underlying storage
service, or even call
Review Comment:
```suggestion
* in the tmp directory, delete the temporary data in the underlying
storage service, or even call
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+
+import java.io.Serializable;
+
+/**
+ * The {@link StagedTable} is designed to implement atomic semantic using a
two-phase commit
+ * protocol. The {@link StagedTable} is supposed to be returned via method
{@link
+ * SupportsStaging#applyStaging} by the {@link DynamicTableSink} which
implements the {@link
+ * SupportsStaging} interface.
+ *
+ * <p>When the Flink job for writing to a {@link DynamicTableSink} is CREATED,
the {@link
Review Comment:
```suggestion
* <p>When the Flink job for writing to a {@link DynamicTableSink} with
atomic semantic supporting is CREATED, the {@link
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+
+import java.io.Serializable;
+
+/**
+ * The {@link StagedTable} is designed to implement atomic semantic using a
two-phase commit
+ * protocol. The {@link StagedTable} is supposed to be returned via method
{@link
+ * SupportsStaging#applyStaging} by the {@link DynamicTableSink} which
implements the {@link
+ * SupportsStaging} interface.
+ *
+ * <p>When the Flink job for writing to a {@link DynamicTableSink} is CREATED,
the {@link
+ * StagedTable#begin()} will be called; when the Flink job is FINISHED, the
{@link
+ * StagedTable#commit()} will be called; when the Flink job is FAILED or
CANCELED, the {@link
+ * StagedTable#abort()} will be called;
+ *
+ * <p>See more in {@link SupportsStaging}.
+ */
+@PublicEvolving
+public interface StagedTable extends Serializable {
+
+ /**
+ * This method will be called when the job is started. In Flink's atomic
CTAS scenario, it is
+ * expected to do initialization work; For example, initializing the
client of the underlying
+ * service, the tmp path of the underlying storage, or even call the start
transaction API of
+ * the underlying service, etc.
+ */
+ void begin();
+
+ /**
+ * This method will be called when the job is succeeds. In Flink's atomic
CTAS scenario, it is
+ * expected to do some commit work. For example, moving the underlying
data to the target
+ * directory to make it visible, writing buffer data to the underlying
storage service, or even
+ * call the commit transaction API of the underlying service, etc.
+ */
+ void commit();
+
+ /**
+ * This method will be called when the job is failed or canceled. In
Flink's atomic CTAS
+ * scenario, it is expected to do some cleaning work for a writing; For
example, delete the data
Review Comment:
```suggestion
* scenario, it is expected to do some cleaning work for writing; For
example, delete the data
```
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/StagedTableITCase.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+import
org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory;
+import
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/** Tests staged table in stream mode. */
+public class StagedTableITCase extends StreamingTestBase {
Review Comment:
dito
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/StagedTableITCase.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+import
org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory;
+import
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/** Tests staged table in batch mode. */
+public class StagedTableITCase extends BatchTestBase {
Review Comment:
```suggestion
public class AtomicCtasITCase extends BatchTestBase {
```
?
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/StagedTableITCase.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+import
org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory;
+import
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/** Tests staged table in batch mode. */
+public class StagedTableITCase extends BatchTestBase {
+
+ @TempDir Path temporaryFolder;
+
+ private File tmpDataFolder;
+
+ @BeforeEach
+ void setup() throws Exception {
+ super.before();
+ List<Row> sourceData = Arrays.asList(Row.of(1, "ZM"));
+
+ TestCollectionTableFactory.reset();
+ TestCollectionTableFactory.initData(sourceData);
+
+ String sourceDDL = "create table t1(a int, b varchar) with
('connector' = 'COLLECTION')";
+ tEnv().executeSql(sourceDDL);
+ tmpDataFolder = TempDirUtils.newFolder(temporaryFolder);
+ }
+
+ @AfterEach
+ void close() {
+ if (tmpDataFolder != null) {
+ tmpDataFolder.delete();
+ }
+ }
+
+ @Test
+ void testStagedTableWithAtomicCtas() throws Exception {
+ commonTestStagedTableWithAtomicCtas("ctas_batch_table", false);
+ }
+
+ @Test
+ void testStagedTableWithAtomicCtasIfNotExists() throws Exception {
+ commonTestStagedTableWithAtomicCtas("ctas_if_not_exists_batch_table",
true);
+ }
+
+ void commonTestStagedTableWithAtomicCtas(String tableName, boolean
ifNotExists)
+ throws Exception {
+
tEnv().getConfig().set(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED, true);
+ String dataDir = tmpDataFolder.getAbsolutePath();
+ String sqlFragment = ifNotExists ? " if not exists " + tableName :
tableName;
+ tEnv().executeSql(
+ "create table "
+ + sqlFragment
+ + " with ('connector' = 'test-staging',
'data-dir' = '"
+ + dataDir
+ + "') as select * from t1")
+ .await();
+ assertThat(tEnv().listTables()).doesNotContain(tableName);
+ File file = new File(dataDir, "data");
+ assertThat(file).exists();
+ assertThat(file).isFile();
+ assertThat(FileUtils.readFileUtf8(file)).isEqualTo("1,ZM");
+
assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).hasSize(2);
+ assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS)
+ .contains("begin", "commit");
+
assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST).hasSize(1);
+ if (ifNotExists) {
+ assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST)
+
.contains(SupportsStaging.StagingPurpose.CREATE_TABLE_AS_IF_NOT_EXISTS);
+ } else {
+ assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST)
+ .contains(SupportsStaging.StagingPurpose.CREATE_TABLE_AS);
+ }
+ }
+
+ @Test
+ void testFailStagedTableWithAtomicCtas() {
+
tEnv().getConfig().set(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED, true);
+ String dataDir = tmpDataFolder.getAbsolutePath();
+ assertThatCode(
+ () ->
+ tEnv().executeSql(
+ "create table
ctas_batch_table_fail with ('connector' = 'test-staging', 'data-dir' = '"
+ + dataDir
+ + "', 'sink-fail' = '"
+ + true
+ + "') as select * from
t1")
+ .await())
+ .hasRootCauseMessage("Test StagedTable abort method.");
+
+
assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).hasSize(2);
+ assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS)
+ .contains("begin", "abort");
+ }
+
+ @Test
+ void testStagedTableWithoutAtomicCtas() throws Exception {
+
tEnv().getConfig().set(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED, false);
+ String dataDir = tmpDataFolder.getAbsolutePath();
+ tEnv().executeSql(
+ "create table ctas_batch_table with ('connector' =
'test-staging', 'data-dir' = '"
+ + dataDir
+ + "') as select * from t1")
+ .await();
+ assertThat(tEnv().listTables()).contains("ctas_batch_table");
+ // Not using StagedTable, so need to read the hidden file
+ File file = new File(dataDir, "_data");
+ assertThat(file).exists();
Review Comment:
Please extract
```
assertThat(file).exists();
assertThat(file).isFile();
assertThat(FileUtils.readFileUtf8(file)).isEqualTo("1,ZM");
```
to a common method
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]