This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a84c3826b8 [core] Fix LookupMergeFunction to use sequence.field for
picking high level records (#7221)
a84c3826b8 is described below
commit a84c3826b8fee6fc854a95a86299088e57c7770b
Author: Liulietong <[email protected]>
AuthorDate: Sun May 24 10:20:52 2026 +0800
[core] Fix LookupMergeFunction to use sequence.field for picking high level
records (#7221)
---
.../LookupChangelogMergeFunctionWrapper.java | 8 +
.../mergetree/compact/LookupMergeFunction.java | 37 +++-
.../LookupChangelogMergeFunctionWrapperTest.java | 186 +++++++++++++++++++++
3 files changed, 225 insertions(+), 6 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
index 7283a3030d..04d9141602 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -88,6 +88,14 @@ public class LookupChangelogMergeFunctionWrapper<T>
this.lookupStrategy = lookupStrategy;
this.deletionVectorsMaintainer = deletionVectorsMaintainer;
this.comparator = createSequenceComparator(userDefinedSeqComparator);
+ // Only set sequence comparator when user-defined sequence field is
configured
+ // to preserve original behavior (pick by level) when sequence.field
is not set.
+ // Note: We use the same comparator for both insertInto and
pickHighLevel.
+ // The comparator's semantics (ascending/descending) are already
handled correctly
+ // by UserDefinedSeqComparator based on sequence.field.sort-order
configuration.
+ if (userDefinedSeqComparator != null) {
+ this.mergeFunction.setSequenceComparator(this.comparator);
+ }
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index cb494b86d0..eb2c8859b7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -27,7 +27,9 @@ import org.apache.paimon.utils.CloseableIterator;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.Comparator;
+import java.util.List;
/**
* A {@link MergeFunction} for lookup, this wrapper only considers the latest
high level record,
@@ -41,6 +43,7 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
private final KeyValueBuffer candidates;
private boolean containLevel0;
private InternalRow currentKey;
+ @Nullable private Comparator<KeyValue> sequenceComparator;
public LookupMergeFunction(
MergeFunction<KeyValue> mergeFunction,
@@ -52,6 +55,11 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
this.candidates = KeyValueBuffer.createHybridBuffer(options, keyType,
valueType, ioManager);
}
+ /** Set the sequence comparator for picking high level records. */
+ public void setSequenceComparator(@Nullable Comparator<KeyValue>
sequenceComparator) {
+ this.sequenceComparator = sequenceComparator;
+ }
+
@Override
public void reset() {
candidates.reset();
@@ -83,9 +91,16 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
if (kv.level() <= 0) {
continue;
}
- // For high-level comparison logic (not involving Level 0),
only the value of the
- // minimum Level should be selected
- if (highLevel == null || kv.level() < highLevel.level()) {
+ if (highLevel == null) {
+ highLevel = kv;
+ } else if (sequenceComparator != null) {
+ // When sequence comparator is set, use it to pick the
record with highest
+ // sequence value, which represents the latest record
+ if (sequenceComparator.compare(kv, highLevel) > 0) {
+ highLevel = kv;
+ }
+ } else if (kv.level() < highLevel.level()) {
+ // Without sequence comparator, fall back to picking the
minimum level
highLevel = kv;
}
}
@@ -107,18 +122,28 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
public KeyValue getResult() {
mergeFunction.reset();
KeyValue highLevel = pickHighLevel();
+
+ // Collect records to merge: level-0 records and the picked high level
record
+ List<KeyValue> toMerge = new ArrayList<>();
try (CloseableIterator<KeyValue> iterator = candidates.iterator()) {
while (iterator.hasNext()) {
KeyValue kv = iterator.next();
- // records that has not been stored on the disk yet, such as
the data in the write
- // buffer being at level -1
if (kv.level() <= 0 || kv == highLevel) {
- mergeFunction.add(kv);
+ toMerge.add(kv);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
+
+ // When sequence comparator is set, sort by sequence so highest
sequence is added last
+ if (sequenceComparator != null) {
+ toMerge.sort(sequenceComparator);
+ }
+
+ for (KeyValue kv : toMerge) {
+ mergeFunction.add(kv);
+ }
return mergeFunction.getResult();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index 57d99557ca..470e84525a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -556,4 +556,190 @@ public class LookupChangelogMergeFunctionWrapperTest {
kv = result.result();
assertThat(kv.value().getInt(0)).isEqualTo(3);
}
+
+ /**
+ * Test that sequence.field is correctly used to pick the high level
record with the highest
+ * sequence value, even when it's at a higher level number.
+ *
+ * <p>Scenario: L1 has older sequence (7), L2 has newer sequence (8), L0
has oldest (6). The
+ * correct behavior should pick L2 (sequence=8) as the high level record.
+ */
+ @Test
+ public void testSequenceFieldWithMultipleLevels() {
+ // Define value type with sequence field as the second column
+ RowType valueType =
+ RowType.builder()
+ .fields(
+ new DataType[] {DataTypes.INT(),
DataTypes.INT()},
+ new String[] {"value", "sequence"})
+ .build();
+
+ // Create user-defined sequence comparator on the second field
+ UserDefinedSeqComparator userDefinedSeqComparator =
+ UserDefinedSeqComparator.create(
+ valueType,
+ CoreOptions.fromMap(ImmutableMap.of("sequence.field",
"sequence")));
+ assertThat(userDefinedSeqComparator).isNotNull();
+
+ Map<InternalRow, KeyValue> highLevel = new HashMap<>();
+
+ LookupChangelogMergeFunctionWrapper function =
+ new LookupChangelogMergeFunctionWrapper(
+ LookupMergeFunction.wrap(
+ DeduplicateMergeFunction.factory(), null,
null, null),
+ highLevel::get,
+ null,
+ LookupStrategy.from(false, true, false, false),
+ null,
+ userDefinedSeqComparator);
+
+ // Test scenario:
+ // L1: (key=1, value=100, sequence=7) <- Level 1, but older sequence
+ // L2: (key=1, value=200, sequence=8) <- Level 2, but newer sequence
(should be picked!)
+ // L0: (key=1, value=50, sequence=6) <- Level 0, oldest sequence
+
+ function.reset();
+ function.add(
+ new KeyValue()
+ .replace(row(1), 1, INSERT, row(100, 7))
+ .setLevel(1)); // Level 1, seq=7
+ function.add(
+ new KeyValue()
+ .replace(row(1), 1, INSERT, row(200, 8))
+ .setLevel(2)); // Level 2, seq=8
+ function.add(
+ new KeyValue()
+ .replace(row(1), 2, INSERT, row(50, 6))
+ .setLevel(0)); // Level 0, seq=6
+
+ ChangelogResult result = function.getResult();
+ assertThat(result).isNotNull();
+
+ KeyValue kv = result.result();
+ assertThat(kv).isNotNull();
+
+ // Should return the record with highest sequence (seq=8 from L2)
+ int actualSequence = kv.value().getInt(1);
+ int actualValue = kv.value().getInt(0);
+
+ assertThat(actualSequence)
+ .as("Should return record with highest sequence field (8)")
+ .isEqualTo(8);
+ assertThat(actualValue).isEqualTo(200);
+
+ // Verify changelog: before should be L2 (seq=8), after should be
merged result
+ List<KeyValue> changelogs = result.changelogs();
+ assertThat(changelogs).hasSize(2);
+ assertThat(changelogs.get(0).valueKind()).isEqualTo(UPDATE_BEFORE);
+ assertThat(changelogs.get(0).value().getInt(1)).isEqualTo(8); //
before is L2
+ assertThat(changelogs.get(1).valueKind()).isEqualTo(UPDATE_AFTER);
+ }
+
+ /**
+ * Test that without sequence.field, the original behavior is preserved:
pick the record with
+ * the lowest level number.
+ */
+ @Test
+ public void testWithoutSequenceFieldPreservesOriginalBehavior() {
+ Map<InternalRow, KeyValue> highLevel = new HashMap<>();
+
+ // No userDefinedSeqComparator (null)
+ LookupChangelogMergeFunctionWrapper function =
+ new LookupChangelogMergeFunctionWrapper(
+ LookupMergeFunction.wrap(
+ DeduplicateMergeFunction.factory(), null,
null, null),
+ highLevel::get,
+ null,
+ LookupStrategy.from(false, true, false, false),
+ null,
+ null); // No sequence comparator
+
+ // L1: value=100, L2: value=200
+ // Without sequence.field, should pick L1 (level 1 < level 2)
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(100)).setLevel(1));
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(200)).setLevel(2));
+ function.add(new KeyValue().replace(row(1), 2, INSERT,
row(50)).setLevel(0));
+
+ ChangelogResult result = function.getResult();
+ assertThat(result).isNotNull();
+
+ // Without sequence.field, L1 is picked as highLevel, and L0 is the
latest
+ // So the result should be L0's value (50) since
DeduplicateMergeFunction keeps the last
+ KeyValue kv = result.result();
+ assertThat(kv).isNotNull();
+ assertThat(kv.value().getInt(0)).isEqualTo(50);
+
+ // Changelog: before=L1(100), after=L0(50)
+ List<KeyValue> changelogs = result.changelogs();
+ assertThat(changelogs).hasSize(2);
+ assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(100); //
before is L1
+ assertThat(changelogs.get(1).value().getInt(0)).isEqualTo(50); //
after is L0
+ }
+
+ /**
+ * Test sequence.field with descending sort order. When
sort-order=descending, smaller sequence
+ * values are considered "newer".
+ *
+ * <p>Note: We use a 3-field schema with sequence at index 2 to avoid
cache collision with other
+ * tests, because CodeGenUtils caches comparators by field types and
indices without considering
+ * sort order.
+ */
+ @Test
+ public void testSequenceFieldWithDescendingSortOrder() {
+ RowType valueType =
+ RowType.builder()
+ .fields(
+ new DataType[] {DataTypes.INT(),
DataTypes.INT(), DataTypes.INT()},
+ new String[] {"value", "extra", "sequence"})
+ .build();
+
+ // Create comparator with descending order
+ UserDefinedSeqComparator userDefinedSeqComparator =
+ UserDefinedSeqComparator.create(
+ valueType,
+ CoreOptions.fromMap(
+ ImmutableMap.of(
+ "sequence.field", "sequence",
+ "sequence.field.sort-order",
"descending")));
+ assertThat(userDefinedSeqComparator).isNotNull();
+
+ Map<InternalRow, KeyValue> highLevel = new HashMap<>();
+
+ LookupChangelogMergeFunctionWrapper function =
+ new LookupChangelogMergeFunctionWrapper(
+ LookupMergeFunction.wrap(
+ DeduplicateMergeFunction.factory(), null,
null, null),
+ highLevel::get,
+ null,
+ LookupStrategy.from(false, true, false, false),
+ null,
+ userDefinedSeqComparator);
+
+ // With descending order, smaller sequence = newer
+ // L1: (key=1, value=100, extra=0, sequence=7) <- Level 1, newer (7 <
8)
+ // L2: (key=1, value=200, extra=0, sequence=8) <- Level 2, older (8 >
7)
+ // L0: (key=1, value=50, extra=0, sequence=9) <- Level 0, oldest (9
> 8 > 7)
+
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 1, INSERT, row(100, 0,
7)).setLevel(1));
+ function.add(new KeyValue().replace(row(1), 1, INSERT, row(200, 0,
8)).setLevel(2));
+ function.add(new KeyValue().replace(row(1), 2, INSERT, row(50, 0,
9)).setLevel(0));
+
+ ChangelogResult result = function.getResult();
+ assertThat(result).isNotNull();
+
+ KeyValue kv = result.result();
+ assertThat(kv).isNotNull();
+
+ int actualSequence = kv.value().getInt(2);
+ int actualValue = kv.value().getInt(0);
+
+ // With descending order, L1 (seq=7) is the newest high-level record
+ // The result should be L1's value (100) since it's the newest
+ assertThat(actualSequence)
+ .as("With descending order, should return record with smallest
sequence (7)")
+ .isEqualTo(7);
+ assertThat(actualValue).isEqualTo(100);
+ }
}