This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ba8b8c125 [core] Refactor ChangelogMergeTreeRewriter to lookup free
(#2986)
ba8b8c125 is described below
commit ba8b8c1258f66e6fd6b5f93d0b87cd443a38f426
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 11 17:01:29 2024 +0800
[core] Refactor ChangelogMergeTreeRewriter to lookup free (#2986)
---
.../compact/ChangelogMergeTreeRewriter.java | 43 ++++++++++------------
.../FullChangelogMergeTreeCompactRewriter.java | 5 +--
.../compact/LookupMergeTreeCompactRewriter.java | 29 ++++++++-------
.../compact/MergeTreeCompactRewriter.java | 14 ++-----
.../paimon/operation/KeyValueFileStoreWrite.java | 18 +++------
5 files changed, 47 insertions(+), 62 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index c807fca23..0024d79ce 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -22,12 +22,10 @@ import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.io.RollingFileWriter;
-import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeReaders;
import org.apache.paimon.mergetree.SortedRun;
@@ -49,7 +47,8 @@ public abstract class ChangelogMergeTreeRewriter extends
MergeTreeCompactRewrite
protected final int maxLevel;
protected final MergeEngine mergeEngine;
- protected final LookupStrategy lookupStrategy;
+ private final boolean produceChangelog;
+ private final boolean forceDropDelete;
public ChangelogMergeTreeRewriter(
int maxLevel,
@@ -60,19 +59,19 @@ public abstract class ChangelogMergeTreeRewriter extends
MergeTreeCompactRewrite
@Nullable FieldsComparator userDefinedSeqComparator,
MergeFunctionFactory<KeyValue> mfFactory,
MergeSorter mergeSorter,
- LookupStrategy lookupStrategy,
- @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
+ boolean produceChangelog,
+ boolean forceDropDelete) {
super(
readerFactory,
writerFactory,
keyComparator,
userDefinedSeqComparator,
mfFactory,
- mergeSorter,
- deletionVectorsMaintainer);
+ mergeSorter);
this.maxLevel = maxLevel;
this.mergeEngine = mergeEngine;
- this.lookupStrategy = lookupStrategy;
+ this.produceChangelog = produceChangelog;
+ this.forceDropDelete = forceDropDelete;
}
protected abstract boolean rewriteChangelog(
@@ -143,17 +142,19 @@ public abstract class ChangelogMergeTreeRewriter extends
MergeTreeCompactRewrite
if (rewriteCompactFile) {
compactFileWriter =
writerFactory.createRollingMergeTreeFileWriter(outputLevel);
}
- if (lookupStrategy.produceChangelog) {
+ if (produceChangelog) {
changelogFileWriter =
writerFactory.createRollingChangelogFileWriter(outputLevel);
}
while (iterator.hasNext()) {
ChangelogResult result = iterator.next();
KeyValue keyValue = result.result();
- if (rewriteCompactFile && keyValue != null && (!dropDelete ||
keyValue.isAdd())) {
+ if (compactFileWriter != null
+ && keyValue != null
+ && (!dropDelete || keyValue.isAdd())) {
compactFileWriter.write(keyValue);
}
- if (lookupStrategy.produceChangelog) {
+ if (produceChangelog) {
for (KeyValue kv : result.changelogs()) {
changelogFileWriter.write(kv);
}
@@ -173,24 +174,19 @@ public abstract class ChangelogMergeTreeRewriter extends
MergeTreeCompactRewrite
List<DataFileMeta> before = extractFilesFromSections(sections);
List<DataFileMeta> after =
- rewriteCompactFile
+ compactFileWriter != null
? compactFileWriter.result()
: before.stream()
.map(x -> x.upgrade(outputLevel))
.collect(Collectors.toList());
- if (deletionVectorsMaintainer != null) {
- for (DataFileMeta dataFileMeta : before) {
-
deletionVectorsMaintainer.removeDeletionVectorOf(dataFileMeta.fileName());
- }
- }
+ notifyCompactBefore(before);
- return new CompactResult(
- before,
- after,
- lookupStrategy.produceChangelog
+ List<DataFileMeta> changelogFiles =
+ changelogFileWriter != null
? changelogFileWriter.result()
- : Collections.emptyList());
+ : Collections.emptyList();
+ return new CompactResult(before, after, changelogFiles);
}
@Override
@@ -201,8 +197,7 @@ public abstract class ChangelogMergeTreeRewriter extends
MergeTreeCompactRewrite
outputLevel,
Collections.singletonList(
Collections.singletonList(SortedRun.fromSingle(file))),
- // In deletion vector mode, we always drop deletion
- lookupStrategy.deletionVector,
+ forceDropDelete,
strategy.rewrite);
} else {
return super.upgrade(outputLevel, file);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
index 1eaa76cf4..5415427f4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
@@ -25,7 +25,6 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
-import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.utils.FieldsComparator;
@@ -66,8 +65,8 @@ public class FullChangelogMergeTreeCompactRewriter extends
ChangelogMergeTreeRew
userDefinedSeqComparator,
mfFactory,
mergeSorter,
- LookupStrategy.CHANGELOG_ONLY,
- null);
+ true,
+ false);
this.valueEqualiser = valueEqualiser;
this.changelogRowDeduplicate = changelogRowDeduplicate;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index 95b7ab78c..23d2cdb26 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -42,7 +42,6 @@ import java.util.List;
import static
org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_NO_REWRITE;
import static
org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_WITH_REWRITE;
import static
org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.NO_CHANGELOG;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
* A {@link MergeTreeCompactRewriter} which produces changelog files by lookup
for the compaction
@@ -52,6 +51,7 @@ public class LookupMergeTreeCompactRewriter<T> extends
ChangelogMergeTreeRewrite
private final LookupLevels<T> lookupLevels;
private final MergeFunctionWrapperFactory<T> wrapperFactory;
+ @Nullable private final DeletionVectorsMaintainer dvMaintainer;
public LookupMergeTreeCompactRewriter(
int maxLevel,
@@ -64,8 +64,8 @@ public class LookupMergeTreeCompactRewriter<T> extends
ChangelogMergeTreeRewrite
MergeFunctionFactory<KeyValue> mfFactory,
MergeSorter mergeSorter,
MergeFunctionWrapperFactory<T> wrapperFactory,
- LookupStrategy lookupStrategy,
- @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
+ boolean produceChangelog,
+ @Nullable DeletionVectorsMaintainer dvMaintainer) {
super(
maxLevel,
mergeEngine,
@@ -75,17 +75,20 @@ public class LookupMergeTreeCompactRewriter<T> extends
ChangelogMergeTreeRewrite
userDefinedSeqComparator,
mfFactory,
mergeSorter,
- lookupStrategy,
- deletionVectorsMaintainer);
- if (lookupStrategy.deletionVector) {
- checkArgument(
- deletionVectorsMaintainer != null,
- "deletionVectorsMaintainer should not be null, there is a
bug.");
- }
+ produceChangelog,
+ dvMaintainer != null);
+ this.dvMaintainer = dvMaintainer;
this.lookupLevels = lookupLevels;
this.wrapperFactory = wrapperFactory;
}
+ @Override
+ protected void notifyCompactBefore(List<DataFileMeta> files) {
+ if (dvMaintainer != null) {
+ files.forEach(file ->
dvMaintainer.removeDeletionVectorOf(file.fileName()));
+ }
+ }
+
@Override
protected boolean rewriteChangelog(
int outputLevel, boolean dropDelete, List<List<SortedRun>>
sections) {
@@ -99,7 +102,8 @@ public class LookupMergeTreeCompactRewriter<T> extends
ChangelogMergeTreeRewrite
}
// In deletionVector mode, since drop delete is required, rewrite is
always required.
- if (lookupStrategy.deletionVector) {
+ // TODO wait https://github.com/apache/incubator-paimon/pull/2962
+ if (dvMaintainer != null) {
return CHANGELOG_WITH_REWRITE;
}
@@ -121,8 +125,7 @@ public class LookupMergeTreeCompactRewriter<T> extends
ChangelogMergeTreeRewrite
@Override
protected MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int
outputLevel) {
- return wrapperFactory.create(
- mfFactory, outputLevel, lookupLevels,
deletionVectorsMaintainer);
+ return wrapperFactory.create(mfFactory, outputLevel, lookupLevels,
dvMaintainer);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
index 8e8f67207..b4659db48 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
@@ -21,7 +21,6 @@ package org.apache.paimon.mergetree.compact;
import org.apache.paimon.KeyValue;
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
@@ -47,7 +46,6 @@ public class MergeTreeCompactRewriter extends
AbstractCompactRewriter {
@Nullable protected final FieldsComparator userDefinedSeqComparator;
protected final MergeFunctionFactory<KeyValue> mfFactory;
protected final MergeSorter mergeSorter;
- @Nullable protected final DeletionVectorsMaintainer
deletionVectorsMaintainer;
public MergeTreeCompactRewriter(
KeyValueFileReaderFactory readerFactory,
@@ -55,15 +53,13 @@ public class MergeTreeCompactRewriter extends
AbstractCompactRewriter {
Comparator<InternalRow> keyComparator,
@Nullable FieldsComparator userDefinedSeqComparator,
MergeFunctionFactory<KeyValue> mfFactory,
- MergeSorter mergeSorter,
- @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
+ MergeSorter mergeSorter) {
this.readerFactory = readerFactory;
this.writerFactory = writerFactory;
this.keyComparator = keyComparator;
this.userDefinedSeqComparator = userDefinedSeqComparator;
this.mfFactory = mfFactory;
this.mergeSorter = mergeSorter;
- this.deletionVectorsMaintainer = deletionVectorsMaintainer;
}
@Override
@@ -88,11 +84,9 @@ public class MergeTreeCompactRewriter extends
AbstractCompactRewriter {
writer.write(new RecordReaderIterator<>(sectionsReader));
writer.close();
List<DataFileMeta> before = extractFilesFromSections(sections);
- if (deletionVectorsMaintainer != null) {
- for (DataFileMeta dataFileMeta : before) {
-
deletionVectorsMaintainer.removeDeletionVectorOf(dataFileMeta.fileName());
- }
- }
+ notifyCompactBefore(before);
return new CompactResult(before, writer.result());
}
+
+ protected void notifyCompactBefore(List<DataFileMeta> files) {}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index b37069fe4..ce4a27813 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -163,7 +163,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
List<DataFileMeta> restoreFiles,
@Nullable CommitIncrement restoreIncrement,
ExecutorService compactExecutor,
- @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
+ @Nullable DeletionVectorsMaintainer dvMaintainer) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Creating merge tree writer for partition {} bucket {}
from restored files {}",
@@ -188,12 +188,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
: universalCompaction;
CompactManager compactManager =
createCompactManager(
- partition,
- bucket,
- compactStrategy,
- compactExecutor,
- levels,
- deletionVectorsMaintainer);
+ partition, bucket, compactStrategy, compactExecutor,
levels, dvMaintainer);
return new MergeTreeWriter(
bufferSpillable(),
@@ -222,7 +217,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
CompactStrategy compactStrategy,
ExecutorService compactExecutor,
Levels levels,
- @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
+ @Nullable DeletionVectorsMaintainer dvMaintainer) {
if (options.writeOnly()) {
return new NoopCompactManager();
} else {
@@ -235,7 +230,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
keyComparator,
userDefinedSeqComparator,
levels,
- deletionVectorsMaintainer);
+ dvMaintainer);
return new MergeTreeCompactManager(
compactExecutor,
levels,
@@ -321,7 +316,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
mfFactory,
mergeSorter,
wrapperFactory,
- lookupStrategy,
+ lookupStrategy.produceChangelog,
deletionVectorsMaintainer);
} else {
return new MergeTreeCompactRewriter(
@@ -330,8 +325,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
keyComparator,
userDefinedSeqComparator,
mfFactory,
- mergeSorter,
- deletionVectorsMaintainer);
+ mergeSorter);
}
}