This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c0a5f22d0 [core] Introduce DeletionVectorIndexFileMaintainer (#3387)
c0a5f22d0 is described below
commit c0a5f22d0fb2442a387a2f2f61491b4419e015d0
Author: Yann Byron <[email protected]>
AuthorDate: Fri May 24 13:05:31 2024 +0800
[core] Introduce DeletionVectorIndexFileMaintainer (#3387)
---
.../DeletionVectorIndexFileMaintainer.java | 116 +++++++++++++++++++++
.../deletionvectors/DeletionVectorsIndexFile.java | 27 +++++
.../org/apache/paimon/index/IndexFileHandler.java | 12 +++
.../org/apache/paimon/TestAppendFileStore.java | 9 +-
.../DeletionVectorIndexFileMaintainerTest.java | 115 ++++++++++++++++++++
.../paimon/operation/FileStoreCommitTest.java | 16 +--
6 files changed, 286 insertions(+), 9 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
new file mode 100644
index 000000000..3daca576b
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.table.source.DeletionFile;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** DeletionVectorIndexFileMaintainer. */
+public class DeletionVectorIndexFileMaintainer {
+
+ private final IndexFileHandler indexFileHandler;
+
+ private final Map<String, IndexManifestEntry> indexNameToEntry = new
HashMap<>();
+
+ private final Map<String, Map<String, DeletionFile>>
indexFileToDeletionFiles = new HashMap<>();
+
+ private final Set<String> touchedIndexFiles = new HashSet<>();
+
+ public DeletionVectorIndexFileMaintainer(
+ IndexFileHandler indexFileHandler, Map<String, DeletionFile>
dataFileToDeletionFiles) {
+ this.indexFileHandler = indexFileHandler;
+ List<String> touchedIndexFileNames =
+ dataFileToDeletionFiles.values().stream()
+ .map(deletionFile -> new
Path(deletionFile.path()).getName())
+ .distinct()
+ .collect(Collectors.toList());
+ indexFileHandler.scan().stream()
+ .filter(
+ indexManifestEntry ->
+ touchedIndexFileNames.contains(
+
indexManifestEntry.indexFile().fileName()))
+ .forEach(entry ->
indexNameToEntry.put(entry.indexFile().fileName(), entry));
+
+ for (String dataFile : dataFileToDeletionFiles.keySet()) {
+ DeletionFile deletionFile = dataFileToDeletionFiles.get(dataFile);
+ String indexFileName = new Path(deletionFile.path()).getName();
+ if (!indexFileToDeletionFiles.containsKey(indexFileName)) {
+ indexFileToDeletionFiles.put(indexFileName, new HashMap<>());
+ }
+ indexFileToDeletionFiles.get(indexFileName).put(dataFile,
deletionFile);
+ }
+ }
+
+ public void notifyDeletionFiles(Map<String, DeletionFile>
dataFileToDeletionFiles) {
+ for (String dataFile : dataFileToDeletionFiles.keySet()) {
+ DeletionFile deletionFile = dataFileToDeletionFiles.get(dataFile);
+ String indexFileName = new Path(deletionFile.path()).getName();
+ touchedIndexFiles.add(indexFileName);
+ if (indexFileToDeletionFiles.containsKey(indexFileName)) {
+ indexFileToDeletionFiles.get(indexFileName).remove(dataFile);
+ if (indexFileToDeletionFiles.get(indexFileName).isEmpty()) {
+ indexFileToDeletionFiles.remove(indexFileName);
+ indexNameToEntry.remove(indexFileName);
+ }
+ }
+ }
+ }
+
+ public List<IndexManifestEntry> writeUnchangedDeletionVector() {
+ DeletionVectorsIndexFile deletionVectorsIndexFile =
indexFileHandler.deletionVectorsIndex();
+ List<IndexManifestEntry> newIndexEntries = new ArrayList<>();
+ for (String indexFile : indexFileToDeletionFiles.keySet()) {
+ if (touchedIndexFiles.contains(indexFile)) {
+ IndexManifestEntry oldEntry = indexNameToEntry.get(indexFile);
+
+ // write unchanged deletion vector.
+ Map<String, DeletionFile> dataFileToDeletionFiles =
+ indexFileToDeletionFiles.get(indexFile);
+ if (!dataFileToDeletionFiles.isEmpty()) {
+ IndexFileMeta newIndexFile =
+ indexFileHandler.writeDeletionVectorsIndex(
+
deletionVectorsIndexFile.readDeletionVector(
+ dataFileToDeletionFiles));
+ newIndexEntries.add(
+ new IndexManifestEntry(
+ FileKind.ADD,
+ oldEntry.partition(),
+ oldEntry.bucket(),
+ newIndexFile));
+ }
+
+ // mark the touched index file as removed.
+ newIndexEntries.add(oldEntry.toDeleteEntry());
+ }
+ }
+ return newIndexEntries;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
index 3d186cee2..0207250d1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
@@ -23,6 +23,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.index.IndexFile;
import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;
@@ -36,6 +37,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.zip.CRC32;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
/** DeletionVectors index file. */
@@ -83,6 +85,31 @@ public class DeletionVectorsIndexFile extends IndexFile {
return deletionVectors;
}
+ /** Reads deletion vectors from a list of DeletionFile which belong to a
same index file. */
+ public Map<String, DeletionVector> readDeletionVector(
+ Map<String, DeletionFile> dataFileToDeletionFiles) {
+ Map<String, DeletionVector> deletionVectors = new HashMap<>();
+ if (dataFileToDeletionFiles.isEmpty()) {
+ return deletionVectors;
+ }
+
+ String indexFile =
dataFileToDeletionFiles.values().stream().findAny().get().path();
+ try (SeekableInputStream inputStream = fileIO.newInputStream(new
Path(indexFile))) {
+ checkVersion(inputStream);
+ for (String dataFile : dataFileToDeletionFiles.keySet()) {
+ DeletionFile deletionFile =
dataFileToDeletionFiles.get(dataFile);
+ checkArgument(deletionFile.path().equals(indexFile));
+ inputStream.seek(deletionFile.offset());
+ DataInputStream dataInputStream = new
DataInputStream(inputStream);
+ deletionVectors.put(
+ dataFile, readDeletionVector(dataInputStream, (int)
deletionFile.length()));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to read deletion vector from
file: " + indexFile, e);
+ }
+ return deletionVectors;
+ }
+
/**
* Reads a single deletion vector from the specified file.
*
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index f3138cb02..2d3dee20e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -65,6 +65,18 @@ public class IndexFileHandler {
this.deletionVectorsIndex = deletionVectorsIndex;
}
+ public DeletionVectorsIndexFile deletionVectorsIndex() {
+ return this.deletionVectorsIndex;
+ }
+
+ public List<IndexManifestEntry> scan() {
+ Snapshot snapshot = snapshotManager.latestSnapshot();
+ if (snapshot == null || snapshot.indexManifest() == null) {
+ return Collections.emptyList();
+ }
+ return indexManifestFile.read(snapshot.indexManifest());
+ }
+
public Optional<IndexFileMeta> scanHashIndex(long snapshotId, BinaryRow
partition, int bucket) {
List<IndexFileMeta> result = scan(snapshotId, HASH_INDEX, partition,
bucket);
if (result.size() > 1) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
index 11612ac1b..fd9f78473 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
@@ -19,6 +19,7 @@
package org.apache.paimon;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.DeletionVectorIndexFileMaintainer;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileIOFinder;
@@ -39,6 +40,7 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.TraceableFileIO;
@@ -117,6 +119,11 @@ public class TestAppendFileStore extends
AppendOnlyFileStore {
return fileHandler.scan(lastSnapshotId, DELETION_VECTORS_INDEX,
partition, bucket);
}
+ public DeletionVectorIndexFileMaintainer createDVIFMaintainer(
+ Map<String, DeletionFile> dataFileToDeletionFiles) {
+ return new DeletionVectorIndexFileMaintainer(fileHandler,
dataFileToDeletionFiles);
+ }
+
public DeletionVectorsMaintainer createOrRestoreDVMaintainer(BinaryRow
partition, int bucket) {
Long lastSnapshotId = snapshotManager().latestSnapshotId();
DeletionVectorsMaintainer.Factory factory =
@@ -124,7 +131,7 @@ public class TestAppendFileStore extends
AppendOnlyFileStore {
return factory.createOrRestore(lastSnapshotId, partition, bucket);
}
- public CommitMessage writeDVIndexFiles(
+ public CommitMessageImpl writeDVIndexFiles(
BinaryRow partition, int bucket, Map<String, List<Integer>>
dataFileToPositions) {
DeletionVectorsMaintainer dvMaintainer =
createOrRestoreDVMaintainer(partition, bucket);
for (Map.Entry<String, List<Integer>> entry :
dataFileToPositions.entrySet()) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java
new file mode 100644
index 000000000..5da65c782
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors;
+
+import org.apache.paimon.TestAppendFileStore;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.source.DeletionFile;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.PathFactory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for DeletionVectorIndexFileMaintainer. */
+public class DeletionVectorIndexFileMaintainerTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @Test
+ public void test() throws Exception {
+ TestAppendFileStore store =
TestAppendFileStore.createAppendStore(tempDir, new HashMap<>());
+
+ Map<String, List<Integer>> dvs = new HashMap<>();
+ dvs.put("f1", Arrays.asList(1, 3, 5));
+ dvs.put("f2", Arrays.asList(2, 4, 6));
+ CommitMessageImpl commitMessage1 =
store.writeDVIndexFiles(BinaryRow.EMPTY_ROW, 0, dvs);
+ CommitMessageImpl commitMessage2 =
+ store.writeDVIndexFiles(
+ BinaryRow.EMPTY_ROW,
+ 1,
+ Collections.singletonMap("f3", Arrays.asList(1, 2,
3)));
+ store.commit(commitMessage1, commitMessage2);
+
+ PathFactory indexPathFactory = store.pathFactory().indexFileFactory();
+ Map<String, DeletionFile> dataFileToDeletionFiles = new HashMap<>();
+ dataFileToDeletionFiles.putAll(
+ createDeletionFileMapFromIndexFileMetas(
+ indexPathFactory,
commitMessage1.indexIncrement().newIndexFiles()));
+ dataFileToDeletionFiles.putAll(
+ createDeletionFileMapFromIndexFileMetas(
+ indexPathFactory,
commitMessage2.indexIncrement().newIndexFiles()));
+
+ DeletionVectorIndexFileMaintainer dvIFMaintainer =
+ store.createDVIFMaintainer(dataFileToDeletionFiles);
+
+ // no dv should be rewritten, because nothing is changed.
+ List<IndexManifestEntry> res =
dvIFMaintainer.writeUnchangedDeletionVector();
+ assertThat(res.size()).isEqualTo(0);
+
+ // no dv should be rewritten, because all the deletion vectors have
been updated in the
+ // index file that contains the dv of f3.
+ dvIFMaintainer.notifyDeletionFiles(
+ Collections.singletonMap("f3",
dataFileToDeletionFiles.get("f3")));
+ res = dvIFMaintainer.writeUnchangedDeletionVector();
+ assertThat(res.size()).isEqualTo(0);
+
+ // the dv of f1 and f2 are in one index file, and the dv of f1 is
updated.
+ // the dv of f2 need to be rewritten, and this index file should be
marked as REMOVE.
+ dvIFMaintainer.notifyDeletionFiles(
+ Collections.singletonMap("f1",
dataFileToDeletionFiles.get("f1")));
+ res = dvIFMaintainer.writeUnchangedDeletionVector();
+ assertThat(res.size()).isEqualTo(2);
+ IndexManifestEntry entry =
+ res.stream().filter(file -> file.kind() ==
FileKind.ADD).findAny().get();
+
assertThat(entry.indexFile().deletionVectorsRanges().containsKey("f2")).isTrue();
+ entry = res.stream().filter(file -> file.kind() ==
FileKind.DELETE).findAny().get();
+ assertThat(entry.indexFile())
+
.isEqualTo(commitMessage1.indexIncrement().newIndexFiles().get(0));
+ }
+
+ private Map<String, DeletionFile> createDeletionFileMapFromIndexFileMetas(
+ PathFactory indexPathFactory, List<IndexFileMeta> fileMetas) {
+ Map<String, DeletionFile> dataFileToDeletionFiles = new HashMap<>();
+ for (IndexFileMeta indexFileMeta : fileMetas) {
+ for (Map.Entry<String, Pair<Integer, Integer>> range :
+ indexFileMeta.deletionVectorsRanges().entrySet()) {
+ dataFileToDeletionFiles.put(
+ range.getKey(),
+ new DeletionFile(
+
indexPathFactory.toPath(indexFileMeta.fileName()).toString(),
+ range.getValue().getLeft(),
+ range.getValue().getRight()));
+ }
+ }
+ return dataFileToDeletionFiles;
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 03630e71f..7953bd6c6 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -854,12 +854,12 @@ public class FileStoreCommitTest {
TestAppendFileStore store =
TestAppendFileStore.createAppendStore(tempDir, new HashMap<>());
// commit 1
- CommitMessage commitMessage1 =
+ CommitMessageImpl commitMessage1 =
store.writeDVIndexFiles(
BinaryRow.EMPTY_ROW,
0,
Collections.singletonMap("f1", Arrays.asList(1, 3)));
- CommitMessage commitMessage2 =
+ CommitMessageImpl commitMessage2 =
store.writeDVIndexFiles(
BinaryRow.EMPTY_ROW,
0,
@@ -880,18 +880,18 @@ public class FileStoreCommitTest {
CommitMessage commitMessage3 =
store.writeDVIndexFiles(
BinaryRow.EMPTY_ROW, 0, Collections.singletonMap("f2",
Arrays.asList(3)));
- CommitMessage commitMessage4 =
- store.removeIndexFiles(
- BinaryRow.EMPTY_ROW,
- 0,
- ((CommitMessageImpl)
commitMessage1).indexIncrement().newIndexFiles());
+ List<IndexFileMeta> deleted =
+ new
ArrayList<>(commitMessage1.indexIncrement().newIndexFiles());
+ deleted.addAll(commitMessage2.indexIncrement().newIndexFiles());
+ CommitMessage commitMessage4 =
store.removeIndexFiles(BinaryRow.EMPTY_ROW, 0, deleted);
store.commit(commitMessage3, commitMessage4);
// assert 2
- assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW,
0).size()).isEqualTo(2);
+ assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW,
0).size()).isEqualTo(1);
maintainer = store.createOrRestoreDVMaintainer(BinaryRow.EMPTY_ROW, 0);
dvs = maintainer.deletionVectors();
assertThat(dvs.size()).isEqualTo(2);
+ assertThat(dvs.get("f1").isDeleted(3)).isTrue();
assertThat(dvs.get("f2").isDeleted(3)).isTrue();
}