This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new cec1709c6 [core] Introduce DeletionVectorIndexFileWriter (#3402)
cec1709c6 is described below
commit cec1709c66434bc92de25762c7d6055a39a0dc3e
Author: Yann Byron <[email protected]>
AuthorDate: Thu May 30 15:04:01 2024 +0800
[core] Introduce DeletionVectorIndexFileWriter (#3402)
---
.../shortcodes/generated/core_configuration.html | 6 +
.../main/java/org/apache/paimon/CoreOptions.java | 10 ++
.../java/org/apache/paimon/AbstractFileStore.java | 8 +-
.../DeletionVectorIndexFileMaintainer.java | 17 ++-
.../DeletionVectorIndexFileWriter.java | 141 +++++++++++++++++++++
.../deletionvectors/DeletionVectorsIndexFile.java | 71 +++--------
.../deletionvectors/DeletionVectorsMaintainer.java | 3 +-
.../org/apache/paimon/index/IndexFileHandler.java | 31 ++---
.../DeletionVectorsIndexFileTest.java | 122 ++++++++++++------
9 files changed, 291 insertions(+), 118 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 34bf784f8..01f62e759 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -194,6 +194,12 @@ under the License.
<td>Boolean</td>
<td>Whether to enable deletion vectors mode. In this mode, index
files containing deletion vectors are generated when data is written, which
marks the data for deletion. During read operations, by applying these index
files, merging can be avoided.</td>
</tr>
+ <tr>
+ <td><h5>deletion-vector.index-file.target-size</h5></td>
+ <td style="word-wrap: break-word;">2 mb</td>
+ <td>MemorySize</td>
+ <td>The target size of deletion vector index file.</td>
+ </tr>
<tr>
<td><h5>dynamic-bucket.assigner-parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 76f6b7a08..e46946b4a 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1113,6 +1113,12 @@ public class CoreOptions implements Serializable {
+ " vectors are generated when data is
written, which marks the data for deletion."
+ " During read operations, by applying
these index files, merging can be avoided.");
+ public static final ConfigOption<MemorySize>
DELETION_VECTOR_INDEX_FILE_TARGET_SIZE =
+ key("deletion-vector.index-file.target-size")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(2))
+ .withDescription("The target size of deletion vector index
file.");
+
public static final ConfigOption<Boolean> DELETION_FORCE_PRODUCE_CHANGELOG
=
key("delete.force-produce-changelog")
.booleanType()
@@ -1816,6 +1822,10 @@ public class CoreOptions implements Serializable {
return options.get(DELETION_VECTORS_ENABLED);
}
+ public MemorySize deletionVectorIndexFileTargetSize() {
+ return options.get(DELETION_VECTOR_INDEX_FILE_TARGET_SIZE);
+ }
+
public FileIndexOptions indexColumnsOptions() {
return new FileIndexOptions(this);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index a5cba5c87..bfa0e8947 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -39,6 +39,7 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.stats.StatsFile;
import org.apache.paimon.stats.StatsFileHandler;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.sink.CallbackUtils;
import org.apache.paimon.table.sink.TagCallback;
@@ -146,7 +147,12 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
pathFactory().indexFileFactory(),
indexManifestFileFactory().create(),
new HashIndexFile(fileIO, pathFactory().indexFileFactory()),
- new DeletionVectorsIndexFile(fileIO,
pathFactory().indexFileFactory()));
+ new DeletionVectorsIndexFile(
+ fileIO,
+ pathFactory().indexFileFactory(),
+ bucketMode() == BucketMode.BUCKET_UNAWARE
+ ? options.deletionVectorIndexFileTargetSize()
+ : MemorySize.ofBytes(Long.MAX_VALUE)));
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
index 3daca576b..a9c52fb22 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
@@ -95,16 +95,19 @@ public class DeletionVectorIndexFileMaintainer {
Map<String, DeletionFile> dataFileToDeletionFiles =
indexFileToDeletionFiles.get(indexFile);
if (!dataFileToDeletionFiles.isEmpty()) {
- IndexFileMeta newIndexFile =
+ List<IndexFileMeta> newIndexFiles =
indexFileHandler.writeDeletionVectorsIndex(
deletionVectorsIndexFile.readDeletionVector(
dataFileToDeletionFiles));
- newIndexEntries.add(
- new IndexManifestEntry(
- FileKind.ADD,
- oldEntry.partition(),
- oldEntry.bucket(),
- newIndexFile));
+ newIndexFiles.forEach(
+ newIndexFile -> {
+ newIndexEntries.add(
+ new IndexManifestEntry(
+ FileKind.ADD,
+ oldEntry.partition(),
+ oldEntry.bucket(),
+ newIndexFile));
+ });
}
// mark the touched index file as removed.
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
new file mode 100644
index 000000000..f7fa7eb08
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.PathFactory;
+import org.apache.paimon.utils.Preconditions;
+
+import java.io.Closeable;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
+import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.VERSION_ID_V1;
+import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.calculateChecksum;
+
+/** Writer for deletion vector index file. */
+public class DeletionVectorIndexFileWriter {
+
+ private final PathFactory indexPathFactory;
+ private final FileIO fileIO;
+ private final long targetSizeInBytes;
+
+ public DeletionVectorIndexFileWriter(
+ FileIO fileIO, PathFactory pathFactory, MemorySize
targetSizePerIndexFile) {
+ this.indexPathFactory = pathFactory;
+ this.fileIO = fileIO;
+ this.targetSizeInBytes = targetSizePerIndexFile.getBytes();
+ }
+
+ public List<IndexFileMeta> write(Map<String, DeletionVector> input) throws
IOException {
+ if (input.isEmpty()) {
+ return emptyIndexFile();
+ }
+ List<IndexFileMeta> result = new ArrayList<>();
+ Iterator<Map.Entry<String, DeletionVector>> iterator =
input.entrySet().iterator();
+ while (iterator.hasNext()) {
+ result.add(tryWriter(iterator));
+ }
+ return result;
+ }
+
+ private IndexFileMeta tryWriter(Iterator<Map.Entry<String,
DeletionVector>> iterator)
+ throws IOException {
+ SingleIndexFileWriter writer = new SingleIndexFileWriter();
+ try {
+ while (iterator.hasNext()) {
+ Map.Entry<String, DeletionVector> entry = iterator.next();
+ long currentSize = writer.write(entry.getKey(),
entry.getValue());
+
+ if (writer.writtenSizeInBytes() + currentSize >
targetSizeInBytes) {
+ break;
+ }
+ }
+ } finally {
+ writer.close();
+ }
+ return writer.writtenIndexFile();
+ }
+
+ private List<IndexFileMeta> emptyIndexFile() throws IOException {
+ try (SingleIndexFileWriter writer = new SingleIndexFileWriter()) {
+ return Collections.singletonList(writer.writtenIndexFile());
+ }
+ }
+
+ private class SingleIndexFileWriter implements Closeable {
+
+ private final Path path;
+
+ private final DataOutputStream dataOutputStream;
+
+ private final LinkedHashMap<String, Pair<Integer, Integer>> dvRanges;
+
+ private long writtenSizeInBytes = 0L;
+
+ public SingleIndexFileWriter() throws IOException {
+ this.path = indexPathFactory.newPath();
+ this.dataOutputStream = new
DataOutputStream(fileIO.newOutputStream(path, true));
+ dataOutputStream.writeByte(VERSION_ID_V1);
+ this.dvRanges = new LinkedHashMap<>();
+ }
+
+ public long writtenSizeInBytes() {
+ return this.writtenSizeInBytes;
+ }
+
+ public long write(String key, DeletionVector deletionVector) throws
IOException {
+ Preconditions.checkNotNull(dataOutputStream);
+ byte[] data = deletionVector.serializeToBytes();
+ int size = data.length;
+
+ dvRanges.put(key, Pair.of(dataOutputStream.size(), size));
+ dataOutputStream.writeInt(size);
+ dataOutputStream.write(data);
+ dataOutputStream.writeInt(calculateChecksum(data));
+ writtenSizeInBytes += size;
+ return size;
+ }
+
+ public IndexFileMeta writtenIndexFile() throws IOException {
+ return new IndexFileMeta(
+ DELETION_VECTORS_INDEX,
+ path.getName(),
+ fileIO.getFileSize(path),
+ dvRanges.size(),
+ dvRanges);
+ }
+
+ @Override
+ public void close() throws IOException {
+ dataOutputStream.close();
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
index 0207250d1..f21aeb5a7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
@@ -23,17 +23,18 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.index.IndexFile;
import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.options.MemorySize;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;
import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.zip.CRC32;
@@ -46,8 +47,12 @@ public class DeletionVectorsIndexFile extends IndexFile {
public static final String DELETION_VECTORS_INDEX = "DELETION_VECTORS";
public static final byte VERSION_ID_V1 = 1;
- public DeletionVectorsIndexFile(FileIO fileIO, PathFactory pathFactory) {
+ private final MemorySize targetSizePerIndexFile;
+
+ public DeletionVectorsIndexFile(
+ FileIO fileIO, PathFactory pathFactory, MemorySize
targetSizePerIndexFile) {
super(fileIO, pathFactory);
+ this.targetSizePerIndexFile = targetSizePerIndexFile;
}
/**
@@ -85,6 +90,12 @@ public class DeletionVectorsIndexFile extends IndexFile {
return deletionVectors;
}
+ public Map<String, DeletionVector>
readAllDeletionVectors(List<IndexFileMeta> indexFiles) {
+ Map<String, DeletionVector> deletionVectors = new HashMap<>();
+ indexFiles.forEach(indexFile ->
deletionVectors.putAll(readAllDeletionVectors(indexFile)));
+ return deletionVectors;
+ }
+
/** Reads deletion vectors from a list of DeletionFile which belong to a
same index file. */
public Map<String, DeletionVector> readDeletionVector(
Map<String, DeletionFile> dataFileToDeletionFiles) {
@@ -110,33 +121,6 @@ public class DeletionVectorsIndexFile extends IndexFile {
return deletionVectors;
}
- /**
- * Reads a single deletion vector from the specified file.
- *
- * @param fileName The name of the file from which to read the deletion
vector.
- * @param deletionVectorRange A Pair specifying the range (start position
and size) within the
- * file where the deletion vector data is located.
- * @return The DeletionVector object read from the specified range in the
file.
- * @throws UncheckedIOException If an I/O error occurs while reading from
the file.
- */
- public DeletionVector readDeletionVector(
- String fileName, Pair<Integer, Integer> deletionVectorRange) {
- Path filePath = pathFactory.toPath(fileName);
- try (SeekableInputStream inputStream =
fileIO.newInputStream(filePath)) {
- checkVersion(inputStream);
- inputStream.seek(deletionVectorRange.getLeft());
- DataInputStream dataInputStream = new DataInputStream(inputStream);
- return readDeletionVector(dataInputStream,
deletionVectorRange.getRight());
- } catch (Exception e) {
- throw new RuntimeException(
- "Unable to read deletion vector from file: "
- + filePath
- + ", deletionVectorRange: "
- + deletionVectorRange,
- e);
- }
- }
-
/**
* Write deletion vectors to a new file, the format of this file can be
referenced at: <a
* href="https://cwiki.apache.org/confluence/x/Tws4EQ">PIP-16</a>.
@@ -149,28 +133,15 @@ public class DeletionVectorsIndexFile extends IndexFile {
* data is located.
* @throws UncheckedIOException If an I/O error occurs while writing to
the file.
*/
- public Pair<String, LinkedHashMap<String, Pair<Integer, Integer>>> write(
- Map<String, DeletionVector> input) {
- int size = input.size();
- LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorRanges =
- new LinkedHashMap<>(size);
- Path path = pathFactory.newPath();
- try (DataOutputStream dataOutputStream =
- new DataOutputStream(fileIO.newOutputStream(path, true))) {
- dataOutputStream.writeByte(VERSION_ID_V1);
- for (Map.Entry<String, DeletionVector> entry : input.entrySet()) {
- String key = entry.getKey();
- byte[] valueBytes = entry.getValue().serializeToBytes();
- deletionVectorRanges.put(key, Pair.of(dataOutputStream.size(),
valueBytes.length));
- dataOutputStream.writeInt(valueBytes.length);
- dataOutputStream.write(valueBytes);
- dataOutputStream.writeInt(calculateChecksum(valueBytes));
- }
+ public List<IndexFileMeta> write(Map<String, DeletionVector> input) {
+ try {
+ DeletionVectorIndexFileWriter writer =
+ new DeletionVectorIndexFileWriter(
+ this.fileIO, this.pathFactory,
this.targetSizePerIndexFile);
+ return writer.write(input);
} catch (IOException e) {
- throw new RuntimeException(
- "Unable to write deletion vectors to file: " +
path.getName(), e);
+ throw new RuntimeException("Failed to write deletion vectors.", e);
}
- return Pair.of(path.getName(), deletionVectorRanges);
}
private void checkVersion(InputStream in) throws IOException {
@@ -213,7 +184,7 @@ public class DeletionVectorsIndexFile extends IndexFile {
}
}
- private int calculateChecksum(byte[] bytes) {
+ public static int calculateChecksum(byte[] bytes) {
CRC32 crc = new CRC32();
crc.update(bytes);
return (int) crc.getValue();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
index 461e092e0..6e04f4922 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
@@ -97,9 +97,8 @@ public class DeletionVectorsMaintainer {
*/
public List<IndexFileMeta> prepareCommit() {
if (modified) {
- IndexFileMeta entry =
indexFileHandler.writeDeletionVectorsIndex(deletionVectors);
modified = false;
- return Collections.singletonList(entry);
+ return indexFileHandler.writeDeletionVectorsIndex(deletionVectors);
}
return Collections.emptyList();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index cdb10ae13..684f20952 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -28,7 +28,6 @@ import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.IndexManifestFile;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.utils.IntIterator;
-import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;
import org.apache.paimon.utils.SnapshotManager;
@@ -36,14 +35,13 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Handle index files. */
public class IndexFileHandler {
@@ -193,25 +191,16 @@ public class IndexFileHandler {
}
public Map<String, DeletionVector>
readAllDeletionVectors(List<IndexFileMeta> fileMetas) {
- Map<String, DeletionVector> deletionVectors = new HashMap<>();
for (IndexFileMeta indexFile : fileMetas) {
- if (!indexFile.indexType().equals(DELETION_VECTORS_INDEX)) {
- throw new IllegalArgumentException(
- "Input file is not deletion vectors index " +
indexFile.indexType());
- }
-
deletionVectors.putAll(deletionVectorsIndex.readAllDeletionVectors(indexFile));
+ checkArgument(
+ indexFile.indexType().equals(DELETION_VECTORS_INDEX),
+ "Input file is not deletion vectors index " +
indexFile.indexType());
}
- return deletionVectors;
- }
-
- public IndexFileMeta writeDeletionVectorsIndex(Map<String, DeletionVector>
deletionVectors) {
- Pair<String, LinkedHashMap<String, Pair<Integer, Integer>>> pair =
- deletionVectorsIndex.write(deletionVectors);
- return new IndexFileMeta(
- DELETION_VECTORS_INDEX,
- pair.getLeft(),
- deletionVectorsIndex.fileSize(pair.getLeft()),
- deletionVectors.size(),
- pair.getRight());
+ return deletionVectorsIndex.readAllDeletionVectors(fileMetas);
+ }
+
+ public List<IndexFileMeta> writeDeletionVectorsIndex(
+ Map<String, DeletionVector> deletionVectors) {
+ return deletionVectorsIndex.write(deletionVectors);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
index c43950a44..2b1c98550 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
@@ -21,23 +21,23 @@ package org.apache.paimon.deletionvectors;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.index.IndexFileMeta;
-import org.apache.paimon.utils.Pair;
+import org.apache.paimon.options.MemorySize;
import org.apache.paimon.utils.PathFactory;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.util.HashMap;
-import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
-import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link DeletionVectorsIndexFile}. */
public class DeletionVectorsIndexFileTest {
+
@TempDir java.nio.file.Path tempPath;
@Test
@@ -45,7 +45,8 @@ public class DeletionVectorsIndexFileTest {
PathFactory pathFactory = getPathFactory();
DeletionVectorsIndexFile deletionVectorsIndexFile =
- new DeletionVectorsIndexFile(LocalFileIO.create(),
pathFactory);
+ new DeletionVectorsIndexFile(
+ LocalFileIO.create(), pathFactory,
MemorySize.ofBytes(Long.MAX_VALUE));
// write
HashMap<String, DeletionVector> deleteMap = new HashMap<>();
@@ -62,28 +63,19 @@ public class DeletionVectorsIndexFileTest {
index3.delete(3);
deleteMap.put("file33.parquet", index3);
- Pair<String, LinkedHashMap<String, Pair<Integer, Integer>>> pair =
- deletionVectorsIndexFile.write(deleteMap);
- String fileName = pair.getLeft();
- LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorRanges =
pair.getRight();
+ List<IndexFileMeta> indexFiles =
deletionVectorsIndexFile.write(deleteMap);
+ assertThat(indexFiles.size()).isEqualTo(1);
// read
- IndexFileMeta indexFileMeta =
- new IndexFileMeta(DELETION_VECTORS_INDEX, fileName, 0L, 0L,
deletionVectorRanges);
+ String fileName = indexFiles.get(0).fileName();
Map<String, DeletionVector> actualDeleteMap =
- deletionVectorsIndexFile.readAllDeletionVectors(indexFileMeta);
+ deletionVectorsIndexFile.readAllDeletionVectors(indexFiles);
assertThat(actualDeleteMap.get("file1.parquet").isDeleted(1)).isTrue();
assertThat(actualDeleteMap.get("file1.parquet").isDeleted(2)).isFalse();
assertThat(actualDeleteMap.get("file2.parquet").isDeleted(2)).isTrue();
assertThat(actualDeleteMap.get("file2.parquet").isDeleted(3)).isTrue();
assertThat(actualDeleteMap.get("file33.parquet").isDeleted(3)).isTrue();
- DeletionVector file1DeletionVector =
- deletionVectorsIndexFile.readDeletionVector(
- fileName, deletionVectorRanges.get("file1.parquet"));
- assertThat(file1DeletionVector.isDeleted(1)).isTrue();
- assertThat(file1DeletionVector.isDeleted(2)).isFalse();
-
// delete
deletionVectorsIndexFile.delete(fileName);
assertThat(deletionVectorsIndexFile.exists(fileName)).isFalse();
@@ -93,7 +85,8 @@ public class DeletionVectorsIndexFileTest {
public void testReadDvIndexWithCopiousDv() {
PathFactory pathFactory = getPathFactory();
DeletionVectorsIndexFile deletionVectorsIndexFile =
- new DeletionVectorsIndexFile(LocalFileIO.create(),
pathFactory);
+ new DeletionVectorsIndexFile(
+ LocalFileIO.create(), pathFactory,
MemorySize.ofBytes(Long.MAX_VALUE));
// write
Random random = new Random();
@@ -104,14 +97,11 @@ public class DeletionVectorsIndexFileTest {
deleteMap.put(String.format("file%s.parquet", i), index);
}
- Pair<String, LinkedHashMap<String, Pair<Integer, Integer>>> pair =
- deletionVectorsIndexFile.write(deleteMap);
-
// read
- IndexFileMeta indexFileMeta =
- new IndexFileMeta(DELETION_VECTORS_INDEX, pair.getLeft(), 0L,
0L, pair.getRight());
+ List<IndexFileMeta> indexFiles =
deletionVectorsIndexFile.write(deleteMap);
+ assertThat(indexFiles.size()).isEqualTo(1);
Map<String, DeletionVector> dvs =
- deletionVectorsIndexFile.readAllDeletionVectors(indexFileMeta);
+ deletionVectorsIndexFile.readAllDeletionVectors(indexFiles);
assertThat(dvs.size()).isEqualTo(100000);
}
@@ -119,26 +109,84 @@ public class DeletionVectorsIndexFileTest {
public void testReadDvIndexWithEnormousDv() {
PathFactory pathFactory = getPathFactory();
DeletionVectorsIndexFile deletionVectorsIndexFile =
- new DeletionVectorsIndexFile(LocalFileIO.create(),
pathFactory);
+ new DeletionVectorsIndexFile(
+ LocalFileIO.create(), pathFactory,
MemorySize.ofBytes(Long.MAX_VALUE));
// write
Random random = new Random();
- HashMap<String, DeletionVector> deleteMap = new HashMap<>();
- BitmapDeletionVector index = new BitmapDeletionVector();
- // dv index's size is about 20M
- for (int i = 0; i < 10000000; i++) {
- index.delete(random.nextInt(Integer.MAX_VALUE));
+ Map<String, DeletionVector> fileToDV = new HashMap<>();
+ Map<String, Long> fileToCardinality = new HashMap<>();
+ for (int i = 0; i < 5; i++) {
+ BitmapDeletionVector index = new BitmapDeletionVector();
+ // the size of dv index file is about 20M
+ for (int j = 0; j < 10000000; j++) {
+ index.delete(random.nextInt(Integer.MAX_VALUE));
+ }
+ fileToCardinality.put("f" + i, index.getCardinality());
+ fileToDV.put("f" + i, index);
}
- deleteMap.put("largeFile.parquet", index);
- Pair<String, LinkedHashMap<String, Pair<Integer, Integer>>> pair =
- deletionVectorsIndexFile.write(deleteMap);
+ List<IndexFileMeta> indexFiles =
deletionVectorsIndexFile.write(fileToDV);
// read
- IndexFileMeta indexFileMeta =
- new IndexFileMeta(DELETION_VECTORS_INDEX, pair.getLeft(), 0L,
0L, pair.getRight());
+ assertThat(indexFiles.size()).isEqualTo(1);
+ Map<String, DeletionVector> dvs =
+ deletionVectorsIndexFile.readAllDeletionVectors(indexFiles);
+ assertThat(dvs.size()).isEqualTo(5);
+ for (String file : dvs.keySet()) {
+
assertThat(dvs.get(file).getCardinality()).isEqualTo(fileToCardinality.get(file));
+ }
+ }
+
+ @Test
+ public void testWriteDVIndexWithLimitedTargetSizePerIndexFile() {
+ PathFactory pathFactory = getPathFactory();
+ DeletionVectorsIndexFile deletionVectorsIndexFile =
+ new DeletionVectorsIndexFile(
+ LocalFileIO.create(), pathFactory,
MemorySize.parse("2MB"));
+
+ // write1
+ Random random = new Random();
+ Map<String, DeletionVector> fileToDV = new HashMap<>();
+ Map<String, Long> fileToCardinality = new HashMap<>();
+ for (int i = 0; i < 5; i++) {
+ BitmapDeletionVector index = new BitmapDeletionVector();
+ // the size of dv index file is about 1.7M
+ for (int j = 0; j < 750000; j++) {
+ index.delete(random.nextInt(Integer.MAX_VALUE));
+ }
+ fileToCardinality.put("f" + i, index.getCardinality());
+ fileToDV.put("f" + i, index);
+ }
+ List<IndexFileMeta> indexFiles =
deletionVectorsIndexFile.write(fileToDV);
+
+ // assert 1
+ assertThat(indexFiles.size()).isEqualTo(5);
Map<String, DeletionVector> dvs =
- deletionVectorsIndexFile.readAllDeletionVectors(indexFileMeta);
- assertThat(dvs.size()).isEqualTo(1);
+ deletionVectorsIndexFile.readAllDeletionVectors(indexFiles);
+ for (String file : dvs.keySet()) {
+
assertThat(dvs.get(file).getCardinality()).isEqualTo(fileToCardinality.get(file));
+ }
+
+ // write2
+ fileToDV.clear();
+ fileToCardinality.clear();
+ for (int i = 0; i < 10; i++) {
+ BitmapDeletionVector index = new BitmapDeletionVector();
+ // the size of dv index file is about 0.42M
+ for (int j = 0; j < 100000; j++) {
+ index.delete(random.nextInt(Integer.MAX_VALUE));
+ }
+ fileToCardinality.put("f" + i, index.getCardinality());
+ fileToDV.put("f" + i, index);
+ }
+ indexFiles = deletionVectorsIndexFile.write(fileToDV);
+
+ // assert 2
+ assertThat(indexFiles.size()).isGreaterThan(1);
+ dvs = deletionVectorsIndexFile.readAllDeletionVectors(indexFiles);
+ for (String file : dvs.keySet()) {
+
assertThat(dvs.get(file).getCardinality()).isEqualTo(fileToCardinality.get(file));
+ }
}
private PathFactory getPathFactory() {