This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c0a5f22d0 [core] Introduce DeletionVectorIndexFileMaintainer (#3387)
c0a5f22d0 is described below

commit c0a5f22d0fb2442a387a2f2f61491b4419e015d0
Author: Yann Byron <[email protected]>
AuthorDate: Fri May 24 13:05:31 2024 +0800

    [core] Introduce DeletionVectorIndexFileMaintainer (#3387)
---
 .../DeletionVectorIndexFileMaintainer.java         | 116 +++++++++++++++++++++
 .../deletionvectors/DeletionVectorsIndexFile.java  |  27 +++++
 .../org/apache/paimon/index/IndexFileHandler.java  |  12 +++
 .../org/apache/paimon/TestAppendFileStore.java     |   9 +-
 .../DeletionVectorIndexFileMaintainerTest.java     | 115 ++++++++++++++++++++
 .../paimon/operation/FileStoreCommitTest.java      |  16 +--
 6 files changed, 286 insertions(+), 9 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
new file mode 100644
index 000000000..3daca576b
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.table.source.DeletionFile;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** DeletionVectorIndexFileMaintainer. */
+public class DeletionVectorIndexFileMaintainer {
+
+    private final IndexFileHandler indexFileHandler;
+
+    private final Map<String, IndexManifestEntry> indexNameToEntry = new 
HashMap<>();
+
+    private final Map<String, Map<String, DeletionFile>> 
indexFileToDeletionFiles = new HashMap<>();
+
+    private final Set<String> touchedIndexFiles = new HashSet<>();
+
+    public DeletionVectorIndexFileMaintainer(
+            IndexFileHandler indexFileHandler, Map<String, DeletionFile> 
dataFileToDeletionFiles) {
+        this.indexFileHandler = indexFileHandler;
+        List<String> touchedIndexFileNames =
+                dataFileToDeletionFiles.values().stream()
+                        .map(deletionFile -> new 
Path(deletionFile.path()).getName())
+                        .distinct()
+                        .collect(Collectors.toList());
+        indexFileHandler.scan().stream()
+                .filter(
+                        indexManifestEntry ->
+                                touchedIndexFileNames.contains(
+                                        
indexManifestEntry.indexFile().fileName()))
+                .forEach(entry -> 
indexNameToEntry.put(entry.indexFile().fileName(), entry));
+
+        for (String dataFile : dataFileToDeletionFiles.keySet()) {
+            DeletionFile deletionFile = dataFileToDeletionFiles.get(dataFile);
+            String indexFileName = new Path(deletionFile.path()).getName();
+            if (!indexFileToDeletionFiles.containsKey(indexFileName)) {
+                indexFileToDeletionFiles.put(indexFileName, new HashMap<>());
+            }
+            indexFileToDeletionFiles.get(indexFileName).put(dataFile, 
deletionFile);
+        }
+    }
+
+    public void notifyDeletionFiles(Map<String, DeletionFile> 
dataFileToDeletionFiles) {
+        for (String dataFile : dataFileToDeletionFiles.keySet()) {
+            DeletionFile deletionFile = dataFileToDeletionFiles.get(dataFile);
+            String indexFileName = new Path(deletionFile.path()).getName();
+            touchedIndexFiles.add(indexFileName);
+            if (indexFileToDeletionFiles.containsKey(indexFileName)) {
+                indexFileToDeletionFiles.get(indexFileName).remove(dataFile);
+                if (indexFileToDeletionFiles.get(indexFileName).isEmpty()) {
+                    indexFileToDeletionFiles.remove(indexFileName);
+                    indexNameToEntry.remove(indexFileName);
+                }
+            }
+        }
+    }
+
+    public List<IndexManifestEntry> writeUnchangedDeletionVector() {
+        DeletionVectorsIndexFile deletionVectorsIndexFile = 
indexFileHandler.deletionVectorsIndex();
+        List<IndexManifestEntry> newIndexEntries = new ArrayList<>();
+        for (String indexFile : indexFileToDeletionFiles.keySet()) {
+            if (touchedIndexFiles.contains(indexFile)) {
+                IndexManifestEntry oldEntry = indexNameToEntry.get(indexFile);
+
+                // write unchanged deletion vector.
+                Map<String, DeletionFile> dataFileToDeletionFiles =
+                        indexFileToDeletionFiles.get(indexFile);
+                if (!dataFileToDeletionFiles.isEmpty()) {
+                    IndexFileMeta newIndexFile =
+                            indexFileHandler.writeDeletionVectorsIndex(
+                                    
deletionVectorsIndexFile.readDeletionVector(
+                                            dataFileToDeletionFiles));
+                    newIndexEntries.add(
+                            new IndexManifestEntry(
+                                    FileKind.ADD,
+                                    oldEntry.partition(),
+                                    oldEntry.bucket(),
+                                    newIndexFile));
+                }
+
+                // mark the touched index file as removed.
+                newIndexEntries.add(oldEntry.toDeleteEntry());
+            }
+        }
+        return newIndexEntries;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
index 3d186cee2..0207250d1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
@@ -23,6 +23,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.index.IndexFile;
 import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.table.source.DeletionFile;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.PathFactory;
 
@@ -36,6 +37,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.zip.CRC32;
 
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /** DeletionVectors index file. */
@@ -83,6 +85,31 @@ public class DeletionVectorsIndexFile extends IndexFile {
         return deletionVectors;
     }
 
+    /** Reads deletion vectors from a list of DeletionFile which belong to a 
same index file. */
+    public Map<String, DeletionVector> readDeletionVector(
+            Map<String, DeletionFile> dataFileToDeletionFiles) {
+        Map<String, DeletionVector> deletionVectors = new HashMap<>();
+        if (dataFileToDeletionFiles.isEmpty()) {
+            return deletionVectors;
+        }
+
+        String indexFile = 
dataFileToDeletionFiles.values().stream().findAny().get().path();
+        try (SeekableInputStream inputStream = fileIO.newInputStream(new 
Path(indexFile))) {
+            checkVersion(inputStream);
+            for (String dataFile : dataFileToDeletionFiles.keySet()) {
+                DeletionFile deletionFile = 
dataFileToDeletionFiles.get(dataFile);
+                checkArgument(deletionFile.path().equals(indexFile));
+                inputStream.seek(deletionFile.offset());
+                DataInputStream dataInputStream = new 
DataInputStream(inputStream);
+                deletionVectors.put(
+                        dataFile, readDeletionVector(dataInputStream, (int) 
deletionFile.length()));
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to read deletion vector from 
file: " + indexFile, e);
+        }
+        return deletionVectors;
+    }
+
     /**
      * Reads a single deletion vector from the specified file.
      *
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index f3138cb02..2d3dee20e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -65,6 +65,18 @@ public class IndexFileHandler {
         this.deletionVectorsIndex = deletionVectorsIndex;
     }
 
+    public DeletionVectorsIndexFile deletionVectorsIndex() {
+        return this.deletionVectorsIndex;
+    }
+
+    public List<IndexManifestEntry> scan() {
+        Snapshot snapshot = snapshotManager.latestSnapshot();
+        if (snapshot == null || snapshot.indexManifest() == null) {
+            return Collections.emptyList();
+        }
+        return indexManifestFile.read(snapshot.indexManifest());
+    }
+
     public Optional<IndexFileMeta> scanHashIndex(long snapshotId, BinaryRow 
partition, int bucket) {
         List<IndexFileMeta> result = scan(snapshotId, HASH_INDEX, partition, 
bucket);
         if (result.size() > 1) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
index 11612ac1b..fd9f78473 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
@@ -19,6 +19,7 @@
 package org.apache.paimon;
 
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.DeletionVectorIndexFileMaintainer;
 import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileIOFinder;
@@ -39,6 +40,7 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.source.DeletionFile;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.TraceableFileIO;
 
@@ -117,6 +119,11 @@ public class TestAppendFileStore extends 
AppendOnlyFileStore {
         return fileHandler.scan(lastSnapshotId, DELETION_VECTORS_INDEX, 
partition, bucket);
     }
 
+    public DeletionVectorIndexFileMaintainer createDVIFMaintainer(
+            Map<String, DeletionFile> dataFileToDeletionFiles) {
+        return new DeletionVectorIndexFileMaintainer(fileHandler, 
dataFileToDeletionFiles);
+    }
+
     public DeletionVectorsMaintainer createOrRestoreDVMaintainer(BinaryRow 
partition, int bucket) {
         Long lastSnapshotId = snapshotManager().latestSnapshotId();
         DeletionVectorsMaintainer.Factory factory =
@@ -124,7 +131,7 @@ public class TestAppendFileStore extends 
AppendOnlyFileStore {
         return factory.createOrRestore(lastSnapshotId, partition, bucket);
     }
 
-    public CommitMessage writeDVIndexFiles(
+    public CommitMessageImpl writeDVIndexFiles(
             BinaryRow partition, int bucket, Map<String, List<Integer>> 
dataFileToPositions) {
         DeletionVectorsMaintainer dvMaintainer = 
createOrRestoreDVMaintainer(partition, bucket);
         for (Map.Entry<String, List<Integer>> entry : 
dataFileToPositions.entrySet()) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java
new file mode 100644
index 000000000..5da65c782
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors;
+
+import org.apache.paimon.TestAppendFileStore;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.source.DeletionFile;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.PathFactory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for DeletionVectorIndexFileMaintainer. */
+public class DeletionVectorIndexFileMaintainerTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @Test
+    public void test() throws Exception {
+        TestAppendFileStore store = 
TestAppendFileStore.createAppendStore(tempDir, new HashMap<>());
+
+        Map<String, List<Integer>> dvs = new HashMap<>();
+        dvs.put("f1", Arrays.asList(1, 3, 5));
+        dvs.put("f2", Arrays.asList(2, 4, 6));
+        CommitMessageImpl commitMessage1 = 
store.writeDVIndexFiles(BinaryRow.EMPTY_ROW, 0, dvs);
+        CommitMessageImpl commitMessage2 =
+                store.writeDVIndexFiles(
+                        BinaryRow.EMPTY_ROW,
+                        1,
+                        Collections.singletonMap("f3", Arrays.asList(1, 2, 
3)));
+        store.commit(commitMessage1, commitMessage2);
+
+        PathFactory indexPathFactory = store.pathFactory().indexFileFactory();
+        Map<String, DeletionFile> dataFileToDeletionFiles = new HashMap<>();
+        dataFileToDeletionFiles.putAll(
+                createDeletionFileMapFromIndexFileMetas(
+                        indexPathFactory, 
commitMessage1.indexIncrement().newIndexFiles()));
+        dataFileToDeletionFiles.putAll(
+                createDeletionFileMapFromIndexFileMetas(
+                        indexPathFactory, 
commitMessage2.indexIncrement().newIndexFiles()));
+
+        DeletionVectorIndexFileMaintainer dvIFMaintainer =
+                store.createDVIFMaintainer(dataFileToDeletionFiles);
+
+        // no dv should be rewritten, because nothing is changed.
+        List<IndexManifestEntry> res = 
dvIFMaintainer.writeUnchangedDeletionVector();
+        assertThat(res.size()).isEqualTo(0);
+
+        // no dv should be rewritten, because all the deletion vectors have 
been updated in the
+        // index file that contains the dv of f3.
+        dvIFMaintainer.notifyDeletionFiles(
+                Collections.singletonMap("f3", 
dataFileToDeletionFiles.get("f3")));
+        res = dvIFMaintainer.writeUnchangedDeletionVector();
+        assertThat(res.size()).isEqualTo(0);
+
+        // the dv of f1 and f2 are in one index file, and the dv of f1 is 
updated.
+        // the dv of f2 need to be rewritten, and this index file should be 
marked as REMOVE.
+        dvIFMaintainer.notifyDeletionFiles(
+                Collections.singletonMap("f1", 
dataFileToDeletionFiles.get("f1")));
+        res = dvIFMaintainer.writeUnchangedDeletionVector();
+        assertThat(res.size()).isEqualTo(2);
+        IndexManifestEntry entry =
+                res.stream().filter(file -> file.kind() == 
FileKind.ADD).findAny().get();
+        
assertThat(entry.indexFile().deletionVectorsRanges().containsKey("f2")).isTrue();
+        entry = res.stream().filter(file -> file.kind() == 
FileKind.DELETE).findAny().get();
+        assertThat(entry.indexFile())
+                
.isEqualTo(commitMessage1.indexIncrement().newIndexFiles().get(0));
+    }
+
+    private Map<String, DeletionFile> createDeletionFileMapFromIndexFileMetas(
+            PathFactory indexPathFactory, List<IndexFileMeta> fileMetas) {
+        Map<String, DeletionFile> dataFileToDeletionFiles = new HashMap<>();
+        for (IndexFileMeta indexFileMeta : fileMetas) {
+            for (Map.Entry<String, Pair<Integer, Integer>> range :
+                    indexFileMeta.deletionVectorsRanges().entrySet()) {
+                dataFileToDeletionFiles.put(
+                        range.getKey(),
+                        new DeletionFile(
+                                
indexPathFactory.toPath(indexFileMeta.fileName()).toString(),
+                                range.getValue().getLeft(),
+                                range.getValue().getRight()));
+            }
+        }
+        return dataFileToDeletionFiles;
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 03630e71f..7953bd6c6 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -854,12 +854,12 @@ public class FileStoreCommitTest {
         TestAppendFileStore store = 
TestAppendFileStore.createAppendStore(tempDir, new HashMap<>());
 
         // commit 1
-        CommitMessage commitMessage1 =
+        CommitMessageImpl commitMessage1 =
                 store.writeDVIndexFiles(
                         BinaryRow.EMPTY_ROW,
                         0,
                         Collections.singletonMap("f1", Arrays.asList(1, 3)));
-        CommitMessage commitMessage2 =
+        CommitMessageImpl commitMessage2 =
                 store.writeDVIndexFiles(
                         BinaryRow.EMPTY_ROW,
                         0,
@@ -880,18 +880,18 @@ public class FileStoreCommitTest {
         CommitMessage commitMessage3 =
                 store.writeDVIndexFiles(
                         BinaryRow.EMPTY_ROW, 0, Collections.singletonMap("f2", 
Arrays.asList(3)));
-        CommitMessage commitMessage4 =
-                store.removeIndexFiles(
-                        BinaryRow.EMPTY_ROW,
-                        0,
-                        ((CommitMessageImpl) 
commitMessage1).indexIncrement().newIndexFiles());
+        List<IndexFileMeta> deleted =
+                new 
ArrayList<>(commitMessage1.indexIncrement().newIndexFiles());
+        deleted.addAll(commitMessage2.indexIncrement().newIndexFiles());
+        CommitMessage commitMessage4 = 
store.removeIndexFiles(BinaryRow.EMPTY_ROW, 0, deleted);
         store.commit(commitMessage3, commitMessage4);
 
         // assert 2
-        assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW, 
0).size()).isEqualTo(2);
+        assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW, 
0).size()).isEqualTo(1);
         maintainer = store.createOrRestoreDVMaintainer(BinaryRow.EMPTY_ROW, 0);
         dvs = maintainer.deletionVectors();
         assertThat(dvs.size()).isEqualTo(2);
+        assertThat(dvs.get("f1").isDeleted(3)).isTrue();
         assertThat(dvs.get("f2").isDeleted(3)).isTrue();
     }
 

Reply via email to