This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6cf7e5d2ba48bd565ab3ec2841059606a49668b1
Author: luoyuxia <[email protected]>
AuthorDate: Fri Jan 13 11:24:38 2023 +0800

    [FLINK-30661][table] introduce SupportsRowLevelModificationScan interface
---
 .../connector/RowLevelModificationScanContext.java | 37 ++++++++++
 .../SupportsRowLevelModificationScan.java          | 78 ++++++++++++++++++++++
 2 files changed, 115 insertions(+)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RowLevelModificationScanContext.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RowLevelModificationScanContext.java
new file mode 100644
index 00000000000..f0e3c35ba36
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/RowLevelModificationScanContext.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector;
+
+import org.apache.flink.annotation.PublicEvolving;
+import 
org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan;
+
+/**
+ * The context is intended to provide the relevant table scan information 
needed by the sink to
+ * perform row-level update/delete. It'll be generated by a table source that 
implements {@link
+ * SupportsRowLevelModificationScan}, and then passed to a sink which 
implements {@link
+ * SupportsRowLevelUpdate} or {@link SupportsRowLevelDelete} for executing 
UPDATE/DELETE statement
+ * during compilation phase.
+ *
+ * <p>This mechanism enables the coordination between the table sources and 
the table sink which is
+ * to be updated/deleted.
+ *
+ * <p>Connectors can implement this interface to provide custom information.
+ */
+@PublicEvolving
+public interface RowLevelModificationScanContext {}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java
new file mode 100644
index 00000000000..bbf0bfa969b
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsRowLevelModificationScan.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.abilities;
+
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.source.ScanTableSource;
+
+import javax.annotation.Nullable;
+
+/**
+ * Interface for {@link ScanTableSource}s that support the row-level 
modification. The table source
+ * is responsible for returning the information described by {@link
+ * RowLevelModificationScanContext}. The context will be propagated to the 
sink which implements
+ * {@link SupportsRowLevelUpdate} or {@link SupportsRowLevelDelete}.
+ *
+ * <p>Note: This interface is optional for table sources to implement. For 
cases where the table
+ * source neither needs to know the type of row-level modification nor 
propagate information to
+ * sink, the table source does't need to implement this interface. See more 
details at {@link
+ * #applyRowLevelModificationScan(RowLevelModificationType, 
RowLevelModificationScanContext)}.
+ */
+public interface SupportsRowLevelModificationScan {
+
+    /**
+     * Applies the type of row-level modification and the previous {@link
+     * RowLevelModificationScanContext} returned by previous table source 
scan, return a new {@link
+     * RowLevelModificationScanContext}. If the table source is the last one, 
the {@link
+     * RowLevelModificationScanContext} will be passed to the table sink. 
Otherwise, it will be
+     * passed to the following table source.
+     *
+     * <p>Note: For the all tables in the UPDATE/DELETE statement, this method 
will be involved for
+     * the corresponding table source scan.
+     *
+     * <p>Note: It may have multiple table sources in the case of sub-query. 
In such case, it will
+     * return multiple {@link RowLevelModificationScanContext}s. To handle 
such case, the planner
+     * will also pass the previous {@link RowLevelModificationScanContext} to 
the current table
+     * source scan which is expected to decide what to do with the previous 
{@link
+     * RowLevelModificationScanContext}. The order is consistent with the 
compilation order of the
+     * table sources. The planer will only pass the last context returned to 
the sink.
+     *
+     * @param previousContext the context returned by previous table source, 
if there's no previous
+     *     context, it'll be null.
+     */
+    RowLevelModificationScanContext applyRowLevelModificationScan(
+            RowLevelModificationType rowLevelModificationType,
+            @Nullable RowLevelModificationScanContext previousContext);
+
+    /**
+     * Type of the row-level modification for table.
+     *
+     * <p>Currently, two types are supported:
+     *
+     * <ul>
+     *   <li>UPDATE
+     *   <li>DELETE
+     * </ul>
+     */
+    enum RowLevelModificationType {
+        UPDATE,
+
+        DELETE
+    }
+}

Reply via email to