This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.0
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 2cbdc1df99b5ac3847fa40c2b0352a441980850a
Author: yuzelin <[email protected]>
AuthorDate: Tue Jan 21 10:22:21 2025 +0800

    [core] Fix NPE when retracting collect and merge-map (#4960)
    
    =
---
 .../compact/aggregate/FieldCollectAgg.java         | 10 ++-
 .../compact/aggregate/FieldMergeMapAgg.java        | 10 ++-
 .../apache/paimon/flink/PreAggregationITCase.java  | 81 ++++++++++++++++++++++
 3 files changed, 99 insertions(+), 2 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java
index afe5e05e70..d9e706f6e8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java
@@ -155,18 +155,26 @@ public class FieldCollectAgg extends FieldAggregator {
 
     @Override
     public Object retract(Object accumulator, Object retractField) {
+        // it's hard to mark the input is retracted without accumulator
         if (accumulator == null) {
             return null;
         }
 
-        InternalArray acc = (InternalArray) accumulator;
+        // nothing to be retracted
+        if (retractField == null) {
+            return accumulator;
+        }
         InternalArray retract = (InternalArray) retractField;
+        if (retract.size() == 0) {
+            return accumulator;
+        }
 
         List<Object> retractedElements = new ArrayList<>();
         for (int i = 0; i < retract.size(); i++) {
             retractedElements.add(elementGetter.getElementOrNull(retract, i));
         }
 
+        InternalArray acc = (InternalArray) accumulator;
         List<Object> accElements = new ArrayList<>();
         for (int i = 0; i < acc.size(); i++) {
             Object candidate = elementGetter.getElementOrNull(acc, i);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java
index 9965339afd..487f20e3fd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java
@@ -69,12 +69,19 @@ public class FieldMergeMapAgg extends FieldAggregator {
 
     @Override
     public Object retract(Object accumulator, Object retractField) {
+        // it's hard to mark the input is retracted without accumulator
         if (accumulator == null) {
             return null;
         }
 
-        InternalMap acc = (InternalMap) accumulator;
+        // nothing to be retracted
+        if (retractField == null) {
+            return accumulator;
+        }
         InternalMap retract = (InternalMap) retractField;
+        if (retract.size() == 0) {
+            return accumulator;
+        }
 
         InternalArray retractKeyArray = retract.keyArray();
         Set<Object> retractKeys = new HashSet<>();
@@ -82,6 +89,7 @@ public class FieldMergeMapAgg extends FieldAggregator {
             retractKeys.add(keyGetter.getElementOrNull(retractKeyArray, i));
         }
 
+        InternalMap acc = (InternalMap) accumulator;
         Map<Object, Object> resultMap = new HashMap<>();
         InternalArray accKeyArray = acc.keyArray();
         InternalArray accValueArray = acc.valueArray();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
index b8dfd8f6a8..954c1455d4 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
@@ -1709,6 +1709,46 @@ public class PreAggregationITCase {
             select.close();
         }
 
+        @Test
+        public void testRetractInputNull() throws Exception {
+            sql(
+                    "CREATE TABLE test_collect ("
+                            + "  id INT PRIMARY KEY NOT ENFORCED,"
+                            + "  f0 ARRAY<STRING>,"
+                            + "  f1 INT"
+                            + ") WITH ("
+                            + "  'changelog-producer' = 'lookup',"
+                            + "  'merge-engine' = 'partial-update',"
+                            + "  'fields.f0.aggregate-function' = 'collect',"
+                            + "  'fields.f1.sequence-group' = 'f0'"
+                            + ")");
+
+            List<Row> input =
+                    Arrays.asList(
+                            Row.ofKind(RowKind.INSERT, 1, null, 1),
+                            Row.ofKind(RowKind.INSERT, 1, new String[] {"A"}, 
2),
+                            Row.ofKind(RowKind.UPDATE_BEFORE, 1, null, 1),
+                            Row.ofKind(RowKind.UPDATE_AFTER, 1, new String[] 
{"B"}, 3));
+            sEnv.executeSql(
+                            String.format(
+                                    "CREATE TEMPORARY TABLE input ("
+                                            + "  id INT PRIMARY KEY NOT 
ENFORCED,"
+                                            + "  f0 ARRAY<STRING>,"
+                                            + "  f1 INT"
+                                            + ") WITH ("
+                                            + "  'connector' = 'values',"
+                                            + "  'data-id' = '%s',"
+                                            + "  'bounded' = 'true',"
+                                            + "  'changelog-mode' = 'UB,UA'"
+                                            + ")",
+                                    
TestValuesTableFactory.registerData(input)))
+                    .await();
+            sEnv.executeSql("INSERT INTO test_collect SELECT * FROM 
input").await();
+
+            assertThat(sql("SELECT * FROM test_collect"))
+                    .containsExactly(Row.of(1, new String[] {"A", "B"}, 3));
+        }
+
         private void checkOneRecord(Row row, int id, String... elements) {
             assertThat(row.getField(0)).isEqualTo(id);
             if (elements == null || elements.length == 0) {
@@ -1759,6 +1799,47 @@ public class PreAggregationITCase {
             checkOneRecord(result.get(2), 3, toMap(1, "a", 2, "b", 3, "c"));
         }
 
+        @Test
+        public void testRetractInputNull() throws Exception {
+            sql(
+                    "CREATE TABLE test_merge_map1 ("
+                            + "  id INT PRIMARY KEY NOT ENFORCED,"
+                            + "  f0 MAP<INT, STRING>,"
+                            + "  f1 INT"
+                            + ") WITH ("
+                            + "  'changelog-producer' = 'lookup',"
+                            + "  'merge-engine' = 'partial-update',"
+                            + "  'fields.f0.aggregate-function' = 'merge_map',"
+                            + "  'fields.f1.sequence-group' = 'f0'"
+                            + ")");
+
+            List<Row> input =
+                    Arrays.asList(
+                            Row.ofKind(RowKind.INSERT, 1, null, 1),
+                            Row.ofKind(RowKind.INSERT, 1, 
Collections.singletonMap(1, "A"), 2),
+                            Row.ofKind(RowKind.UPDATE_BEFORE, 1, null, 1),
+                            Row.ofKind(
+                                    RowKind.UPDATE_AFTER, 1, 
Collections.singletonMap(2, "B"), 3));
+            sEnv.executeSql(
+                            String.format(
+                                    "CREATE TEMPORARY TABLE input ("
+                                            + "  id INT PRIMARY KEY NOT 
ENFORCED,"
+                                            + "  f0 MAP<INT, STRING>,"
+                                            + "  f1 INT"
+                                            + ") WITH ("
+                                            + "  'connector' = 'values',"
+                                            + "  'data-id' = '%s',"
+                                            + "  'bounded' = 'true',"
+                                            + "  'changelog-mode' = 'UB,UA'"
+                                            + ")",
+                                    
TestValuesTableFactory.registerData(input)))
+                    .await();
+            sEnv.executeSql("INSERT INTO test_merge_map1 SELECT * FROM 
input").await();
+
+            assertThat(sql("SELECT * FROM test_merge_map1"))
+                    .containsExactly(Row.of(1, toMap(1, "A", 2, "B"), 3));
+        }
+
         private Map<Object, Object> toMap(Object... kvs) {
             Map<Object, Object> result = new HashMap<>();
             for (int i = 0; i < kvs.length; i += 2) {

Reply via email to