This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new dfb5278de [core] Integrate deletion vector to reader and writer (#2958)
dfb5278de is described below

commit dfb5278de0f08ea3cf4669a4c676fc124a8527bc
Author: Zouxxyy <[email protected]>
AuthorDate: Sat Mar 9 15:56:34 2024 +0800

    [core] Integrate deletion vector to reader and writer (#2958)
---
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  24 +++
 .../org/apache/paimon/lookup/LookupStrategy.java   |  56 ++++++
 .../java/org/apache/paimon/KeyValueFileStore.java  |  11 +-
 .../deletionvectors/ApplyDeletionVectorReader.java |  65 +++++++
 .../deletionvectors/DeletionVectorsIndexFile.java  |   4 +-
 .../deletionvectors/DeletionVectorsMaintainer.java |  12 +-
 .../paimon/io/KeyValueFileReaderFactory.java       |  53 ++++--
 .../org/apache/paimon/mergetree/LookupLevels.java  | 139 +++++++++++++--
 .../compact/ChangelogMergeTreeRewriter.java        |  37 +++-
 .../FullChangelogMergeTreeCompactRewriter.java     |   5 +-
 .../LookupChangelogMergeFunctionWrapper.java       |  41 ++++-
 .../compact/LookupMergeTreeCompactRewriter.java    |  51 ++++--
 .../mergetree/compact/MergeTreeCompactManager.java |   8 +-
 .../compact/MergeTreeCompactRewriter.java          |  14 +-
 .../paimon/operation/AbstractFileStoreWrite.java   |  42 ++++-
 .../paimon/operation/AppendOnlyFileStoreWrite.java |   6 +-
 .../apache/paimon/operation/FileStoreWrite.java    |   7 +-
 .../paimon/operation/KeyValueFileStoreRead.java    |  62 +++++--
 .../paimon/operation/KeyValueFileStoreWrite.java   | 168 +++++++++++-------
 .../paimon/operation/MemoryFileStoreWrite.java     |   3 +
 .../org/apache/paimon/schema/SchemaValidation.java |  27 +++
 .../paimon/table/PrimaryKeyFileStoreTable.java     |   3 +-
 .../apache/paimon/table/query/LocalTableQuery.java |   2 +-
 .../DeletionVectorsMaintainerTest.java             |   4 +-
 .../apache/paimon/mergetree/MergeTreeTestBase.java |   6 +-
 .../LookupChangelogMergeFunctionWrapperTest.java   |  13 +-
 .../compact/MergeTreeCompactManagerTest.java       |   3 +-
 .../org/apache/paimon/flink/sink/FlinkSink.java    |   7 +-
 .../sink/MultiTablesStoreCompactOperator.java      |   8 +-
 .../flink/PrimaryKeyFileStoreTableITCase.java      |  27 ++-
 .../flink/source/TestChangelogDataReadWrite.java   |   5 +-
 .../paimon/spark/sql/DeletionVectorTest.scala      | 189 +++++++++++++++++++++
 33 files changed, 937 insertions(+), 171 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index a1bf28c62..52645cf8c 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -164,6 +164,12 @@ under the License.
             <td>Boolean</td>
             <td>Whether to ignore delete records in deduplicate mode.</td>
         </tr>
+        <tr>
+            <td><h5>deletion-vectors.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to enable deletion vectors mode. In this mode, index 
files containing deletion vectors are generated when data is written, which 
marks the data for deletion. During read operations, by applying these index 
files, merging can be avoided.</td>
+        </tr>
         <tr>
             <td><h5>dynamic-bucket.assigner-parallelism</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index f8eec84fb..faea53997 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -24,6 +24,7 @@ import org.apache.paimon.annotation.Documentation.Immutable;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.lookup.LookupStrategy;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
@@ -1075,6 +1076,15 @@ public class CoreOptions implements Serializable {
                     .defaultValue(false)
                     .withDescription("Whether to force create snapshot on 
commit.");
 
+    public static final ConfigOption<Boolean> DELETION_VECTORS_ENABLED =
+            key("deletion-vectors.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to enable deletion vectors mode. In this 
mode, index files containing deletion"
+                                    + " vectors are generated when data is 
written, which marks the data for deletion."
+                                    + " During read operations, by applying 
these index files, merging can be avoided.");
+
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
@@ -1377,6 +1387,16 @@ public class CoreOptions implements Serializable {
         return options.get(CHANGELOG_PRODUCER);
     }
 
+    public boolean needLookup() {
+        return lookupStrategy().needLookup;
+    }
+
+    public LookupStrategy lookupStrategy() {
+        return LookupStrategy.from(
+                
options.get(CHANGELOG_PRODUCER).equals(ChangelogProducer.LOOKUP),
+                deletionVectorsEnabled());
+    }
+
     public boolean changelogRowDeduplicate() {
         return options.get(CHANGELOG_PRODUCER_ROW_DEDUPLICATE);
     }
@@ -1634,6 +1654,10 @@ public class CoreOptions implements Serializable {
         return options.get(ZORDER_VAR_LENGTH_CONTRIBUTION);
     }
 
+    public boolean deletionVectorsEnabled() {
+        return options.get(DELETION_VECTORS_ENABLED);
+    }
+
     /** Specifies the merge engine for table with primary key. */
     public enum MergeEngine implements DescribedEnum {
         DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java 
b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java
new file mode 100644
index 000000000..6c709bcae
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStrategy.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup;
+
+/** Strategy for lookup. */
+public enum LookupStrategy {
+    NO_LOOKUP(false, false),
+
+    CHANGELOG_ONLY(true, false),
+
+    DELETION_VECTOR_ONLY(false, true),
+
+    CHANGELOG_AND_DELETION_VECTOR(true, true);
+
+    public final boolean needLookup;
+
+    public final boolean produceChangelog;
+
+    public final boolean deletionVector;
+
+    LookupStrategy(boolean produceChangelog, boolean deletionVector) {
+        this.produceChangelog = produceChangelog;
+        this.deletionVector = deletionVector;
+        this.needLookup = produceChangelog || deletionVector;
+    }
+
+    public static LookupStrategy from(boolean produceChangelog, boolean 
deletionVector) {
+        for (LookupStrategy strategy : values()) {
+            if (strategy.produceChangelog == produceChangelog
+                    && strategy.deletionVector == deletionVector) {
+                return strategy;
+            }
+        }
+        throw new IllegalArgumentException(
+                "Invalid combination of produceChangelog : "
+                        + produceChangelog
+                        + " and deletionVector : "
+                        + deletionVector);
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 727bf8f82..21e6ac60e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -20,6 +20,7 @@ package org.apache.paimon;
 
 import org.apache.paimon.codegen.RecordEqualiser;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.index.HashIndexMaintainer;
@@ -129,7 +130,9 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 newKeyComparator(),
                 userDefinedSeqComparator(),
                 mfFactory,
-                newReaderFactoryBuilder());
+                newReaderFactoryBuilder(),
+                options,
+                newIndexFileHandler());
     }
 
     public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
@@ -161,6 +164,11 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
         if (bucketMode() == BucketMode.DYNAMIC) {
             indexFactory = new 
HashIndexMaintainer.Factory(newIndexFileHandler());
         }
+        DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory = 
null;
+        if (options.deletionVectorsEnabled()) {
+            deletionVectorsMaintainerFactory =
+                    new 
DeletionVectorsMaintainer.Factory(newIndexFileHandler());
+        }
         return new KeyValueFileStoreWrite(
                 fileIO,
                 schemaManager,
@@ -177,6 +185,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 snapshotManager(),
                 newScan(true, 
DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
                 indexFactory,
+                deletionVectorsMaintainerFactory,
                 options,
                 keyValueFieldsExtractor,
                 tableName);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
new file mode 100644
index 000000000..32d6da861
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors;
+
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.RecordWithPositionIterator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** A {@link RecordReader} which apply {@link DeletionVector} to filter 
record. */
+public class ApplyDeletionVectorReader<T> implements RecordReader<T> {
+
+    private final RecordReader<T> reader;
+
+    private final DeletionVector deletionVector;
+
+    public ApplyDeletionVectorReader(RecordReader<T> reader, DeletionVector 
deletionVector) {
+        this.reader = reader;
+        this.deletionVector = deletionVector;
+    }
+
+    @Nullable
+    @Override
+    public RecordIterator<T> readBatch() throws IOException {
+        RecordIterator<T> batch = reader.readBatch();
+
+        if (batch == null) {
+            return null;
+        }
+
+        checkArgument(
+                batch instanceof RecordWithPositionIterator,
+                "There is a bug, RecordIterator in ApplyDeletionVectorReader 
must be RecordWithPositionIterator");
+
+        RecordWithPositionIterator<T> batchWithPosition = 
(RecordWithPositionIterator<T>) batch;
+
+        return batchWithPosition.filter(
+                a -> 
!deletionVector.isDeleted(batchWithPosition.returnedPosition()));
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
index a82cc9be8..313435928 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
@@ -140,9 +140,9 @@ public class DeletionVectorsIndexFile extends IndexFile {
         int version = in.read();
         if (version != VERSION_ID_V1) {
             throw new RuntimeException(
-                    "Version not match, actual size: "
+                    "Version not match, actual version: "
                             + version
-                            + ", expert size: "
+                            + ", expert version: "
                             + VERSION_ID_V1);
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
index 7c88edc6c..878c76841 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.deletionvectors;
 
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
@@ -56,7 +57,7 @@ public class DeletionVectorsMaintainer {
         this.deletionVectors =
                 indexFile == null
                         ? new HashMap<>()
-                        : indexFileHandler.readAllDeletionVectors(indexFile);
+                        : new 
HashMap<>(indexFileHandler.readAllDeletionVectors(indexFile));
         this.modified = false;
     }
 
@@ -115,12 +116,17 @@ public class DeletionVectorsMaintainer {
         return Optional.ofNullable(deletionVectors.get(fileName));
     }
 
+    @VisibleForTesting
+    public Map<String, DeletionVector> deletionVectors() {
+        return deletionVectors;
+    }
+
     /** Factory to restore {@link DeletionVectorsMaintainer}. */
-    public static class DeletionVectorsMaintainerFactory {
+    public static class Factory {
 
         private final IndexFileHandler handler;
 
-        public DeletionVectorsMaintainerFactory(IndexFileHandler handler) {
+        public Factory(IndexFileHandler handler) {
             this.handler = handler;
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 780149498..3fa19681e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -21,6 +21,8 @@ package org.apache.paimon.io;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
+import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.format.FormatKey;
 import org.apache.paimon.fs.FileIO;
@@ -42,6 +44,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 /** Factory to create {@link RecordReader}s for reading {@link KeyValue} 
files. */
@@ -60,6 +64,9 @@ public class KeyValueFileReaderFactory {
     private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
     private final BinaryRow partition;
 
+    // FileName to its corresponding deletion vector
+    private final @Nullable Function<String, Optional<DeletionVector>> 
deletionVectorSupplier;
+
     private KeyValueFileReaderFactory(
             FileIO fileIO,
             SchemaManager schemaManager,
@@ -69,7 +76,8 @@ public class KeyValueFileReaderFactory {
             BulkFormatMapping.BulkFormatMappingBuilder 
bulkFormatMappingBuilder,
             DataFilePathFactory pathFactory,
             long asyncThreshold,
-            BinaryRow partition) {
+            BinaryRow partition,
+            @Nullable Function<String, Optional<DeletionVector>> 
deletionVectorSupplier) {
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
         this.schemaId = schemaId;
@@ -80,6 +88,7 @@ public class KeyValueFileReaderFactory {
         this.asyncThreshold = asyncThreshold;
         this.partition = partition;
         this.bulkFormatMappings = new HashMap<>();
+        this.deletionVectorSupplier = deletionVectorSupplier;
     }
 
     public RecordReader<KeyValue> createRecordReader(
@@ -113,17 +122,27 @@ public class KeyValueFileReaderFactory {
                                 new FormatKey(schemaId, formatIdentifier),
                                 key -> formatSupplier.get())
                         : formatSupplier.get();
-        return new KeyValueDataFileRecordReader(
-                fileIO,
-                bulkFormatMapping.getReaderFactory(),
-                pathFactory.toPath(fileName),
-                keyType,
-                valueType,
-                level,
-                poolSize,
-                bulkFormatMapping.getIndexMapping(),
-                bulkFormatMapping.getCastMapping(),
-                PartitionUtils.create(bulkFormatMapping.getPartitionPair(), 
partition));
+        RecordReader<KeyValue> recordReader =
+                new KeyValueDataFileRecordReader(
+                        fileIO,
+                        bulkFormatMapping.getReaderFactory(),
+                        pathFactory.toPath(fileName),
+                        keyType,
+                        valueType,
+                        level,
+                        poolSize,
+                        bulkFormatMapping.getIndexMapping(),
+                        bulkFormatMapping.getCastMapping(),
+                        
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
+        if (deletionVectorSupplier != null) {
+            Optional<DeletionVector> optionalDeletionVector =
+                    deletionVectorSupplier.apply(fileName);
+            if (optionalDeletionVector.isPresent() && 
!optionalDeletionVector.get().isEmpty()) {
+                recordReader =
+                        new ApplyDeletionVectorReader<>(recordReader, 
optionalDeletionVector.get());
+            }
+        }
+        return recordReader;
     }
 
     public static Builder builder(
@@ -166,6 +185,7 @@ public class KeyValueFileReaderFactory {
         private int[][] valueProjection;
         private RowType projectedKeyType;
         private RowType projectedValueType;
+        private @Nullable Function<String, Optional<DeletionVector>> 
deletionVectorSupplier;
 
         private Builder(
                 FileIO fileIO,
@@ -218,6 +238,12 @@ public class KeyValueFileReaderFactory {
             return this;
         }
 
+        public Builder withDeletionVectorSupplier(
+                Function<String, Optional<DeletionVector>> 
deletionVectorSupplier) {
+            this.deletionVectorSupplier = deletionVectorSupplier;
+            return this;
+        }
+
         public RowType keyType() {
             return keyType;
         }
@@ -248,7 +274,8 @@ public class KeyValueFileReaderFactory {
                             formatDiscover, extractor, keyProjection, 
valueProjection, filters),
                     pathFactory.createDataFilePathFactory(partition, bucket),
                     options.fileReaderAsyncThreshold().getBytes(),
-                    partition);
+                    partition,
+                    deletionVectorSupplier);
         }
 
         private void applyProjection() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
index 4869055ad..3e7e12702 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java
@@ -29,6 +29,7 @@ import org.apache.paimon.lookup.LookupStoreWriter;
 import org.apache.paimon.memory.MemorySegment;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.RecordWithPositionIterator;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BloomFilter;
@@ -142,7 +143,8 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
             return null;
         }
 
-        return valueProcessor.readFromDisk(key, 
lookupFile.remoteFile().level(), valueBytes);
+        return valueProcessor.readFromDisk(
+                key, lookupFile.remoteFile().level(), valueBytes, 
file.fileName());
     }
 
     private int fileWeigh(String file, LookupFile lookupFile) {
@@ -168,15 +170,29 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
                 lookupStoreFactory.createWriter(localFile, 
bfGenerator.apply(file.rowCount()));
         LookupStoreFactory.Context context;
         try (RecordReader<KeyValue> reader = fileReaderFactory.apply(file)) {
-            RecordReader.RecordIterator<KeyValue> batch;
             KeyValue kv;
-            while ((batch = reader.readBatch()) != null) {
-                while ((kv = batch.next()) != null) {
-                    byte[] keyBytes = keySerializer.serializeToBytes(kv.key());
-                    byte[] valueBytes = valueProcessor.persistToDisk(kv);
-                    kvWriter.put(keyBytes, valueBytes);
+            if (valueProcessor.withPosition()) {
+                RecordWithPositionIterator<KeyValue> batch;
+                while ((batch = (RecordWithPositionIterator<KeyValue>) 
reader.readBatch())
+                        != null) {
+                    while ((kv = batch.next()) != null) {
+                        byte[] keyBytes = 
keySerializer.serializeToBytes(kv.key());
+                        byte[] valueBytes =
+                                valueProcessor.persistToDisk(kv, 
batch.returnedPosition());
+                        kvWriter.put(keyBytes, valueBytes);
+                    }
+                    batch.releaseBatch();
+                }
+            } else {
+                RecordReader.RecordIterator<KeyValue> batch;
+                while ((batch = reader.readBatch()) != null) {
+                    while ((kv = batch.next()) != null) {
+                        byte[] keyBytes = 
keySerializer.serializeToBytes(kv.key());
+                        byte[] valueBytes = valueProcessor.persistToDisk(kv);
+                        kvWriter.put(keyBytes, valueBytes);
+                    }
+                    batch.releaseBatch();
                 }
-                batch.releaseBatch();
             }
         } catch (IOException e) {
             FileIOUtils.deleteFileOrDirectory(localFile);
@@ -228,9 +244,15 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
     /** Processor to process value. */
     public interface ValueProcessor<T> {
 
+        boolean withPosition();
+
         byte[] persistToDisk(KeyValue kv);
 
-        T readFromDisk(InternalRow key, int level, byte[] valueBytes);
+        default byte[] persistToDisk(KeyValue kv, long rowPosition) {
+            throw new UnsupportedOperationException();
+        }
+
+        T readFromDisk(InternalRow key, int level, byte[] valueBytes, String 
fileName);
     }
 
     /** A {@link ValueProcessor} to return {@link KeyValue}. */
@@ -242,6 +264,11 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
             this.valueSerializer = new RowCompactedSerializer(valueType);
         }
 
+        @Override
+        public boolean withPosition() {
+            return false;
+        }
+
         @Override
         public byte[] persistToDisk(KeyValue kv) {
             byte[] vBytes = valueSerializer.serializeToBytes(kv.value());
@@ -254,7 +281,7 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
         }
 
         @Override
-        public KeyValue readFromDisk(InternalRow key, int level, byte[] bytes) 
{
+        public KeyValue readFromDisk(InternalRow key, int level, byte[] bytes, 
String fileName) {
             InternalRow value = valueSerializer.deserialize(bytes);
             long sequenceNumber = 
MemorySegment.wrap(bytes).getLong(bytes.length - 9);
             RowKind rowKind = RowKind.fromByteValue(bytes[bytes.length - 1]);
@@ -267,14 +294,104 @@ public class LookupLevels<T> implements 
Levels.DropFileCallback, Closeable {
 
         private static final byte[] EMPTY_BYTES = new byte[0];
 
+        @Override
+        public boolean withPosition() {
+            return false;
+        }
+
         @Override
         public byte[] persistToDisk(KeyValue kv) {
             return EMPTY_BYTES;
         }
 
         @Override
-        public Boolean readFromDisk(InternalRow key, int level, byte[] bytes) {
+        public Boolean readFromDisk(InternalRow key, int level, byte[] bytes, 
String fileName) {
             return Boolean.TRUE;
         }
     }
+
+    /** A {@link ValueProcessor} to return {@link PositionedKeyValue}. */
+    public static class PositionedKeyValueProcessor implements 
ValueProcessor<PositionedKeyValue> {
+        private final boolean persistValue;
+        private final RowCompactedSerializer valueSerializer;
+
+        public PositionedKeyValueProcessor(RowType valueType, boolean 
persistValue) {
+            this.persistValue = persistValue;
+            this.valueSerializer = persistValue ? new 
RowCompactedSerializer(valueType) : null;
+        }
+
+        @Override
+        public boolean withPosition() {
+            return true;
+        }
+
+        @Override
+        public byte[] persistToDisk(KeyValue kv) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public byte[] persistToDisk(KeyValue kv, long rowPosition) {
+            if (persistValue) {
+                byte[] vBytes = valueSerializer.serializeToBytes(kv.value());
+                byte[] bytes = new byte[vBytes.length + 8 + 8 + 1];
+                MemorySegment segment = MemorySegment.wrap(bytes);
+                segment.put(0, vBytes);
+                segment.putLong(bytes.length - 17, rowPosition);
+                segment.putLong(bytes.length - 9, kv.sequenceNumber());
+                segment.put(bytes.length - 1, kv.valueKind().toByteValue());
+                return bytes;
+            } else {
+                byte[] bytes = new byte[8];
+                MemorySegment segment = MemorySegment.wrap(bytes);
+                segment.putLong(0, rowPosition);
+                return bytes;
+            }
+        }
+
+        @Override
+        public PositionedKeyValue readFromDisk(
+                InternalRow key, int level, byte[] bytes, String fileName) {
+            if (persistValue) {
+                InternalRow value = valueSerializer.deserialize(bytes);
+                MemorySegment segment = MemorySegment.wrap(bytes);
+                long rowPosition = segment.getLong(bytes.length - 17);
+                long sequenceNumber = segment.getLong(bytes.length - 9);
+                RowKind rowKind = RowKind.fromByteValue(bytes[bytes.length - 
1]);
+                return new PositionedKeyValue(
+                        new KeyValue().replace(key, sequenceNumber, rowKind, 
value).setLevel(level),
+                        fileName,
+                        rowPosition);
+            } else {
+                MemorySegment segment = MemorySegment.wrap(bytes);
+                return new PositionedKeyValue(null, fileName, 
segment.getLong(0));
+            }
+        }
+    }
+
+    /** {@link KeyValue} with file name and row position for DeletionVector. */
+    public static class PositionedKeyValue {
+        private final @Nullable KeyValue keyValue;
+        private final String fileName;
+        private final long rowPosition;
+
+        public PositionedKeyValue(@Nullable KeyValue keyValue, String 
fileName, long rowPosition) {
+            this.keyValue = keyValue;
+            this.fileName = fileName;
+            this.rowPosition = rowPosition;
+        }
+
+        public String fileName() {
+            return fileName;
+        }
+
+        public long rowPosition() {
+            return rowPosition;
+        }
+
+        @Nullable
+        public KeyValue keyValue() {
+            return keyValue;
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index f338ee056..c807fca23 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -22,10 +22,12 @@ import org.apache.paimon.CoreOptions.MergeEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.compact.CompactResult;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
 import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.lookup.LookupStrategy;
 import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.MergeTreeReaders;
 import org.apache.paimon.mergetree.SortedRun;
@@ -47,6 +49,7 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
 
     protected final int maxLevel;
     protected final MergeEngine mergeEngine;
+    protected final LookupStrategy lookupStrategy;
 
     public ChangelogMergeTreeRewriter(
             int maxLevel,
@@ -56,16 +59,20 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
             Comparator<InternalRow> keyComparator,
             @Nullable FieldsComparator userDefinedSeqComparator,
             MergeFunctionFactory<KeyValue> mfFactory,
-            MergeSorter mergeSorter) {
+            MergeSorter mergeSorter,
+            LookupStrategy lookupStrategy,
+            @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
         super(
                 readerFactory,
                 writerFactory,
                 keyComparator,
                 userDefinedSeqComparator,
                 mfFactory,
-                mergeSorter);
+                mergeSorter,
+                deletionVectorsMaintainer);
         this.maxLevel = maxLevel;
         this.mergeEngine = mergeEngine;
+        this.lookupStrategy = lookupStrategy;
     }
 
     protected abstract boolean rewriteChangelog(
@@ -136,7 +143,9 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
             if (rewriteCompactFile) {
                 compactFileWriter = 
writerFactory.createRollingMergeTreeFileWriter(outputLevel);
             }
-            changelogFileWriter = 
writerFactory.createRollingChangelogFileWriter(outputLevel);
+            if (lookupStrategy.produceChangelog) {
+                changelogFileWriter = 
writerFactory.createRollingChangelogFileWriter(outputLevel);
+            }
 
             while (iterator.hasNext()) {
                 ChangelogResult result = iterator.next();
@@ -144,8 +153,10 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
                 if (rewriteCompactFile && keyValue != null && (!dropDelete || 
keyValue.isAdd())) {
                     compactFileWriter.write(keyValue);
                 }
-                for (KeyValue kv : result.changelogs()) {
-                    changelogFileWriter.write(kv);
+                if (lookupStrategy.produceChangelog) {
+                    for (KeyValue kv : result.changelogs()) {
+                        changelogFileWriter.write(kv);
+                    }
                 }
             }
         } finally {
@@ -168,7 +179,18 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
                                 .map(x -> x.upgrade(outputLevel))
                                 .collect(Collectors.toList());
 
-        return new CompactResult(before, after, changelogFileWriter.result());
+        if (deletionVectorsMaintainer != null) {
+            for (DataFileMeta dataFileMeta : before) {
+                
deletionVectorsMaintainer.removeDeletionVectorOf(dataFileMeta.fileName());
+            }
+        }
+
+        return new CompactResult(
+                before,
+                after,
+                lookupStrategy.produceChangelog
+                        ? changelogFileWriter.result()
+                        : Collections.emptyList());
     }
 
     @Override
@@ -179,7 +201,8 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
                     outputLevel,
                     Collections.singletonList(
                             
Collections.singletonList(SortedRun.fromSingle(file))),
-                    false,
+                    // In deletion vector mode, we always drop deletion
+                    lookupStrategy.deletionVector,
                     strategy.rewrite);
         } else {
             return super.upgrade(outputLevel, file);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
index 3ed7a5d69..1eaa76cf4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
@@ -25,6 +25,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
+import org.apache.paimon.lookup.LookupStrategy;
 import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.SortedRun;
 import org.apache.paimon.utils.FieldsComparator;
@@ -64,7 +65,9 @@ public class FullChangelogMergeTreeCompactRewriter extends 
ChangelogMergeTreeRew
                 keyComparator,
                 userDefinedSeqComparator,
                 mfFactory,
-                mergeSorter);
+                mergeSorter,
+                LookupStrategy.CHANGELOG_ONLY,
+                null);
         this.valueEqualiser = valueEqualiser;
         this.changelogRowDeduplicate = changelogRowDeduplicate;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
index 319f5055a..aa5555fd5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -21,8 +21,13 @@ package org.apache.paimon.mergetree.compact;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.codegen.RecordEqualiser;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.lookup.LookupStrategy;
+import org.apache.paimon.mergetree.LookupLevels.PositionedKeyValue;
 import org.apache.paimon.types.RowKind;
 
+import javax.annotation.Nullable;
+
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.function.Function;
@@ -44,33 +49,45 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
  *       level as BEFORE.
  * </ul>
  */
-public class LookupChangelogMergeFunctionWrapper implements 
MergeFunctionWrapper<ChangelogResult> {
+public class LookupChangelogMergeFunctionWrapper<T>
+        implements MergeFunctionWrapper<ChangelogResult> {
 
     private final LookupMergeFunction mergeFunction;
     private final MergeFunction<KeyValue> mergeFunction2;
-    private final Function<InternalRow, KeyValue> lookup;
+    private final Function<InternalRow, T> lookup;
 
     private final ChangelogResult reusedResult = new ChangelogResult();
     private final KeyValue reusedBefore = new KeyValue();
     private final KeyValue reusedAfter = new KeyValue();
     private final RecordEqualiser valueEqualiser;
     private final boolean changelogRowDeduplicate;
+    private final LookupStrategy lookupStrategy;
+    private final @Nullable DeletionVectorsMaintainer 
deletionVectorsMaintainer;
 
     public LookupChangelogMergeFunctionWrapper(
             MergeFunctionFactory<KeyValue> mergeFunctionFactory,
-            Function<InternalRow, KeyValue> lookup,
+            Function<InternalRow, T> lookup,
             RecordEqualiser valueEqualiser,
-            boolean changelogRowDeduplicate) {
+            boolean changelogRowDeduplicate,
+            LookupStrategy lookupStrategy,
+            @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
         MergeFunction<KeyValue> mergeFunction = mergeFunctionFactory.create();
         checkArgument(
                 mergeFunction instanceof LookupMergeFunction,
                 "Merge function should be a LookupMergeFunction, but is %s, 
there is a bug.",
                 mergeFunction.getClass().getName());
+        if (lookupStrategy.deletionVector) {
+            checkArgument(
+                    deletionVectorsMaintainer != null,
+                    "deletionVectorsMaintainer should not be null, there is a 
bug.");
+        }
         this.mergeFunction = (LookupMergeFunction) mergeFunction;
         this.mergeFunction2 = mergeFunctionFactory.create();
         this.lookup = lookup;
         this.valueEqualiser = valueEqualiser;
         this.changelogRowDeduplicate = changelogRowDeduplicate;
+        this.lookupStrategy = lookupStrategy;
+        this.deletionVectorsMaintainer = deletionVectorsMaintainer;
     }
 
     @Override
@@ -105,7 +122,17 @@ public class LookupChangelogMergeFunctionWrapper 
implements MergeFunctionWrapper
         // 2. Lookup if latest high level record is absent
         if (highLevel == null) {
             InternalRow lookupKey = candidates.get(0).key();
-            highLevel = lookup.apply(lookupKey);
+            T lookupResult = lookup.apply(lookupKey);
+            if (lookupResult != null) {
+                if (lookupStrategy.deletionVector) {
+                    PositionedKeyValue positionedKeyValue = 
(PositionedKeyValue) lookupResult;
+                    highLevel = positionedKeyValue.keyValue();
+                    deletionVectorsMaintainer.notifyNewDeletion(
+                            positionedKeyValue.fileName(), 
positionedKeyValue.rowPosition());
+                } else {
+                    highLevel = (KeyValue) lookupResult;
+                }
+            }
         }
 
         // 3. Calculate result
@@ -118,14 +145,14 @@ public class LookupChangelogMergeFunctionWrapper 
implements MergeFunctionWrapper
 
         // 4. Set changelog when there's level-0 records
         reusedResult.reset();
-        if (containLevel0) {
+        if (containLevel0 && lookupStrategy.produceChangelog) {
             setChangelog(highLevel, result);
         }
 
         return reusedResult.setResult(result);
     }
 
-    private void setChangelog(KeyValue before, KeyValue after) {
+    private void setChangelog(@Nullable KeyValue before, KeyValue after) {
         if (before == null || !before.isAdd()) {
             if (after.isAdd()) {
                 reusedResult.addChangelog(replaceAfter(RowKind.INSERT, after));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index 5335ba82a..95b7ab78c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -22,9 +22,11 @@ import org.apache.paimon.CoreOptions.MergeEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.codegen.RecordEqualiser;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
+import org.apache.paimon.lookup.LookupStrategy;
 import org.apache.paimon.mergetree.LookupLevels;
 import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.SortedRun;
@@ -40,6 +42,7 @@ import java.util.List;
 import static 
org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_NO_REWRITE;
 import static 
org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_WITH_REWRITE;
 import static 
org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.NO_CHANGELOG;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
  * A {@link MergeTreeCompactRewriter} which produces changelog files by lookup 
for the compaction
@@ -60,7 +63,9 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
             @Nullable FieldsComparator userDefinedSeqComparator,
             MergeFunctionFactory<KeyValue> mfFactory,
             MergeSorter mergeSorter,
-            MergeFunctionWrapperFactory<T> wrapperFactory) {
+            MergeFunctionWrapperFactory<T> wrapperFactory,
+            LookupStrategy lookupStrategy,
+            @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
         super(
                 maxLevel,
                 mergeEngine,
@@ -69,7 +74,14 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
                 keyComparator,
                 userDefinedSeqComparator,
                 mfFactory,
-                mergeSorter);
+                mergeSorter,
+                lookupStrategy,
+                deletionVectorsMaintainer);
+        if (lookupStrategy.deletionVector) {
+            checkArgument(
+                    deletionVectorsMaintainer != null,
+                    "deletionVectorsMaintainer should not be null, there is a 
bug.");
+        }
         this.lookupLevels = lookupLevels;
         this.wrapperFactory = wrapperFactory;
     }
@@ -86,12 +98,17 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
             return NO_CHANGELOG;
         }
 
+        // In deletionVector mode, since drop delete is required, rewrite is 
always required.
+        if (lookupStrategy.deletionVector) {
+            return CHANGELOG_WITH_REWRITE;
+        }
+
         if (outputLevel == maxLevel) {
             return CHANGELOG_NO_REWRITE;
         }
 
         // DEDUPLICATE retains the latest records as the final result, so 
merging has no impact on
-        // it at all
+        // it at all.
         if (mergeEngine == MergeEngine.DEDUPLICATE) {
             return CHANGELOG_NO_REWRITE;
         }
@@ -104,7 +121,8 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
 
     @Override
     protected MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int 
outputLevel) {
-        return wrapperFactory.create(mfFactory, outputLevel, lookupLevels);
+        return wrapperFactory.create(
+                mfFactory, outputLevel, lookupLevels, 
deletionVectorsMaintainer);
     }
 
     @Override
@@ -118,28 +136,34 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
         MergeFunctionWrapper<ChangelogResult> create(
                 MergeFunctionFactory<KeyValue> mfFactory,
                 int outputLevel,
-                LookupLevels<T> lookupLevels);
+                LookupLevels<T> lookupLevels,
+                @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer);
     }
 
     /** A normal {@link MergeFunctionWrapperFactory} to create lookup wrapper. 
*/
-    public static class LookupMergeFunctionWrapperFactory
-            implements MergeFunctionWrapperFactory<KeyValue> {
+    public static class LookupMergeFunctionWrapperFactory<T>
+            implements MergeFunctionWrapperFactory<T> {
 
         private final RecordEqualiser valueEqualiser;
         private final boolean changelogRowDeduplicate;
+        private final LookupStrategy lookupStrategy;
 
         public LookupMergeFunctionWrapperFactory(
-                RecordEqualiser valueEqualiser, boolean 
changelogRowDeduplicate) {
+                RecordEqualiser valueEqualiser,
+                boolean changelogRowDeduplicate,
+                LookupStrategy lookupStrategy) {
             this.valueEqualiser = valueEqualiser;
             this.changelogRowDeduplicate = changelogRowDeduplicate;
+            this.lookupStrategy = lookupStrategy;
         }
 
         @Override
         public MergeFunctionWrapper<ChangelogResult> create(
                 MergeFunctionFactory<KeyValue> mfFactory,
                 int outputLevel,
-                LookupLevels<KeyValue> lookupLevels) {
-            return new LookupChangelogMergeFunctionWrapper(
+                LookupLevels<T> lookupLevels,
+                @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) 
{
+            return new LookupChangelogMergeFunctionWrapper<>(
                     mfFactory,
                     key -> {
                         try {
@@ -149,7 +173,9 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
                         }
                     },
                     valueEqualiser,
-                    changelogRowDeduplicate);
+                    changelogRowDeduplicate,
+                    lookupStrategy,
+                    deletionVectorsMaintainer);
         }
     }
 
@@ -161,7 +187,8 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
         public MergeFunctionWrapper<ChangelogResult> create(
                 MergeFunctionFactory<KeyValue> mfFactory,
                 int outputLevel,
-                LookupLevels<Boolean> lookupLevels) {
+                LookupLevels<Boolean> lookupLevels,
+                @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) 
{
             return new FistRowMergeFunctionWrapper(
                     mfFactory,
                     key -> {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
index 6d94ad574..7bdb44118 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
@@ -58,6 +58,7 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
     private final CompactRewriter rewriter;
 
     @Nullable private final CompactionMetrics.Reporter metricsReporter;
+    private final boolean deletionVectorsEnabled;
 
     public MergeTreeCompactManager(
             ExecutorService executor,
@@ -67,7 +68,8 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
             long compactionFileSize,
             int numSortedRunStopTrigger,
             CompactRewriter rewriter,
-            @Nullable CompactionMetrics.Reporter metricsReporter) {
+            @Nullable CompactionMetrics.Reporter metricsReporter,
+            boolean deletionVectorsEnabled) {
         this.executor = executor;
         this.levels = levels;
         this.strategy = strategy;
@@ -76,6 +78,7 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
         this.keyComparator = keyComparator;
         this.rewriter = rewriter;
         this.metricsReporter = metricsReporter;
+        this.deletionVectorsEnabled = deletionVectorsEnabled;
 
         MetricUtils.safeCall(this::reportLevel0FileCount, LOG);
     }
@@ -145,7 +148,8 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
                      */
                     boolean dropDelete =
                             unit.outputLevel() != 0
-                                    && unit.outputLevel() >= 
levels.nonEmptyHighestLevel();
+                                    && (unit.outputLevel() >= 
levels.nonEmptyHighestLevel()
+                                            || deletionVectorsEnabled);
 
                     if (LOG.isDebugEnabled()) {
                         LOG.debug(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
index bd3338bf7..8e8f67207 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
@@ -21,6 +21,7 @@ package org.apache.paimon.mergetree.compact;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.compact.CompactResult;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
@@ -46,6 +47,7 @@ public class MergeTreeCompactRewriter extends 
AbstractCompactRewriter {
     @Nullable protected final FieldsComparator userDefinedSeqComparator;
     protected final MergeFunctionFactory<KeyValue> mfFactory;
     protected final MergeSorter mergeSorter;
+    @Nullable protected final DeletionVectorsMaintainer 
deletionVectorsMaintainer;
 
     public MergeTreeCompactRewriter(
             KeyValueFileReaderFactory readerFactory,
@@ -53,13 +55,15 @@ public class MergeTreeCompactRewriter extends 
AbstractCompactRewriter {
             Comparator<InternalRow> keyComparator,
             @Nullable FieldsComparator userDefinedSeqComparator,
             MergeFunctionFactory<KeyValue> mfFactory,
-            MergeSorter mergeSorter) {
+            MergeSorter mergeSorter,
+            @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
         this.readerFactory = readerFactory;
         this.writerFactory = writerFactory;
         this.keyComparator = keyComparator;
         this.userDefinedSeqComparator = userDefinedSeqComparator;
         this.mfFactory = mfFactory;
         this.mergeSorter = mergeSorter;
+        this.deletionVectorsMaintainer = deletionVectorsMaintainer;
     }
 
     @Override
@@ -83,6 +87,12 @@ public class MergeTreeCompactRewriter extends 
AbstractCompactRewriter {
                         mergeSorter);
         writer.write(new RecordReaderIterator<>(sectionsReader));
         writer.close();
-        return new CompactResult(extractFilesFromSections(sections), 
writer.result());
+        List<DataFileMeta> before = extractFilesFromSections(sections);
+        if (deletionVectorsMaintainer != null) {
+            for (DataFileMeta dataFileMeta : before) {
+                
deletionVectorsMaintainer.removeDeletionVectorOf(dataFileMeta.fileName());
+            }
+        }
+        return new CompactResult(before, writer.result());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index af78e3463..30ee9c6c1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -21,6 +21,7 @@ package org.apache.paimon.operation;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.index.IndexMaintainer;
@@ -65,6 +66,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
     private final FileStoreScan scan;
     private final int writerNumberMax;
     @Nullable private final IndexMaintainer.Factory<T> indexFactory;
+    @Nullable private final DeletionVectorsMaintainer.Factory 
deletionVectorsMaintainerFactory;
 
     @Nullable protected IOManager ioManager;
 
@@ -83,13 +85,14 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
             SnapshotManager snapshotManager,
             FileStoreScan scan,
             @Nullable IndexMaintainer.Factory<T> indexFactory,
+            @Nullable DeletionVectorsMaintainer.Factory 
deletionVectorsMaintainerFactory,
             String tableName,
             int writerNumberMax) {
         this.commitUser = commitUser;
         this.snapshotManager = snapshotManager;
         this.scan = scan;
         this.indexFactory = indexFactory;
-
+        this.deletionVectorsMaintainerFactory = 
deletionVectorsMaintainerFactory;
         this.writers = new HashMap<>();
         this.tableName = tableName;
         this.writerNumberMax = writerNumberMax;
@@ -193,7 +196,10 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
                 CommitIncrement increment = 
writerContainer.writer.prepareCommit(waitCompaction);
                 List<IndexFileMeta> newIndexFiles = new ArrayList<>();
                 if (writerContainer.indexMaintainer != null) {
-                    newIndexFiles = 
writerContainer.indexMaintainer.prepareCommit();
+                    
newIndexFiles.addAll(writerContainer.indexMaintainer.prepareCommit());
+                }
+                if (writerContainer.deletionVectorsMaintainer != null) {
+                    
newIndexFiles.addAll(writerContainer.deletionVectorsMaintainer.prepareCommit());
                 }
                 CommitMessageImpl committable =
                         new CommitMessageImpl(
@@ -292,6 +298,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
                                 writerContainer.lastModifiedCommitIdentifier,
                                 dataFiles,
                                 writerContainer.indexMaintainer,
+                                writerContainer.deletionVectorsMaintainer,
                                 increment));
             }
         }
@@ -311,10 +318,15 @@ public abstract class AbstractFileStoreWrite<T> 
implements FileStoreWrite<T> {
                             state.bucket,
                             state.dataFiles,
                             state.commitIncrement,
-                            compactExecutor());
+                            compactExecutor(),
+                            state.deletionVectorsMaintainer);
             notifyNewWriter(writer);
             WriterContainer<T> writerContainer =
-                    new WriterContainer<>(writer, state.indexMaintainer, 
state.baseSnapshotId);
+                    new WriterContainer<>(
+                            writer,
+                            state.indexMaintainer,
+                            state.deletionVectorsMaintainer,
+                            state.baseSnapshotId);
             writerContainer.lastModifiedCommitIdentifier = 
state.lastModifiedCommitIdentifier;
             writers.computeIfAbsent(state.partition, k -> new HashMap<>())
                     .put(state.bucket, writerContainer);
@@ -360,10 +372,22 @@ public abstract class AbstractFileStoreWrite<T> 
implements FileStoreWrite<T> {
                         ? null
                         : indexFactory.createOrRestore(
                                 ignorePreviousFiles ? null : latestSnapshotId, 
partition, bucket);
+        DeletionVectorsMaintainer deletionVectorsMaintainer =
+                deletionVectorsMaintainerFactory == null
+                        ? null
+                        : deletionVectorsMaintainerFactory.createOrRestore(
+                                ignorePreviousFiles ? null : latestSnapshotId, 
partition, bucket);
         RecordWriter<T> writer =
-                createWriter(partition.copy(), bucket, restoreFiles, null, 
compactExecutor());
+                createWriter(
+                        partition.copy(),
+                        bucket,
+                        restoreFiles,
+                        null,
+                        compactExecutor(),
+                        deletionVectorsMaintainer);
         notifyNewWriter(writer);
-        return new WriterContainer<>(writer, indexMaintainer, 
latestSnapshotId);
+        return new WriterContainer<>(
+                writer, indexMaintainer, deletionVectorsMaintainer, 
latestSnapshotId);
     }
 
     @Override
@@ -409,7 +433,8 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
             int bucket,
             List<DataFileMeta> restoreFiles,
             @Nullable CommitIncrement restoreIncrement,
-            ExecutorService compactExecutor);
+            ExecutorService compactExecutor,
+            @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer);
 
     // force buffer spill to avoid out of memory in batch mode
     protected void forceBufferSpill() throws Exception {}
@@ -422,15 +447,18 @@ public abstract class AbstractFileStoreWrite<T> 
implements FileStoreWrite<T> {
     public static class WriterContainer<T> {
         public final RecordWriter<T> writer;
         @Nullable public final IndexMaintainer<T> indexMaintainer;
+        @Nullable public final DeletionVectorsMaintainer 
deletionVectorsMaintainer;
         protected final long baseSnapshotId;
         protected long lastModifiedCommitIdentifier;
 
         protected WriterContainer(
                 RecordWriter<T> writer,
                 @Nullable IndexMaintainer<T> indexMaintainer,
+                @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer,
                 Long baseSnapshotId) {
             this.writer = writer;
             this.indexMaintainer = indexMaintainer;
+            this.deletionVectorsMaintainer = deletionVectorsMaintainer;
             this.baseSnapshotId =
                     baseSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID - 1 : 
baseSnapshotId;
             this.lastModifiedCommitIdentifier = Long.MIN_VALUE;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index 283df7b07..fbb51960a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -26,6 +26,7 @@ import org.apache.paimon.compact.CompactManager;
 import org.apache.paimon.compact.NoopCompactManager;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.io.DataFileMeta;
@@ -85,7 +86,7 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
             FileStoreScan scan,
             CoreOptions options,
             String tableName) {
-        super(commitUser, snapshotManager, scan, options, null, tableName);
+        super(commitUser, snapshotManager, scan, options, null, null, 
tableName);
         this.fileIO = fileIO;
         this.read = read;
         this.schemaId = schemaId;
@@ -110,7 +111,8 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
             int bucket,
             List<DataFileMeta> restoredFiles,
             @Nullable CommitIncrement restoreIncrement,
-            ExecutorService compactExecutor) {
+            ExecutorService compactExecutor,
+            @Nullable DeletionVectorsMaintainer ignore) {
         // let writer and compact manager hold the same reference
         // and make restore files mutable to update
         long maxSequenceNumber = getMaxSequenceNumber(restoredFiles);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
index e6c5ed50f..1391b6916 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
@@ -20,6 +20,7 @@ package org.apache.paimon.operation;
 
 import org.apache.paimon.FileStore;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.index.IndexMaintainer;
 import org.apache.paimon.io.DataFileMeta;
@@ -146,6 +147,7 @@ public interface FileStoreWrite<T> extends 
Restorable<List<FileStoreWrite.State<
         protected final long lastModifiedCommitIdentifier;
         protected final List<DataFileMeta> dataFiles;
         @Nullable protected final IndexMaintainer<T> indexMaintainer;
+        @Nullable protected final DeletionVectorsMaintainer 
deletionVectorsMaintainer;
         protected final CommitIncrement commitIncrement;
 
         protected State(
@@ -155,6 +157,7 @@ public interface FileStoreWrite<T> extends 
Restorable<List<FileStoreWrite.State<
                 long lastModifiedCommitIdentifier,
                 Collection<DataFileMeta> dataFiles,
                 @Nullable IndexMaintainer<T> indexMaintainer,
+                @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer,
                 CommitIncrement commitIncrement) {
             this.partition = partition;
             this.bucket = bucket;
@@ -162,19 +165,21 @@ public interface FileStoreWrite<T> extends 
Restorable<List<FileStoreWrite.State<
             this.lastModifiedCommitIdentifier = lastModifiedCommitIdentifier;
             this.dataFiles = new ArrayList<>(dataFiles);
             this.indexMaintainer = indexMaintainer;
+            this.deletionVectorsMaintainer = deletionVectorsMaintainer;
             this.commitIncrement = commitIncrement;
         }
 
         @Override
         public String toString() {
             return String.format(
-                    "{%s, %d, %d, %d, %s, %s, %s}",
+                    "{%s, %d, %d, %d, %s, %s,  %s, %s}",
                     partition,
                     bucket,
                     baseSnapshotId,
                     lastModifiedCommitIdentifier,
                     dataFiles,
                     indexMaintainer,
+                    deletionVectorsMaintainer,
                     commitIncrement);
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
index 8852e38a3..e7fbbea45 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
@@ -23,7 +23,9 @@ import org.apache.paimon.KeyValue;
 import org.apache.paimon.KeyValueFileStore;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
 import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.mergetree.DropDeleteReader;
@@ -79,6 +81,8 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
 
     @Nullable private int[][] pushdownProjection;
     @Nullable private int[][] outerProjection;
+    private final CoreOptions options;
+    private final IndexFileHandler indexFileHandler;
 
     private boolean forceKeepDelete = false;
 
@@ -90,7 +94,9 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
             Comparator<InternalRow> keyComparator,
             @Nullable FieldsComparator userDefinedSeqComparator,
             MergeFunctionFactory<KeyValue> mfFactory,
-            KeyValueFileReaderFactory.Builder readerFactoryBuilder) {
+            KeyValueFileReaderFactory.Builder readerFactoryBuilder,
+            CoreOptions options,
+            IndexFileHandler indexFileHandler) {
         this.tableSchema = schemaManager.schema(schemaId);
         this.readerFactoryBuilder = readerFactoryBuilder;
         this.keyComparator = keyComparator;
@@ -99,6 +105,8 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
         this.mergeSorter =
                 new MergeSorter(
                         CoreOptions.fromMap(tableSchema.options()), keyType, 
valueType, null);
+        this.options = options;
+        this.indexFileHandler = indexFileHandler;
     }
 
     public KeyValueFileStoreRead withKeyProjection(int[][] projectedFields) {
@@ -175,6 +183,20 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
 
     private RecordReader<KeyValue> 
createReaderWithoutOuterProjection(DataSplit split)
             throws IOException {
+        if (options.deletionVectorsEnabled()) {
+            indexFileHandler
+                    .scan(
+                            split.snapshotId(),
+                            DeletionVectorsIndexFile.DELETION_VECTORS_INDEX,
+                            split.partition(),
+                            split.bucket())
+                    .ifPresent(
+                            fileMeta ->
+                                    
readerFactoryBuilder.withDeletionVectorSupplier(
+                                            filename ->
+                                                    
indexFileHandler.readDeletionVector(
+                                                            fileMeta, 
filename)));
+        }
         if (split.isStreaming()) {
             KeyValueFileReaderFactory readerFactory =
                     readerFactoryBuilder.build(
@@ -213,24 +235,30 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
                 readerFactoryBuilder.build(
                         partition, bucket, false, 
filtersForNonOverlappedSection);
 
-        List<ReaderSupplier<KeyValue>> sectionReaders = new ArrayList<>();
-        MergeFunctionWrapper<KeyValue> mergeFuncWrapper =
-                new 
ReducerMergeFunctionWrapper(mfFactory.create(pushdownProjection));
-        for (List<SortedRun> section : new IntervalPartition(files, 
keyComparator).partition()) {
-            sectionReaders.add(
-                    () ->
-                            MergeTreeReaders.readerForSection(
-                                    section,
-                                    section.size() > 1
-                                            ? overlappedSectionFactory
-                                            : nonOverlappedSectionFactory,
-                                    keyComparator,
-                                    userDefinedSeqComparator,
-                                    mergeFuncWrapper,
-                                    mergeSorter));
+        RecordReader<KeyValue> reader;
+        if (options.deletionVectorsEnabled()) {
+            reader = streamingConcat(files, nonOverlappedSectionFactory);
+        } else {
+            List<ReaderSupplier<KeyValue>> sectionReaders = new ArrayList<>();
+            MergeFunctionWrapper<KeyValue> mergeFuncWrapper =
+                    new 
ReducerMergeFunctionWrapper(mfFactory.create(pushdownProjection));
+            for (List<SortedRun> section :
+                    new IntervalPartition(files, keyComparator).partition()) {
+                sectionReaders.add(
+                        () ->
+                                MergeTreeReaders.readerForSection(
+                                        section,
+                                        section.size() > 1
+                                                ? overlappedSectionFactory
+                                                : nonOverlappedSectionFactory,
+                                        keyComparator,
+                                        userDefinedSeqComparator,
+                                        mergeFuncWrapper,
+                                        mergeSorter));
+            }
+            reader = ConcatRecordReader.create(sectionReaders);
         }
 
-        RecordReader<KeyValue> reader = 
ConcatRecordReader.create(sectionReaders);
         if (!keepDelete) {
             reader = new DropDeleteReader(reader);
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 8fb99645b..b37069fe4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -20,6 +20,7 @@ package org.apache.paimon.operation;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.ChangelogProducer;
+import org.apache.paimon.CoreOptions.MergeEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.KeyValueFileStore;
 import org.apache.paimon.annotation.VisibleForTesting;
@@ -28,17 +29,20 @@ import org.apache.paimon.compact.CompactManager;
 import org.apache.paimon.compact.NoopCompactManager;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.index.IndexMaintainer;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
+import org.apache.paimon.lookup.LookupStrategy;
 import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
 import org.apache.paimon.mergetree.Levels;
 import org.apache.paimon.mergetree.LookupLevels;
 import org.apache.paimon.mergetree.LookupLevels.ContainsValueProcessor;
 import org.apache.paimon.mergetree.LookupLevels.KeyValueProcessor;
+import org.apache.paimon.mergetree.LookupLevels.PositionedKeyValueProcessor;
 import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.MergeTreeWriter;
 import org.apache.paimon.mergetree.compact.CompactRewriter;
@@ -73,6 +77,8 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Supplier;
 
+import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION;
+import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
 import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber;
 import static org.apache.paimon.lookup.LookupStoreFactory.bfGenerator;
 
@@ -108,10 +114,18 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             SnapshotManager snapshotManager,
             FileStoreScan scan,
             @Nullable IndexMaintainer.Factory<KeyValue> indexFactory,
+            @Nullable DeletionVectorsMaintainer.Factory 
deletionVectorsMaintainerFactory,
             CoreOptions options,
             KeyValueFieldsExtractor extractor,
             String tableName) {
-        super(commitUser, snapshotManager, scan, options, indexFactory, 
tableName);
+        super(
+                commitUser,
+                snapshotManager,
+                scan,
+                options,
+                indexFactory,
+                deletionVectorsMaintainerFactory,
+                tableName);
         this.fileIO = fileIO;
         this.keyType = keyType;
         this.valueType = valueType;
@@ -148,7 +162,8 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             int bucket,
             List<DataFileMeta> restoreFiles,
             @Nullable CommitIncrement restoreIncrement,
-            ExecutorService compactExecutor) {
+            ExecutorService compactExecutor,
+            @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
         if (LOG.isDebugEnabled()) {
             LOG.debug(
                     "Creating merge tree writer for partition {} bucket {} 
from restored files {}",
@@ -168,11 +183,17 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                         options.numSortedRunCompactionTrigger(),
                         options.optimizedCompactionInterval());
         CompactStrategy compactStrategy =
-                options.changelogProducer() == ChangelogProducer.LOOKUP
+                options.needLookup()
                         ? new ForceUpLevel0Compaction(universalCompaction)
                         : universalCompaction;
         CompactManager compactManager =
-                createCompactManager(partition, bucket, compactStrategy, 
compactExecutor, levels);
+                createCompactManager(
+                        partition,
+                        bucket,
+                        compactStrategy,
+                        compactExecutor,
+                        levels,
+                        deletionVectorsMaintainer);
 
         return new MergeTreeWriter(
                 bufferSpillable(),
@@ -200,7 +221,8 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             int bucket,
             CompactStrategy compactStrategy,
             ExecutorService compactExecutor,
-            Levels levels) {
+            Levels levels,
+            @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
         if (options.writeOnly()) {
             return new NoopCompactManager();
         } else {
@@ -208,7 +230,12 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             @Nullable FieldsComparator userDefinedSeqComparator = 
udsComparatorSupplier.get();
             CompactRewriter rewriter =
                     createRewriter(
-                            partition, bucket, keyComparator, 
userDefinedSeqComparator, levels);
+                            partition,
+                            bucket,
+                            keyComparator,
+                            userDefinedSeqComparator,
+                            levels,
+                            deletionVectorsMaintainer);
             return new MergeTreeCompactManager(
                     compactExecutor,
                     levels,
@@ -219,7 +246,8 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                     rewriter,
                     compactionMetrics == null
                             ? null
-                            : compactionMetrics.createReporter(partition, 
bucket));
+                            : compactionMetrics.createReporter(partition, 
bucket),
+                    options.deletionVectorsEnabled());
         }
     }
 
@@ -228,68 +256,82 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             int bucket,
             Comparator<InternalRow> keyComparator,
             @Nullable FieldsComparator userDefinedSeqComparator,
-            Levels levels) {
+            Levels levels,
+            @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
+        if (deletionVectorsMaintainer != null) {
+            readerFactoryBuilder.withDeletionVectorSupplier(
+                    deletionVectorsMaintainer::deletionVectorOf);
+        }
         KeyValueFileReaderFactory readerFactory = 
readerFactoryBuilder.build(partition, bucket);
         KeyValueFileWriterFactory writerFactory =
                 writerFactoryBuilder.build(partition, bucket, options);
         MergeSorter mergeSorter = new MergeSorter(options, keyType, valueType, 
ioManager);
         int maxLevel = options.numLevels() - 1;
-        CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
-        switch (options.changelogProducer()) {
-            case FULL_COMPACTION:
-                return new FullChangelogMergeTreeCompactRewriter(
-                        maxLevel,
-                        mergeEngine,
-                        readerFactory,
-                        writerFactory,
-                        keyComparator,
-                        userDefinedSeqComparator,
-                        mfFactory,
-                        mergeSorter,
-                        valueEqualiserSupplier.get(),
-                        options.changelogRowDeduplicate());
-            case LOOKUP:
-                if (mergeEngine == CoreOptions.MergeEngine.FIRST_ROW) {
-                    KeyValueFileReaderFactory keyOnlyReader =
-                            readerFactoryBuilder
-                                    .copyWithoutProjection()
-                                    .withValueProjection(new int[0][])
-                                    .build(partition, bucket);
-                    return new LookupMergeTreeCompactRewriter<>(
-                            maxLevel,
-                            mergeEngine,
-                            createLookupLevels(levels, new 
ContainsValueProcessor(), keyOnlyReader),
-                            readerFactory,
-                            writerFactory,
-                            keyComparator,
-                            userDefinedSeqComparator,
-                            mfFactory,
-                            mergeSorter,
-                            new FirstRowMergeFunctionWrapperFactory());
-                } else {
-                    return new LookupMergeTreeCompactRewriter<>(
-                            maxLevel,
-                            mergeEngine,
-                            createLookupLevels(
-                                    levels, new KeyValueProcessor(valueType), 
readerFactory),
-                            readerFactory,
-                            writerFactory,
-                            keyComparator,
-                            userDefinedSeqComparator,
-                            mfFactory,
-                            mergeSorter,
-                            new LookupMergeFunctionWrapperFactory(
-                                    valueEqualiserSupplier.get(),
-                                    options.changelogRowDeduplicate()));
+        MergeEngine mergeEngine = options.mergeEngine();
+        ChangelogProducer changelogProducer = options.changelogProducer();
+        if (changelogProducer.equals(FULL_COMPACTION)) {
+            return new FullChangelogMergeTreeCompactRewriter(
+                    maxLevel,
+                    mergeEngine,
+                    readerFactory,
+                    writerFactory,
+                    keyComparator,
+                    userDefinedSeqComparator,
+                    mfFactory,
+                    mergeSorter,
+                    valueEqualiserSupplier.get(),
+                    options.changelogRowDeduplicate());
+        } else if (options.needLookup()) {
+            LookupStrategy lookupStrategy = options.lookupStrategy();
+            LookupLevels.ValueProcessor<?> processor;
+            LookupMergeTreeCompactRewriter.MergeFunctionWrapperFactory<?> 
wrapperFactory;
+            KeyValueFileReaderFactory lookupReaderFactory = readerFactory;
+            if (mergeEngine == FIRST_ROW) {
+                if (options.deletionVectorsEnabled()) {
+                    throw new UnsupportedOperationException(
+                            "Deletion vectors mode is not supported for first 
row merge engine now.");
                 }
-            default:
-                return new MergeTreeCompactRewriter(
-                        readerFactory,
-                        writerFactory,
-                        keyComparator,
-                        userDefinedSeqComparator,
-                        mfFactory,
-                        mergeSorter);
+                lookupReaderFactory =
+                        readerFactoryBuilder
+                                .copyWithoutProjection()
+                                .withValueProjection(new int[0][])
+                                .build(partition, bucket);
+                processor = new ContainsValueProcessor();
+                wrapperFactory = new FirstRowMergeFunctionWrapperFactory();
+            } else {
+                processor =
+                        lookupStrategy.deletionVector
+                                ? new PositionedKeyValueProcessor(
+                                        valueType, 
lookupStrategy.produceChangelog)
+                                : new KeyValueProcessor(valueType);
+                wrapperFactory =
+                        new LookupMergeFunctionWrapperFactory<>(
+                                valueEqualiserSupplier.get(),
+                                options.changelogRowDeduplicate(),
+                                lookupStrategy);
+            }
+            return new LookupMergeTreeCompactRewriter(
+                    maxLevel,
+                    mergeEngine,
+                    createLookupLevels(levels, processor, lookupReaderFactory),
+                    readerFactory,
+                    writerFactory,
+                    keyComparator,
+                    userDefinedSeqComparator,
+                    mfFactory,
+                    mergeSorter,
+                    wrapperFactory,
+                    lookupStrategy,
+                    deletionVectorsMaintainer);
+        } else {
+            return new MergeTreeCompactRewriter(
+                    readerFactory,
+                    writerFactory,
+                    keyComparator,
+                    userDefinedSeqComparator,
+                    mfFactory,
+                    mergeSorter,
+                    deletionVectorsMaintainer);
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
index 9159965d9..0b2bd719f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.operation;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.index.IndexMaintainer;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
@@ -60,12 +61,14 @@ public abstract class MemoryFileStoreWrite<T> extends 
AbstractFileStoreWrite<T>
             FileStoreScan scan,
             CoreOptions options,
             @Nullable IndexMaintainer.Factory<T> indexFactory,
+            @Nullable DeletionVectorsMaintainer.Factory 
deletionVectorsMaintainerFactory,
             String tableName) {
         super(
                 commitUser,
                 snapshotManager,
                 scan,
                 indexFactory,
+                deletionVectorsMaintainerFactory,
                 tableName,
                 options.writeMaxWritersToSpill());
         this.options = options;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 91ccf9c11..68f009c41 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -20,6 +20,7 @@ package org.apache.paimon.schema;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.ChangelogProducer;
+import org.apache.paimon.CoreOptions.MergeEngine;
 import org.apache.paimon.casting.CastExecutor;
 import org.apache.paimon.casting.CastExecutors;
 import org.apache.paimon.data.BinaryString;
@@ -49,6 +50,8 @@ import static org.apache.paimon.CoreOptions.BUCKET_KEY;
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
 import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
+import static org.apache.paimon.CoreOptions.FileFormatType.ORC;
+import static org.apache.paimon.CoreOptions.FileFormatType.PARQUET;
 import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
 import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
 import static org.apache.paimon.CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS;
@@ -219,6 +222,10 @@ public class SchemaValidation {
                                 schema.primaryKeys(), schema.partitionKeys()));
             }
         }
+
+        if (options.deletionVectorsEnabled()) {
+            validateForDeletionVectors(schema, options);
+        }
     }
 
     private static void validateOnlyContainPrimitiveType(
@@ -471,4 +478,24 @@ public class SchemaValidation {
             }
         }
     }
+
+    private static void validateForDeletionVectors(TableSchema schema, 
CoreOptions options) {
+        checkArgument(
+                !schema.primaryKeys().isEmpty(),
+                "Deletion vectors mode is only supported for tables with 
primary keys.");
+
+        checkArgument(
+                options.formatType().equals(ORC) || 
options.formatType().equals(PARQUET),
+                "Deletion vectors mode is only supported for orc or parquet 
file format now.");
+
+        checkArgument(
+                options.changelogProducer() == ChangelogProducer.NONE
+                        || options.changelogProducer() == 
ChangelogProducer.LOOKUP,
+                "Deletion vectors mode is only supported for none or lookup 
changelog producer now.");
+
+        // todo: implement it
+        checkArgument(
+                !options.mergeEngine().equals(MergeEngine.FIRST_ROW),
+                "Deletion vectors mode is not supported for first row merge 
engine now.");
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 11e557f17..781e71e86 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.table;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.KeyValueFileStore;
 import org.apache.paimon.data.InternalRow;
@@ -89,7 +88,7 @@ class PrimaryKeyFileStoreTable extends AbstractFileStoreTable 
{
 
             MergeFunctionFactory<KeyValue> mfFactory =
                     
PrimaryKeyTableUtils.createMergeFunctionFactory(tableSchema, extractor);
-            if (options.changelogProducer() == ChangelogProducer.LOOKUP) {
+            if (options.needLookup()) {
                 mfFactory =
                         LookupMergeFunction.wrap(
                                 mfFactory, new 
RowType(extractor.keyFields(tableSchema)), rowType);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
index 3ed9a3ac7..e10184fef 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
@@ -86,7 +86,7 @@ public class LocalTableQuery implements TableQuery {
                         
options.toConfiguration().get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR),
                         
options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_SPILL_COMPRESSION));
 
-        if (options.changelogProducer() == 
CoreOptions.ChangelogProducer.LOOKUP) {
+        if (options.needLookup()) {
             startLevel = 1;
         } else {
             if (options.sequenceField().isPresent()) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
index 6749c6a93..9bfb4f81a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
@@ -42,8 +42,8 @@ public class DeletionVectorsMaintainerTest extends 
PrimaryKeyTableTestBase {
 
     @Test
     public void test0() {
-        DeletionVectorsMaintainer.DeletionVectorsMaintainerFactory factory =
-                new 
DeletionVectorsMaintainer.DeletionVectorsMaintainerFactory(fileHandler);
+        DeletionVectorsMaintainer.Factory factory =
+                new DeletionVectorsMaintainer.Factory(fileHandler);
         DeletionVectorsMaintainer dvMaintainer =
                 factory.createOrRestore(null, BinaryRow.EMPTY_ROW, 0);
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 6e718a085..8e8f315c9 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -447,7 +447,8 @@ public abstract class MergeTreeTestBase {
                 options.compactionFileSize(),
                 options.numSortedRunStopTrigger(),
                 new TestRewriter(),
-                null);
+                null,
+                false);
     }
 
     static class MockFailResultCompactionManager extends 
MergeTreeCompactManager {
@@ -467,7 +468,8 @@ public abstract class MergeTreeTestBase {
                     minFileSize,
                     numSortedRunStopTrigger,
                     rewriter,
-                    null);
+                    null,
+                    false);
         }
 
         protected CompactResult obtainCompactResult()
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index 7564ca07d..4c377af17 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -23,6 +23,7 @@ import org.apache.paimon.KeyValue;
 import org.apache.paimon.codegen.RecordEqualiser;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.InternalRow.FieldGetter;
+import org.apache.paimon.lookup.LookupStrategy;
 import org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
 import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import org.apache.paimon.mergetree.compact.aggregate.FieldSumAgg;
@@ -69,7 +70,9 @@ public class LookupChangelogMergeFunctionWrapperTest {
                                 RowType.of(DataTypes.INT())),
                         highLevel::get,
                         EQUALISER,
-                        changelogRowDeduplicate);
+                        changelogRowDeduplicate,
+                        LookupStrategy.CHANGELOG_ONLY,
+                        null);
 
         // Without level-0
         function.reset();
@@ -225,7 +228,9 @@ public class LookupChangelogMergeFunctionWrapperTest {
                                 RowType.of(DataTypes.INT())),
                         key -> null,
                         EQUALISER,
-                        changelogRowDeduplicate);
+                        changelogRowDeduplicate,
+                        LookupStrategy.CHANGELOG_ONLY,
+                        null);
 
         // Without level-0
         function.reset();
@@ -384,7 +389,9 @@ public class LookupChangelogMergeFunctionWrapperTest {
                                 RowType.of(DataTypes.INT())),
                         key -> null,
                         EQUALISER,
-                        false);
+                        false,
+                        LookupStrategy.CHANGELOG_ONLY,
+                        null);
 
         function.reset();
         function.add(new KeyValue().replace(row(1), 1, DELETE, 
row(1)).setLevel(2));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
index 338013d32..d5aee3ccc 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
@@ -205,7 +205,8 @@ public class MergeTreeCompactManagerTest {
                         2,
                         Integer.MAX_VALUE,
                         new TestRewriter(expectedDropDelete),
-                        null);
+                        null,
+                        false);
         manager.triggerCompaction(false);
         manager.getCompactionResult(true);
         List<LevelMinMax> outputs =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 5f0c4d662..40cd90e3b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -54,6 +54,7 @@ import java.util.Set;
 import java.util.UUID;
 
 import static 
org.apache.flink.configuration.ClusterOptions.ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT;
+import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
 import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT;
@@ -101,9 +102,11 @@ public abstract class FlinkSink<T> implements Serializable 
{
         } else {
             Options options = table.coreOptions().toConfiguration();
             ChangelogProducer changelogProducer = 
table.coreOptions().changelogProducer();
+            // todo: deletion vectors support lookup wait
             waitCompaction =
-                    changelogProducer == ChangelogProducer.LOOKUP
-                            && options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT);
+                    (changelogProducer == ChangelogProducer.LOOKUP
+                                    && 
options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT))
+                            || options.get(DELETION_VECTORS_ENABLED);
 
             int deltaCommits = -1;
             if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 46a7931b9..dbb220bfd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -43,6 +43,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
 import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT;
@@ -240,10 +241,11 @@ public class MultiTablesStoreCompactOperator
             Options options = fileStoreTable.coreOptions().toConfiguration();
             CoreOptions.ChangelogProducer changelogProducer =
                     fileStoreTable.coreOptions().changelogProducer();
+            // todo: deletion vectors support lookup wait
             waitCompaction =
-                    changelogProducer == CoreOptions.ChangelogProducer.LOOKUP
-                            && options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT);
-
+                    (changelogProducer == CoreOptions.ChangelogProducer.LOOKUP
+                                    && 
options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT))
+                            || options.get(DELETION_VECTORS_ENABLED);
             int deltaCommits = -1;
             if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
                 deltaCommits = options.get(FULL_COMPACTION_DELTA_COMMITS);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index 713bcc341..8872cc333 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -338,7 +338,22 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
 
     private void testNoChangelogProducerRandom(
             TableEnvironment tEnv, int numProducers, boolean enableFailure) 
throws Exception {
-        List<TableResult> results = testRandom(tEnv, numProducers, 
enableFailure, "'bucket' = '4'");
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        boolean enableDeletionVectors = random.nextBoolean();
+        if (enableDeletionVectors) {
+            // Deletion vectors mode not support concurrent write
+            numProducers = 1;
+        }
+        List<TableResult> results =
+                testRandom(
+                        tEnv,
+                        numProducers,
+                        enableFailure,
+                        "'bucket' = '4',"
+                                + String.format(
+                                        "'deletion-vectors.enabled' = '%s'",
+                                        enableDeletionVectors));
+
         for (TableResult result : results) {
             result.await();
         }
@@ -370,7 +385,11 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
     private void testLookupChangelogProducerRandom(
             TableEnvironment tEnv, int numProducers, boolean enableFailure) 
throws Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
-
+        boolean enableDeletionVectors = random.nextBoolean();
+        if (enableDeletionVectors) {
+            // Deletion vectors mode not support concurrent write
+            numProducers = 1;
+        }
         testRandom(
                 tEnv,
                 numProducers,
@@ -381,7 +400,9 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
                                 random.nextBoolean() ? "512kb" : "1mb")
                         + "'changelog-producer' = 'lookup',"
                         + String.format(
-                                "'changelog-producer.lookup-wait' = '%s'", 
random.nextBoolean()));
+                                "'changelog-producer.lookup-wait' = '%s',", 
random.nextBoolean())
+                        + String.format(
+                                "'deletion-vectors.enabled' = '%s'", 
enableDeletionVectors));
 
         // sleep for a random amount of time to check
         // if we can first read complete records then read incremental records 
correctly
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 745e92b68..244449f9b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -138,7 +138,9 @@ public class TestChangelogDataReadWrite {
                                 ignore -> avro,
                                 pathFactory,
                                 EXTRACTOR,
-                                new CoreOptions(new HashMap<>())));
+                                new CoreOptions(new HashMap<>())),
+                        new CoreOptions(new HashMap<>()),
+                        null);
         return new KeyValueTableRead(read, null) {
 
             @Override
@@ -193,6 +195,7 @@ public class TestChangelogDataReadWrite {
                                 snapshotManager,
                                 null, // not used, we only create an empty 
writer
                                 null,
+                                null,
                                 options,
                                 EXTRACTOR,
                                 tablePath.getName())
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
new file mode 100644
index 000000000..887b6c8df
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.data.BinaryRow
+import org.apache.paimon.deletionvectors.{DeletionVector, 
DeletionVectorsMaintainer}
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+import org.junit.jupiter.api.Assertions
+
+import scala.util.Random
+
+class DeletionVectorTest extends PaimonSparkTestBase {
+
+  test("Paimon deletionVector: deletion vector write verification") {
+    withTable("T") {
+      spark.sql(s"""
+                   |CREATE TABLE T (id INT, name STRING)
+                   |TBLPROPERTIES (
+                   | 'bucket' = '1',
+                   | 'primary-key' = 'id',
+                   | 'file.format' = 'parquet',
+                   | 'deletion-vectors.enabled' = 'true'
+                   |)
+                   |""".stripMargin)
+      val table = loadTable("T")
+
+      // Insert1
+      // f1 (1, 2, 3), row with positions 0 and 2 in f1 are marked deleted
+      // f2 (1, 3)
+      spark.sql("INSERT INTO T VALUES (1, 'aaaaaaaaaaaaaaaaaaa'), (2, 'b'), 
(3, 'c')")
+      spark.sql("INSERT INTO T VALUES (1, 'a_new1'), (3, 'c_new1')")
+      checkAnswer(
+        spark.sql(s"SELECT * from T ORDER BY id"),
+        Row(1, "a_new1") :: Row(2, "b") :: Row(3, "c_new1") :: Nil)
+
+      val dvMaintainerFactory =
+        new 
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
+
+      def restoreDeletionVectors(): java.util.Map[String, DeletionVector] = {
+        dvMaintainerFactory
+          .createOrRestore(table.snapshotManager().latestSnapshotId(), 
BinaryRow.EMPTY_ROW, 0)
+          .deletionVectors()
+      }
+
+      val deletionVectors1 = restoreDeletionVectors()
+      // 1, 3 deleted, their row positions are 0, 2
+      Assertions.assertEquals(1, deletionVectors1.size())
+      deletionVectors1
+        .entrySet()
+        .forEach(
+          e => {
+            Assertions.assertTrue(e.getValue.isDeleted(0))
+            Assertions.assertTrue(e.getValue.isDeleted(2))
+          })
+
+      // Compact
+      // f3 (1, 2, 3), no deletion
+      spark.sql("CALL sys.compact('T')")
+      val deletionVectors2 = restoreDeletionVectors()
+      // After compaction, deletionVectors should be empty
+      Assertions.assertTrue(deletionVectors2.isEmpty)
+
+      // Insert2
+      // f3 (1, 2, 3), row with position 1 in f3 is marked deleted
+      // f4 (2)
+      spark.sql("INSERT INTO T VALUES (2, 'b_new2')")
+      checkAnswer(
+        spark.sql(s"SELECT * from T ORDER BY id"),
+        Row(1, "a_new1") :: Row(2, "b_new2") :: Row(3, "c_new1") :: Nil)
+
+      val deletionVectors3 = restoreDeletionVectors()
+      // 2 deleted, row positions is 1
+      Assertions.assertEquals(1, deletionVectors3.size())
+      deletionVectors3
+        .entrySet()
+        .forEach(
+          e => {
+            Assertions.assertTrue(e.getValue.isDeleted(1))
+          })
+    }
+  }
+
+  test("Paimon deletionVector: e2e random write") {
+    val bucket = Random.shuffle(Seq("-1", "1", "3")).head
+    val changelogProducer = Random.shuffle(Seq("none", "lookup")).head
+    val format = Random.shuffle(Seq("orc", "parquet")).head
+    val batchSize = Random.nextInt(1024) + 1
+
+    val dvTbl = "deletion_vector_tbl"
+    val resultTbl = "result_tbl"
+    spark.sql(s"drop table if exists $dvTbl")
+    spark.sql(s"""
+                 |CREATE TABLE $dvTbl (id INT, name STRING, pt STRING)
+                 |TBLPROPERTIES (
+                 | 'primary-key' = 'id, pt',
+                 | 'deletion-vectors.enabled' = 'true',
+                 | 'bucket' = '$bucket',
+                 | 'changelog-producer' = '$changelogProducer',
+                 | 'file.format' = '$format',
+                 | 'read.batch-size' = '$batchSize'
+                 |)
+                 |PARTITIONED BY (pt)
+                 |""".stripMargin)
+
+    spark.sql(s"drop table if exists $resultTbl")
+    spark.sql(s"""
+                 |CREATE TABLE $resultTbl (id INT, name STRING, pt STRING)
+                 |TBLPROPERTIES (
+                 | 'primary-key' = 'id, pt',
+                 | 'deletion-vectors.enabled' = 'false'
+                 |)
+                 |PARTITIONED BY (pt)
+                 |""".stripMargin)
+
+    def insert(t1: String, t2: String, count: Int): Unit = {
+      val ids = (1 to count).map(_ => Random.nextInt(10000))
+      val names = (1 to count).map(_ => (Random.nextInt(26) + 
'a'.toInt).toChar.toString)
+      val pts = (1 to count).map(_ => s"p${Random.nextInt(3)}")
+      val values = ids
+        .zip(names)
+        .zip(pts)
+        .map { case ((id, name), pt) => s"($id, '$name', '$pt')" }
+        .mkString(", ")
+      spark.sql(s"INSERT INTO $t1 VALUES $values")
+      spark.sql(s"INSERT INTO $t2 VALUES $values")
+    }
+
+    def delete(t1: String, t2: String, count: Int): Unit = {
+      val ids = (1 to count).map(_ => Random.nextInt(10000)).toList
+      val idsString = ids.mkString(", ")
+      spark.sql(s"DELETE FROM $t1 WHERE id IN ($idsString)")
+      spark.sql(s"DELETE FROM $t2 WHERE id IN ($idsString)")
+    }
+
+    def update(t1: String, t2: String, count: Int): Unit = {
+      val ids = (1 to count).map(_ => Random.nextInt(10000)).toList
+      val idsString = ids.mkString(", ")
+      val randomName = (Random.nextInt(26) + 'a'.toInt).toChar.toString
+      spark.sql(s"UPDATE $t1 SET name = '$randomName' WHERE id IN 
($idsString)")
+      spark.sql(s"UPDATE $t2 SET name = '$randomName' WHERE id IN 
($idsString)")
+    }
+
+    def checkResult(t1: String, t2: String): Unit = {
+      try {
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $t1 ORDER BY id, pt"),
+          spark.sql(s"SELECT * FROM $t2 ORDER BY id, pt"))
+      } catch {
+        case e: Throwable =>
+          println(s"test error, table params: ${loadTable(dvTbl).options()}")
+          throw new RuntimeException(e)
+      }
+    }
+
+    val operations = Seq(
+      () => insert(dvTbl, resultTbl, 1000),
+      () => update(dvTbl, resultTbl, 100),
+      () => delete(dvTbl, resultTbl, 100)
+    )
+
+    // Insert first
+    operations.head()
+    checkResult(dvTbl, resultTbl)
+
+    for (_ <- 1 to 20) {
+      // Randomly select an operation
+      operations(Random.nextInt(operations.size))()
+      checkResult(dvTbl, resultTbl)
+    }
+  }
+}

Reply via email to