This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.0 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 2cbdc1df99b5ac3847fa40c2b0352a441980850a Author: yuzelin <[email protected]> AuthorDate: Tue Jan 21 10:22:21 2025 +0800 [core] Fix NPE when retracting collect and merge-map (#4960) = --- .../compact/aggregate/FieldCollectAgg.java | 10 ++- .../compact/aggregate/FieldMergeMapAgg.java | 10 ++- .../apache/paimon/flink/PreAggregationITCase.java | 81 ++++++++++++++++++++++ 3 files changed, 99 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java index afe5e05e70..d9e706f6e8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java @@ -155,18 +155,26 @@ public class FieldCollectAgg extends FieldAggregator { @Override public Object retract(Object accumulator, Object retractField) { + // it's hard to mark the input is retracted without accumulator if (accumulator == null) { return null; } - InternalArray acc = (InternalArray) accumulator; + // nothing to be retracted + if (retractField == null) { + return accumulator; + } InternalArray retract = (InternalArray) retractField; + if (retract.size() == 0) { + return accumulator; + } List<Object> retractedElements = new ArrayList<>(); for (int i = 0; i < retract.size(); i++) { retractedElements.add(elementGetter.getElementOrNull(retract, i)); } + InternalArray acc = (InternalArray) accumulator; List<Object> accElements = new ArrayList<>(); for (int i = 0; i < acc.size(); i++) { Object candidate = elementGetter.getElementOrNull(acc, i); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java index 9965339afd..487f20e3fd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java @@ -69,12 +69,19 @@ public class FieldMergeMapAgg extends FieldAggregator { @Override public Object retract(Object accumulator, Object retractField) { + // it's hard to mark the input is retracted without accumulator if (accumulator == null) { return null; } - InternalMap acc = (InternalMap) accumulator; + // nothing to be retracted + if (retractField == null) { + return accumulator; + } InternalMap retract = (InternalMap) retractField; + if (retract.size() == 0) { + return accumulator; + } InternalArray retractKeyArray = retract.keyArray(); Set<Object> retractKeys = new HashSet<>(); @@ -82,6 +89,7 @@ public class FieldMergeMapAgg extends FieldAggregator { retractKeys.add(keyGetter.getElementOrNull(retractKeyArray, i)); } + InternalMap acc = (InternalMap) accumulator; Map<Object, Object> resultMap = new HashMap<>(); InternalArray accKeyArray = acc.keyArray(); InternalArray accValueArray = acc.valueArray(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java index b8dfd8f6a8..954c1455d4 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java @@ -1709,6 +1709,46 @@ public class PreAggregationITCase { select.close(); } + @Test + public void testRetractInputNull() throws Exception { + sql( + "CREATE TABLE test_collect (" + + " id INT PRIMARY KEY NOT ENFORCED," + + " f0 ARRAY<STRING>," + + " f1 INT" + + ") WITH (" + + " 'changelog-producer' = 'lookup'," + + " 'merge-engine' = 'partial-update'," + + " 'fields.f0.aggregate-function' = 'collect'," + + " 'fields.f1.sequence-group' = 'f0'" + + ")"); + + List<Row> input = + Arrays.asList( + Row.ofKind(RowKind.INSERT, 1, null, 1), + Row.ofKind(RowKind.INSERT, 1, new String[] {"A"}, 2), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, null, 1), + Row.ofKind(RowKind.UPDATE_AFTER, 1, new String[] {"B"}, 3)); + sEnv.executeSql( + String.format( + "CREATE TEMPORARY TABLE input (" + + " id INT PRIMARY KEY NOT ENFORCED," + + " f0 ARRAY<STRING>," + + " f1 INT" + + ") WITH (" + + " 'connector' = 'values'," + + " 'data-id' = '%s'," + + " 'bounded' = 'true'," + + " 'changelog-mode' = 'UB,UA'" + + ")", + TestValuesTableFactory.registerData(input))) + .await(); + sEnv.executeSql("INSERT INTO test_collect SELECT * FROM input").await(); + + assertThat(sql("SELECT * FROM test_collect")) + .containsExactly(Row.of(1, new String[] {"A", "B"}, 3)); + } + private void checkOneRecord(Row row, int id, String... elements) { assertThat(row.getField(0)).isEqualTo(id); if (elements == null || elements.length == 0) { @@ -1759,6 +1799,47 @@ public class PreAggregationITCase { checkOneRecord(result.get(2), 3, toMap(1, "a", 2, "b", 3, "c")); } + @Test + public void testRetractInputNull() throws Exception { + sql( + "CREATE TABLE test_merge_map1 (" + + " id INT PRIMARY KEY NOT ENFORCED," + + " f0 MAP<INT, STRING>," + + " f1 INT" + + ") WITH (" + + " 'changelog-producer' = 'lookup'," + + " 'merge-engine' = 'partial-update'," + + " 'fields.f0.aggregate-function' = 'merge_map'," + + " 'fields.f1.sequence-group' = 'f0'" + + ")"); + + List<Row> input = + Arrays.asList( + Row.ofKind(RowKind.INSERT, 1, null, 1), + Row.ofKind(RowKind.INSERT, 1, Collections.singletonMap(1, "A"), 2), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, null, 1), + Row.ofKind( + RowKind.UPDATE_AFTER, 1, Collections.singletonMap(2, "B"), 3)); + sEnv.executeSql( + String.format( + "CREATE TEMPORARY TABLE input (" + + " id INT PRIMARY KEY NOT ENFORCED," + + " f0 MAP<INT, STRING>," + + " f1 INT" + + ") WITH (" + + " 'connector' = 'values'," + + " 'data-id' = '%s'," + + " 'bounded' = 'true'," + + " 'changelog-mode' = 'UB,UA'" + + ")", + TestValuesTableFactory.registerData(input))) + .await(); + sEnv.executeSql("INSERT INTO test_merge_map1 SELECT * FROM input").await(); + + assertThat(sql("SELECT * FROM test_merge_map1")) + .containsExactly(Row.of(1, toMap(1, "A", 2, "B"), 3)); + } + private Map<Object, Object> toMap(Object... kvs) { Map<Object, Object> result = new HashMap<>(); for (int i = 0; i < kvs.length; i += 2) {
