This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ba8b8c125 [core] Refactor ChangelogMergeTreeRewriter to lookup free 
(#2986)
ba8b8c125 is described below

commit ba8b8c1258f66e6fd6b5f93d0b87cd443a38f426
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 11 17:01:29 2024 +0800

    [core] Refactor ChangelogMergeTreeRewriter to lookup free (#2986)
---
 .../compact/ChangelogMergeTreeRewriter.java        | 43 ++++++++++------------
 .../FullChangelogMergeTreeCompactRewriter.java     |  5 +--
 .../compact/LookupMergeTreeCompactRewriter.java    | 29 ++++++++-------
 .../compact/MergeTreeCompactRewriter.java          | 14 ++-----
 .../paimon/operation/KeyValueFileStoreWrite.java   | 18 +++------
 5 files changed, 47 insertions(+), 62 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index c807fca23..0024d79ce 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -22,12 +22,10 @@ import org.apache.paimon.CoreOptions.MergeEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.compact.CompactResult;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
 import org.apache.paimon.io.RollingFileWriter;
-import org.apache.paimon.lookup.LookupStrategy;
 import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.MergeTreeReaders;
 import org.apache.paimon.mergetree.SortedRun;
@@ -49,7 +47,8 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
 
     protected final int maxLevel;
     protected final MergeEngine mergeEngine;
-    protected final LookupStrategy lookupStrategy;
+    private final boolean produceChangelog;
+    private final boolean forceDropDelete;
 
     public ChangelogMergeTreeRewriter(
             int maxLevel,
@@ -60,19 +59,19 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
             @Nullable FieldsComparator userDefinedSeqComparator,
             MergeFunctionFactory<KeyValue> mfFactory,
             MergeSorter mergeSorter,
-            LookupStrategy lookupStrategy,
-            @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
+            boolean produceChangelog,
+            boolean forceDropDelete) {
         super(
                 readerFactory,
                 writerFactory,
                 keyComparator,
                 userDefinedSeqComparator,
                 mfFactory,
-                mergeSorter,
-                deletionVectorsMaintainer);
+                mergeSorter);
         this.maxLevel = maxLevel;
         this.mergeEngine = mergeEngine;
-        this.lookupStrategy = lookupStrategy;
+        this.produceChangelog = produceChangelog;
+        this.forceDropDelete = forceDropDelete;
     }
 
     protected abstract boolean rewriteChangelog(
@@ -143,17 +142,19 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
             if (rewriteCompactFile) {
                 compactFileWriter = 
writerFactory.createRollingMergeTreeFileWriter(outputLevel);
             }
-            if (lookupStrategy.produceChangelog) {
+            if (produceChangelog) {
                 changelogFileWriter = 
writerFactory.createRollingChangelogFileWriter(outputLevel);
             }
 
             while (iterator.hasNext()) {
                 ChangelogResult result = iterator.next();
                 KeyValue keyValue = result.result();
-                if (rewriteCompactFile && keyValue != null && (!dropDelete || 
keyValue.isAdd())) {
+                if (compactFileWriter != null
+                        && keyValue != null
+                        && (!dropDelete || keyValue.isAdd())) {
                     compactFileWriter.write(keyValue);
                 }
-                if (lookupStrategy.produceChangelog) {
+                if (produceChangelog) {
                     for (KeyValue kv : result.changelogs()) {
                         changelogFileWriter.write(kv);
                     }
@@ -173,24 +174,19 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
 
         List<DataFileMeta> before = extractFilesFromSections(sections);
         List<DataFileMeta> after =
-                rewriteCompactFile
+                compactFileWriter != null
                         ? compactFileWriter.result()
                         : before.stream()
                                 .map(x -> x.upgrade(outputLevel))
                                 .collect(Collectors.toList());
 
-        if (deletionVectorsMaintainer != null) {
-            for (DataFileMeta dataFileMeta : before) {
-                
deletionVectorsMaintainer.removeDeletionVectorOf(dataFileMeta.fileName());
-            }
-        }
+        notifyCompactBefore(before);
 
-        return new CompactResult(
-                before,
-                after,
-                lookupStrategy.produceChangelog
+        List<DataFileMeta> changelogFiles =
+                changelogFileWriter != null
                         ? changelogFileWriter.result()
-                        : Collections.emptyList());
+                        : Collections.emptyList();
+        return new CompactResult(before, after, changelogFiles);
     }
 
     @Override
@@ -201,8 +197,7 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
                     outputLevel,
                     Collections.singletonList(
                             
Collections.singletonList(SortedRun.fromSingle(file))),
-                    // In deletion vector mode, we always drop deletion
-                    lookupStrategy.deletionVector,
+                    forceDropDelete,
                     strategy.rewrite);
         } else {
             return super.upgrade(outputLevel, file);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
index 1eaa76cf4..5415427f4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
@@ -25,7 +25,6 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
-import org.apache.paimon.lookup.LookupStrategy;
 import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.SortedRun;
 import org.apache.paimon.utils.FieldsComparator;
@@ -66,8 +65,8 @@ public class FullChangelogMergeTreeCompactRewriter extends 
ChangelogMergeTreeRew
                 userDefinedSeqComparator,
                 mfFactory,
                 mergeSorter,
-                LookupStrategy.CHANGELOG_ONLY,
-                null);
+                true,
+                false);
         this.valueEqualiser = valueEqualiser;
         this.changelogRowDeduplicate = changelogRowDeduplicate;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index 95b7ab78c..23d2cdb26 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -42,7 +42,6 @@ import java.util.List;
 import static 
org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_NO_REWRITE;
 import static 
org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_WITH_REWRITE;
 import static 
org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.NO_CHANGELOG;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
  * A {@link MergeTreeCompactRewriter} which produces changelog files by lookup 
for the compaction
@@ -52,6 +51,7 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
 
     private final LookupLevels<T> lookupLevels;
     private final MergeFunctionWrapperFactory<T> wrapperFactory;
+    @Nullable private final DeletionVectorsMaintainer dvMaintainer;
 
     public LookupMergeTreeCompactRewriter(
             int maxLevel,
@@ -64,8 +64,8 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
             MergeFunctionFactory<KeyValue> mfFactory,
             MergeSorter mergeSorter,
             MergeFunctionWrapperFactory<T> wrapperFactory,
-            LookupStrategy lookupStrategy,
-            @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
+            boolean produceChangelog,
+            @Nullable DeletionVectorsMaintainer dvMaintainer) {
         super(
                 maxLevel,
                 mergeEngine,
@@ -75,17 +75,20 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
                 userDefinedSeqComparator,
                 mfFactory,
                 mergeSorter,
-                lookupStrategy,
-                deletionVectorsMaintainer);
-        if (lookupStrategy.deletionVector) {
-            checkArgument(
-                    deletionVectorsMaintainer != null,
-                    "deletionVectorsMaintainer should not be null, there is a 
bug.");
-        }
+                produceChangelog,
+                dvMaintainer != null);
+        this.dvMaintainer = dvMaintainer;
         this.lookupLevels = lookupLevels;
         this.wrapperFactory = wrapperFactory;
     }
 
+    @Override
+    protected void notifyCompactBefore(List<DataFileMeta> files) {
+        if (dvMaintainer != null) {
+            files.forEach(file -> 
dvMaintainer.removeDeletionVectorOf(file.fileName()));
+        }
+    }
+
     @Override
     protected boolean rewriteChangelog(
             int outputLevel, boolean dropDelete, List<List<SortedRun>> 
sections) {
@@ -99,7 +102,8 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
         }
 
         // In deletionVector mode, since drop delete is required, rewrite is 
always required.
-        if (lookupStrategy.deletionVector) {
+        // TODO wait https://github.com/apache/incubator-paimon/pull/2962
+        if (dvMaintainer != null) {
             return CHANGELOG_WITH_REWRITE;
         }
 
@@ -121,8 +125,7 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
 
     @Override
     protected MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int 
outputLevel) {
-        return wrapperFactory.create(
-                mfFactory, outputLevel, lookupLevels, 
deletionVectorsMaintainer);
+        return wrapperFactory.create(mfFactory, outputLevel, lookupLevels, 
dvMaintainer);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
index 8e8f67207..b4659db48 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
@@ -21,7 +21,6 @@ package org.apache.paimon.mergetree.compact;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.compact.CompactResult;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
@@ -47,7 +46,6 @@ public class MergeTreeCompactRewriter extends 
AbstractCompactRewriter {
     @Nullable protected final FieldsComparator userDefinedSeqComparator;
     protected final MergeFunctionFactory<KeyValue> mfFactory;
     protected final MergeSorter mergeSorter;
-    @Nullable protected final DeletionVectorsMaintainer 
deletionVectorsMaintainer;
 
     public MergeTreeCompactRewriter(
             KeyValueFileReaderFactory readerFactory,
@@ -55,15 +53,13 @@ public class MergeTreeCompactRewriter extends 
AbstractCompactRewriter {
             Comparator<InternalRow> keyComparator,
             @Nullable FieldsComparator userDefinedSeqComparator,
             MergeFunctionFactory<KeyValue> mfFactory,
-            MergeSorter mergeSorter,
-            @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
+            MergeSorter mergeSorter) {
         this.readerFactory = readerFactory;
         this.writerFactory = writerFactory;
         this.keyComparator = keyComparator;
         this.userDefinedSeqComparator = userDefinedSeqComparator;
         this.mfFactory = mfFactory;
         this.mergeSorter = mergeSorter;
-        this.deletionVectorsMaintainer = deletionVectorsMaintainer;
     }
 
     @Override
@@ -88,11 +84,9 @@ public class MergeTreeCompactRewriter extends 
AbstractCompactRewriter {
         writer.write(new RecordReaderIterator<>(sectionsReader));
         writer.close();
         List<DataFileMeta> before = extractFilesFromSections(sections);
-        if (deletionVectorsMaintainer != null) {
-            for (DataFileMeta dataFileMeta : before) {
-                
deletionVectorsMaintainer.removeDeletionVectorOf(dataFileMeta.fileName());
-            }
-        }
+        notifyCompactBefore(before);
         return new CompactResult(before, writer.result());
     }
+
+    protected void notifyCompactBefore(List<DataFileMeta> files) {}
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index b37069fe4..ce4a27813 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -163,7 +163,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             List<DataFileMeta> restoreFiles,
             @Nullable CommitIncrement restoreIncrement,
             ExecutorService compactExecutor,
-            @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
+            @Nullable DeletionVectorsMaintainer dvMaintainer) {
         if (LOG.isDebugEnabled()) {
             LOG.debug(
                     "Creating merge tree writer for partition {} bucket {} 
from restored files {}",
@@ -188,12 +188,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                         : universalCompaction;
         CompactManager compactManager =
                 createCompactManager(
-                        partition,
-                        bucket,
-                        compactStrategy,
-                        compactExecutor,
-                        levels,
-                        deletionVectorsMaintainer);
+                        partition, bucket, compactStrategy, compactExecutor, 
levels, dvMaintainer);
 
         return new MergeTreeWriter(
                 bufferSpillable(),
@@ -222,7 +217,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             CompactStrategy compactStrategy,
             ExecutorService compactExecutor,
             Levels levels,
-            @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
+            @Nullable DeletionVectorsMaintainer dvMaintainer) {
         if (options.writeOnly()) {
             return new NoopCompactManager();
         } else {
@@ -235,7 +230,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                             keyComparator,
                             userDefinedSeqComparator,
                             levels,
-                            deletionVectorsMaintainer);
+                            dvMaintainer);
             return new MergeTreeCompactManager(
                     compactExecutor,
                     levels,
@@ -321,7 +316,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                     mfFactory,
                     mergeSorter,
                     wrapperFactory,
-                    lookupStrategy,
+                    lookupStrategy.produceChangelog,
                     deletionVectorsMaintainer);
         } else {
             return new MergeTreeCompactRewriter(
@@ -330,8 +325,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                     keyComparator,
                     userDefinedSeqComparator,
                     mfFactory,
-                    mergeSorter,
-                    deletionVectorsMaintainer);
+                    mergeSorter);
         }
     }
 

Reply via email to