This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7715c60595 [core] Slightly refatory code in
LookupChangelogMergeFunctionWrapper (#5010)
7715c60595 is described below
commit 7715c60595cbe987e0e766e63bc56e9ca47f5a2a
Author: YeJunHao <[email protected]>
AuthorDate: Tue Feb 11 09:56:23 2025 +0800
[core] Slightly refatory code in LookupChangelogMergeFunctionWrapper (#5010)
---
.../LookupChangelogMergeFunctionWrapper.java | 31 +++++++++++++---------
.../mergetree/compact/LookupMergeFunction.java | 12 ++++++---
2 files changed, 26 insertions(+), 17 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
index f182c7f4ad..450df52314 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -21,6 +21,7 @@ package org.apache.paimon.mergetree.compact;
import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.mergetree.LookupLevels.PositionedKeyValue;
@@ -56,8 +57,7 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
public class LookupChangelogMergeFunctionWrapper<T>
implements MergeFunctionWrapper<ChangelogResult> {
- private final LookupMergeFunction mergeFunction;
- private final MergeFunction<KeyValue> mergeFunction2;
+ private final MergeFunction<KeyValue> mergeFunction;
private final Function<InternalRow, T> lookup;
private final ChangelogResult reusedResult = new ChangelogResult();
@@ -68,6 +68,10 @@ public class LookupChangelogMergeFunctionWrapper<T>
private final @Nullable DeletionVectorsMaintainer
deletionVectorsMaintainer;
private final Comparator<KeyValue> comparator;
+ private final LinkedList<KeyValue> candidates = new LinkedList<>();
+ private final InternalRowSerializer keySerializer;
+ private final InternalRowSerializer valueSerializer;
+
public LookupChangelogMergeFunctionWrapper(
MergeFunctionFactory<KeyValue> mergeFunctionFactory,
Function<InternalRow, T> lookup,
@@ -85,8 +89,10 @@ public class LookupChangelogMergeFunctionWrapper<T>
deletionVectorsMaintainer != null,
"deletionVectorsMaintainer should not be null, there is a
bug.");
}
- this.mergeFunction = (LookupMergeFunction) mergeFunction;
- this.mergeFunction2 = mergeFunctionFactory.create();
+ LookupMergeFunction lookupMergeFunction = (LookupMergeFunction)
mergeFunction;
+ this.keySerializer = lookupMergeFunction.getKeySerializer();
+ this.valueSerializer = lookupMergeFunction.getValueSerializer();
+ this.mergeFunction = mergeFunctionFactory.create();
this.lookup = lookup;
this.valueEqualiser = valueEqualiser;
this.lookupStrategy = lookupStrategy;
@@ -96,18 +102,17 @@ public class LookupChangelogMergeFunctionWrapper<T>
@Override
public void reset() {
- mergeFunction.reset();
+ candidates.clear();
}
@Override
public void add(KeyValue kv) {
- mergeFunction.add(kv);
+ candidates.add(kv.copy(keySerializer, valueSerializer));
}
@Override
public ChangelogResult getResult() {
// 1. Compute the latest high level record and containLevel0 of
candidates
- LinkedList<KeyValue> candidates = mergeFunction.candidates();
Iterator<KeyValue> descending = candidates.descendingIterator();
KeyValue highLevel = null;
boolean containLevel0 = false;
@@ -152,20 +157,20 @@ public class LookupChangelogMergeFunctionWrapper<T>
}
private KeyValue calculateResult(List<KeyValue> candidates, @Nullable
KeyValue highLevel) {
- mergeFunction2.reset();
+ mergeFunction.reset();
for (KeyValue candidate : candidates) {
if (highLevel != null && comparator.compare(highLevel, candidate)
< 0) {
- mergeFunction2.add(highLevel);
- mergeFunction2.add(candidate);
+ mergeFunction.add(highLevel);
+ mergeFunction.add(candidate);
highLevel = null;
} else {
- mergeFunction2.add(candidate);
+ mergeFunction.add(candidate);
}
}
if (highLevel != null) {
- mergeFunction2.add(highLevel);
+ mergeFunction.add(highLevel);
}
- return mergeFunction2.getResult();
+ return mergeFunction.getResult();
}
private void setChangelog(@Nullable KeyValue before, KeyValue after) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index 71425ef50d..a3a6af23cb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -57,6 +57,14 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
candidates.add(kv.copy(keySerializer, valueSerializer));
}
+ public InternalRowSerializer getKeySerializer() {
+ return keySerializer;
+ }
+
+ public InternalRowSerializer getValueSerializer() {
+ return valueSerializer;
+ }
+
@Override
public KeyValue getResult() {
// 1. Find the latest high level record
@@ -79,10 +87,6 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
return mergeFunction.getResult();
}
- LinkedList<KeyValue> candidates() {
- return candidates;
- }
-
public static MergeFunctionFactory<KeyValue> wrap(
MergeFunctionFactory<KeyValue> wrapped, RowType keyType, RowType
valueType) {
if (wrapped.create() instanceof FirstRowMergeFunction) {