This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e9d5ebceb [core] Fix the merge order in lookup compaction (#3286)
e9d5ebceb is described below

commit e9d5ebceb8b6e482bc3092ad21027a27bb95bdb5
Author: Aitozi <[email protected]>
AuthorDate: Tue Apr 30 10:56:45 2024 +0800

    [core] Fix the merge order in lookup compaction (#3286)
---
 .../LookupChangelogMergeFunctionWrapper.java       | 48 ++++++++++--
 .../compact/LookupMergeTreeCompactRewriter.java    |  9 ++-
 .../paimon/operation/KeyValueFileStoreWrite.java   |  3 +-
 .../LookupChangelogMergeFunctionWrapperTest.java   | 87 ++++++++++++++++++++++
 4 files changed, 137 insertions(+), 10 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
index aa5555fd5..7f90871ac 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -25,11 +25,15 @@ import 
org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.lookup.LookupStrategy;
 import org.apache.paimon.mergetree.LookupLevels.PositionedKeyValue;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.FieldsComparator;
+import org.apache.paimon.utils.UserDefinedSeqComparator;
 
 import javax.annotation.Nullable;
 
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.function.Function;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -63,6 +67,7 @@ public class LookupChangelogMergeFunctionWrapper<T>
     private final boolean changelogRowDeduplicate;
     private final LookupStrategy lookupStrategy;
     private final @Nullable DeletionVectorsMaintainer 
deletionVectorsMaintainer;
+    private final Comparator<KeyValue> comparator;
 
     public LookupChangelogMergeFunctionWrapper(
             MergeFunctionFactory<KeyValue> mergeFunctionFactory,
@@ -70,7 +75,8 @@ public class LookupChangelogMergeFunctionWrapper<T>
             RecordEqualiser valueEqualiser,
             boolean changelogRowDeduplicate,
             LookupStrategy lookupStrategy,
-            @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
+            @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer,
+            UserDefinedSeqComparator userDefinedSeqComparator) {
         MergeFunction<KeyValue> mergeFunction = mergeFunctionFactory.create();
         checkArgument(
                 mergeFunction instanceof LookupMergeFunction,
@@ -88,6 +94,7 @@ public class LookupChangelogMergeFunctionWrapper<T>
         this.changelogRowDeduplicate = changelogRowDeduplicate;
         this.lookupStrategy = lookupStrategy;
         this.deletionVectorsMaintainer = deletionVectorsMaintainer;
+        this.comparator = createSequenceComparator(userDefinedSeqComparator);
     }
 
     @Override
@@ -136,12 +143,7 @@ public class LookupChangelogMergeFunctionWrapper<T>
         }
 
         // 3. Calculate result
-        mergeFunction2.reset();
-        if (highLevel != null) {
-            mergeFunction2.add(highLevel);
-        }
-        candidates.forEach(mergeFunction2::add);
-        KeyValue result = mergeFunction2.getResult();
+        KeyValue result = calculateResult(candidates, highLevel);
 
         // 4. Set changelog when there's level-0 records
         reusedResult.reset();
@@ -152,6 +154,23 @@ public class LookupChangelogMergeFunctionWrapper<T>
         return reusedResult.setResult(result);
     }
 
+    private KeyValue calculateResult(List<KeyValue> candidates, @Nullable 
KeyValue highLevel) {
+        mergeFunction2.reset();
+        for (KeyValue candidate : candidates) {
+            if (highLevel != null && comparator.compare(highLevel, candidate) 
< 0) {
+                mergeFunction2.add(highLevel);
+                mergeFunction2.add(candidate);
+                highLevel = null;
+            } else {
+                mergeFunction2.add(candidate);
+            }
+        }
+        if (highLevel != null) {
+            mergeFunction2.add(highLevel);
+        }
+        return mergeFunction2.getResult();
+    }
+
     private void setChangelog(@Nullable KeyValue before, KeyValue after) {
         if (before == null || !before.isAdd()) {
             if (after.isAdd()) {
@@ -180,4 +199,19 @@ public class LookupChangelogMergeFunctionWrapper<T>
     private KeyValue replace(KeyValue reused, RowKind valueKind, KeyValue 
from) {
         return reused.replace(from.key(), from.sequenceNumber(), valueKind, 
from.value());
     }
+
+    private Comparator<KeyValue> createSequenceComparator(
+            @Nullable FieldsComparator userDefinedSeqComparator) {
+        if (userDefinedSeqComparator == null) {
+            return Comparator.comparingLong(KeyValue::sequenceNumber);
+        }
+
+        return (o1, o2) -> {
+            int result = userDefinedSeqComparator.compare(o1.value(), 
o2.value());
+            if (result != 0) {
+                return result;
+            }
+            return Long.compare(o1.sequenceNumber(), o2.sequenceNumber());
+        };
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index 88fa682d2..fea64539b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -31,6 +31,7 @@ import org.apache.paimon.mergetree.LookupLevels;
 import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.SortedRun;
 import org.apache.paimon.utils.FieldsComparator;
+import org.apache.paimon.utils.UserDefinedSeqComparator;
 
 import javax.annotation.Nullable;
 
@@ -150,14 +151,17 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
         private final RecordEqualiser valueEqualiser;
         private final boolean changelogRowDeduplicate;
         private final LookupStrategy lookupStrategy;
+        @Nullable private final UserDefinedSeqComparator 
userDefinedSeqComparator;
 
         public LookupMergeFunctionWrapperFactory(
                 RecordEqualiser valueEqualiser,
                 boolean changelogRowDeduplicate,
-                LookupStrategy lookupStrategy) {
+                LookupStrategy lookupStrategy,
+                @Nullable UserDefinedSeqComparator userDefinedSeqComparator) {
             this.valueEqualiser = valueEqualiser;
             this.changelogRowDeduplicate = changelogRowDeduplicate;
             this.lookupStrategy = lookupStrategy;
+            this.userDefinedSeqComparator = userDefinedSeqComparator;
         }
 
         @Override
@@ -178,7 +182,8 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
                     valueEqualiser,
                     changelogRowDeduplicate,
                     lookupStrategy,
-                    deletionVectorsMaintainer);
+                    deletionVectorsMaintainer,
+                    userDefinedSeqComparator);
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index a4b153239..8343ba8b8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -307,7 +307,8 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                         new LookupMergeFunctionWrapperFactory<>(
                                 valueEqualiserSupplier.get(),
                                 options.changelogRowDeduplicate(),
-                                lookupStrategy);
+                                lookupStrategy,
+                                UserDefinedSeqComparator.create(valueType, 
options));
             }
             return new LookupMergeTreeCompactRewriter(
                     maxLevel,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index 0a9d83ee3..870df10f9 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.mergetree.compact;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.codegen.RecordEqualiser;
 import org.apache.paimon.data.InternalRow;
@@ -25,11 +26,15 @@ import org.apache.paimon.data.InternalRow.FieldGetter;
 import org.apache.paimon.lookup.LookupStrategy;
 import org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
 import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
+import org.apache.paimon.mergetree.compact.aggregate.FieldLastValueAgg;
 import org.apache.paimon.mergetree.compact.aggregate.FieldSumAgg;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.UserDefinedSeqComparator;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
 
 import org.assertj.core.util.Lists;
 import org.junit.jupiter.api.Test;
@@ -69,6 +74,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
                         EQUALISER,
                         changelogRowDeduplicate,
                         LookupStrategy.CHANGELOG_ONLY,
+                        null,
                         null);
 
         // Without level-0
@@ -143,6 +149,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
 
         // With level-0 'delete' record, without level-x record, query fail
         function.reset();
+        highLevel.clear();
         function.add(new KeyValue().replace(row(1), 1, UPDATE_BEFORE, 
row(2)).setLevel(0));
         result = function.getResult();
         assertThat(result).isNotNull();
@@ -227,6 +234,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
                         EQUALISER,
                         changelogRowDeduplicate,
                         LookupStrategy.CHANGELOG_ONLY,
+                        null,
                         null);
 
         // Without level-0
