lincoln-lil commented on code in PR #21658: URL: https://github.com/apache/flink/pull/21658#discussion_r1070792730
########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.source.abilities; + +import org.apache.flink.table.connector.RowLevelModificationScanContext; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate; +import org.apache.flink.table.connector.source.ScanTableSource; + +import javax.annotation.Nullable; + +/** + * Interface for {@link ScanTableSource}s that support the row-level modification. The table source + * is responsible to return the information described by {@link RowLevelModificationScanContext}. Review Comment: 'responsible for returning' ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.source.abilities; + +import org.apache.flink.table.connector.RowLevelModificationScanContext; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate; +import org.apache.flink.table.connector.source.ScanTableSource; + +import javax.annotation.Nullable; + +/** + * Interface for {@link ScanTableSource}s that support the row-level modification. The table source + * is responsible to return the information described by {@link RowLevelModificationScanContext}. + * The context will be propagated to the sink which implements {@link SupportsRowLevelUpdate} or + * {@link SupportsRowLevelDelete}. + * + * <p>Note: This interface is optional for table sources to implement. For cases where the table + * source neither need to know the type of row-level modification nor propagate information to sink, + * the table source don't need to implement this interface. See more details at {@link Review Comment: 'For cases where the table source neither needs to know the type of row-level modification nor propagate information to the sink, the table source does not need to implement this interface' ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsRowLevelUpdate.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.sink.abilities; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.connector.RowLevelModificationScanContext; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Optional; + +/** + * Interface for {@link DynamicTableSink}s that support update existing data according to row-level + * changes. The table sink is responsible for telling planner how to produce the row changes, and + * consuming them to achieve rows update purpose. + * + * <p>The planner will call method {@link #applyRowLevelUpdate(List, + * RowLevelModificationScanContext)} to get the {@link RowLevelUpdateInfo} returned by sink, and + * rewrite the update statement based on gotten {@link RowLevelUpdateInfo} to produce rows to {@link + * DynamicTableSink}. + */ +@PublicEvolving +public interface SupportsRowLevelUpdate { + + /** + * Applies row-level update with providing the updated columns and {@link + * RowLevelModificationScanContext}, and return {@link RowLevelUpdateInfo}. + * + * @param updatedColumns the columns updated by update statement in table column order. + * @param context the context passed by table source which implement {@link + * SupportsRowLevelModificationScan}. It'll be null if the table source doesn't implement + * it. + */ + RowLevelUpdateInfo applyRowLevelUpdate( + List<Column> updatedColumns, @Nullable RowLevelModificationScanContext context); + + /** The information that guides the planner on how to rewrite the update statement. */ + @PublicEvolving + interface RowLevelUpdateInfo { + + /** + * The required columns by the sink to perform row-level update. The rows consumed by sink + * will contain the required columns in order. If return Optional.empty(), it will contain + * all columns. + */ + default Optional<List<Column>> requiredColumns() { + return Optional.empty(); + } + + /** + * Planner will rewrite the update statement to query base on the {@link + * RowLevelUpdateMode}, keeping the query of update unchanged by default(in `UPDATED_ROWS` + * mode), or changing the query to union the updated rows and the other rows (in `ALL_ROWS` + * mode). + * + * <p>Take the following SQL as an example: + * + * <pre>{@code + * UPDATE t SET x = 1 WHERE y = 2; + * }</pre> + * + * <p>If returns {@link RowLevelUpdateMode#UPDATED_ROWS}, the sink will get the update after + * rows which match the filter [y = 2]. + * + * <p>If returns {@link RowLevelUpdateMode#ALL_ROWS}, the sink will get both the update + * after rows which match the filter [y = 2] and the other rows that don't match the filter + * [y = 2]. + * + * <p>Note: All rows will have RowKind#UPDATE_AFTER when RowLevelUpdateMode is UPDATED_ROWS, + * and RowKind#INSERT when RowLevelUpdateMode is ALL_ROWS. + */ + default RowLevelUpdateMode getRowLevelUpdateMode() { + return RowLevelUpdateMode.UPDATED_ROWS; + } + } + + /** + * Type of delete modes that the sink expects for update purpose. Review Comment: 'delete' -> 'update' ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsRowLevelDelete.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.sink.abilities; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.connector.RowLevelModificationScanContext; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Optional; + +/** + * Interface for {@link DynamicTableSink}s that support delete existing data according to row-level + * changes. The table sink is responsible for telling planner how to produce the row changes, and + * consuming them to achieve rows delete purpose. Review Comment: -> 'achieve the purpose of row(s) deletion' ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
