This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 80128c4878 [flink] Flink batch delete supports 
aggregation.remove-record-on-delete option (#5402)
80128c4878 is described below

commit 80128c4878c074abe94ce026430b21906e03afb6
Author: yuzelin <[email protected]>
AuthorDate: Tue Apr 8 17:29:25 2025 +0800

    [flink] Flink batch delete supports aggregation.remove-record-on-delete 
option (#5402)
---
 .../flink/sink/SupportsRowLevelOperationFlinkTableSink.java   | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
index 86be6f347a..6bdd17055c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
@@ -53,6 +53,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static 
org.apache.paimon.CoreOptions.AGGREGATION_REMOVE_RECORD_ON_DELETE;
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
 import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
 import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
@@ -201,6 +202,16 @@ public abstract class 
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
                                     SEQUENCE_GROUP,
                                     
PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP));
                 }
+            case AGGREGATE:
+                if (options.get(AGGREGATION_REMOVE_RECORD_ON_DELETE)) {
+                    return;
+                } else {
+                    throw new UnsupportedOperationException(
+                            String.format(
+                                    "Merge engine %s doesn't support batch 
delete by default. To support batch delete, "
+                                            + "please set %s to true.",
+                                    mergeEngine, 
AGGREGATION_REMOVE_RECORD_ON_DELETE.key()));
+                }
             default:
                 throw new UnsupportedOperationException(
                         String.format(

Reply via email to