This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a35882f6 [core] Enhance retraction handle when using lookup 
changelog-producer (#2961)
4a35882f6 is described below

commit 4a35882f6d33f7ce5a19f6de63089aff9a5b04ce
Author: yuzelin <[email protected]>
AuthorDate: Fri Mar 8 19:09:42 2024 +0800

    [core] Enhance retraction handle when using lookup changelog-producer 
(#2961)
---
 .../data/serializer/InternalArraySerializer.java   |   2 +-
 .../LookupChangelogMergeFunctionWrapper.java       |  54 ++++---
 .../mergetree/compact/LookupMergeFunction.java     |  12 +-
 .../apache/paimon/flink/PreAggregationITCase.java  | 174 ++++++++++++++-------
 4 files changed, 156 insertions(+), 86 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalArraySerializer.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalArraySerializer.java
index 8b7955ecd..b3126e4df 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalArraySerializer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalArraySerializer.java
@@ -67,7 +67,7 @@ public class InternalArraySerializer implements 
Serializer<InternalArray> {
         } else if (from instanceof BinaryArray) {
             return ((BinaryArray) from).copy();
         } else {
-            return toBinaryArray(from);
+            return toBinaryArray(from).copy();
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
index bad97e8bb..319f5055a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -23,6 +23,8 @@ import org.apache.paimon.codegen.RecordEqualiser;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.types.RowKind;
 
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.function.Function;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -83,39 +85,43 @@ public class LookupChangelogMergeFunctionWrapper implements 
MergeFunctionWrapper
 
     @Override
     public ChangelogResult getResult() {
-        reusedResult.reset();
-
-        KeyValue result = mergeFunction.getResult();
-        if (result == null) {
-            return reusedResult;
+        // 1. Compute the latest high level record and containLevel0 of 
candidates
+        LinkedList<KeyValue> candidates = mergeFunction.candidates();
+        Iterator<KeyValue> descending = candidates.descendingIterator();
+        KeyValue highLevel = null;
+        boolean containLevel0 = false;
+        while (descending.hasNext()) {
+            KeyValue kv = descending.next();
+            if (kv.level() > 0) {
+                descending.remove();
+                if (highLevel == null) {
+                    highLevel = kv;
+                }
+            } else {
+                containLevel0 = true;
+            }
         }
 
-        KeyValue highLevel = mergeFunction.highLevel;
-        boolean containLevel0 = mergeFunction.containLevel0;
-
-        // 1. No level 0, just return
-        if (!containLevel0) {
-            return reusedResult.setResult(result);
+        // 2. Lookup if latest high level record is absent
+        if (highLevel == null) {
+            InternalRow lookupKey = candidates.get(0).key();
+            highLevel = lookup.apply(lookupKey);
         }
 
-        // 2. With level 0, with the latest high level, return changelog
+        // 3. Calculate result
+        mergeFunction2.reset();
         if (highLevel != null) {
-            setChangelog(highLevel, result);
-            return reusedResult.setResult(result);
+            mergeFunction2.add(highLevel);
         }
+        candidates.forEach(mergeFunction2::add);
+        KeyValue result = mergeFunction2.getResult();
 
-        // 3. Lookup to find the latest high level record
-        highLevel = lookup.apply(result.key());
-
-        if (highLevel != null) {
-            mergeFunction2.reset();
-            mergeFunction2.add(highLevel);
-            mergeFunction2.add(result);
-            result = mergeFunction2.getResult();
+        // 4. Set changelog when there's level-0 records
+        reusedResult.reset();
+        if (containLevel0) {
             setChangelog(highLevel, result);
-        } else {
-            setChangelog(null, result);
         }
+
         return reusedResult.setResult(result);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index c525b4676..71425ef50 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -40,9 +40,6 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
     private final InternalRowSerializer keySerializer;
     private final InternalRowSerializer valueSerializer;
 
-    KeyValue highLevel;
-    boolean containLevel0;
-
     public LookupMergeFunction(
             MergeFunction<KeyValue> mergeFunction, RowType keyType, RowType 
valueType) {
         this.mergeFunction = mergeFunction;
@@ -53,8 +50,6 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
     @Override
     public void reset() {
         candidates.clear();
-        highLevel = null;
-        containLevel0 = false;
     }
 
     @Override
@@ -66,6 +61,7 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
     public KeyValue getResult() {
         // 1. Find the latest high level record
         Iterator<KeyValue> descending = candidates.descendingIterator();
+        KeyValue highLevel = null;
         while (descending.hasNext()) {
             KeyValue kv = descending.next();
             if (kv.level() > 0) {
@@ -74,8 +70,6 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
                 } else {
                     highLevel = kv;
                 }
-            } else {
-                containLevel0 = true;
             }
         }
 
@@ -85,6 +79,10 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
         return mergeFunction.getResult();
     }
 
+    LinkedList<KeyValue> candidates() {
+        return candidates;
+    }
+
     public static MergeFunctionFactory<KeyValue> wrap(
             MergeFunctionFactory<KeyValue> wrapped, RowType keyType, RowType 
valueType) {
         if (wrapped.create() instanceof FirstRowMergeFunction) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
index 6e1778917..85f5e93f0 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
@@ -30,6 +30,9 @@ import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.math.BigDecimal;
 import java.time.LocalDate;
@@ -1518,74 +1521,137 @@ public class PreAggregationITCase {
             checkOneRecord(result.get(2), 3, "car", "watch");
         }
 
-        @Test
-        public void testRetractWithAggregation() throws Exception {
+        private static List<Arguments> retractArguments() {
+            return Arrays.asList(
+                    Arguments.arguments("lookup", "aggregation"),
+                    Arguments.arguments("lookup", "partial-update"),
+                    Arguments.arguments("full-compaction", "aggregation"),
+                    Arguments.arguments("full-compaction", "partial-update"));
+        }
+
+        @ParameterizedTest(name = "changelog-producer = {0}, merge-engine = 
{1}")
+        @MethodSource("retractArguments")
+        public void testRetract(String changelogProducer, String mergeEngine) 
throws Exception {
+            String sequenceGroup = "";
+            if (mergeEngine.equals("partial-update")) {
+                sequenceGroup = ", 'fields.f1.sequence-group' = 'f0'";
+            }
             sql(
                     "CREATE TABLE test_collect("
                             + "  id INT PRIMARY KEY NOT ENFORCED,"
-                            + "  f0 ARRAY<STRING>"
+                            + "  f0 ARRAY<STRING>,"
+                            + "  f1 INT"
                             + ") WITH ("
-                            + "  'merge-engine' = 'aggregation',"
+                            + "  'changelog-producer' = '%s',"
+                            + "  'merge-engine' = '%s',"
                             + "  'fields.f0.aggregate-function' = 'collect'"
-                            + ")");
+                            + "  %s"
+                            + ")",
+                    changelogProducer, mergeEngine, sequenceGroup);
 
-            innerTestRetract(false);
-        }
+            BlockingIterator<Row, Row> select = streamSqlBlockIter("SELECT * 
FROM test_collect");
 
-        @Test
-        public void testRetractWithPartialUpdate() throws Exception {
-            sql(
-                    "CREATE TABLE test_collect("
+            String temporaryTableTemplate =
+                    "CREATE TEMPORARY TABLE %s ("
                             + "  id INT PRIMARY KEY NOT ENFORCED,"
                             + "  f0 ARRAY<STRING>,"
                             + "  f1 INT"
                             + ") WITH ("
-                            + "  'merge-engine' = 'partial-update',"
-                            + "  'fields.f0.aggregate-function' = 'collect',"
-                            + "  'fields.f1.sequence-group' = 'f0'"
-                            + ")");
-
-            innerTestRetract(true);
-        }
-
-        private void innerTestRetract(boolean partialUpdate) throws Exception {
-            String temporaryTable =
-                    "CREATE TEMPORARY TABLE INPUT ("
-                            + "  id INT PRIMARY KEY NOT ENFORCED,"
-                            + "  f0 ARRAY<STRING>"
-                            + "  %s) WITH (\n"
-                            + "  'connector' = 'values',\n"
-                            + "  'data-id' = '%s',\n"
-                            + "  'bounded' = 'true',\n"
-                            + "  'changelog-mode' = 'I,UA,UB'\n"
+                            + "  'connector' = 'values',"
+                            + "  'data-id' = '%s',"
+                            + "  'bounded' = 'true',"
+                            + "  'changelog-mode' = '%s'"
                             + ")";
 
-            String f1;
-            List<Row> inputRecords;
-            if (partialUpdate) {
-                f1 = ", f1 INT";
-                inputRecords =
-                        Arrays.asList(
-                                Row.ofKind(RowKind.INSERT, 1, new String[] 
{"A", "B"}, 10),
-                                Row.ofKind(RowKind.UPDATE_BEFORE, 1, new 
String[] {"A", "B"}, 10),
-                                Row.ofKind(RowKind.UPDATE_AFTER, 1, new 
String[] {"C", "D"}, 20));
-            } else {
-                f1 = "";
-                inputRecords =
-                        Arrays.asList(
-                                Row.ofKind(RowKind.INSERT, 1, new String[] 
{"A", "B"}),
-                                Row.ofKind(RowKind.UPDATE_BEFORE, 1, new 
String[] {"A", "B"}),
-                                Row.ofKind(RowKind.UPDATE_AFTER, 1, new 
String[] {"C", "D"}));
-            }
-            streamSqlIter(temporaryTable, f1, 
TestValuesTableFactory.registerData(inputRecords))
-                    .close();
+            // 1 only exists single key record
+            sql("INSERT INTO test_collect VALUES (1, ARRAY['A', 'B'], 1)");
+            List<Row> result = select.collect(1);
+            checkOneRecord(result.get(0), 1, "A", "B");
+
+            // 1.1 UB + UA (CANNOT handle)
+            List<Row> inputRecords =
+                    Arrays.asList(
+                            Row.ofKind(RowKind.UPDATE_BEFORE, 1, new String[] 
{"A", "B"}, 2),
+                            Row.ofKind(RowKind.UPDATE_AFTER, 1, new String[] 
{"C", "D"}, 3));
+            sEnv.executeSql(
+                            String.format(
+                                    temporaryTableTemplate,
+                                    "INPUT11",
+                                    
TestValuesTableFactory.registerData(inputRecords),
+                                    "UB,UA"))
+                    .await();
+            sEnv.executeSql("INSERT INTO test_collect SELECT * FROM 
INPUT11").await();
+
+            result = select.collect(2);
+            
assertThat(result.get(0).getKind()).isEqualTo(RowKind.UPDATE_BEFORE);
+            checkOneRecord(result.get(0), 1, "A", "B");
+            
assertThat(result.get(1).getKind()).isEqualTo(RowKind.UPDATE_AFTER);
+            checkOneRecord(result.get(1), 1, "A", "B", "C", "D");
+
+            // 1.2 -D
+            inputRecords =
+                    Collections.singletonList(
+                            Row.ofKind(RowKind.DELETE, 1, new String[] {"C", 
"D"}, 4));
+            sEnv.executeSql(
+                            String.format(
+                                    temporaryTableTemplate,
+                                    "INPUT12",
+                                    
TestValuesTableFactory.registerData(inputRecords),
+                                    "D"))
+                    .await();
+            sEnv.executeSql("INSERT INTO test_collect SELECT * FROM 
INPUT12").await();
+
+            result = select.collect(2);
+            
assertThat(result.get(0).getKind()).isEqualTo(RowKind.UPDATE_BEFORE);
+            checkOneRecord(result.get(0), 1, "A", "B", "C", "D");
+            
assertThat(result.get(1).getKind()).isEqualTo(RowKind.UPDATE_AFTER);
+            checkOneRecord(result.get(1), 1, "A", "B");
+
+            // 2 exists multiple key records
+            sql("INSERT INTO test_collect VALUES (2, ARRAY['A', 'B'], 5), (3, 
ARRAY['A', 'B'], 6)");
+            result = select.collect(2);
+            checkOneRecord(result.get(0), 2, "A", "B");
+            checkOneRecord(result.get(1), 3, "A", "B");
+
+            // 2.1 UB + UA (CANNOT handle)
+            inputRecords =
+                    Arrays.asList(
+                            Row.ofKind(RowKind.UPDATE_BEFORE, 2, new String[] 
{"A", "B"}, 7),
+                            Row.ofKind(RowKind.UPDATE_AFTER, 2, new String[] 
{"C", "D"}, 8));
+            sEnv.executeSql(
+                            String.format(
+                                    temporaryTableTemplate,
+                                    "INPUT21",
+                                    
TestValuesTableFactory.registerData(inputRecords),
+                                    "UB,UA"))
+                    .await();
+            sEnv.executeSql("INSERT INTO test_collect SELECT * FROM 
INPUT21").await();
+
+            result = select.collect(2);
+            
assertThat(result.get(0).getKind()).isEqualTo(RowKind.UPDATE_BEFORE);
+            checkOneRecord(result.get(0), 2, "A", "B");
+            
assertThat(result.get(1).getKind()).isEqualTo(RowKind.UPDATE_AFTER);
+            checkOneRecord(result.get(1), 2, "A", "B", "C", "D");
+
+            // 2.2 -D
+            inputRecords =
+                    Collections.singletonList(Row.ofKind(RowKind.DELETE, 3, 
new String[] {"A"}, 9));
+            sEnv.executeSql(
+                            String.format(
+                                    temporaryTableTemplate,
+                                    "INPUT22",
+                                    
TestValuesTableFactory.registerData(inputRecords),
+                                    "D"))
+                    .await();
+            sEnv.executeSql("INSERT INTO test_collect SELECT * FROM 
INPUT22").await();
+
+            result = select.collect(2);
+            
assertThat(result.get(0).getKind()).isEqualTo(RowKind.UPDATE_BEFORE);
+            checkOneRecord(result.get(0), 3, "A", "B");
+            
assertThat(result.get(1).getKind()).isEqualTo(RowKind.UPDATE_AFTER);
+            checkOneRecord(result.get(1), 3, "B");
 
-            sEnv.executeSql("INSERT INTO test_collect SELECT * FROM 
INPUT").await();
-
-            List<Row> result = sql("SELECT * FROM test_collect");
-            assertThat(result.size()).isEqualTo(1);
-            // if not retracted, the result would be ['A', 'B', 'C', 'D']
-            checkOneRecord(result.get(0), 1, "C", "D");
+            select.close();
         }
 
         private void checkOneRecord(Row row, int id, String... elements) {

Reply via email to