This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c933bfd337 [core] keep lowest high-level kv for lookup compaction. 
(#5632)
c933bfd337 is described below

commit c933bfd337fee24dfedcea9469eaf1d21f8dbdb5
Author: zhoulii <[email protected]>
AuthorDate: Wed May 21 13:44:00 2025 +0800

    [core] keep lowest high-level kv for lookup compaction. (#5632)
---
 .../LookupChangelogMergeFunctionWrapper.java       |  2 +-
 .../mergetree/compact/LookupMergeFunction.java     | 21 +++++++-----
 .../LookupChangelogMergeFunctionWrapperTest.java   | 39 ++++++++++++++++++++++
 3 files changed, 52 insertions(+), 10 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
index c327474fa8..bbaa354401 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -114,7 +114,7 @@ public class LookupChangelogMergeFunctionWrapper<T>
             KeyValue kv = descending.next();
             if (kv.level() > 0) {
                 descending.remove();
-                if (highLevel == null) {
+                if (highLevel == null || kv.level() < highLevel.level()) {
                     highLevel = kv;
                 }
             } else {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index 6f999297aa..2aadbff49a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -22,7 +22,6 @@ import org.apache.paimon.KeyValue;
 
 import javax.annotation.Nullable;
 
-import java.util.Iterator;
 import java.util.LinkedList;
 
 /**
@@ -52,19 +51,23 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
     @Override
     public KeyValue getResult() {
         // 1. Find the latest high level record
-        Iterator<KeyValue> descending = candidates.descendingIterator();
-        KeyValue highLevel = null;
-        while (descending.hasNext()) {
-            KeyValue kv = descending.next();
+        KeyValue keptHighLevel = null;
+        LinkedList<KeyValue> highLevels = new LinkedList<>();
+
+        for (KeyValue kv : candidates) {
             if (kv.level() > 0) {
-                if (highLevel != null) {
-                    descending.remove();
-                } else {
-                    highLevel = kv;
+                highLevels.add(kv);
+                if (keptHighLevel == null || kv.level() < 
keptHighLevel.level()) {
+                    keptHighLevel = kv;
                 }
             }
         }
 
+        if (highLevels.size() > 1) {
+            highLevels.remove(keptHighLevel);
+            candidates.removeAll(highLevels);
+        }
+
         // 2. Do the merge for inputs
         mergeFunction.reset();
         candidates.forEach(mergeFunction::add);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index 9c7f32589a..5341c6db69 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -508,4 +508,43 @@ public class LookupChangelogMergeFunctionWrapperTest {
         kv = result.result();
         assertThat(kv).isNull();
     }
+
+    @Test
+    public void testKeepLowestHighLevel() {
+        Map<InternalRow, KeyValue> highLevel = new HashMap<>();
+        LookupChangelogMergeFunctionWrapper function =
+                new LookupChangelogMergeFunctionWrapper(
+                        
LookupMergeFunction.wrap(DeduplicateMergeFunction.factory()),
+                        highLevel::get,
+                        null,
+                        LookupStrategy.from(false, true, false, false),
+                        null,
+                        null);
+
+        // Without level-0
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(2)).setLevel(1));
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(1)).setLevel(2));
+        ChangelogResult result = function.getResult();
+        assertThat(result).isNotNull();
+        assertThat(result.changelogs()).isEmpty();
+        KeyValue kv = result.result();
+        assertThat(kv).isNotNull();
+        assertThat(kv.value().getInt(0)).isEqualTo(2);
+
+        // With level-0 record, with multiple level-x (x > 0) record
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(1)).setLevel(1));
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(2)).setLevel(2));
+        function.add(new KeyValue().replace(row(1), 2, INSERT, 
row(3)).setLevel(0));
+        result = function.getResult();
+        List<KeyValue> changelogs = result.changelogs();
+        assertThat(changelogs).hasSize(2);
+        assertThat(changelogs.get(0).valueKind()).isEqualTo(UPDATE_BEFORE);
+        assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(1);
+        assertThat(changelogs.get(1).valueKind()).isEqualTo(UPDATE_AFTER);
+        assertThat(changelogs.get(1).value().getInt(0)).isEqualTo(3);
+        kv = result.result();
+        assertThat(kv.value().getInt(0)).isEqualTo(3);
+    }
 }

Reply via email to