This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7715c60595 [core] Slightly refatory code in 
LookupChangelogMergeFunctionWrapper (#5010)
7715c60595 is described below

commit 7715c60595cbe987e0e766e63bc56e9ca47f5a2a
Author: YeJunHao <[email protected]>
AuthorDate: Tue Feb 11 09:56:23 2025 +0800

    [core] Slightly refatory code in LookupChangelogMergeFunctionWrapper (#5010)
---
 .../LookupChangelogMergeFunctionWrapper.java       | 31 +++++++++++++---------
 .../mergetree/compact/LookupMergeFunction.java     | 12 ++++++---
 2 files changed, 26 insertions(+), 17 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
index f182c7f4ad..450df52314 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -21,6 +21,7 @@ package org.apache.paimon.mergetree.compact;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.codegen.RecordEqualiser;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.lookup.LookupStrategy;
 import org.apache.paimon.mergetree.LookupLevels.PositionedKeyValue;
@@ -56,8 +57,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 public class LookupChangelogMergeFunctionWrapper<T>
         implements MergeFunctionWrapper<ChangelogResult> {
 
-    private final LookupMergeFunction mergeFunction;
-    private final MergeFunction<KeyValue> mergeFunction2;
+    private final MergeFunction<KeyValue> mergeFunction;
     private final Function<InternalRow, T> lookup;
 
     private final ChangelogResult reusedResult = new ChangelogResult();
@@ -68,6 +68,10 @@ public class LookupChangelogMergeFunctionWrapper<T>
     private final @Nullable DeletionVectorsMaintainer 
deletionVectorsMaintainer;
     private final Comparator<KeyValue> comparator;
 
+    private final LinkedList<KeyValue> candidates = new LinkedList<>();
+    private final InternalRowSerializer keySerializer;
+    private final InternalRowSerializer valueSerializer;
+
     public LookupChangelogMergeFunctionWrapper(
             MergeFunctionFactory<KeyValue> mergeFunctionFactory,
             Function<InternalRow, T> lookup,
@@ -85,8 +89,10 @@ public class LookupChangelogMergeFunctionWrapper<T>
                     deletionVectorsMaintainer != null,
                     "deletionVectorsMaintainer should not be null, there is a 
bug.");
         }
-        this.mergeFunction = (LookupMergeFunction) mergeFunction;
-        this.mergeFunction2 = mergeFunctionFactory.create();
+        LookupMergeFunction lookupMergeFunction = (LookupMergeFunction) 
mergeFunction;
+        this.keySerializer = lookupMergeFunction.getKeySerializer();
+        this.valueSerializer = lookupMergeFunction.getValueSerializer();
+        this.mergeFunction = mergeFunctionFactory.create();
         this.lookup = lookup;
         this.valueEqualiser = valueEqualiser;
         this.lookupStrategy = lookupStrategy;
@@ -96,18 +102,17 @@ public class LookupChangelogMergeFunctionWrapper<T>
 
     @Override
     public void reset() {
-        mergeFunction.reset();
+        candidates.clear();
     }
 
     @Override
     public void add(KeyValue kv) {
-        mergeFunction.add(kv);
+        candidates.add(kv.copy(keySerializer, valueSerializer));
     }
 
     @Override
     public ChangelogResult getResult() {
         // 1. Compute the latest high level record and containLevel0 of 
candidates
-        LinkedList<KeyValue> candidates = mergeFunction.candidates();
         Iterator<KeyValue> descending = candidates.descendingIterator();
         KeyValue highLevel = null;
         boolean containLevel0 = false;
@@ -152,20 +157,20 @@ public class LookupChangelogMergeFunctionWrapper<T>
     }
 
     private KeyValue calculateResult(List<KeyValue> candidates, @Nullable 
KeyValue highLevel) {
-        mergeFunction2.reset();
+        mergeFunction.reset();
         for (KeyValue candidate : candidates) {
             if (highLevel != null && comparator.compare(highLevel, candidate) 
< 0) {
-                mergeFunction2.add(highLevel);
-                mergeFunction2.add(candidate);
+                mergeFunction.add(highLevel);
+                mergeFunction.add(candidate);
                 highLevel = null;
             } else {
-                mergeFunction2.add(candidate);
+                mergeFunction.add(candidate);
             }
         }
         if (highLevel != null) {
-            mergeFunction2.add(highLevel);
+            mergeFunction.add(highLevel);
         }
-        return mergeFunction2.getResult();
+        return mergeFunction.getResult();
     }
 
     private void setChangelog(@Nullable KeyValue before, KeyValue after) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index 71425ef50d..a3a6af23cb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -57,6 +57,14 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
         candidates.add(kv.copy(keySerializer, valueSerializer));
     }
 
+    public InternalRowSerializer getKeySerializer() {
+        return keySerializer;
+    }
+
+    public InternalRowSerializer getValueSerializer() {
+        return valueSerializer;
+    }
+
     @Override
     public KeyValue getResult() {
         // 1. Find the latest high level record
@@ -79,10 +87,6 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
         return mergeFunction.getResult();
     }
 
-    LinkedList<KeyValue> candidates() {
-        return candidates;
-    }
-
     public static MergeFunctionFactory<KeyValue> wrap(
             MergeFunctionFactory<KeyValue> wrapped, RowType keyType, RowType 
valueType) {
         if (wrapped.create() instanceof FirstRowMergeFunction) {

Reply via email to