@@ -295,6 +303,85 @@ public class LookupChangelogMergeFunctionWrapperTest {
         assertThat(kv.value().getInt(0)).isEqualTo(2);
     }
 
+    @Test
+    public void testMergeHighLevelOrder() {
+        Map<InternalRow, KeyValue> highLevel = new HashMap<>();
+        LookupChangelogMergeFunctionWrapper function =
+                new LookupChangelogMergeFunctionWrapper(
+                        LookupMergeFunction.wrap(
+                                projection ->
+                                        new AggregateMergeFunction(
+                                                new FieldGetter[] {
+                                                    row -> row.isNullAt(0) ? 
null : row.getInt(0)
+                                                },
+                                                new FieldAggregator[] {
+                                                    new 
FieldLastValueAgg(DataTypes.INT())
+                                                }),
+                                RowType.of(DataTypes.INT()),
+                                RowType.of(DataTypes.INT())),
+                        highLevel::get,
+                        EQUALISER,
+                        false,
+                        LookupStrategy.CHANGELOG_ONLY,
+                        null,
+                        UserDefinedSeqComparator.create(
+                                RowType.builder().field("f0", 
DataTypes.INT()).build(),
+                                
CoreOptions.fromMap(ImmutableMap.of("sequence.field", "f0"))));
+
+        // Only level-0 record and find record of higher sequence.field value 
in high level.
+        highLevel.put(row(1), new KeyValue().replace(row(1), 1, INSERT, 
row(3)).setLevel(3));
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 2, INSERT, 
row(2)).setLevel(0));
+        ChangelogResult result = function.getResult();
+        assertThat(result).isNotNull();
+        List<KeyValue> changelogs = result.changelogs();
+        // 3 -> 3
+        assertThat(changelogs).hasSize(2);
+        assertThat(changelogs.get(0).valueKind()).isEqualTo(UPDATE_BEFORE);
+        assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(3);
+        assertThat(changelogs.get(1).valueKind()).isEqualTo(UPDATE_AFTER);
+        assertThat(changelogs.get(1).value().getInt(0)).isEqualTo(3);
+        KeyValue kv = result.result();
+        assertThat(kv).isNotNull();
+        assertThat(kv.value().getInt(0)).isEqualTo(3);
+
+        // Only level-0 record and find record of lower sequence.field value 
in high level.
+        highLevel.put(row(1), new KeyValue().replace(row(1), 1, INSERT, 
row(1)).setLevel(3));
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 2, INSERT, 
row(2)).setLevel(0));
+        result = function.getResult();
+        assertThat(result).isNotNull();
+        changelogs = result.changelogs();
+        // 1 -> 2
+        assertThat(changelogs).hasSize(2);
+        assertThat(changelogs.get(0).valueKind()).isEqualTo(UPDATE_BEFORE);
+        assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(1);
+        assertThat(changelogs.get(1).valueKind()).isEqualTo(UPDATE_AFTER);
+        assertThat(changelogs.get(1).value().getInt(0)).isEqualTo(2);
+        kv = result.result();
+        assertThat(kv).isNotNull();
+        assertThat(kv.value().getInt(0)).isEqualTo(2);
+
+        // Only level-0 record and find record of middle sequence.field value 
in high level.
+        // 1 2 3
+        highLevel.put(row(1), new KeyValue().replace(row(1), 1, INSERT, 
row(2)).setLevel(3));
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 2, INSERT, 
row(1)).setLevel(0));
+        function.add(new KeyValue().replace(row(1), 2, INSERT, 
row(3)).setLevel(0));
+        result = function.getResult();
+        assertThat(result).isNotNull();
+        changelogs = result.changelogs();
+        // 2 -> 3
+        assertThat(changelogs).hasSize(2);
+        assertThat(changelogs.get(0).valueKind()).isEqualTo(UPDATE_BEFORE);
+        assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(2);
+        assertThat(changelogs.get(1).valueKind()).isEqualTo(UPDATE_AFTER);
+        assertThat(changelogs.get(1).value().getInt(0)).isEqualTo(3);
+        kv = result.result();
+        assertThat(kv).isNotNull();
+        assertThat(kv.value().getInt(0)).isEqualTo(3);
+    }
+
     @Test
     public void testFirstRow() {
         Set<InternalRow> highLevel = new HashSet<>();

Reply via email to