This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e9d5ebceb [core] Fix the merge order in lookup compaction (#3286)
e9d5ebceb is described below
commit e9d5ebceb8b6e482bc3092ad21027a27bb95bdb5
Author: Aitozi <[email protected]>
AuthorDate: Tue Apr 30 10:56:45 2024 +0800
[core] Fix the merge order in lookup compaction (#3286)
---
.../LookupChangelogMergeFunctionWrapper.java | 48 ++++++++++--
.../compact/LookupMergeTreeCompactRewriter.java | 9 ++-
.../paimon/operation/KeyValueFileStoreWrite.java | 3 +-
.../LookupChangelogMergeFunctionWrapperTest.java | 87 ++++++++++++++++++++++
4 files changed, 137 insertions(+), 10 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
index aa5555fd5..7f90871ac 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -25,11 +25,15 @@ import
org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.mergetree.LookupLevels.PositionedKeyValue;
import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.FieldsComparator;
+import org.apache.paimon.utils.UserDefinedSeqComparator;
import javax.annotation.Nullable;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.List;
import java.util.function.Function;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -63,6 +67,7 @@ public class LookupChangelogMergeFunctionWrapper<T>
private final boolean changelogRowDeduplicate;
private final LookupStrategy lookupStrategy;
private final @Nullable DeletionVectorsMaintainer
deletionVectorsMaintainer;
+ private final Comparator<KeyValue> comparator;
public LookupChangelogMergeFunctionWrapper(
MergeFunctionFactory<KeyValue> mergeFunctionFactory,
@@ -70,7 +75,8 @@ public class LookupChangelogMergeFunctionWrapper<T>
RecordEqualiser valueEqualiser,
boolean changelogRowDeduplicate,
LookupStrategy lookupStrategy,
- @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
+ @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer,
+ UserDefinedSeqComparator userDefinedSeqComparator) {
MergeFunction<KeyValue> mergeFunction = mergeFunctionFactory.create();
checkArgument(
mergeFunction instanceof LookupMergeFunction,
@@ -88,6 +94,7 @@ public class LookupChangelogMergeFunctionWrapper<T>
this.changelogRowDeduplicate = changelogRowDeduplicate;
this.lookupStrategy = lookupStrategy;
this.deletionVectorsMaintainer = deletionVectorsMaintainer;
+ this.comparator = createSequenceComparator(userDefinedSeqComparator);
}
@Override
@@ -136,12 +143,7 @@ public class LookupChangelogMergeFunctionWrapper<T>
}
// 3. Calculate result
- mergeFunction2.reset();
- if (highLevel != null) {
- mergeFunction2.add(highLevel);
- }
- candidates.forEach(mergeFunction2::add);
- KeyValue result = mergeFunction2.getResult();
+ KeyValue result = calculateResult(candidates, highLevel);
// 4. Set changelog when there's level-0 records
reusedResult.reset();
@@ -152,6 +154,23 @@ public class LookupChangelogMergeFunctionWrapper<T>
return reusedResult.setResult(result);
}
+ private KeyValue calculateResult(List<KeyValue> candidates, @Nullable
KeyValue highLevel) {
+ mergeFunction2.reset();
+ for (KeyValue candidate : candidates) {
+ if (highLevel != null && comparator.compare(highLevel, candidate)
< 0) {
+ mergeFunction2.add(highLevel);
+ mergeFunction2.add(candidate);
+ highLevel = null;
+ } else {
+ mergeFunction2.add(candidate);
+ }
+ }
+ if (highLevel != null) {
+ mergeFunction2.add(highLevel);
+ }
+ return mergeFunction2.getResult();
+ }
+
private void setChangelog(@Nullable KeyValue before, KeyValue after) {
if (before == null || !before.isAdd()) {
if (after.isAdd()) {
@@ -180,4 +199,19 @@ public class LookupChangelogMergeFunctionWrapper<T>
private KeyValue replace(KeyValue reused, RowKind valueKind, KeyValue
from) {
return reused.replace(from.key(), from.sequenceNumber(), valueKind,
from.value());
}
+
+ private Comparator<KeyValue> createSequenceComparator(
+ @Nullable FieldsComparator userDefinedSeqComparator) {
+ if (userDefinedSeqComparator == null) {
+ return Comparator.comparingLong(KeyValue::sequenceNumber);
+ }
+
+ return (o1, o2) -> {
+ int result = userDefinedSeqComparator.compare(o1.value(),
o2.value());
+ if (result != 0) {
+ return result;
+ }
+ return Long.compare(o1.sequenceNumber(), o2.sequenceNumber());
+ };
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index 88fa682d2..fea64539b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -31,6 +31,7 @@ import org.apache.paimon.mergetree.LookupLevels;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.utils.FieldsComparator;
+import org.apache.paimon.utils.UserDefinedSeqComparator;
import javax.annotation.Nullable;
@@ -150,14 +151,17 @@ public class LookupMergeTreeCompactRewriter<T> extends
ChangelogMergeTreeRewrite
private final RecordEqualiser valueEqualiser;
private final boolean changelogRowDeduplicate;
private final LookupStrategy lookupStrategy;
+ @Nullable private final UserDefinedSeqComparator
userDefinedSeqComparator;
public LookupMergeFunctionWrapperFactory(
RecordEqualiser valueEqualiser,
boolean changelogRowDeduplicate,
- LookupStrategy lookupStrategy) {
+ LookupStrategy lookupStrategy,
+ @Nullable UserDefinedSeqComparator userDefinedSeqComparator) {
this.valueEqualiser = valueEqualiser;
this.changelogRowDeduplicate = changelogRowDeduplicate;
this.lookupStrategy = lookupStrategy;
+ this.userDefinedSeqComparator = userDefinedSeqComparator;
}
@Override
@@ -178,7 +182,8 @@ public class LookupMergeTreeCompactRewriter<T> extends
ChangelogMergeTreeRewrite
valueEqualiser,
changelogRowDeduplicate,
lookupStrategy,
- deletionVectorsMaintainer);
+ deletionVectorsMaintainer,
+ userDefinedSeqComparator);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index a4b153239..8343ba8b8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -307,7 +307,8 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
new LookupMergeFunctionWrapperFactory<>(
valueEqualiserSupplier.get(),
options.changelogRowDeduplicate(),
- lookupStrategy);
+ lookupStrategy,
+ UserDefinedSeqComparator.create(valueType,
options));
}
return new LookupMergeTreeCompactRewriter(
maxLevel,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index 0a9d83ee3..870df10f9 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.mergetree.compact;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.data.InternalRow;
@@ -25,11 +26,15 @@ import org.apache.paimon.data.InternalRow.FieldGetter;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
+import org.apache.paimon.mergetree.compact.aggregate.FieldLastValueAgg;
import org.apache.paimon.mergetree.compact.aggregate.FieldSumAgg;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.UserDefinedSeqComparator;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Test;
@@ -69,6 +74,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
EQUALISER,
changelogRowDeduplicate,
LookupStrategy.CHANGELOG_ONLY,
+ null,
null);
// Without level-0
@@ -143,6 +149,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
// With level-0 'delete' record, without level-x record, query fail
function.reset();
+ highLevel.clear();
function.add(new KeyValue().replace(row(1), 1, UPDATE_BEFORE,
row(2)).setLevel(0));
result = function.getResult();
assertThat(result).isNotNull();
@@ -227,6 +234,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
EQUALISER,
changelogRowDeduplicate,
LookupStrategy.CHANGELOG_ONLY,
+ null,
null);
// Without level-0
@@ -295,6 +303,85 @@ public class LookupChangelogMergeFunctionWrapperTest {
assertThat(kv.value().getInt(0)).isEqualTo(2);
}
+ @Test
+ public void testMergeHighLevelOrder() {
+ Map<InternalRow, KeyValue> highLevel = new HashMap<>();
+ LookupChangelogMergeFunctionWrapper function =
+ new LookupChangelogMergeFunctionWrapper(
+ LookupMergeFunction.wrap(
+ projection ->
+ new AggregateMergeFunction(
+ new FieldGetter[] {
+ row -> row.isNullAt(0) ?
null : row.getInt(0)
+ },
+ new FieldAggregator[] {
+ new
FieldLastValueAgg(DataTypes.INT())
+ }),
+ RowType.of(DataTypes.INT()),
+ RowType.of(DataTypes.INT())),
+ highLevel::get,
+ EQUALISER,
+ false,
+ LookupStrategy.CHANGELOG_ONLY,
+ null,
+ UserDefinedSeqComparator.create(
+ RowType.builder().field("f0",
DataTypes.INT()).build(),
+
CoreOptions.fromMap(ImmutableMap.of("sequence.field", "f0"))));
+
+ // Only level-0 record and find record of higher sequence.field value
in high level.
+ highLevel.put(row(1), new KeyValue().replace(row(1), 1, INSERT,
row(3)).setLevel(3));
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 2, INSERT,
row(2)).setLevel(0));
+ ChangelogResult result = function.getResult();
+ assertThat(result).isNotNull();
+ List<KeyValue> changelogs = result.changelogs();
+ // 3 -> 3
+ assertThat(changelogs).hasSize(2);
+ assertThat(changelogs.get(0).valueKind()).isEqualTo(UPDATE_BEFORE);
+ assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(3);
+ assertThat(changelogs.get(1).valueKind()).isEqualTo(UPDATE_AFTER);
+ assertThat(changelogs.get(1).value().getInt(0)).isEqualTo(3);
+ KeyValue kv = result.result();
+ assertThat(kv).isNotNull();
+ assertThat(kv.value().getInt(0)).isEqualTo(3);
+
+ // Only level-0 record and find record of lower sequence.field value
in high level.
+ highLevel.put(row(1), new KeyValue().replace(row(1), 1, INSERT,
row(1)).setLevel(3));
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 2, INSERT,
row(2)).setLevel(0));
+ result = function.getResult();
+ assertThat(result).isNotNull();
+ changelogs = result.changelogs();
+ // 1 -> 2
+ assertThat(changelogs).hasSize(2);
+ assertThat(changelogs.get(0).valueKind()).isEqualTo(UPDATE_BEFORE);
+ assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(1);
+ assertThat(changelogs.get(1).valueKind()).isEqualTo(UPDATE_AFTER);
+ assertThat(changelogs.get(1).value().getInt(0)).isEqualTo(2);
+ kv = result.result();
+ assertThat(kv).isNotNull();
+ assertThat(kv.value().getInt(0)).isEqualTo(2);
+
+ // Only level-0 record and find record of middle sequence.field value
in high level.
+ // 1 2 3
+ highLevel.put(row(1), new KeyValue().replace(row(1), 1, INSERT,
row(2)).setLevel(3));
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 2, INSERT,
row(1)).setLevel(0));
+ function.add(new KeyValue().replace(row(1), 2, INSERT,
row(3)).setLevel(0));
+ result = function.getResult();
+ assertThat(result).isNotNull();
+ changelogs = result.changelogs();
+ // 2 -> 3
+ assertThat(changelogs).hasSize(2);
+ assertThat(changelogs.get(0).valueKind()).isEqualTo(UPDATE_BEFORE);
+ assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(2);
+ assertThat(changelogs.get(1).valueKind()).isEqualTo(UPDATE_AFTER);
+ assertThat(changelogs.get(1).value().getInt(0)).isEqualTo(3);
+ kv = result.result();
+ assertThat(kv).isNotNull();
+ assertThat(kv.value().getInt(0)).isEqualTo(3);
+ }
+
@Test
public void testFirstRow() {
Set<InternalRow> highLevel = new HashSet<>();