This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f9d04c147 [core] Introduce deletion vector (#2923)
f9d04c147 is described below
commit f9d04c14737a17492409d47d683c8c970d52db4a
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Mar 1 11:32:11 2024 +0800
[core] Introduce deletion vector (#2923)
---
paimon-core/pom.xml | 7 +
.../java/org/apache/paimon/AbstractFileStore.java | 4 +-
.../deletionvectors/BitmapDeletionVector.java | 94 +++++++++++
.../paimon/deletionvectors/DeletionVector.java | 94 +++++++++++
.../deletionvectors/DeletionVectorsIndexFile.java | 188 +++++++++++++++++++++
.../deletionvectors/DeletionVectorsMaintainer.java | 132 +++++++++++++++
.../org/apache/paimon/index/HashIndexFile.java | 29 +---
.../index/{HashIndexFile.java => IndexFile.java} | 26 +--
.../org/apache/paimon/index/IndexFileHandler.java | 47 +++++-
.../org/apache/paimon/index/IndexFileMeta.java | 41 ++++-
.../paimon/index/IndexFileMetaSerializer.java | 37 +++-
.../apache/paimon/manifest/IndexManifestEntry.java | 11 ++
.../manifest/IndexManifestEntrySerializer.java | 10 +-
.../paimon/deletionvectors/DeletionVectorTest.java | 65 +++++++
.../DeletionVectorsIndexFileTest.java | 97 +++++++++++
.../DeletionVectorsMaintainerTest.java | 67 ++++++++
.../paimon/index/IndexFileMetaSerializerTest.java | 26 ++-
.../manifest/IndexManifestEntrySerializerTest.java | 9 +-
18 files changed, 916 insertions(+), 68 deletions(-)
diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml
index 0a0effb15..0829c7da2 100644
--- a/paimon-core/pom.xml
+++ b/paimon-core/pom.xml
@@ -33,6 +33,7 @@ under the License.
<properties>
<frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version>
+ <RoaringBitmap.version>1.0.1</RoaringBitmap.version>
</properties>
<dependencies>
@@ -63,6 +64,12 @@ under the License.
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.roaringbitmap</groupId>
+ <artifactId>RoaringBitmap</artifactId>
+ <version>${RoaringBitmap.version}</version>
+ </dependency>
+
<!-- test dependencies -->
<dependency>
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 896b53794..f0dc46301 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -19,6 +19,7 @@
package org.apache.paimon;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.index.HashIndexFile;
import org.apache.paimon.index.IndexFileHandler;
@@ -143,7 +144,8 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
return new IndexFileHandler(
snapshotManager(),
indexManifestFileFactory().create(),
- new HashIndexFile(fileIO, pathFactory().indexFileFactory()));
+ new HashIndexFile(fileIO, pathFactory().indexFileFactory()),
+ new DeletionVectorsIndexFile(fileIO,
pathFactory().indexFileFactory()));
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
new file mode 100644
index 000000000..9ef2182e1
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors;
+
+import org.roaringbitmap.RoaringBitmap;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * A {@link DeletionVector} based on {@link RoaringBitmap}, it only supports
files with row count
+ * not exceeding {@link Integer#MAX_VALUE}.
+ */
+public class BitmapDeletionVector implements DeletionVector {
+
+ public static final int MAGIC_NUMBER = 1581511376;
+
+ private final RoaringBitmap roaringBitmap;
+
+ BitmapDeletionVector() {
+ roaringBitmap = new RoaringBitmap();
+ }
+
+ private BitmapDeletionVector(RoaringBitmap roaringBitmap) {
+ this.roaringBitmap = roaringBitmap;
+ }
+
+ @Override
+ public void delete(long position) {
+ checkPosition(position);
+ roaringBitmap.add((int) position);
+ }
+
+ @Override
+ public boolean checkedDelete(long position) {
+ checkPosition(position);
+ return roaringBitmap.checkedAdd((int) position);
+ }
+
+ @Override
+ public boolean isDeleted(long position) {
+ checkPosition(position);
+ return roaringBitmap.contains((int) position);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return roaringBitmap.isEmpty();
+ }
+
+ @Override
+ public byte[] serializeToBytes() {
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos)) {
+ dos.writeInt(MAGIC_NUMBER);
+ roaringBitmap.runOptimize();
+ roaringBitmap.serialize(dos);
+ return bos.toByteArray();
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to serialize deletion vector",
e);
+ }
+ }
+
+ public static DeletionVector deserializeFromDataInput(DataInput bis)
throws IOException {
+ RoaringBitmap roaringBitmap = new RoaringBitmap();
+ roaringBitmap.deserialize(bis);
+ return new BitmapDeletionVector(roaringBitmap);
+ }
+
+ private void checkPosition(long position) {
+ if (position > Integer.MAX_VALUE) {
+ throw new IllegalArgumentException(
+ "The file has too many rows, RoaringBitmap32 only supports
files with row count not exceeding 2147483647.");
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
new file mode 100644
index 000000000..c30df42ab
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+/**
+ * The DeletionVector can efficiently record the positions of rows that are
deleted in a file, which
+ * can then be used to filter out deleted rows when processing the file.
+ */
+public interface DeletionVector {
+
+ /**
+ * Marks the row at the specified position as deleted.
+ *
+ * @param position The position of the row to be marked as deleted.
+ */
+ void delete(long position);
+
+ /**
+ * Marks the row at the specified position as deleted.
+ *
+ * @param position The position of the row to be marked as deleted.
+ * @return true if the added position wasn't already deleted. False
otherwise.
+ */
+ default boolean checkedDelete(long position) {
+ if (isDeleted(position)) {
+ return false;
+ } else {
+ delete(position);
+ return true;
+ }
+ }
+
+ /**
+ * Checks if the row at the specified position is marked as deleted.
+ *
+ * @param position The position of the row to check.
+ * @return true if the row is marked as deleted, false otherwise.
+ */
+ boolean isDeleted(long position);
+
+ /**
+ * Determines if the deletion vector is empty, indicating no deletions.
+ *
+ * @return true if the deletion vector is empty, false if it contains
deletions.
+ */
+ boolean isEmpty();
+
+ /**
+ * Serializes the deletion vector to a byte array for storage or
transmission.
+ *
+ * @return A byte array representing the serialized deletion vector.
+ */
+ byte[] serializeToBytes();
+
+ /**
+ * Deserializes a deletion vector from a byte array.
+ *
+ * @param bytes The byte array containing the serialized deletion vector.
+ * @return A DeletionVector instance that represents the deserialized data.
+ */
+ static DeletionVector deserializeFromBytes(byte[] bytes) {
+ try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ DataInputStream dis = new DataInputStream(bis)) {
+ int magicNum = dis.readInt();
+ if (magicNum == BitmapDeletionVector.MAGIC_NUMBER) {
+ return BitmapDeletionVector.deserializeFromDataInput(dis);
+ } else {
+ throw new RuntimeException("Invalid magic number: " +
magicNum);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to deserialize deletion
vector", e);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
new file mode 100644
index 000000000..a82cc9be8
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.index.IndexFile;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.PathFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.zip.CRC32;
+
+/** DeletionVectors index file. */
+public class DeletionVectorsIndexFile extends IndexFile {
+
+ public static final String DELETION_VECTORS_INDEX = "DELETION_VECTORS";
+ public static final byte VERSION_ID_V1 = 1;
+
+ public DeletionVectorsIndexFile(FileIO fileIO, PathFactory pathFactory) {
+ super(fileIO, pathFactory);
+ }
+
+ /**
+ * Reads all deletion vectors from a specified file.
+ *
+ * @param fileName The name of the file from which to read the deletion
vectors.
+ * @param deletionVectorRanges A map where the key represents which file
the DeletionVector
+ * belongs to and the value is a Pair object specifying the range
(start position and size)
+ * within the file where the deletion vector data is located.
+ * @return A map where the key represents which file the DeletionVector
belongs to, and the
+ * value is the corresponding DeletionVector object.
+ * @throws UncheckedIOException If an I/O error occurs while reading from
the file.
+ */
+ public Map<String, DeletionVector> readAllDeletionVectors(
+ String fileName, Map<String, Pair<Integer, Integer>>
deletionVectorRanges) {
+ Map<String, DeletionVector> deletionVectors = new HashMap<>();
+ try (SeekableInputStream inputStream =
+ fileIO.newInputStream(pathFactory.toPath(fileName))) {
+ checkVersion(inputStream);
+ DataInputStream dataInputStream = new DataInputStream(inputStream);
+ for (Map.Entry<String, Pair<Integer, Integer>> entry :
+ deletionVectorRanges.entrySet()) {
+ deletionVectors.put(
+ entry.getKey(),
+ readDeletionVector(dataInputStream,
entry.getValue().getRight()));
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ "Unable to read deletion vectors from file: " + fileName,
e);
+ }
+ return deletionVectors;
+ }
+
+ /**
+ * Reads a single deletion vector from the specified file.
+ *
+ * @param fileName The name of the file from which to read the deletion
vector.
+ * @param deletionVectorRange A Pair specifying the range (start position
and size) within the
+ * file where the deletion vector data is located.
+ * @return The DeletionVector object read from the specified range in the
file.
+ * @throws UncheckedIOException If an I/O error occurs while reading from
the file.
+ */
+ public DeletionVector readDeletionVector(
+ String fileName, Pair<Integer, Integer> deletionVectorRange) {
+ try (SeekableInputStream inputStream =
+ fileIO.newInputStream(pathFactory.toPath(fileName))) {
+ checkVersion(inputStream);
+ inputStream.seek(deletionVectorRange.getLeft());
+ DataInputStream dataInputStream = new DataInputStream(inputStream);
+ return readDeletionVector(dataInputStream,
deletionVectorRange.getRight());
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ "Unable to read deletion vector from file: " + fileName,
e);
+ }
+ }
+
+ /**
+ * Write deletion vectors to a new file, the format of this file can be
referenced at: <a
+ * href="https://cwiki.apache.org/confluence/x/Tws4EQ">PIP-16</a>.
+ *
+ * @param input A map where the key represents which file the
DeletionVector belongs to, and the
+ * value is the corresponding DeletionVector object.
+ * @return A Pair object specifying the name of the written new file and a
map where the key
+ * represents which file the DeletionVector belongs to and the value
is a Pair object
+ * specifying the range (start position and size) within the file
where the deletion vector
+ * data is located.
+ * @throws UncheckedIOException If an I/O error occurs while writing to
the file.
+ */
+ public Pair<String, Map<String, Pair<Integer, Integer>>> write(
+ Map<String, DeletionVector> input) {
+ int size = input.size();
+ // use LinkedHashMap to ensure that the order of DeletionVectorRanges
and the written
+ // DeletionVectors is consistent.
+ Map<String, Pair<Integer, Integer>> deletionVectorRanges = new
LinkedHashMap<>(size);
+ Path path = pathFactory.newPath();
+ try (DataOutputStream dataOutputStream =
+ new DataOutputStream(fileIO.newOutputStream(path, true))) {
+ dataOutputStream.writeByte(VERSION_ID_V1);
+ for (Map.Entry<String, DeletionVector> entry : input.entrySet()) {
+ String key = entry.getKey();
+ byte[] valueBytes = entry.getValue().serializeToBytes();
+ deletionVectorRanges.put(key, Pair.of(dataOutputStream.size(),
valueBytes.length));
+ dataOutputStream.writeInt(valueBytes.length);
+ dataOutputStream.write(valueBytes);
+ dataOutputStream.writeInt(calculateChecksum(valueBytes));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Unable to write deletion vectors to file: " +
path.getName(), e);
+ }
+ return Pair.of(path.getName(), deletionVectorRanges);
+ }
+
+ private void checkVersion(InputStream in) throws IOException {
+ int version = in.read();
+ if (version != VERSION_ID_V1) {
+ throw new RuntimeException(
+ "Version not match, actual size: "
+ + version
+ + ", expert size: "
+ + VERSION_ID_V1);
+ }
+ }
+
+ private DeletionVector readDeletionVector(DataInputStream inputStream, int
size) {
+ try {
+ // check size
+ int actualSize = inputStream.readInt();
+ if (actualSize != size) {
+ throw new RuntimeException(
+ "Size not match, actual size: " + actualSize + ",
expert size: " + size);
+ }
+
+ // read DeletionVector bytes
+ byte[] bytes = new byte[size];
+ int readSize = inputStream.read(bytes);
+ if (readSize != size) {
+ throw new RuntimeException(
+ "Size not match, actual size: " + readSize + ", expert
size: " + size);
+ }
+
+ // check checksum
+ int checkSum = calculateChecksum(bytes);
+ int actualCheckSum = inputStream.readInt();
+ if (actualCheckSum != checkSum) {
+ throw new RuntimeException(
+ "Checksum not match, actual checksum: "
+ + actualCheckSum
+ + ", expected checksum: "
+ + checkSum);
+ }
+ return DeletionVector.deserializeFromBytes(bytes);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Unable to read deletion vector",
e);
+ }
+ }
+
+ private int calculateChecksum(byte[] bytes) {
+ CRC32 crc = new CRC32();
+ crc.update(bytes);
+ return (int) crc.getValue();
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
new file mode 100644
index 000000000..7c88edc6c
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Maintainer of deletionVectors index. */
+public class DeletionVectorsMaintainer {
+
+ private final IndexFileHandler indexFileHandler;
+ private final Map<String, DeletionVector> deletionVectors;
+ private boolean modified;
+
+ private DeletionVectorsMaintainer(
+ IndexFileHandler fileHandler,
+ @Nullable Long snapshotId,
+ BinaryRow partition,
+ int bucket) {
+ this.indexFileHandler = fileHandler;
+ IndexFileMeta indexFile =
+ snapshotId == null
+ ? null
+ : fileHandler
+ .scan(
+ snapshotId,
+
DeletionVectorsIndexFile.DELETION_VECTORS_INDEX,
+ partition,
+ bucket)
+ .orElse(null);
+ this.deletionVectors =
+ indexFile == null
+ ? new HashMap<>()
+ : indexFileHandler.readAllDeletionVectors(indexFile);
+ this.modified = false;
+ }
+
+ /**
+ * Notifies a new deletion which marks the specified row position as
deleted with the given file
+ * name.
+ *
+ * @param fileName The name of the file where the deletion occurred.
+ * @param position The row position within the file that has been deleted.
+ */
+ public void notifyNewDeletion(String fileName, long position) {
+ DeletionVector deletionVector =
+ deletionVectors.computeIfAbsent(fileName, k -> new
BitmapDeletionVector());
+ if (deletionVector.checkedDelete(position)) {
+ modified = true;
+ }
+ }
+
+ /**
+ * Removes the specified file's deletion vector, this method is typically
used for remove before
+ * files' deletion vector in compaction.
+ *
+ * @param fileName The name of the file whose deletion vector should be
removed.
+ */
+ public void removeDeletionVectorOf(String fileName) {
+ if (deletionVectors.containsKey(fileName)) {
+ deletionVectors.remove(fileName);
+ modified = true;
+ }
+ }
+
+ /**
+ * Prepares to commit: write new deletion vectors index file if any
modifications have been
+ * made.
+ *
+ * @return A list containing the metadata of the deletion vectors index
file, or an empty list
+ * if no changes need to be committed.
+ */
+ public List<IndexFileMeta> prepareCommit() {
+ if (modified) {
+ IndexFileMeta entry =
indexFileHandler.writeDeletionVectorsIndex(deletionVectors);
+ modified = false;
+ return Collections.singletonList(entry);
+ }
+ return Collections.emptyList();
+ }
+
+ /**
+ * Retrieves the deletion vector associated with the specified file name.
+ *
+ * @param fileName The name of the file for which the deletion vector is
requested.
+ * @return An {@code Optional} containing the deletion vector if it
exists, or an empty {@code
+ * Optional} if not.
+ */
+ public Optional<DeletionVector> deletionVectorOf(String fileName) {
+ return Optional.ofNullable(deletionVectors.get(fileName));
+ }
+
+ /** Factory to restore {@link DeletionVectorsMaintainer}. */
+ public static class DeletionVectorsMaintainerFactory {
+
+ private final IndexFileHandler handler;
+
+ public DeletionVectorsMaintainerFactory(IndexFileHandler handler) {
+ this.handler = handler;
+ }
+
+ public DeletionVectorsMaintainer createOrRestore(
+ @Nullable Long snapshotId, BinaryRow partition, int bucket) {
+ return new DeletionVectorsMaintainer(handler, snapshotId,
partition, bucket);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java
b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java
index f8ecff62d..4919f08f3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java
@@ -24,30 +24,17 @@ import org.apache.paimon.utils.IntIterator;
import org.apache.paimon.utils.PathFactory;
import java.io.IOException;
-import java.io.UncheckedIOException;
import static org.apache.paimon.utils.IntFileUtils.readInts;
import static org.apache.paimon.utils.IntFileUtils.writeInts;
/** Hash index file contains ints. */
-public class HashIndexFile {
+public class HashIndexFile extends IndexFile {
public static final String HASH_INDEX = "HASH";
- private final FileIO fileIO;
- private final PathFactory pathFactory;
-
public HashIndexFile(FileIO fileIO, PathFactory pathFactory) {
- this.fileIO = fileIO;
- this.pathFactory = pathFactory;
- }
-
- public long fileSize(String fileName) {
- try {
- return fileIO.getFileSize(pathFactory.toPath(fileName));
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
+ super(fileIO, pathFactory);
}
public IntIterator read(String fileName) throws IOException {
@@ -59,16 +46,4 @@ public class HashIndexFile {
writeInts(fileIO, path, input);
return path.getName();
}
-
- public void delete(String fileName) {
- fileIO.deleteQuietly(pathFactory.toPath(fileName));
- }
-
- public boolean exists(String fileName) {
- try {
- return fileIO.exists(pathFactory.toPath(fileName));
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFile.java
similarity index 67%
copy from paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java
copy to paimon-core/src/main/java/org/apache/paimon/index/IndexFile.java
index f8ecff62d..2106113eb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFile.java
@@ -19,25 +19,19 @@
package org.apache.paimon.index;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.utils.IntIterator;
import org.apache.paimon.utils.PathFactory;
import java.io.IOException;
import java.io.UncheckedIOException;
-import static org.apache.paimon.utils.IntFileUtils.readInts;
-import static org.apache.paimon.utils.IntFileUtils.writeInts;
+/** Base index file. */
+public abstract class IndexFile {
-/** Hash index file contains ints. */
-public class HashIndexFile {
+ protected final FileIO fileIO;
- public static final String HASH_INDEX = "HASH";
+ protected final PathFactory pathFactory;
- private final FileIO fileIO;
- private final PathFactory pathFactory;
-
- public HashIndexFile(FileIO fileIO, PathFactory pathFactory) {
+ public IndexFile(FileIO fileIO, PathFactory pathFactory) {
this.fileIO = fileIO;
this.pathFactory = pathFactory;
}
@@ -50,16 +44,6 @@ public class HashIndexFile {
}
}
- public IntIterator read(String fileName) throws IOException {
- return readInts(fileIO, pathFactory.toPath(fileName));
- }
-
- public String write(IntIterator input) throws IOException {
- Path path = pathFactory.newPath();
- writeInts(fileIO, path, input);
- return path.getName();
- }
-
public void delete(String fileName) {
fileIO.deleteQuietly(pathFactory.toPath(fileName));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index 73b0d3416..d460097f0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -20,9 +20,12 @@ package org.apache.paimon.index;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.IndexManifestFile;
import org.apache.paimon.utils.IntIterator;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
import java.io.IOException;
@@ -30,8 +33,10 @@ import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
/** Handle index files. */
@@ -40,14 +45,17 @@ public class IndexFileHandler {
private final SnapshotManager snapshotManager;
private final IndexManifestFile indexManifestFile;
private final HashIndexFile hashIndex;
+ private final DeletionVectorsIndexFile deletionVectorsIndex;
public IndexFileHandler(
SnapshotManager snapshotManager,
IndexManifestFile indexManifestFile,
- HashIndexFile hashIndex) {
+ HashIndexFile hashIndex,
+ DeletionVectorsIndexFile deletionVectorsIndex) {
this.snapshotManager = snapshotManager;
this.indexManifestFile = indexManifestFile;
this.hashIndex = hashIndex;
+ this.deletionVectorsIndex = deletionVectorsIndex;
}
public Optional<IndexFileMeta> scan(
@@ -148,4 +156,41 @@ public class IndexFileHandler {
public void deleteManifest(String indexManifest) {
indexManifestFile.delete(indexManifest);
}
+
+ public Map<String, DeletionVector> readAllDeletionVectors(IndexFileMeta
fileMeta) {
+ if (!fileMeta.indexType().equals(DELETION_VECTORS_INDEX)) {
+ throw new IllegalArgumentException(
+ "Input file is not deletion vectors index " +
fileMeta.indexType());
+ }
+ Map<String, Pair<Integer, Integer>> deleteIndexRange =
fileMeta.deletionVectorsRanges();
+ if (deleteIndexRange == null || deleteIndexRange.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ return
deletionVectorsIndex.readAllDeletionVectors(fileMeta.fileName(),
deleteIndexRange);
+ }
+
+ public Optional<DeletionVector> readDeletionVector(IndexFileMeta fileMeta,
String fileName) {
+ if (!fileMeta.indexType().equals(DELETION_VECTORS_INDEX)) {
+ throw new IllegalArgumentException(
+ "Input file is not deletion vectors index " +
fileMeta.indexType());
+ }
+ Map<String, Pair<Integer, Integer>> deleteIndexRange =
fileMeta.deletionVectorsRanges();
+ if (deleteIndexRange == null ||
!deleteIndexRange.containsKey(fileName)) {
+ return Optional.empty();
+ }
+ return Optional.of(
+ deletionVectorsIndex.readDeletionVector(
+ fileMeta.fileName(), deleteIndexRange.get(fileName)));
+ }
+
+ public IndexFileMeta writeDeletionVectorsIndex(Map<String, DeletionVector>
deletionVectors) {
+ Pair<String, Map<String, Pair<Integer, Integer>>> pair =
+ deletionVectorsIndex.write(deletionVectors);
+ return new IndexFileMeta(
+ DELETION_VECTORS_INDEX,
+ pair.getLeft(),
+ deletionVectorsIndex.fileSize(pair.getLeft()),
+ deletionVectors.size(),
+ pair.getRight());
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
index 6fb3e176b..1b3f405fe 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
@@ -18,12 +18,19 @@
package org.apache.paimon.index;
+import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
+import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import static org.apache.paimon.utils.SerializationUtils.newStringType;
@@ -36,11 +43,24 @@ public class IndexFileMeta {
private final long fileSize;
private final long rowCount;
+ /** Metadata only used by {@link DeletionVectorsIndexFile}. */
+ private final @Nullable Map<String, Pair<Integer, Integer>>
deletionVectorsRanges;
+
public IndexFileMeta(String indexType, String fileName, long fileSize,
long rowCount) {
+ this(indexType, fileName, fileSize, rowCount, null);
+ }
+
+ public IndexFileMeta(
+ String indexType,
+ String fileName,
+ long fileSize,
+ long rowCount,
+ @Nullable Map<String, Pair<Integer, Integer>>
deletionVectorsRanges) {
this.indexType = indexType;
this.fileName = fileName;
this.fileSize = fileSize;
this.rowCount = rowCount;
+ this.deletionVectorsRanges = deletionVectorsRanges;
}
public String indexType() {
@@ -59,6 +79,10 @@ public class IndexFileMeta {
return rowCount;
}
+ public @Nullable Map<String, Pair<Integer, Integer>>
deletionVectorsRanges() {
+ return deletionVectorsRanges;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -71,12 +95,13 @@ public class IndexFileMeta {
return Objects.equals(indexType, that.indexType)
&& Objects.equals(fileName, that.fileName)
&& fileSize == that.fileSize
- && rowCount == that.rowCount;
+ && rowCount == that.rowCount
+ && Objects.equals(deletionVectorsRanges,
that.deletionVectorsRanges);
}
@Override
public int hashCode() {
- return Objects.hash(indexType, fileName, fileSize, rowCount);
+ return Objects.hash(indexType, fileName, fileSize, rowCount,
deletionVectorsRanges);
}
@Override
@@ -91,6 +116,8 @@ public class IndexFileMeta {
+ fileSize
+ ", rowCount="
+ rowCount
+ + ", deletionVectorsRanges="
+ + deletionVectorsRanges
+ '}';
}
@@ -100,6 +127,16 @@ public class IndexFileMeta {
fields.add(new DataField(1, "_FILE_NAME", newStringType(false)));
fields.add(new DataField(2, "_FILE_SIZE", new BigIntType(false)));
fields.add(new DataField(3, "_ROW_COUNT", new BigIntType(false)));
+ fields.add(
+ new DataField(
+ 4,
+ "_DELETION_VECTORS_RANGES",
+ new ArrayType(
+ true,
+ RowType.of(
+ newStringType(false),
+ new IntType(false),
+ new IntType(false)))));
return new RowType(fields);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
index 2813dd48b..005868540 100644
---
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
@@ -19,11 +19,17 @@
package org.apache.paimon.index;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.utils.ObjectSerializer;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.VersionedObjectSerializer;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
/** A {@link VersionedObjectSerializer} for {@link IndexFileMeta}. */
public class IndexFileMetaSerializer extends ObjectSerializer<IndexFileMeta> {
@@ -37,7 +43,10 @@ public class IndexFileMetaSerializer extends
ObjectSerializer<IndexFileMeta> {
BinaryString.fromString(record.indexType()),
BinaryString.fromString(record.fileName()),
record.fileSize(),
- record.rowCount());
+ record.rowCount(),
+ record.deletionVectorsRanges() == null
+ ? null
+ :
dvRangesToRowArrayData(record.deletionVectorsRanges()));
}
@Override
@@ -46,6 +55,30 @@ public class IndexFileMetaSerializer extends
ObjectSerializer<IndexFileMeta> {
row.getString(0).toString(),
row.getString(1).toString(),
row.getLong(2),
- row.getLong(3));
+ row.getLong(3),
+ row.isNullAt(4) ? null :
rowArrayDataToDvRanges(row.getArray(4)));
+ }
+
+ public static InternalArray dvRangesToRowArrayData(
+ Map<String, Pair<Integer, Integer>> dvRanges) {
+ return new GenericArray(
+ dvRanges.entrySet().stream()
+ .map(
+ entry ->
+ GenericRow.of(
+
BinaryString.fromString(entry.getKey()),
+ entry.getValue().getLeft(),
+ entry.getValue().getRight()))
+ .toArray(GenericRow[]::new));
+ }
+
+ public static Map<String, Pair<Integer, Integer>> rowArrayDataToDvRanges(
+ InternalArray arrayData) {
+ Map<String, Pair<Integer, Integer>> dvRanges = new
LinkedHashMap<>(arrayData.size());
+ for (int i = 0; i < arrayData.size(); i++) {
+ InternalRow row = arrayData.getRow(i, 3);
+ dvRanges.put(row.getString(0).toString(), Pair.of(row.getInt(1),
row.getInt(2)));
+ }
+ return dvRanges;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
index e2b136855..53f3fb82d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
@@ -20,6 +20,7 @@ package org.apache.paimon.manifest;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.IntType;
@@ -80,6 +81,16 @@ public class IndexManifestEntry {
fields.add(new DataField(4, "_FILE_NAME", newStringType(false)));
fields.add(new DataField(5, "_FILE_SIZE", new BigIntType(false)));
fields.add(new DataField(6, "_ROW_COUNT", new BigIntType(false)));
+ fields.add(
+ new DataField(
+ 7,
+ "_DELETIONS_VECTORS_RANGES",
+ new ArrayType(
+ true,
+ RowType.of(
+ newStringType(false),
+ new IntType(false),
+ new IntType(false)))));
return new RowType(fields);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
index 40c4cb5a8..bdd711442 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
@@ -24,6 +24,8 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.utils.VersionedObjectSerializer;
+import static
org.apache.paimon.index.IndexFileMetaSerializer.dvRangesToRowArrayData;
+import static
org.apache.paimon.index.IndexFileMetaSerializer.rowArrayDataToDvRanges;
import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
@@ -49,7 +51,10 @@ public class IndexManifestEntrySerializer extends
VersionedObjectSerializer<Inde
BinaryString.fromString(indexFile.indexType()),
BinaryString.fromString(indexFile.fileName()),
indexFile.fileSize(),
- indexFile.rowCount());
+ indexFile.rowCount(),
+ record.indexFile().deletionVectorsRanges() == null
+ ? null
+ :
dvRangesToRowArrayData(record.indexFile().deletionVectorsRanges()));
}
@Override
@@ -66,6 +71,7 @@ public class IndexManifestEntrySerializer extends
VersionedObjectSerializer<Inde
row.getString(3).toString(),
row.getString(4).toString(),
row.getLong(5),
- row.getLong(6)));
+ row.getLong(6),
+ row.isNullAt(7) ? null :
rowArrayDataToDvRanges(row.getArray(7))));
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorTest.java
new file mode 100644
index 000000000..d2b42b798
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashSet;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DeletionVector}. */
+public class DeletionVectorTest {
+ @Test
+ public void testBitmapDeletionVector() {
+ HashSet<Integer> toDelete = new HashSet<>();
+ Random random = new Random();
+ for (int i = 0; i < 10000; i++) {
+ toDelete.add(random.nextInt());
+ }
+ HashSet<Integer> notDelete = new HashSet<>();
+ for (int i = 0; i < 10000; i++) {
+ if (!toDelete.contains(i)) {
+ notDelete.add(i);
+ }
+ }
+
+ DeletionVector deletionVector = new BitmapDeletionVector();
+ assertThat(deletionVector.isEmpty()).isTrue();
+
+ for (Integer i : toDelete) {
+ assertThat(deletionVector.checkedDelete(i)).isTrue();
+ assertThat(deletionVector.checkedDelete(i)).isFalse();
+ }
+ DeletionVector deserializedDeletionVector =
+
DeletionVector.deserializeFromBytes(deletionVector.serializeToBytes());
+
+ assertThat(deletionVector.isEmpty()).isFalse();
+ assertThat(deserializedDeletionVector.isEmpty()).isFalse();
+ for (Integer i : toDelete) {
+ assertThat(deletionVector.isDeleted(i)).isTrue();
+ assertThat(deserializedDeletionVector.isDeleted(i)).isTrue();
+ }
+ for (Integer i : notDelete) {
+ assertThat(deletionVector.isDeleted(i)).isFalse();
+ assertThat(deserializedDeletionVector.isDeleted(i)).isFalse();
+ }
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
new file mode 100644
index 000000000..228508f33
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.PathFactory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DeletionVectorsIndexFile}. */
+public class DeletionVectorsIndexFileTest {
+ @TempDir java.nio.file.Path tempPath;
+
+ @Test
+ public void test0() {
+ Path dir = new Path(tempPath.toUri());
+ PathFactory pathFactory =
+ new PathFactory() {
+ @Override
+ public Path newPath() {
+ return new Path(dir, UUID.randomUUID().toString());
+ }
+
+ @Override
+ public Path toPath(String fileName) {
+ return new Path(dir, fileName);
+ }
+ };
+
+ DeletionVectorsIndexFile deletionVectorsIndexFile =
+ new DeletionVectorsIndexFile(LocalFileIO.create(),
pathFactory);
+
+ // write
+ HashMap<String, DeletionVector> deleteMap = new HashMap<>();
+ BitmapDeletionVector index1 = new BitmapDeletionVector();
+ index1.delete(1);
+ deleteMap.put("file1.parquet", index1);
+
+ BitmapDeletionVector index2 = new BitmapDeletionVector();
+ index2.delete(2);
+ index2.delete(3);
+ deleteMap.put("file2.parquet", index2);
+
+ BitmapDeletionVector index3 = new BitmapDeletionVector();
+ index3.delete(3);
+ deleteMap.put("file33.parquet", index3);
+
+ Pair<String, Map<String, Pair<Integer, Integer>>> pair =
+ deletionVectorsIndexFile.write(deleteMap);
+ String fileName = pair.getLeft();
+ Map<String, Pair<Integer, Integer>> deletionVectorRanges =
pair.getRight();
+
+ // read
+ Map<String, DeletionVector> actualDeleteMap =
+ deletionVectorsIndexFile.readAllDeletionVectors(fileName,
deletionVectorRanges);
+ assertThat(actualDeleteMap.get("file1.parquet").isDeleted(1)).isTrue();
+
assertThat(actualDeleteMap.get("file1.parquet").isDeleted(2)).isFalse();
+ assertThat(actualDeleteMap.get("file2.parquet").isDeleted(2)).isTrue();
+ assertThat(actualDeleteMap.get("file2.parquet").isDeleted(3)).isTrue();
+
assertThat(actualDeleteMap.get("file33.parquet").isDeleted(3)).isTrue();
+
+ DeletionVector file1DeletionVector =
+ deletionVectorsIndexFile.readDeletionVector(
+ fileName, deletionVectorRanges.get("file1.parquet"));
+ assertThat(file1DeletionVector.isDeleted(1)).isTrue();
+ assertThat(file1DeletionVector.isDeleted(2)).isFalse();
+
+ // delete
+ deletionVectorsIndexFile.delete(fileName);
+ assertThat(deletionVectorsIndexFile.exists(fileName)).isFalse();
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
new file mode 100644
index 000000000..6749c6a93
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors;
+
+import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DeletionVectorsMaintainer}. */
+public class DeletionVectorsMaintainerTest extends PrimaryKeyTableTestBase {
+ private IndexFileHandler fileHandler;
+
+ @BeforeEach
+ public void beforeEach() throws Exception {
+ fileHandler = table.store().newIndexFileHandler();
+ }
+
+ @Test
+ public void test0() {
+ DeletionVectorsMaintainer.DeletionVectorsMaintainerFactory factory =
+ new
DeletionVectorsMaintainer.DeletionVectorsMaintainerFactory(fileHandler);
+ DeletionVectorsMaintainer dvMaintainer =
+ factory.createOrRestore(null, BinaryRow.EMPTY_ROW, 0);
+
+ dvMaintainer.notifyNewDeletion("f1", 1);
+ dvMaintainer.notifyNewDeletion("f2", 2);
+ dvMaintainer.notifyNewDeletion("f3", 3);
+ dvMaintainer.removeDeletionVectorOf("f3");
+
+ assertThat(dvMaintainer.deletionVectorOf("f1")).isPresent();
+ assertThat(dvMaintainer.deletionVectorOf("f3")).isEmpty();
+ List<IndexFileMeta> fileMetas = dvMaintainer.prepareCommit();
+
+ Map<String, DeletionVector> deletionVectors =
+ fileHandler.readAllDeletionVectors(fileMetas.get(0));
+ assertThat(deletionVectors.get("f1").isDeleted(1)).isTrue();
+ assertThat(deletionVectors.get("f1").isDeleted(2)).isFalse();
+ assertThat(deletionVectors.get("f2").isDeleted(1)).isFalse();
+ assertThat(deletionVectors.get("f2").isDeleted(2)).isTrue();
+ assertThat(deletionVectors.containsKey("f3")).isFalse();
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
index 8f1ef392a..0e8cfc430 100644
---
a/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
@@ -18,9 +18,13 @@
package org.apache.paimon.index;
+import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.utils.ObjectSerializer;
import org.apache.paimon.utils.ObjectSerializerTestBase;
+import org.apache.paimon.utils.Pair;
+import java.util.LinkedHashMap;
+import java.util.Map;
import java.util.Random;
/** Test for {@link org.apache.paimon.index.IndexFileMetaSerializer}. */
@@ -38,10 +42,22 @@ public class IndexFileMetaSerializerTest extends
ObjectSerializerTestBase<IndexF
public static IndexFileMeta randomIndexFile() {
Random rnd = new Random();
- return new IndexFileMeta(
- HashIndexFile.HASH_INDEX,
- "my_file_name" + rnd.nextLong(),
- rnd.nextInt(),
- rnd.nextInt());
+ if (rnd.nextBoolean()) {
+ return new IndexFileMeta(
+ HashIndexFile.HASH_INDEX,
+ "my_file_name" + rnd.nextLong(),
+ rnd.nextInt(),
+ rnd.nextInt());
+ } else {
+ Map<String, Pair<Integer, Integer>> deletionVectorsRanges = new
LinkedHashMap<>();
+ deletionVectorsRanges.put("my_file_name1", Pair.of(rnd.nextInt(),
rnd.nextInt()));
+ deletionVectorsRanges.put("my_file_name2", Pair.of(rnd.nextInt(),
rnd.nextInt()));
+ return new IndexFileMeta(
+ DeletionVectorsIndexFile.DELETION_VECTORS_INDEX,
+ "deletion_vectors_index_file_name" + rnd.nextLong(),
+ rnd.nextInt(),
+ rnd.nextInt(),
+ deletionVectorsRanges);
+ }
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestEntrySerializerTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestEntrySerializerTest.java
index 1ee3d6b5d..0429b8dae 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestEntrySerializerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestEntrySerializerTest.java
@@ -18,13 +18,12 @@
package org.apache.paimon.manifest;
-import org.apache.paimon.index.HashIndexFile;
-import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.utils.ObjectSerializer;
import org.apache.paimon.utils.ObjectSerializerTestBase;
import java.util.Random;
+import static
org.apache.paimon.index.IndexFileMetaSerializerTest.randomIndexFile;
import static org.apache.paimon.io.DataFileTestUtils.row;
/** Test for {@link IndexManifestEntrySerializer}. */
@@ -46,10 +45,6 @@ public class IndexManifestEntrySerializerTest extends
ObjectSerializerTestBase<I
rnd.nextBoolean() ? FileKind.ADD : FileKind.DELETE,
row(rnd.nextInt()),
rnd.nextInt(),
- new IndexFileMeta(
- HashIndexFile.HASH_INDEX,
- "my_file_name" + rnd.nextLong(),
- rnd.nextInt(),
- rnd.nextInt()));
+ randomIndexFile());
}
}