lincoln-lil commented on code in PR #21698: URL: https://github.com/apache/flink/pull/21698#discussion_r1089853488
########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.java: ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.batch.sql; + +import org.apache.flink.table.api.ExplainDetail; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate; +import org.apache.flink.table.planner.utils.BatchTableTestUtil; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +import scala.collection.Seq; + +/** Test for row-level update. */ +@RunWith(Parameterized.class) +public class RowLevelUpdateTest extends TableTestBase { + + private final Seq<ExplainDetail> explainDetails = + JavaScalaConversionUtil.toScala( + Collections.singletonList(ExplainDetail.JSON_EXECUTION_PLAN)); + private final SupportsRowLevelUpdate.RowLevelUpdateMode updateMode; + + private BatchTableTestUtil util; + + @Parameterized.Parameters(name = "updateMode = {0}") + public static Collection<SupportsRowLevelUpdate.RowLevelUpdateMode> data() { + return Arrays.asList( + SupportsRowLevelUpdate.RowLevelUpdateMode.UPDATED_ROWS, + SupportsRowLevelUpdate.RowLevelUpdateMode.ALL_ROWS); + } + + public RowLevelUpdateTest(SupportsRowLevelUpdate.RowLevelUpdateMode updateMode) { + this.updateMode = updateMode; + } + + @Before + public void before() { + util = batchTestUtil(TableConfig.getDefault()); + util.tableEnv() + .getConfig() + .getConfiguration() + .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 12); + } + + @Test + public void testUpdateWithoutFilter() { + createTableForUpdate(); + util.verifyExplainInsert("UPDATE t SET b = 'n1', a = char_length(b) * a ", explainDetails); + } + + @Test + public void testUpdateWithFilter() { + createTableForUpdate(); + util.verifyExplainInsert( + "UPDATE t SET b = 'v2' WHERE a = 123 AND b = 'v1'", explainDetails); + } + + @Test + public void testUpdateWithSubQuery() { + createTableForUpdate(); + util.tableEnv() + .executeSql( + String.format( + "CREATE TABLE t1 (a int, b string) WITH " + + "('connector' = 'test-update-delete', 'update-mode' = '%s') ", + updateMode)); + util.verifyExplainInsert( + "UPDATE t SET b = 'v2' WHERE a = (SELECT count(*) FROM t1)", explainDetails); + } + + @Test + public void testUpdateWithOnlyRequireUpdatedCols() { Review Comment: add a new case which not contains all columns in update clause, e.g., a source table with more columns here 'TABLE t (f0, f1,..., a int, b string, c double, ...)' ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/SqlToOperationConverterTestUtils.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.utils; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.ObjectIdentifier; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** Utils for {@link org.apache.flink.table.planner.operations.SqlToOperationConverterTest} . */ +public class SqlToOperationConverterTestUtils { Review Comment: Since the `SqlToOperationConverterTest` has exceeded the maximum row limit, we might as well split it by sql type, such as ddl, dml and other command, extract a `SqlToOperationConverterTestBase` to keep all the helper methods, and split the test into three pieces: `SqlDDLToOperationConverterTest`, `SqlDMLToOperationConverterTest`, and `SqlCommandToOperationConverterTest` , WDYT? ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java: ########## @@ -123,6 +131,20 @@ .withDescription( "The columns' name for the required columns in row-level delete"); + private static final ConfigOption<List<String>> REQUIRED_COLUMNS_FOR_UPDATE = + ConfigOptions.key("required-columns-for-update") + .stringType() + .asList() + .noDefaultValue() + .withDescription("The name for the required columns in row-level update"); + + private static final ConfigOption<Boolean> ONLY_REQUIRE_UPDATED_COLUMNS_FOR_UPDATE = + ConfigOptions.key("only_require_updated_columns_for_update") Review Comment: 'only_require_updated_columns_for_update' -> 'only-require-updated-columns-for-update' ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java: ########## @@ -403,6 +418,59 @@ private static RelNode convertDelete( } } + private static RelNode convertUpdate( + LogicalTableModify tableModify, + DynamicTableSink sink, + ContextResolvedTable contextResolvedTable, + String tableDebugName, + DataTypeFactory dataTypeFactory, + FlinkTypeFactory typeFactory, + List<SinkAbilitySpec> sinkAbilitySpecs) { + if (!(sink instanceof SupportsRowLevelUpdate)) { + throw new UnsupportedOperationException( + String.format( + "Can't perform update operation of the table %s because the corresponding dynamic table sink has not yet implemented %s.", + tableDebugName, SupportsRowLevelUpdate.class.getName())); + } + SupportsRowLevelUpdate supportsRowLevelUpdate = (SupportsRowLevelUpdate) sink; + ResolvedSchema resolvedSchema = contextResolvedTable.getResolvedSchema(); + List<Column> updatedColumns = getUpdatedColumns(tableModify, resolvedSchema); + SupportsRowLevelUpdate.RowLevelUpdateInfo updateInfo = + supportsRowLevelUpdate.applyRowLevelUpdate( + updatedColumns, RowLevelModificationContextUtils.getScanContext()); Review Comment: nit: add a local variable for `RowLevelModificationContextUtils.getScanContext()` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelUpdateSpec.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.abilities.sink; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.connector.RowLevelModificationScanContext; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Objects; + +/** + * A sub-class of {@link SinkAbilitySpec} that can not only serialize/deserialize the row-level + * update mode & updated columns to/from JSON, but also can update existing data for {@link Review Comment: 'update mode & updated columns' -> 'update mode & columns' ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java: ########## @@ -1545,6 +1548,23 @@ private Operation convertDelete(SqlDelete sqlDelete) { contextResolvedTable, queryOperation, SinkModifyOperation.ModifyType.DELETE); } + private Operation convertUpdate(SqlUpdate sqlUpdate) { + // set it's update + RowLevelModificationContextUtils.setModificationType( + SupportsRowLevelModificationScan.RowLevelModificationType.UPDATE); + RelRoot updateRelational = flinkPlanner.rel(sqlUpdate); + // get target sink table + LogicalTableModify tableModify = (LogicalTableModify) updateRelational.rel; + List<String> targetTablePath = tableModify.getTable().getQualifiedName(); Review Comment: nit: use `UnresolvedIdentifier` directly to keep consistent with the delete operation ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java: ########## @@ -123,6 +131,20 @@ .withDescription( "The columns' name for the required columns in row-level delete"); + private static final ConfigOption<List<String>> REQUIRED_COLUMNS_FOR_UPDATE = + ConfigOptions.key("required-columns-for-update") + .stringType() + .asList() + .noDefaultValue() + .withDescription("The name for the required columns in row-level update"); + + private static final ConfigOption<Boolean> ONLY_REQUIRE_UPDATED_COLUMNS_FOR_UPDATE = + ConfigOptions.key("only_require_updated_columns_for_update") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to only require the updated columns for update statement. "); Review Comment: add additional comments: ', require all columns by default' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
