This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6cf7e5d2ba48bd565ab3ec2841059606a49668b1 Author: luoyuxia <[email protected]> AuthorDate: Fri Jan 13 11:24:38 2023 +0800 [FLINK-30661][table] introduce SupportsRowLevelModificationScan interface --- .../connector/RowLevelModificationScanContext.java | 37 ++++++++++ .../SupportsRowLevelModificationScan.java | 78 ++++++++++++++++++++++ 2 files changed, 115 insertions(+) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RowLevelModificationScanContext.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RowLevelModificationScanContext.java new file mode 100644 index 00000000000..f0e3c35ba36 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RowLevelModificationScanContext.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan; + +/** + * The context is intended to provide the relevant table scan information needed by the sink to + * perform row-level update/delete. It'll be generated by a table source that implements {@link + * SupportsRowLevelModificationScan}, and then passed to a sink which implements {@link + * SupportsRowLevelUpdate} or {@link SupportsRowLevelDelete} for executing UPDATE/DELETE statement + * during compilation phase. + * + * <p>This mechanism enables the coordination between the table sources and the table sink which is + * to be updated/deleted. + * + * <p>Connectors can implement this interface to provide custom information. + */ +@PublicEvolving +public interface RowLevelModificationScanContext {} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java new file mode 100644 index 00000000000..bbf0bfa969b --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.source.abilities; + +import org.apache.flink.table.connector.RowLevelModificationScanContext; +import org.apache.flink.table.connector.source.ScanTableSource; + +import javax.annotation.Nullable; + +/** + * Interface for {@link ScanTableSource}s that support the row-level modification. The table source + * is responsible for returning the information described by {@link + * RowLevelModificationScanContext}. The context will be propagated to the sink which implements + * {@link SupportsRowLevelUpdate} or {@link SupportsRowLevelDelete}. + * + * <p>Note: This interface is optional for table sources to implement. For cases where the table + * source neither needs to know the type of row-level modification nor propagate information to + * sink, the table source does't need to implement this interface. See more details at {@link + * #applyRowLevelModificationScan(RowLevelModificationType, RowLevelModificationScanContext)}. + */ +public interface SupportsRowLevelModificationScan { + + /** + * Applies the type of row-level modification and the previous {@link + * RowLevelModificationScanContext} returned by previous table source scan, return a new {@link + * RowLevelModificationScanContext}. If the table source is the last one, the {@link + * RowLevelModificationScanContext} will be passed to the table sink. Otherwise, it will be + * passed to the following table source. + * + * <p>Note: For the all tables in the UPDATE/DELETE statement, this method will be involved for + * the corresponding table source scan. + * + * <p>Note: It may have multiple table sources in the case of sub-query. In such case, it will + * return multiple {@link RowLevelModificationScanContext}s. To handle such case, the planner + * will also pass the previous {@link RowLevelModificationScanContext} to the current table + * source scan which is expected to decide what to do with the previous {@link + * RowLevelModificationScanContext}. The order is consistent with the compilation order of the + * table sources. The planer will only pass the last context returned to the sink. + * + * @param previousContext the context returned by previous table source, if there's no previous + * context, it'll be null. + */ + RowLevelModificationScanContext applyRowLevelModificationScan( + RowLevelModificationType rowLevelModificationType, + @Nullable RowLevelModificationScanContext previousContext); + + /** + * Type of the row-level modification for table. + * + * <p>Currently, two types are supported: + * + * <ul> + * <li>UPDATE + * <li>DELETE + * </ul> + */ + enum RowLevelModificationType { + UPDATE, + + DELETE + } +}
