This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4a35882f6 [core] Enhance retraction handle when using lookup
changelog-producer (#2961)
4a35882f6 is described below
commit 4a35882f6d33f7ce5a19f6de63089aff9a5b04ce
Author: yuzelin <[email protected]>
AuthorDate: Fri Mar 8 19:09:42 2024 +0800
[core] Enhance retraction handle when using lookup changelog-producer
(#2961)
---
.../data/serializer/InternalArraySerializer.java | 2 +-
.../LookupChangelogMergeFunctionWrapper.java | 54 ++++---
.../mergetree/compact/LookupMergeFunction.java | 12 +-
.../apache/paimon/flink/PreAggregationITCase.java | 174 ++++++++++++++-------
4 files changed, 156 insertions(+), 86 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalArraySerializer.java
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalArraySerializer.java
index 8b7955ecd..b3126e4df 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalArraySerializer.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalArraySerializer.java
@@ -67,7 +67,7 @@ public class InternalArraySerializer implements
Serializer<InternalArray> {
} else if (from instanceof BinaryArray) {
return ((BinaryArray) from).copy();
} else {
- return toBinaryArray(from);
+ return toBinaryArray(from).copy();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
index bad97e8bb..319f5055a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -23,6 +23,8 @@ import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.types.RowKind;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.function.Function;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -83,39 +85,43 @@ public class LookupChangelogMergeFunctionWrapper implements
MergeFunctionWrapper
@Override
public ChangelogResult getResult() {
- reusedResult.reset();
-
- KeyValue result = mergeFunction.getResult();
- if (result == null) {
- return reusedResult;
+ // 1. Compute the latest high level record and containLevel0 of
candidates
+ LinkedList<KeyValue> candidates = mergeFunction.candidates();
+ Iterator<KeyValue> descending = candidates.descendingIterator();
+ KeyValue highLevel = null;
+ boolean containLevel0 = false;
+ while (descending.hasNext()) {
+ KeyValue kv = descending.next();
+ if (kv.level() > 0) {
+ descending.remove();
+ if (highLevel == null) {
+ highLevel = kv;
+ }
+ } else {
+ containLevel0 = true;
+ }
}
- KeyValue highLevel = mergeFunction.highLevel;
- boolean containLevel0 = mergeFunction.containLevel0;
-
- // 1. No level 0, just return
- if (!containLevel0) {
- return reusedResult.setResult(result);
+ // 2. Lookup if latest high level record is absent
+ if (highLevel == null) {
+ InternalRow lookupKey = candidates.get(0).key();
+ highLevel = lookup.apply(lookupKey);
}
- // 2. With level 0, with the latest high level, return changelog
+ // 3. Calculate result
+ mergeFunction2.reset();
if (highLevel != null) {
- setChangelog(highLevel, result);
- return reusedResult.setResult(result);
+ mergeFunction2.add(highLevel);
}
+ candidates.forEach(mergeFunction2::add);
+ KeyValue result = mergeFunction2.getResult();
- // 3. Lookup to find the latest high level record
- highLevel = lookup.apply(result.key());
-
- if (highLevel != null) {
- mergeFunction2.reset();
- mergeFunction2.add(highLevel);
- mergeFunction2.add(result);
- result = mergeFunction2.getResult();
+ // 4. Set changelog when there's level-0 records
+ reusedResult.reset();
+ if (containLevel0) {
setChangelog(highLevel, result);
- } else {
- setChangelog(null, result);
}
+
return reusedResult.setResult(result);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index c525b4676..71425ef50 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -40,9 +40,6 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
private final InternalRowSerializer keySerializer;
private final InternalRowSerializer valueSerializer;
- KeyValue highLevel;
- boolean containLevel0;
-
public LookupMergeFunction(
MergeFunction<KeyValue> mergeFunction, RowType keyType, RowType
valueType) {
this.mergeFunction = mergeFunction;
@@ -53,8 +50,6 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
@Override
public void reset() {
candidates.clear();
- highLevel = null;
- containLevel0 = false;
}
@Override
@@ -66,6 +61,7 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
public KeyValue getResult() {
// 1. Find the latest high level record
Iterator<KeyValue> descending = candidates.descendingIterator();
+ KeyValue highLevel = null;
while (descending.hasNext()) {
KeyValue kv = descending.next();
if (kv.level() > 0) {
@@ -74,8 +70,6 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
} else {
highLevel = kv;
}
- } else {
- containLevel0 = true;
}
}
@@ -85,6 +79,10 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
return mergeFunction.getResult();
}
+ LinkedList<KeyValue> candidates() {
+ return candidates;
+ }
+
public static MergeFunctionFactory<KeyValue> wrap(
MergeFunctionFactory<KeyValue> wrapped, RowType keyType, RowType
valueType) {
if (wrapped.create() instanceof FirstRowMergeFunction) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
index 6e1778917..85f5e93f0 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
@@ -30,6 +30,9 @@ import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.math.BigDecimal;
import java.time.LocalDate;
@@ -1518,74 +1521,137 @@ public class PreAggregationITCase {
checkOneRecord(result.get(2), 3, "car", "watch");
}
- @Test
- public void testRetractWithAggregation() throws Exception {
+ private static List<Arguments> retractArguments() {
+ return Arrays.asList(
+ Arguments.arguments("lookup", "aggregation"),
+ Arguments.arguments("lookup", "partial-update"),
+ Arguments.arguments("full-compaction", "aggregation"),
+ Arguments.arguments("full-compaction", "partial-update"));
+ }
+
+ @ParameterizedTest(name = "changelog-producer = {0}, merge-engine =
{1}")
+ @MethodSource("retractArguments")
+ public void testRetract(String changelogProducer, String mergeEngine)
throws Exception {
+ String sequenceGroup = "";
+ if (mergeEngine.equals("partial-update")) {
+ sequenceGroup = ", 'fields.f1.sequence-group' = 'f0'";
+ }
sql(
"CREATE TABLE test_collect("
+ " id INT PRIMARY KEY NOT ENFORCED,"
- + " f0 ARRAY<STRING>"
+ + " f0 ARRAY<STRING>,"
+ + " f1 INT"
+ ") WITH ("
- + " 'merge-engine' = 'aggregation',"
+ + " 'changelog-producer' = '%s',"
+ + " 'merge-engine' = '%s',"
+ " 'fields.f0.aggregate-function' = 'collect'"
- + ")");
+ + " %s"
+ + ")",
+ changelogProducer, mergeEngine, sequenceGroup);
- innerTestRetract(false);
- }
+ BlockingIterator<Row, Row> select = streamSqlBlockIter("SELECT *
FROM test_collect");
- @Test
- public void testRetractWithPartialUpdate() throws Exception {
- sql(
- "CREATE TABLE test_collect("
+ String temporaryTableTemplate =
+ "CREATE TEMPORARY TABLE %s ("
+ " id INT PRIMARY KEY NOT ENFORCED,"
+ " f0 ARRAY<STRING>,"
+ " f1 INT"
+ ") WITH ("
- + " 'merge-engine' = 'partial-update',"
- + " 'fields.f0.aggregate-function' = 'collect',"
- + " 'fields.f1.sequence-group' = 'f0'"
- + ")");
-
- innerTestRetract(true);
- }
-
- private void innerTestRetract(boolean partialUpdate) throws Exception {
- String temporaryTable =
- "CREATE TEMPORARY TABLE INPUT ("
- + " id INT PRIMARY KEY NOT ENFORCED,"
- + " f0 ARRAY<STRING>"
- + " %s) WITH (\n"
- + " 'connector' = 'values',\n"
- + " 'data-id' = '%s',\n"
- + " 'bounded' = 'true',\n"
- + " 'changelog-mode' = 'I,UA,UB'\n"
+ + " 'connector' = 'values',"
+ + " 'data-id' = '%s',"
+ + " 'bounded' = 'true',"
+ + " 'changelog-mode' = '%s'"
+ ")";
- String f1;
- List<Row> inputRecords;
- if (partialUpdate) {
- f1 = ", f1 INT";
- inputRecords =
- Arrays.asList(
- Row.ofKind(RowKind.INSERT, 1, new String[]
{"A", "B"}, 10),
- Row.ofKind(RowKind.UPDATE_BEFORE, 1, new
String[] {"A", "B"}, 10),
- Row.ofKind(RowKind.UPDATE_AFTER, 1, new
String[] {"C", "D"}, 20));
- } else {
- f1 = "";
- inputRecords =
- Arrays.asList(
- Row.ofKind(RowKind.INSERT, 1, new String[]
{"A", "B"}),
- Row.ofKind(RowKind.UPDATE_BEFORE, 1, new
String[] {"A", "B"}),
- Row.ofKind(RowKind.UPDATE_AFTER, 1, new
String[] {"C", "D"}));
- }
- streamSqlIter(temporaryTable, f1,
TestValuesTableFactory.registerData(inputRecords))
- .close();
+ // 1 only exists single key record
+ sql("INSERT INTO test_collect VALUES (1, ARRAY['A', 'B'], 1)");
+ List<Row> result = select.collect(1);
+ checkOneRecord(result.get(0), 1, "A", "B");
+
+ // 1.1 UB + UA (CANNOT handle)
+ List<Row> inputRecords =
+ Arrays.asList(
+ Row.ofKind(RowKind.UPDATE_BEFORE, 1, new String[]
{"A", "B"}, 2),
+ Row.ofKind(RowKind.UPDATE_AFTER, 1, new String[]
{"C", "D"}, 3));
+ sEnv.executeSql(
+ String.format(
+ temporaryTableTemplate,
+ "INPUT11",
+
TestValuesTableFactory.registerData(inputRecords),
+ "UB,UA"))
+ .await();
+ sEnv.executeSql("INSERT INTO test_collect SELECT * FROM
INPUT11").await();
+
+ result = select.collect(2);
+
assertThat(result.get(0).getKind()).isEqualTo(RowKind.UPDATE_BEFORE);
+ checkOneRecord(result.get(0), 1, "A", "B");
+
assertThat(result.get(1).getKind()).isEqualTo(RowKind.UPDATE_AFTER);
+ checkOneRecord(result.get(1), 1, "A", "B", "C", "D");
+
+ // 1.2 -D
+ inputRecords =
+ Collections.singletonList(
+ Row.ofKind(RowKind.DELETE, 1, new String[] {"C",
"D"}, 4));
+ sEnv.executeSql(
+ String.format(
+ temporaryTableTemplate,
+ "INPUT12",
+
TestValuesTableFactory.registerData(inputRecords),
+ "D"))
+ .await();
+ sEnv.executeSql("INSERT INTO test_collect SELECT * FROM
INPUT12").await();
+
+ result = select.collect(2);
+
assertThat(result.get(0).getKind()).isEqualTo(RowKind.UPDATE_BEFORE);
+ checkOneRecord(result.get(0), 1, "A", "B", "C", "D");
+
assertThat(result.get(1).getKind()).isEqualTo(RowKind.UPDATE_AFTER);
+ checkOneRecord(result.get(1), 1, "A", "B");
+
+ // 2 exists multiple key records
+ sql("INSERT INTO test_collect VALUES (2, ARRAY['A', 'B'], 5), (3,
ARRAY['A', 'B'], 6)");
+ result = select.collect(2);
+ checkOneRecord(result.get(0), 2, "A", "B");
+ checkOneRecord(result.get(1), 3, "A", "B");
+
+ // 2.1 UB + UA (CANNOT handle)
+ inputRecords =
+ Arrays.asList(
+ Row.ofKind(RowKind.UPDATE_BEFORE, 2, new String[]
{"A", "B"}, 7),
+ Row.ofKind(RowKind.UPDATE_AFTER, 2, new String[]
{"C", "D"}, 8));
+ sEnv.executeSql(
+ String.format(
+ temporaryTableTemplate,
+ "INPUT21",
+
TestValuesTableFactory.registerData(inputRecords),
+ "UB,UA"))
+ .await();
+ sEnv.executeSql("INSERT INTO test_collect SELECT * FROM
INPUT21").await();
+
+ result = select.collect(2);
+
assertThat(result.get(0).getKind()).isEqualTo(RowKind.UPDATE_BEFORE);
+ checkOneRecord(result.get(0), 2, "A", "B");
+
assertThat(result.get(1).getKind()).isEqualTo(RowKind.UPDATE_AFTER);
+ checkOneRecord(result.get(1), 2, "A", "B", "C", "D");
+
+ // 2.2 -D
+ inputRecords =
+ Collections.singletonList(Row.ofKind(RowKind.DELETE, 3,
new String[] {"A"}, 9));
+ sEnv.executeSql(
+ String.format(
+ temporaryTableTemplate,
+ "INPUT22",
+
TestValuesTableFactory.registerData(inputRecords),
+ "D"))
+ .await();
+ sEnv.executeSql("INSERT INTO test_collect SELECT * FROM
INPUT22").await();
+
+ result = select.collect(2);
+
assertThat(result.get(0).getKind()).isEqualTo(RowKind.UPDATE_BEFORE);
+ checkOneRecord(result.get(0), 3, "A", "B");
+
assertThat(result.get(1).getKind()).isEqualTo(RowKind.UPDATE_AFTER);
+ checkOneRecord(result.get(1), 3, "B");
- sEnv.executeSql("INSERT INTO test_collect SELECT * FROM
INPUT").await();
-
- List<Row> result = sql("SELECT * FROM test_collect");
- assertThat(result.size()).isEqualTo(1);
- // if not retracted, the result would be ['A', 'B', 'C', 'D']
- checkOneRecord(result.get(0), 1, "C", "D");
+ select.close();
}
private void checkOneRecord(Row row, int id, String... elements) {