This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a84c3826b8 [core] Fix LookupMergeFunction to use sequence.field for 
picking high level records (#7221)
a84c3826b8 is described below

commit a84c3826b8fee6fc854a95a86299088e57c7770b
Author: Liulietong <[email protected]>
AuthorDate: Sun May 24 10:20:52 2026 +0800

    [core] Fix LookupMergeFunction to use sequence.field for picking high level 
records (#7221)
---
 .../LookupChangelogMergeFunctionWrapper.java       |   8 +
 .../mergetree/compact/LookupMergeFunction.java     |  37 +++-
 .../LookupChangelogMergeFunctionWrapperTest.java   | 186 +++++++++++++++++++++
 3 files changed, 225 insertions(+), 6 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
index 7283a3030d..04d9141602 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -88,6 +88,14 @@ public class LookupChangelogMergeFunctionWrapper<T>
         this.lookupStrategy = lookupStrategy;
         this.deletionVectorsMaintainer = deletionVectorsMaintainer;
         this.comparator = createSequenceComparator(userDefinedSeqComparator);
+        // Only set sequence comparator when user-defined sequence field is 
configured
+        // to preserve original behavior (pick by level) when sequence.field 
is not set.
+        // Note: We use the same comparator for both insertInto and 
pickHighLevel.
+        // The comparator's semantics (ascending/descending) are already 
handled correctly
+        // by UserDefinedSeqComparator based on sequence.field.sort-order 
configuration.
+        if (userDefinedSeqComparator != null) {
+            this.mergeFunction.setSequenceComparator(this.comparator);
+        }
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index cb494b86d0..eb2c8859b7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -27,7 +27,9 @@ import org.apache.paimon.utils.CloseableIterator;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.List;
 
 /**
  * A {@link MergeFunction} for lookup, this wrapper only considers the latest 
high level record,
@@ -41,6 +43,7 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
     private final KeyValueBuffer candidates;
     private boolean containLevel0;
     private InternalRow currentKey;
+    @Nullable private Comparator<KeyValue> sequenceComparator;
 
     public LookupMergeFunction(
             MergeFunction<KeyValue> mergeFunction,
@@ -52,6 +55,11 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
         this.candidates = KeyValueBuffer.createHybridBuffer(options, keyType, 
valueType, ioManager);
     }
 
+    /** Set the sequence comparator for picking high level records. */
+    public void setSequenceComparator(@Nullable Comparator<KeyValue> 
sequenceComparator) {
+        this.sequenceComparator = sequenceComparator;
+    }
+
     @Override
     public void reset() {
         candidates.reset();
@@ -83,9 +91,16 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
                 if (kv.level() <= 0) {
                     continue;
                 }
-                // For high-level comparison logic (not involving Level 0), 
only the value of the
-                // minimum Level should be selected
-                if (highLevel == null || kv.level() < highLevel.level()) {
+                if (highLevel == null) {
+                    highLevel = kv;
+                } else if (sequenceComparator != null) {
+                    // When sequence comparator is set, use it to pick the 
record with highest
+                    // sequence value, which represents the latest record
+                    if (sequenceComparator.compare(kv, highLevel) > 0) {
+                        highLevel = kv;
+                    }
+                } else if (kv.level() < highLevel.level()) {
+                    // Without sequence comparator, fall back to picking the 
minimum level
                     highLevel = kv;
                 }
             }
@@ -107,18 +122,28 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
     public KeyValue getResult() {
         mergeFunction.reset();
         KeyValue highLevel = pickHighLevel();
+
+        // Collect records to merge: level-0 records and the picked high level 
record
+        List<KeyValue> toMerge = new ArrayList<>();
         try (CloseableIterator<KeyValue> iterator = candidates.iterator()) {
             while (iterator.hasNext()) {
                 KeyValue kv = iterator.next();
-                // records that has not been stored on the disk yet, such as 
the data in the write
-                // buffer being at level -1
                 if (kv.level() <= 0 || kv == highLevel) {
-                    mergeFunction.add(kv);
+                    toMerge.add(kv);
                 }
             }
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
+
+        // When sequence comparator is set, sort by sequence so highest 
sequence is added last
+        if (sequenceComparator != null) {
+            toMerge.sort(sequenceComparator);
+        }
+
+        for (KeyValue kv : toMerge) {
+            mergeFunction.add(kv);
+        }
         return mergeFunction.getResult();
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index 57d99557ca..470e84525a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -556,4 +556,190 @@ public class LookupChangelogMergeFunctionWrapperTest {
         kv = result.result();
         assertThat(kv.value().getInt(0)).isEqualTo(3);
     }
+
+    /**
+     * Test that sequence.field is correctly used to pick the high level 
record with the highest
+     * sequence value, even when it's at a higher level number.
+     *
+     * <p>Scenario: L1 has older sequence (7), L2 has newer sequence (8), L0 
has oldest (6). The
+     * correct behavior should pick L2 (sequence=8) as the high level record.
+     */
+    @Test
+    public void testSequenceFieldWithMultipleLevels() {
+        // Define value type with sequence field as the second column
+        RowType valueType =
+                RowType.builder()
+                        .fields(
+                                new DataType[] {DataTypes.INT(), 
DataTypes.INT()},
+                                new String[] {"value", "sequence"})
+                        .build();
+
+        // Create user-defined sequence comparator on the second field
+        UserDefinedSeqComparator userDefinedSeqComparator =
+                UserDefinedSeqComparator.create(
+                        valueType,
+                        CoreOptions.fromMap(ImmutableMap.of("sequence.field", 
"sequence")));
+        assertThat(userDefinedSeqComparator).isNotNull();
+
+        Map<InternalRow, KeyValue> highLevel = new HashMap<>();
+
+        LookupChangelogMergeFunctionWrapper function =
+                new LookupChangelogMergeFunctionWrapper(
+                        LookupMergeFunction.wrap(
+                                DeduplicateMergeFunction.factory(), null, 
null, null),
+                        highLevel::get,
+                        null,
+                        LookupStrategy.from(false, true, false, false),
+                        null,
+                        userDefinedSeqComparator);
+
+        // Test scenario:
+        // L1: (key=1, value=100, sequence=7)  <- Level 1, but older sequence
+        // L2: (key=1, value=200, sequence=8)  <- Level 2, but newer sequence 
(should be picked!)
+        // L0: (key=1, value=50,  sequence=6)  <- Level 0, oldest sequence
+
+        function.reset();
+        function.add(
+                new KeyValue()
+                        .replace(row(1), 1, INSERT, row(100, 7))
+                        .setLevel(1)); // Level 1, seq=7
+        function.add(
+                new KeyValue()
+                        .replace(row(1), 1, INSERT, row(200, 8))
+                        .setLevel(2)); // Level 2, seq=8
+        function.add(
+                new KeyValue()
+                        .replace(row(1), 2, INSERT, row(50, 6))
+                        .setLevel(0)); // Level 0, seq=6
+
+        ChangelogResult result = function.getResult();
+        assertThat(result).isNotNull();
+
+        KeyValue kv = result.result();
+        assertThat(kv).isNotNull();
+
+        // Should return the record with highest sequence (seq=8 from L2)
+        int actualSequence = kv.value().getInt(1);
+        int actualValue = kv.value().getInt(0);
+
+        assertThat(actualSequence)
+                .as("Should return record with highest sequence field (8)")
+                .isEqualTo(8);
+        assertThat(actualValue).isEqualTo(200);
+
+        // Verify changelog: before should be L2 (seq=8), after should be 
merged result
+        List<KeyValue> changelogs = result.changelogs();
+        assertThat(changelogs).hasSize(2);
+        assertThat(changelogs.get(0).valueKind()).isEqualTo(UPDATE_BEFORE);
+        assertThat(changelogs.get(0).value().getInt(1)).isEqualTo(8); // 
before is L2
+        assertThat(changelogs.get(1).valueKind()).isEqualTo(UPDATE_AFTER);
+    }
+
+    /**
+     * Test that without sequence.field, the original behavior is preserved: 
pick the record with
+     * the lowest level number.
+     */
+    @Test
+    public void testWithoutSequenceFieldPreservesOriginalBehavior() {
+        Map<InternalRow, KeyValue> highLevel = new HashMap<>();
+
+        // No userDefinedSeqComparator (null)
+        LookupChangelogMergeFunctionWrapper function =
+                new LookupChangelogMergeFunctionWrapper(
+                        LookupMergeFunction.wrap(
+                                DeduplicateMergeFunction.factory(), null, 
null, null),
+                        highLevel::get,
+                        null,
+                        LookupStrategy.from(false, true, false, false),
+                        null,
+                        null); // No sequence comparator
+
+        // L1: value=100, L2: value=200
+        // Without sequence.field, should pick L1 (level 1 < level 2)
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(100)).setLevel(1));
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(200)).setLevel(2));
+        function.add(new KeyValue().replace(row(1), 2, INSERT, 
row(50)).setLevel(0));
+
+        ChangelogResult result = function.getResult();
+        assertThat(result).isNotNull();
+
+        // Without sequence.field, L1 is picked as highLevel, and L0 is the 
latest
+        // So the result should be L0's value (50) since 
DeduplicateMergeFunction keeps the last
+        KeyValue kv = result.result();
+        assertThat(kv).isNotNull();
+        assertThat(kv.value().getInt(0)).isEqualTo(50);
+
+        // Changelog: before=L1(100), after=L0(50)
+        List<KeyValue> changelogs = result.changelogs();
+        assertThat(changelogs).hasSize(2);
+        assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(100); // 
before is L1
+        assertThat(changelogs.get(1).value().getInt(0)).isEqualTo(50); // 
after is L0
+    }
+
+    /**
+     * Test sequence.field with descending sort order. When 
sort-order=descending, smaller sequence
+     * values are considered "newer".
+     *
+     * <p>Note: We use a 3-field schema with sequence at index 2 to avoid 
cache collision with other
+     * tests, because CodeGenUtils caches comparators by field types and 
indices without considering
+     * sort order.
+     */
+    @Test
+    public void testSequenceFieldWithDescendingSortOrder() {
+        RowType valueType =
+                RowType.builder()
+                        .fields(
+                                new DataType[] {DataTypes.INT(), 
DataTypes.INT(), DataTypes.INT()},
+                                new String[] {"value", "extra", "sequence"})
+                        .build();
+
+        // Create comparator with descending order
+        UserDefinedSeqComparator userDefinedSeqComparator =
+                UserDefinedSeqComparator.create(
+                        valueType,
+                        CoreOptions.fromMap(
+                                ImmutableMap.of(
+                                        "sequence.field", "sequence",
+                                        "sequence.field.sort-order", 
"descending")));
+        assertThat(userDefinedSeqComparator).isNotNull();
+
+        Map<InternalRow, KeyValue> highLevel = new HashMap<>();
+
+        LookupChangelogMergeFunctionWrapper function =
+                new LookupChangelogMergeFunctionWrapper(
+                        LookupMergeFunction.wrap(
+                                DeduplicateMergeFunction.factory(), null, 
null, null),
+                        highLevel::get,
+                        null,
+                        LookupStrategy.from(false, true, false, false),
+                        null,
+                        userDefinedSeqComparator);
+
+        // With descending order, smaller sequence = newer
+        // L1: (key=1, value=100, extra=0, sequence=7)  <- Level 1, newer (7 < 
8)
+        // L2: (key=1, value=200, extra=0, sequence=8)  <- Level 2, older (8 > 
7)
+        // L0: (key=1, value=50,  extra=0, sequence=9)  <- Level 0, oldest (9 
> 8 > 7)
+
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 1, INSERT, row(100, 0, 
7)).setLevel(1));
+        function.add(new KeyValue().replace(row(1), 1, INSERT, row(200, 0, 
8)).setLevel(2));
+        function.add(new KeyValue().replace(row(1), 2, INSERT, row(50, 0, 
9)).setLevel(0));
+
+        ChangelogResult result = function.getResult();
+        assertThat(result).isNotNull();
+
+        KeyValue kv = result.result();
+        assertThat(kv).isNotNull();
+
+        int actualSequence = kv.value().getInt(2);
+        int actualValue = kv.value().getInt(0);
+
+        // With descending order, L1 (seq=7) is the newest high-level record
+        // The result should be L1's value (100) since it's the newest
+        assertThat(actualSequence)
+                .as("With descending order, should return record with smallest 
sequence (7)")
+                .isEqualTo(7);
+        assertThat(actualValue).isEqualTo(100);
+    }
 }

Reply via email to