LadyForest commented on code in PR #21658: URL: https://github.com/apache/flink/pull/21658#discussion_r1070211065
########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsDeletePushDown.java: ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.sink.abilities; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.expressions.ResolvedExpression; + +import java.util.List; +import java.util.Optional; + +/** + * Enable to delete existing data in a {@link DynamicTableSink} directly according to the filter + * expressions in {@code DELETE} clause. + * + * <p>Given the following SQL: + * + * <pre>{@code + * DELETE FROM t WHERE (a = '1' OR a = '2') AND b IS NOT NULL; + * + * }</pre> + * + * <p>In the example above, {@code [a = '1' OR a = '2']} and {@code [b IS NOT NULL]} are acceptable + * filters. + * + * <p>Flink will get the filters in conjunctive form and push down the filters into sink by calling + * method {@link #applyDeleteFilters} in the planner phase. If it returns true, Flink will then call + * method to execute the actual delete in execution phase. + * + * <p>Note: in the case that the filter expression is not available, e.g., sub-query or {@link + * #applyDeleteFilters} returns false, if the sink implements {@link SupportsRowLevelDelete}, Flink + * will try to rewrite the delete operation and produce row-level changes, see {@link + * SupportsRowLevelDelete} for more details. Otherwise, Flink will throw {@link + * UnsupportedOperationException}. + */ Review Comment: ```suggestion /** * Enables {@link DynamicTableSink} to delete existing data directly according to the filter * expressions specified by {@code DELETE} clause. * * <p>Flink will get the filters in conjunctive form and push down the filters into sink by calling * method {@link #applyDeleteFilters} in the planner phase. If it returns true, Flink will then call * {@link #executeDeletion()} to execute the actual deletion during execution phase. * * <p>Given the following SQL: * * <pre>{@code * DELETE FROM t WHERE (a = '1' OR a = '2') AND b IS NOT NULL; * * }</pre> * * <p>In the example above, the {@code WHERE} clause will be decomposed into two filters * * <ul> * <li>{@code [a = '1' OR a = '2']} * <li>{@code [b IS NOT NULL]} * </ul> * * <p>If the sink accepts both filters, {@link #applyDeleteFilters(List)} will return true. * * <p>Note: in the case that the filter expression is not available, e.g., sub-query or {@link * #applyDeleteFilters} returns false, if the sink implements {@link SupportsRowLevelDelete}, Flink * will try to rewrite the delete operation and produce row-level changes, see {@link * SupportsRowLevelDelete} for more details. Otherwise, Flink will throw {@link * UnsupportedOperationException}. */ ``` ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java: ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.source.abilities; + +import org.apache.flink.table.connector.RowLevelModificationScanContext; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate; +import org.apache.flink.table.connector.source.ScanTableSource; + +import javax.annotation.Nullable; + +/** + * Enables to tell the {@link ScanTableSource} the type of row-level modification the table source + * scan is for, and return {@link RowLevelModificationScanContext} which will then be passed to the + * sink which implements {@link SupportsRowLevelUpdate} or {@link SupportsRowLevelDelete}. It allows + * the table source to pass information to the table sink which is to be updated/deleted. + * + * <p>Note: This interface is optional for table source to support update/delete existing data. For + * the case that the table source won't need to know the type of row-level modification or pass + * information to sink, the table source doesn't need to implement it. + * + * <p>Note: Only if the {@link ScanTableSource} implements this interface, and the table is to be + * scanned to update/delete a table, the method {@link #applyRowLevelModificationScan} for the + * corresponding table source will be involved . For more details, please see the method {@link + * #applyRowLevelModificationScan}. + */ +public interface SupportsRowLevelModificationScan { + + /** + * Apply the type of row-level modification and the previous {@link + * RowLevelModificationScanContext} returned by previous table source scan, return a new {@link + * RowLevelModificationScanContext} which will then finally to be passed to the table sink. + * + * <p>Note: for the all tables in the UPDATE/DELETE statement, this method will be involved for + * the corresponding table source scan. + * + * <p>Note: it may have multiple table sources in the case of sub-query. In such case, it will + * return multiple {@link RowLevelModificationScanContext}. To handle such case, the planner + * will also pass the previous {@link RowLevelModificationScanContext} to the current table + * source scan to make it decide how to deal with the previous {@link + * RowLevelModificationScanContext}. The order is consistent to the order that the table source + * compiles. The planer will only pass the last context returned to the sink. + * + * @param previousContext the context returned by previous table source, if there's no previous + * context, it'll be null. + */ + RowLevelModificationScanContext applyRowLevelModificationScan( + RowLevelModificationType rowLevelModificationType, + @Nullable RowLevelModificationScanContext previousContext); + + /** + * Type of the row-level modification for table. + * + * <p>Currently, two types are supported: + * + * <ul> + * <li>UPDATE + * <li>DELETE + * </ul> + */ + enum RowLevelModificationType { + UPDATE, Review Comment: Nit: add a new line here ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsDeletePushDown.java: ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.sink.abilities; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.expressions.ResolvedExpression; + +import java.util.List; +import java.util.Optional; + +/** + * Enable to delete existing data in a {@link DynamicTableSink} directly according to the filter + * expressions in {@code DELETE} clause. + * + * <p>Given the following SQL: + * + * <pre>{@code + * DELETE FROM t WHERE (a = '1' OR a = '2') AND b IS NOT NULL; + * + * }</pre> + * + * <p>In the example above, {@code [a = '1' OR a = '2']} and {@code [b IS NOT NULL]} are acceptable + * filters. + * + * <p>Flink will get the filters in conjunctive form and push down the filters into sink by calling + * method {@link #applyDeleteFilters} in the planner phase. If it returns true, Flink will then call + * method to execute the actual delete in execution phase. + * + * <p>Note: in the case that the filter expression is not available, e.g., sub-query or {@link + * #applyDeleteFilters} returns false, if the sink implements {@link SupportsRowLevelDelete}, Flink + * will try to rewrite the delete operation and produce row-level changes, see {@link + * SupportsRowLevelDelete} for more details. Otherwise, Flink will throw {@link + * UnsupportedOperationException}. + */ +@PublicEvolving +public interface SupportsDeletePushDown { + + /** + * Provides a list of filters from delete operation in conjunctive form in planning phase. A + * sink can either return true if it can accept all filters or return false if it can not + * accept. + * + * <p>If it returns true, Flink will then call the method {@link #executeDeletion} in execution + * phase to do the actual deletion. + * + * <p>If it returns false, and the sink still implements {@link SupportsRowLevelDelete}, Flink + * will rewrite the delete operation and produce row-level changes. Otherwise, Flink will throw + * {@link UnsupportedOperationException}. + */ Review Comment: ```suggestion /** * Applies a list of filters specified by {@code DELETE} clause in conjunctive form and return * the acceptance status to planner during planning phase. * @param filters a list of resolved filter expressions. * * @return true if the sink accepts all filters; false otherwise. */ ``` ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsRowLevelDelete.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.sink.abilities; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.connector.RowLevelModificationScanContext; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Optional; + +/** + * Enable to delete existing data in a {@link DynamicTableSink} according to row-level changes. + * + * <p>The planner will call method {@link #applyRowLevelDelete} to get the {@link + * RowLevelDeleteInfo} that the sink returns, and rewrite the delete operation based on gotten + * {@link RowLevelDeleteInfo} to produce rows (may rows to be deleted or remaining rows after the + * delete operation depending on sink's implementation) to {@link DynamicTableSink}. The sink is + * expected to consume these rows to achieve rows delete purpose. + * + * <p>Note: if the sink also implements {@link SupportsDeletePushDown}, the planner will always + * prefer {@link SupportsDeletePushDown}, and only the filters aren't available or {@link + * SupportsDeletePushDown#applyDeleteFilters} returns false, this interface will be considered and + * to rewrite the delete operation to produce the rows to the sink. + */ +@PublicEvolving +public interface SupportsRowLevelDelete { + + /** + * Apply row level delete with {@link RowLevelModificationScanContext} passed by table source, + * and return {@link RowLevelDeleteInfo} to guide the planner on how to rewrite the delete + * operation. + * + * <p>Note: if the table source doesn't implement {@link SupportsRowLevelModificationScan}, + * there won't be any {@link RowLevelModificationScanContext} passed, so the {@param context} + * will be null. + */ + RowLevelDeleteInfo applyRowLevelDelete(@Nullable RowLevelModificationScanContext context); + + /** The information that guides the planer on how to rewrite the delete operation. */ + @PublicEvolving + interface RowLevelDeleteInfo { + + /** + * The required columns that the sink expects for deletion, the rows consumed by sink will + * contain the columns with the order consistent with the order of returned columns. If + * return Optional.empty(), it will select all columns. + */ + default Optional<List<Column>> requiredColumns() { + return Optional.empty(); + } + + /** + * Planner will rewrite delete to query base on the {@link RowLevelDeleteInfo}, keeps the + * query of delete unchanged by default(in `DELETE_ROWS` mode), or change the query to the + * complementary set in REMAINING_ROWS mode. + * + * <p>Take the following SQL as an example: + * + * <pre>{@code + * DELETE FROM t WHERE y = 2; + * }</pre> + * + * <p>If returns {@link SupportsRowLevelDelete.RowLevelDeleteMode#DELETED_ROWS}, the sink + * will get rows to be deleted which match the filter [y = 2]. + * + * <p>If returns {@link SupportsRowLevelDelete.RowLevelDeleteMode#REMAINING_ROWS}, the sink + * will get the rows which doesn't match the filter [y = 2]. + * + * <p>Note: All rows will have RowKind#DELETE when RowLevelDeleteMode is DELETED_ROWS, and + * RowKind#INSERT when RowLevelDeleteMode is REMAINING_ROWS. + */ + default RowLevelDeleteMode getRowLevelDeleteMode() { + return RowLevelDeleteMode.DELETED_ROWS; + } + } + + /** + * Type of delete modes that the sink expects for delete purpose. + * + * <p>Currently, two modes are supported: + * + * <ul> + * <li>DELETED_ROWS - in this mode, the sink will only get the rows that need to be deleted. + * <li>REMAINING_ROWS - in this mode, the sink will only get the remaining rows as if the the + * delete operation had been done. + * </ul> + */ + @PublicEvolving + enum RowLevelDeleteMode { + DELETED_ROWS, Review Comment: Nit: add a new line here. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java: ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.source.abilities; + +import org.apache.flink.table.connector.RowLevelModificationScanContext; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate; +import org.apache.flink.table.connector.source.ScanTableSource; + +import javax.annotation.Nullable; + +/** + * Enables to tell the {@link ScanTableSource} the type of row-level modification the table source + * scan is for, and return {@link RowLevelModificationScanContext} which will then be passed to the + * sink which implements {@link SupportsRowLevelUpdate} or {@link SupportsRowLevelDelete}. It allows + * the table source to pass information to the table sink which is to be updated/deleted. + * + * <p>Note: This interface is optional for table source to support update/delete existing data. For + * the case that the table source won't need to know the type of row-level modification or pass + * information to sink, the table source doesn't need to implement it. + * + * <p>Note: Only if the {@link ScanTableSource} implements this interface, and the table is to be + * scanned to update/delete a table, the method {@link #applyRowLevelModificationScan} for the + * corresponding table source will be involved . For more details, please see the method {@link + * #applyRowLevelModificationScan}. + */ +public interface SupportsRowLevelModificationScan { Review Comment: Personally, I feel like the doc is a little bit colloquial and verbose. What about ``` /** * Enables {@link ScanTableSource} to support the row-level modification and return the updated or * deleted information described by {@link RowLevelModificationScanContext}. The context will be * propagated to the sink which implements {@link SupportsRowLevelUpdate} or {@link * SupportsRowLevelDelete}. * * <p>Note: This interface is optional for table sources to implement. See more details at {@link * #applyRowLevelModificationScan}. */ ``` Ref: according to https://www.oxfordlearnersdictionaries.com/definition/english/enable The frequent usage of "enable" is "<1> enable sb./sth. to do sth. <2> enable sth." ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsDeletePushDown.java: ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.sink.abilities; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.expressions.ResolvedExpression; + +import java.util.List; +import java.util.Optional; + +/** + * Enable to delete existing data in a {@link DynamicTableSink} directly according to the filter + * expressions in {@code DELETE} clause. + * + * <p>Given the following SQL: + * + * <pre>{@code + * DELETE FROM t WHERE (a = '1' OR a = '2') AND b IS NOT NULL; + * + * }</pre> + * + * <p>In the example above, {@code [a = '1' OR a = '2']} and {@code [b IS NOT NULL]} are acceptable + * filters. + * + * <p>Flink will get the filters in conjunctive form and push down the filters into sink by calling + * method {@link #applyDeleteFilters} in the planner phase. If it returns true, Flink will then call + * method to execute the actual delete in execution phase. + * + * <p>Note: in the case that the filter expression is not available, e.g., sub-query or {@link + * #applyDeleteFilters} returns false, if the sink implements {@link SupportsRowLevelDelete}, Flink + * will try to rewrite the delete operation and produce row-level changes, see {@link + * SupportsRowLevelDelete} for more details. Otherwise, Flink will throw {@link + * UnsupportedOperationException}. + */ +@PublicEvolving +public interface SupportsDeletePushDown { + + /** + * Provides a list of filters from delete operation in conjunctive form in planning phase. A + * sink can either return true if it can accept all filters or return false if it can not + * accept. + * + * <p>If it returns true, Flink will then call the method {@link #executeDeletion} in execution + * phase to do the actual deletion. + * + * <p>If it returns false, and the sink still implements {@link SupportsRowLevelDelete}, Flink + * will rewrite the delete operation and produce row-level changes. Otherwise, Flink will throw + * {@link UnsupportedOperationException}. + */ + boolean applyDeleteFilters(List<ResolvedExpression> filters); + + /** + * Do the actual deletion in the execution phase, and return how many rows has been deleted, + * Optional.empty() is for unknown delete rows. + * + * <p>Note that this method will be involved if and only if {@link #applyDeleteFilters(List + * ResolvedExpression)} returns true. So, please make sure the implementation for this method + * will do delete the data correctly. + * + * <p>Note that in this method, the sink won't get the filters since they have been passed to + * the method {@link #applyDeleteFilters} before. The sink may need to keep these filters, so + * that it can get the filters if necessary to finish the deletion. + */ Review Comment: ```suggestion /** * Deletes data during execution phase. * * <p>Note: sink connectors which implement the interface need to invoke this method to delete * data iff {@link #applyDeleteFilters(List)} returns true. * * @return the number of the estimated rows to be deleted, or {@link Optional#empty()} for the * unknown condition. */ ``` ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RowLevelModificationScanContext.java: ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete; +import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate; +import org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan; + +/** + * The context for scan the table to do updated/delete, designed to provide some necessary + * information that the sink may need to know to from table source to perform update/delete. It'll + * be generated by table source which implements {@link SupportsRowLevelModificationScan}, and then + * passed to the sink which implements {@link SupportsRowLevelUpdate} or {@link + * SupportsRowLevelDelete} for UPDATE/DELETE statement in compile phase. + * + * <p>This mechanism enables the coordination between the source and the sink for the corresponding + * table to be updated/deleted. + * + * <p>The connector can implement this interface to provide custom information. Review Comment: Correct me if I'm wrong, but should we be more specific to clarify that it is the "source connector" to provide the custom context? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
