LadyForest commented on code in PR #21658:
URL: https://github.com/apache/flink/pull/21658#discussion_r1070211065


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsDeletePushDown.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.expressions.ResolvedExpression;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Enable to delete existing data in a {@link DynamicTableSink} directly 
according to the filter
+ * expressions in {@code DELETE} clause.
+ *
+ * <p>Given the following SQL:
+ *
+ * <pre>{@code
+ * DELETE FROM t WHERE (a = '1' OR a = '2') AND b IS NOT NULL;
+ *
+ * }</pre>
+ *
+ * <p>In the example above, {@code [a = '1' OR a = '2']} and {@code [b IS NOT 
NULL]} are acceptable
+ * filters.
+ *
+ * <p>Flink will get the filters in conjunctive form and push down the filters 
into sink by calling
+ * method {@link #applyDeleteFilters} in the planner phase. If it returns 
true, Flink will then call
+ * method to execute the actual delete in execution phase.
+ *
+ * <p>Note: in the case that the filter expression is not available, e.g., 
sub-query or {@link
+ * #applyDeleteFilters} returns false, if the sink implements {@link 
SupportsRowLevelDelete}, Flink
+ * will try to rewrite the delete operation and produce row-level changes, see 
{@link
+ * SupportsRowLevelDelete} for more details. Otherwise, Flink will throw {@link
+ * UnsupportedOperationException}.
+ */

Review Comment:
   ```suggestion
   /**
    * Enables {@link DynamicTableSink} to delete existing data directly 
according to the filter
    * expressions specified by {@code DELETE} clause.
    *
    * <p>Flink will get the filters in conjunctive form and push down the 
filters into sink by calling
    * method {@link #applyDeleteFilters} in the planner phase. If it returns 
true, Flink will then call
    * {@link #executeDeletion()} to execute the actual deletion during 
execution phase.
    *
    * <p>Given the following SQL:
    *
    * <pre>{@code
    * DELETE FROM t WHERE (a = '1' OR a = '2') AND b IS NOT NULL;
    *
    * }</pre>
    *
    * <p>In the example above, the {@code WHERE} clause will be decomposed into 
two filters
    *
    * <ul>
    *   <li>{@code [a = '1' OR a = '2']}
    *   <li>{@code [b IS NOT NULL]}
    * </ul>
    *
    * <p>If the sink accepts both filters, {@link #applyDeleteFilters(List)} 
will return true.
    *
    * <p>Note: in the case that the filter expression is not available, e.g., 
sub-query or {@link
    * #applyDeleteFilters} returns false, if the sink implements {@link 
SupportsRowLevelDelete}, Flink
    * will try to rewrite the delete operation and produce row-level changes, 
see {@link
    * SupportsRowLevelDelete} for more details. Otherwise, Flink will throw 
{@link
    * UnsupportedOperationException}.
    */
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.abilities;
+
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
+import org.apache.flink.table.connector.source.ScanTableSource;
+
+import javax.annotation.Nullable;
+
+/**
+ * Enables to tell the {@link ScanTableSource} the type of row-level 
modification the table source
+ * scan is for, and return {@link RowLevelModificationScanContext} which will 
then be passed to the
+ * sink which implements {@link SupportsRowLevelUpdate} or {@link 
SupportsRowLevelDelete}. It allows
+ * the table source to pass information to the table sink which is to be 
updated/deleted.
+ *
+ * <p>Note: This interface is optional for table source to support 
update/delete existing data. For
+ * the case that the table source won't need to know the type of row-level 
modification or pass
+ * information to sink, the table source doesn't need to implement it.
+ *
+ * <p>Note: Only if the {@link ScanTableSource} implements this interface, and 
the table is to be
+ * scanned to update/delete a table, the method {@link 
#applyRowLevelModificationScan} for the
+ * corresponding table source will be involved . For more details, please see 
the method {@link
+ * #applyRowLevelModificationScan}.
+ */
+public interface SupportsRowLevelModificationScan {
+
+    /**
+     * Apply the type of row-level modification and the previous {@link
+     * RowLevelModificationScanContext} returned by previous table source 
scan, return a new {@link
+     * RowLevelModificationScanContext} which will then finally to be passed 
to the table sink.
+     *
+     * <p>Note: for the all tables in the UPDATE/DELETE statement, this method 
will be involved for
+     * the corresponding table source scan.
+     *
+     * <p>Note: it may have multiple table sources in the case of sub-query. 
In such case, it will
+     * return multiple {@link RowLevelModificationScanContext}. To handle such 
case, the planner
+     * will also pass the previous {@link RowLevelModificationScanContext} to 
the current table
+     * source scan to make it decide how to deal with the previous {@link
+     * RowLevelModificationScanContext}. The order is consistent to the order 
that the table source
+     * compiles. The planer will only pass the last context returned to the 
sink.
+     *
+     * @param previousContext the context returned by previous table source, 
if there's no previous
+     *     context, it'll be null.
+     */
+    RowLevelModificationScanContext applyRowLevelModificationScan(
+            RowLevelModificationType rowLevelModificationType,
+            @Nullable RowLevelModificationScanContext previousContext);
+
+    /**
+     * Type of the row-level modification for table.
+     *
+     * <p>Currently, two types are supported:
+     *
+     * <ul>
+     *   <li>UPDATE
+     *   <li>DELETE
+     * </ul>
+     */
+    enum RowLevelModificationType {
+        UPDATE,

Review Comment:
   Nit: add a new line here



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsDeletePushDown.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.expressions.ResolvedExpression;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Enable to delete existing data in a {@link DynamicTableSink} directly 
according to the filter
+ * expressions in {@code DELETE} clause.
+ *
+ * <p>Given the following SQL:
+ *
+ * <pre>{@code
+ * DELETE FROM t WHERE (a = '1' OR a = '2') AND b IS NOT NULL;
+ *
+ * }</pre>
+ *
+ * <p>In the example above, {@code [a = '1' OR a = '2']} and {@code [b IS NOT 
NULL]} are acceptable
+ * filters.
+ *
+ * <p>Flink will get the filters in conjunctive form and push down the filters 
into sink by calling
+ * method {@link #applyDeleteFilters} in the planner phase. If it returns 
true, Flink will then call
+ * method to execute the actual delete in execution phase.
+ *
+ * <p>Note: in the case that the filter expression is not available, e.g., 
sub-query or {@link
+ * #applyDeleteFilters} returns false, if the sink implements {@link 
SupportsRowLevelDelete}, Flink
+ * will try to rewrite the delete operation and produce row-level changes, see 
{@link
+ * SupportsRowLevelDelete} for more details. Otherwise, Flink will throw {@link
+ * UnsupportedOperationException}.
+ */
+@PublicEvolving
+public interface SupportsDeletePushDown {
+
+    /**
+     * Provides a list of filters from delete operation in conjunctive form in 
planning phase. A
+     * sink can either return true if it can accept all filters or return 
false if it can not
+     * accept.
+     *
+     * <p>If it returns true, Flink will then call the method {@link 
#executeDeletion} in execution
+     * phase to do the actual deletion.
+     *
+     * <p>If it returns false, and the sink still implements {@link 
SupportsRowLevelDelete}, Flink
+     * will rewrite the delete operation and produce row-level changes. 
Otherwise, Flink will throw
+     * {@link UnsupportedOperationException}.
+     */

Review Comment:
   ```suggestion
       /**
        * Applies a list of filters specified by {@code DELETE} clause in 
conjunctive form and return
        * the acceptance status to planner during planning phase.
        * @param filters a list of resolved filter expressions.
        *
        * @return true if the sink accepts all filters; false otherwise.
        */
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsRowLevelDelete.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import 
org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Enable to delete existing data in a {@link DynamicTableSink} according to 
row-level changes.
+ *
+ * <p>The planner will call method {@link #applyRowLevelDelete} to get the 
{@link
+ * RowLevelDeleteInfo} that the sink returns, and rewrite the delete operation 
based on gotten
+ * {@link RowLevelDeleteInfo} to produce rows (may rows to be deleted or 
remaining rows after the
+ * delete operation depending on sink's implementation) to {@link 
DynamicTableSink}. The sink is
+ * expected to consume these rows to achieve rows delete purpose.
+ *
+ * <p>Note: if the sink also implements {@link SupportsDeletePushDown}, the 
planner will always
+ * prefer {@link SupportsDeletePushDown}, and only the filters aren't 
available or {@link
+ * SupportsDeletePushDown#applyDeleteFilters} returns false, this interface 
will be considered and
+ * to rewrite the delete operation to produce the rows to the sink.
+ */
+@PublicEvolving
+public interface SupportsRowLevelDelete {
+
+    /**
+     * Apply row level delete with {@link RowLevelModificationScanContext} 
passed by table source,
+     * and return {@link RowLevelDeleteInfo} to guide the planner on how to 
rewrite the delete
+     * operation.
+     *
+     * <p>Note: if the table source doesn't implement {@link 
SupportsRowLevelModificationScan},
+     * there won't be any {@link RowLevelModificationScanContext} passed, so 
the {@param context}
+     * will be null.
+     */
+    RowLevelDeleteInfo applyRowLevelDelete(@Nullable 
RowLevelModificationScanContext context);
+
+    /** The information that guides the planer on how to rewrite the delete 
operation. */
+    @PublicEvolving
+    interface RowLevelDeleteInfo {
+
+        /**
+         * The required columns that the sink expects for deletion, the rows 
consumed by sink will
+         * contain the columns with the order consistent with the order of 
returned columns. If
+         * return Optional.empty(), it will select all columns.
+         */
+        default Optional<List<Column>> requiredColumns() {
+            return Optional.empty();
+        }
+
+        /**
+         * Planner will rewrite delete to query base on the {@link 
RowLevelDeleteInfo}, keeps the
+         * query of delete unchanged by default(in `DELETE_ROWS` mode), or 
change the query to the
+         * complementary set in REMAINING_ROWS mode.
+         *
+         * <p>Take the following SQL as an example:
+         *
+         * <pre>{@code
+         * DELETE FROM t WHERE y = 2;
+         * }</pre>
+         *
+         * <p>If returns {@link 
SupportsRowLevelDelete.RowLevelDeleteMode#DELETED_ROWS}, the sink
+         * will get rows to be deleted which match the filter [y = 2].
+         *
+         * <p>If returns {@link 
SupportsRowLevelDelete.RowLevelDeleteMode#REMAINING_ROWS}, the sink
+         * will get the rows which doesn't match the filter [y = 2].
+         *
+         * <p>Note: All rows will have RowKind#DELETE when RowLevelDeleteMode 
is DELETED_ROWS, and
+         * RowKind#INSERT when RowLevelDeleteMode is REMAINING_ROWS.
+         */
+        default RowLevelDeleteMode getRowLevelDeleteMode() {
+            return RowLevelDeleteMode.DELETED_ROWS;
+        }
+    }
+
+    /**
+     * Type of delete modes that the sink expects for delete purpose.
+     *
+     * <p>Currently, two modes are supported:
+     *
+     * <ul>
+     *   <li>DELETED_ROWS - in this mode, the sink will only get the rows that 
need to be deleted.
+     *   <li>REMAINING_ROWS - in this mode, the sink will only get the 
remaining rows as if the the
+     *       delete operation had been done.
+     * </ul>
+     */
+    @PublicEvolving
+    enum RowLevelDeleteMode {
+        DELETED_ROWS,

Review Comment:
   Nit: add a new line here.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.abilities;
+
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
+import org.apache.flink.table.connector.source.ScanTableSource;
+
+import javax.annotation.Nullable;
+
+/**
+ * Enables to tell the {@link ScanTableSource} the type of row-level 
modification the table source
+ * scan is for, and return {@link RowLevelModificationScanContext} which will 
then be passed to the
+ * sink which implements {@link SupportsRowLevelUpdate} or {@link 
SupportsRowLevelDelete}. It allows
+ * the table source to pass information to the table sink which is to be 
updated/deleted.
+ *
+ * <p>Note: This interface is optional for table source to support 
update/delete existing data. For
+ * the case that the table source won't need to know the type of row-level 
modification or pass
+ * information to sink, the table source doesn't need to implement it.
+ *
+ * <p>Note: Only if the {@link ScanTableSource} implements this interface, and 
the table is to be
+ * scanned to update/delete a table, the method {@link 
#applyRowLevelModificationScan} for the
+ * corresponding table source will be involved . For more details, please see 
the method {@link
+ * #applyRowLevelModificationScan}.
+ */
+public interface SupportsRowLevelModificationScan {

Review Comment:
   Personally, I feel like the doc is a little bit colloquial and verbose. What 
about
   ```
   /**
    * Enables {@link ScanTableSource} to support the row-level modification and 
return the updated or
    * deleted information described by {@link RowLevelModificationScanContext}. 
The context will be
    * propagated to the sink which implements {@link SupportsRowLevelUpdate} or 
{@link
    * SupportsRowLevelDelete}.
    *
    * <p>Note: This interface is optional for table sources to implement. See 
more details at {@link
    * #applyRowLevelModificationScan}.
    */
   ```
   Ref:  according to 
https://www.oxfordlearnersdictionaries.com/definition/english/enable
   The frequent usage of "enable" is "<1> enable sb./sth. to do sth. <2> enable 
sth."



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsDeletePushDown.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.expressions.ResolvedExpression;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Enable to delete existing data in a {@link DynamicTableSink} directly 
according to the filter
+ * expressions in {@code DELETE} clause.
+ *
+ * <p>Given the following SQL:
+ *
+ * <pre>{@code
+ * DELETE FROM t WHERE (a = '1' OR a = '2') AND b IS NOT NULL;
+ *
+ * }</pre>
+ *
+ * <p>In the example above, {@code [a = '1' OR a = '2']} and {@code [b IS NOT 
NULL]} are acceptable
+ * filters.
+ *
+ * <p>Flink will get the filters in conjunctive form and push down the filters 
into sink by calling
+ * method {@link #applyDeleteFilters} in the planner phase. If it returns 
true, Flink will then call
+ * method to execute the actual delete in execution phase.
+ *
+ * <p>Note: in the case that the filter expression is not available, e.g., 
sub-query or {@link
+ * #applyDeleteFilters} returns false, if the sink implements {@link 
SupportsRowLevelDelete}, Flink
+ * will try to rewrite the delete operation and produce row-level changes, see 
{@link
+ * SupportsRowLevelDelete} for more details. Otherwise, Flink will throw {@link
+ * UnsupportedOperationException}.
+ */
+@PublicEvolving
+public interface SupportsDeletePushDown {
+
+    /**
+     * Provides a list of filters from delete operation in conjunctive form in 
planning phase. A
+     * sink can either return true if it can accept all filters or return 
false if it can not
+     * accept.
+     *
+     * <p>If it returns true, Flink will then call the method {@link 
#executeDeletion} in execution
+     * phase to do the actual deletion.
+     *
+     * <p>If it returns false, and the sink still implements {@link 
SupportsRowLevelDelete}, Flink
+     * will rewrite the delete operation and produce row-level changes. 
Otherwise, Flink will throw
+     * {@link UnsupportedOperationException}.
+     */
+    boolean applyDeleteFilters(List<ResolvedExpression> filters);
+
+    /**
+     * Do the actual deletion in the execution phase, and return how many rows 
has been deleted,
+     * Optional.empty() is for unknown delete rows.
+     *
+     * <p>Note that this method will be involved if and only if {@link 
#applyDeleteFilters(List
+     * ResolvedExpression)} returns true. So, please make sure the 
implementation for this method
+     * will do delete the data correctly.
+     *
+     * <p>Note that in this method, the sink won't get the filters since they 
have been passed to
+     * the method {@link #applyDeleteFilters} before. The sink may need to 
keep these filters, so
+     * that it can get the filters if necessary to finish the deletion.
+     */

Review Comment:
   ```suggestion
       /**
        * Deletes data during execution phase.
        *
        * <p>Note: sink connectors which implement the interface need to invoke 
this method to delete
        * data iff {@link #applyDeleteFilters(List)} returns true.
        *
        * @return the number of the estimated rows to be deleted, or {@link 
Optional#empty()} for the
        *     unknown condition.
        */
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RowLevelModificationScanContext.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
+import 
org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan;
+
+/**
+ * The context for scan the table to do updated/delete, designed to provide 
some necessary
+ * information that the sink may need to know to from table source to perform 
update/delete. It'll
+ * be generated by table source which implements {@link 
SupportsRowLevelModificationScan}, and then
+ * passed to the sink which implements {@link SupportsRowLevelUpdate} or {@link
+ * SupportsRowLevelDelete} for UPDATE/DELETE statement in compile phase.
+ *
+ * <p>This mechanism enables the coordination between the source and the sink 
for the corresponding
+ * table to be updated/deleted.
+ *
+ * <p>The connector can implement this interface to provide custom information.

Review Comment:
   Correct me if I'm wrong, but should we be more specific to clarify that it 
is the "source connector" to   provide the custom context?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to