This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b7e63fa419 [core] Fix NPE when retracting collect and merge-map (#4960)
b7e63fa419 is described below
commit b7e63fa419a5a7fd93e558675de68e66867e7f6d
Author: yuzelin <[email protected]>
AuthorDate: Tue Jan 21 10:22:21 2025 +0800
[core] Fix NPE when retracting collect and merge-map (#4960)
=
---
.../compact/aggregate/FieldCollectAgg.java | 10 ++-
.../compact/aggregate/FieldMergeMapAgg.java | 10 ++-
.../apache/paimon/flink/PreAggregationITCase.java | 81 ++++++++++++++++++++++
3 files changed, 99 insertions(+), 2 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java
index afe5e05e70..d9e706f6e8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java
@@ -155,18 +155,26 @@ public class FieldCollectAgg extends FieldAggregator {
@Override
public Object retract(Object accumulator, Object retractField) {
+ // it's hard to mark the input is retracted without accumulator
if (accumulator == null) {
return null;
}
- InternalArray acc = (InternalArray) accumulator;
+ // nothing to be retracted
+ if (retractField == null) {
+ return accumulator;
+ }
InternalArray retract = (InternalArray) retractField;
+ if (retract.size() == 0) {
+ return accumulator;
+ }
List<Object> retractedElements = new ArrayList<>();
for (int i = 0; i < retract.size(); i++) {
retractedElements.add(elementGetter.getElementOrNull(retract, i));
}
+ InternalArray acc = (InternalArray) accumulator;
List<Object> accElements = new ArrayList<>();
for (int i = 0; i < acc.size(); i++) {
Object candidate = elementGetter.getElementOrNull(acc, i);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java
index 9965339afd..487f20e3fd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java
@@ -69,12 +69,19 @@ public class FieldMergeMapAgg extends FieldAggregator {
@Override
public Object retract(Object accumulator, Object retractField) {
+ // it's hard to mark the input is retracted without accumulator
if (accumulator == null) {
return null;
}
- InternalMap acc = (InternalMap) accumulator;
+ // nothing to be retracted
+ if (retractField == null) {
+ return accumulator;
+ }
InternalMap retract = (InternalMap) retractField;
+ if (retract.size() == 0) {
+ return accumulator;
+ }
InternalArray retractKeyArray = retract.keyArray();
Set<Object> retractKeys = new HashSet<>();
@@ -82,6 +89,7 @@ public class FieldMergeMapAgg extends FieldAggregator {
retractKeys.add(keyGetter.getElementOrNull(retractKeyArray, i));
}
+ InternalMap acc = (InternalMap) accumulator;
Map<Object, Object> resultMap = new HashMap<>();
InternalArray accKeyArray = acc.keyArray();
InternalArray accValueArray = acc.valueArray();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
index b8dfd8f6a8..954c1455d4 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
@@ -1709,6 +1709,46 @@ public class PreAggregationITCase {
select.close();
}
+ @Test
+ public void testRetractInputNull() throws Exception {
+ sql(
+ "CREATE TABLE test_collect ("
+ + " id INT PRIMARY KEY NOT ENFORCED,"
+ + " f0 ARRAY<STRING>,"
+ + " f1 INT"
+ + ") WITH ("
+ + " 'changelog-producer' = 'lookup',"
+ + " 'merge-engine' = 'partial-update',"
+ + " 'fields.f0.aggregate-function' = 'collect',"
+ + " 'fields.f1.sequence-group' = 'f0'"
+ + ")");
+
+ List<Row> input =
+ Arrays.asList(
+ Row.ofKind(RowKind.INSERT, 1, null, 1),
+ Row.ofKind(RowKind.INSERT, 1, new String[] {"A"},
2),
+ Row.ofKind(RowKind.UPDATE_BEFORE, 1, null, 1),
+ Row.ofKind(RowKind.UPDATE_AFTER, 1, new String[]
{"B"}, 3));
+ sEnv.executeSql(
+ String.format(
+ "CREATE TEMPORARY TABLE input ("
+ + " id INT PRIMARY KEY NOT
ENFORCED,"
+ + " f0 ARRAY<STRING>,"
+ + " f1 INT"
+ + ") WITH ("
+ + " 'connector' = 'values',"
+ + " 'data-id' = '%s',"
+ + " 'bounded' = 'true',"
+ + " 'changelog-mode' = 'UB,UA'"
+ + ")",
+
TestValuesTableFactory.registerData(input)))
+ .await();
+ sEnv.executeSql("INSERT INTO test_collect SELECT * FROM
input").await();
+
+ assertThat(sql("SELECT * FROM test_collect"))
+ .containsExactly(Row.of(1, new String[] {"A", "B"}, 3));
+ }
+
private void checkOneRecord(Row row, int id, String... elements) {
assertThat(row.getField(0)).isEqualTo(id);
if (elements == null || elements.length == 0) {
@@ -1759,6 +1799,47 @@ public class PreAggregationITCase {
checkOneRecord(result.get(2), 3, toMap(1, "a", 2, "b", 3, "c"));
}
+ @Test
+ public void testRetractInputNull() throws Exception {
+ sql(
+ "CREATE TABLE test_merge_map1 ("
+ + " id INT PRIMARY KEY NOT ENFORCED,"
+ + " f0 MAP<INT, STRING>,"
+ + " f1 INT"
+ + ") WITH ("
+ + " 'changelog-producer' = 'lookup',"
+ + " 'merge-engine' = 'partial-update',"
+ + " 'fields.f0.aggregate-function' = 'merge_map',"
+ + " 'fields.f1.sequence-group' = 'f0'"
+ + ")");
+
+ List<Row> input =
+ Arrays.asList(
+ Row.ofKind(RowKind.INSERT, 1, null, 1),
+ Row.ofKind(RowKind.INSERT, 1,
Collections.singletonMap(1, "A"), 2),
+ Row.ofKind(RowKind.UPDATE_BEFORE, 1, null, 1),
+ Row.ofKind(
+ RowKind.UPDATE_AFTER, 1,
Collections.singletonMap(2, "B"), 3));
+ sEnv.executeSql(
+ String.format(
+ "CREATE TEMPORARY TABLE input ("
+ + " id INT PRIMARY KEY NOT
ENFORCED,"
+ + " f0 MAP<INT, STRING>,"
+ + " f1 INT"
+ + ") WITH ("
+ + " 'connector' = 'values',"
+ + " 'data-id' = '%s',"
+ + " 'bounded' = 'true',"
+ + " 'changelog-mode' = 'UB,UA'"
+ + ")",
+
TestValuesTableFactory.registerData(input)))
+ .await();
+ sEnv.executeSql("INSERT INTO test_merge_map1 SELECT * FROM
input").await();
+
+ assertThat(sql("SELECT * FROM test_merge_map1"))
+ .containsExactly(Row.of(1, toMap(1, "A", 2, "B"), 3));
+ }
+
private Map<Object, Object> toMap(Object... kvs) {
Map<Object, Object> result = new HashMap<>();
for (int i = 0; i < kvs.length; i += 2) {