This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 80128c4878 [flink] Flink batch delete supports
aggregation.remove-record-on-delete option (#5402)
80128c4878 is described below
commit 80128c4878c074abe94ce026430b21906e03afb6
Author: yuzelin <[email protected]>
AuthorDate: Tue Apr 8 17:29:25 2025 +0800
[flink] Flink batch delete supports aggregation.remove-record-on-delete
option (#5402)
---
.../flink/sink/SupportsRowLevelOperationFlinkTableSink.java | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
index 86be6f347a..6bdd17055c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
@@ -53,6 +53,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
+import static
org.apache.paimon.CoreOptions.AGGREGATION_REMOVE_RECORD_ON_DELETE;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
@@ -201,6 +202,16 @@ public abstract class
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
SEQUENCE_GROUP,
PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP));
}
+ case AGGREGATE:
+ if (options.get(AGGREGATION_REMOVE_RECORD_ON_DELETE)) {
+ return;
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Merge engine %s doesn't support batch
delete by default. To support batch delete, "
+ + "please set %s to true.",
+ mergeEngine,
AGGREGATION_REMOVE_RECORD_ON_DELETE.key()));
+ }
default:
throw new UnsupportedOperationException(
String.format(