This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 69837fc6145d7491c0dc7aeace43317f56c0e6aa Author: luoyuxia <[email protected]> AuthorDate: Fri Jan 13 11:29:31 2023 +0800 [FLINK-30661][table] introduce SupportsDeletePushDown interface --- .../sink/abilities/SupportsDeletePushDown.java | 82 ++++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsDeletePushDown.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsDeletePushDown.java new file mode 100644 index 00000000000..8220812dbad --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsDeletePushDown.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.sink.abilities; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.expressions.ResolvedExpression; + +import java.util.List; +import java.util.Optional; + +/** + * Enables to push down filters decomposed from the {@code WHERE} clause in delete statement to + * {@link DynamicTableSink}. The table sink can delete existing data directly according to the + * filters. + * + * <p>Flink will get the filters in conjunctive form and push down the filters into sink by calling + * method {@link #applyDeleteFilters(List)} in the planning phase. If it returns true, Flink will + * then call {@link #executeDeletion()} to execute the actual deletion during execution phase. + * + * <p>Given the following SQL: + * + * <pre>{@code + * DELETE FROM t WHERE (a = '1' OR a = '2') AND b IS NOT NULL;* + * }</pre> + * + * <p>In the example above, the {@code WHERE} clause will be decomposed into two filters + * + * <ul> + * <li>{@code [a = '1' OR a = '2']} + * <li>{@code [b IS NOT NULL]} + * </ul> + * + * <p>If the sink can accept both filters which means the sink can delete data directly according to + * the filters, {@link #applyDeleteFilters(List)} should return true. Otherwise, it should return + * false. + * + * <p>Note: For the cases where the filter expression is not available, e.g., sub-query or {@link + * #applyDeleteFilters(List)} returns false, if the sink implements {@link SupportsRowLevelDelete}, + * Flink will try to rewrite the delete statement and produce row-level changes, see {@link + * SupportsRowLevelDelete} for more details. Otherwise, Flink will throw {@link + * UnsupportedOperationException}. + */ +@PublicEvolving +public interface SupportsDeletePushDown { + + /** + * Provides a list of filters specified by {@code WHERE} clause in conjunctive form and return + * the acceptance status to planner during planning phase. + * + * @param filters a list of resolved filter expressions. + * @return true if the sink accepts all filters; false otherwise. + */ + boolean applyDeleteFilters(List<ResolvedExpression> filters); + + /** + * Deletes data during execution phase. + * + * <p>Note: The method will be involved iff the method {@link #applyDeleteFilters(List)} returns + * true. + * + * @return the number of the estimated rows to be deleted, or {@link Optional#empty()} for the + * unknown condition. + */ + Optional<Long> executeDeletion(); +}
