This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e8acd22b9 [core] Introduce deletedIndexFiles in IndexIncrement and
allow multi index files in same bucket (#3368)
e8acd22b9 is described below
commit e8acd22b995d21ac37cc492fa8c1c66abcb5d44d
Author: Yann Byron <[email protected]>
AuthorDate: Wed May 22 21:59:56 2024 +0800
[core] Introduce deletedIndexFiles in IndexIncrement and allow multi index
files in same bucket (#3368)
---
.../java/org/apache/paimon/AbstractFileStore.java | 3 +-
.../deletionvectors/DeletionVectorsIndexFile.java | 13 +-
.../deletionvectors/DeletionVectorsMaintainer.java | 11 +-
.../apache/paimon/index/HashIndexMaintainer.java | 2 +-
.../org/apache/paimon/index/IndexFileHandler.java | 55 +++---
.../org/apache/paimon/io/CompactIncrement.java | 6 +
.../java/org/apache/paimon/io/IndexIncrement.java | 31 ++-
.../apache/paimon/manifest/IndexManifestEntry.java | 55 ------
.../apache/paimon/manifest/IndexManifestFile.java | 33 +---
.../paimon/manifest/IndexManifestFileHandler.java | 211 +++++++++++++++++++++
.../paimon/operation/FileStoreCommitImpl.java | 29 ++-
.../paimon/table/sink/CommitMessageSerializer.java | 5 +-
.../apache/paimon/table/sink/TableCommitImpl.java | 4 +
.../table/source/snapshot/SnapshotReaderImpl.java | 52 ++---
.../org/apache/paimon/TestAppendFileStore.java | 170 +++++++++++++++++
.../DeletionVectorsIndexFileTest.java | 14 +-
.../DeletionVectorsMaintainerTest.java | 62 +++++-
.../paimon/index/IndexFileMetaSerializerTest.java | 39 ++--
.../manifest/IndexManifestFileHandlerTest.java | 112 +++++++++++
.../paimon/operation/FileStoreCommitTest.java | 60 +++++-
.../table/sink/CommitMessageSerializerTest.java | 4 +-
21 files changed, 789 insertions(+), 182 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 8cbf433f6..7528b393d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -191,7 +191,8 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
partitionType.getFieldCount() > 0 &&
options.dynamicPartitionOverwrite(),
newKeyComparator(),
options.branch(),
- newStatsFileHandler());
+ newStatsFileHandler(),
+ bucketMode());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
index 733fb7825..c0cc09039 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
@@ -22,6 +22,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.index.IndexFile;
+import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;
@@ -48,18 +49,16 @@ public class DeletionVectorsIndexFile extends IndexFile {
/**
* Reads all deletion vectors from a specified file.
*
- * @param fileName The name of the file from which to read the deletion
vectors.
- * @param deletionVectorRanges A map where the key represents which file
the DeletionVector
- * belongs to and the value is a Pair object specifying the range
(start position and size)
- * within the file where the deletion vector data is located.
* @return A map where the key represents which file the DeletionVector
belongs to, and the
* value is the corresponding DeletionVector object.
* @throws UncheckedIOException If an I/O error occurs while reading from
the file.
*/
- public Map<String, DeletionVector> readAllDeletionVectors(
- String fileName, LinkedHashMap<String, Pair<Integer, Integer>>
deletionVectorRanges) {
+ public Map<String, DeletionVector> readAllDeletionVectors(IndexFileMeta
fileMeta) {
+ LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorRanges =
+ fileMeta.deletionVectorsRanges();
+ String indexFileName = fileMeta.fileName();
Map<String, DeletionVector> deletionVectors = new HashMap<>();
- Path filePath = pathFactory.toPath(fileName);
+ Path filePath = pathFactory.toPath(indexFileName);
try (SeekableInputStream inputStream =
fileIO.newInputStream(filePath)) {
checkVersion(inputStream);
DataInputStream dataInputStream = new DataInputStream(inputStream);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
index 3beb99623..ce9ad40a1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
@@ -129,15 +129,12 @@ public class DeletionVectorsMaintainer {
public DeletionVectorsMaintainer createOrRestore(
@Nullable Long snapshotId, BinaryRow partition, int bucket) {
- IndexFileMeta indexFile =
+ List<IndexFileMeta> indexFiles =
snapshotId == null
- ? null
- : handler.scan(snapshotId, DELETION_VECTORS_INDEX,
partition, bucket)
- .orElse(null);
+ ? Collections.emptyList()
+ : handler.scan(snapshotId, DELETION_VECTORS_INDEX,
partition, bucket);
Map<String, DeletionVector> deletionVectors =
- indexFile == null
- ? new HashMap<>()
- : new
HashMap<>(handler.readAllDeletionVectors(indexFile));
+ new HashMap<>(handler.readAllDeletionVectors(indexFiles));
return createOrRestore(deletionVectors);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java
index 66a8d6409..41e486525 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java
@@ -51,7 +51,7 @@ public class HashIndexMaintainer implements
IndexMaintainer<KeyValue> {
IntHashSet hashcode = new IntHashSet();
if (snapshotId != null) {
Optional<IndexFileMeta> indexFile =
- fileHandler.scan(snapshotId, HashIndexFile.HASH_INDEX,
partition, bucket);
+ fileHandler.scanHashIndex(snapshotId, partition, bucket);
if (indexFile.isPresent()) {
IndexFileMeta file = indexFile.get();
hashcode = new IntHashSet((int) file.rowCount());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index fdc2fe754..f3138cb02 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -34,6 +34,7 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -64,20 +65,25 @@ public class IndexFileHandler {
this.deletionVectorsIndex = deletionVectorsIndex;
}
- public Optional<IndexFileMeta> scan(
+ public Optional<IndexFileMeta> scanHashIndex(long snapshotId, BinaryRow
partition, int bucket) {
+ List<IndexFileMeta> result = scan(snapshotId, HASH_INDEX, partition,
bucket);
+ if (result.size() > 1) {
+ throw new IllegalArgumentException(
+ "Find multiple hash index files for one bucket: " +
result);
+ }
+ return result.isEmpty() ? Optional.empty() :
Optional.of(result.get(0));
+ }
+
+ public List<IndexFileMeta> scan(
long snapshotId, String indexType, BinaryRow partition, int
bucket) {
List<IndexManifestEntry> entries = scan(snapshotId, indexType,
partition);
- List<IndexManifestEntry> result = new ArrayList<>();
+ List<IndexFileMeta> result = new ArrayList<>();
for (IndexManifestEntry file : entries) {
if (file.bucket() == bucket) {
- result.add(file);
+ result.add(file.indexFile());
}
}
- if (result.size() > 1) {
- throw new IllegalArgumentException(
- "Find multiple index files for one bucket: " + result);
- }
- return result.isEmpty() ? Optional.empty() :
Optional.of(result.get(0).indexFile());
+ return result;
}
public List<IndexManifestEntry> scan(String indexType, BinaryRow
partition) {
@@ -167,31 +173,16 @@ public class IndexFileHandler {
indexManifestFile.delete(indexManifest);
}
- public Map<String, DeletionVector> readAllDeletionVectors(IndexFileMeta
fileMeta) {
- if (!fileMeta.indexType().equals(DELETION_VECTORS_INDEX)) {
- throw new IllegalArgumentException(
- "Input file is not deletion vectors index " +
fileMeta.indexType());
- }
- LinkedHashMap<String, Pair<Integer, Integer>> deleteIndexRange =
- fileMeta.deletionVectorsRanges();
- if (deleteIndexRange == null || deleteIndexRange.isEmpty()) {
- return Collections.emptyMap();
- }
- return
deletionVectorsIndex.readAllDeletionVectors(fileMeta.fileName(),
deleteIndexRange);
- }
-
- public Optional<DeletionVector> readDeletionVector(IndexFileMeta fileMeta,
String fileName) {
- if (!fileMeta.indexType().equals(DELETION_VECTORS_INDEX)) {
- throw new IllegalArgumentException(
- "Input file is not deletion vectors index " +
fileMeta.indexType());
- }
- Map<String, Pair<Integer, Integer>> deleteIndexRange =
fileMeta.deletionVectorsRanges();
- if (deleteIndexRange == null ||
!deleteIndexRange.containsKey(fileName)) {
- return Optional.empty();
+ public Map<String, DeletionVector>
readAllDeletionVectors(List<IndexFileMeta> fileMetas) {
+ Map<String, DeletionVector> deletionVectors = new HashMap<>();
+ for (IndexFileMeta indexFile : fileMetas) {
+ if (!indexFile.indexType().equals(DELETION_VECTORS_INDEX)) {
+ throw new IllegalArgumentException(
+ "Input file is not deletion vectors index " +
indexFile.indexType());
+ }
+
deletionVectors.putAll(deletionVectorsIndex.readAllDeletionVectors(indexFile));
}
- return Optional.of(
- deletionVectorsIndex.readDeletionVector(
- fileMeta.fileName(), deleteIndexRange.get(fileName)));
+ return deletionVectors;
}
public IndexFileMeta writeDeletionVectorsIndex(Map<String, DeletionVector>
deletionVectors) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java
b/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java
index cc3e79f01..23f64b5cd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java
@@ -18,6 +18,7 @@
package org.apache.paimon.io;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
@@ -88,4 +89,9 @@ public class CompactIncrement {
.map(DataFileMeta::fileName)
.collect(Collectors.joining(",\n")));
}
+
+ public static CompactIncrement emptyIncrement() {
+ return new CompactIncrement(
+ Collections.emptyList(), Collections.emptyList(),
Collections.emptyList());
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java
b/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java
index a62d24109..8e53adfbf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java
@@ -20,6 +20,8 @@ package org.apache.paimon.io;
import org.apache.paimon.index.IndexFileMeta;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -28,16 +30,29 @@ public class IndexIncrement {
private final List<IndexFileMeta> newIndexFiles;
+ private final List<IndexFileMeta> deletedIndexFiles;
+
public IndexIncrement(List<IndexFileMeta> newIndexFiles) {
this.newIndexFiles = newIndexFiles;
+ this.deletedIndexFiles = Collections.emptyList();
+ }
+
+ public IndexIncrement(
+ List<IndexFileMeta> newIndexFiles, List<IndexFileMeta>
deletedIndexFiles) {
+ this.newIndexFiles = newIndexFiles;
+ this.deletedIndexFiles = deletedIndexFiles;
}
public List<IndexFileMeta> newIndexFiles() {
return newIndexFiles;
}
+ public List<IndexFileMeta> deletedIndexFiles() {
+ return deletedIndexFiles;
+ }
+
public boolean isEmpty() {
- return newIndexFiles.isEmpty();
+ return newIndexFiles.isEmpty() && deletedIndexFiles.isEmpty();
}
@Override
@@ -49,16 +64,24 @@ public class IndexIncrement {
return false;
}
IndexIncrement that = (IndexIncrement) o;
- return Objects.equals(newIndexFiles, that.newIndexFiles);
+ return Objects.equals(newIndexFiles, that.newIndexFiles)
+ && Objects.equals(deletedIndexFiles, that.deletedIndexFiles);
}
@Override
public int hashCode() {
- return Objects.hash(newIndexFiles);
+ List<IndexFileMeta> all = new ArrayList<>(newIndexFiles);
+ all.addAll(deletedIndexFiles);
+ return Objects.hash(all);
}
@Override
public String toString() {
- return "IndexIncrement{" + "newIndexFiles=" + newIndexFiles + '}';
+ return "IndexIncrement{"
+ + "newIndexFiles="
+ + newIndexFiles
+ + ",deletedIndexFiles="
+ + deletedIndexFiles
+ + "}";
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
index 53f3fb82d..fc4fd03e4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
@@ -94,10 +94,6 @@ public class IndexManifestEntry {
return new RowType(fields);
}
- public Identifier identifier() {
- return new Identifier(partition, bucket, indexFile.indexType());
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -131,55 +127,4 @@ public class IndexManifestEntry {
+ indexFile
+ '}';
}
-
- /** The {@link Identifier} of a {@link IndexFileMeta}. */
- public static class Identifier {
-
- public final BinaryRow partition;
- public final int bucket;
- public final String indexType;
-
- private Integer hash;
-
- private Identifier(BinaryRow partition, int bucket, String indexType) {
- this.partition = partition;
- this.bucket = bucket;
- this.indexType = indexType;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Identifier that = (Identifier) o;
- return bucket == that.bucket
- && Objects.equals(partition, that.partition)
- && Objects.equals(indexType, that.indexType);
- }
-
- @Override
- public int hashCode() {
- if (hash == null) {
- hash = Objects.hash(partition, bucket, indexType);
- }
- return hash;
- }
-
- @Override
- public String toString() {
- return "Identifier{"
- + "partition="
- + partition
- + ", bucket="
- + bucket
- + ", indexType='"
- + indexType
- + '\''
- + '}';
- }
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
index 235f8c509..8a91f3483 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
@@ -22,7 +22,7 @@ import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.manifest.IndexManifestEntry.Identifier;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.ObjectsFile;
@@ -31,10 +31,7 @@ import org.apache.paimon.utils.VersionedObjectSerializer;
import javax.annotation.Nullable;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
/** Index manifest file. */
public class IndexManifestFile extends ObjectsFile<IndexManifestEntry> {
@@ -53,27 +50,17 @@ public class IndexManifestFile extends
ObjectsFile<IndexManifestEntry> {
null);
}
- /** Merge new index files to index manifest. */
+ /** Write new index files to index manifest. */
@Nullable
- public String merge(
- @Nullable String previousIndexManifest, List<IndexManifestEntry>
newIndexFiles) {
- String indexManifest = previousIndexManifest;
- if (newIndexFiles.size() > 0) {
- Map<Identifier, IndexManifestEntry> indexEntries = new
LinkedHashMap<>();
- List<IndexManifestEntry> entries =
- indexManifest == null ? new ArrayList<>() :
read(indexManifest);
- entries.addAll(newIndexFiles);
- for (IndexManifestEntry file : entries) {
- if (file.kind() == FileKind.ADD) {
- indexEntries.put(file.identifier(), file);
- } else {
- indexEntries.remove(file.identifier());
- }
- }
- indexManifest = writeWithoutRolling(indexEntries.values());
+ public String writeIndexFiles(
+ @Nullable String previousIndexManifest,
+ List<IndexManifestEntry> newIndexFiles,
+ BucketMode bucketMode) {
+ if (newIndexFiles.isEmpty()) {
+ return previousIndexManifest;
}
-
- return indexManifest;
+ IndexManifestFileHandler handler = new IndexManifestFileHandler(this,
bucketMode);
+ return handler.write(previousIndexManifest, newIndexFiles);
}
/** Creator of {@link IndexManifestFile}. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
new file mode 100644
index 000000000..2c62e3ef4
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
+import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
+
+/** IndexManifestFile Handler. */
+public class IndexManifestFileHandler {
+
+ private final IndexManifestFile indexManifestFile;
+
+ private final BucketMode bucketMode;
+
+ IndexManifestFileHandler(IndexManifestFile indexManifestFile, BucketMode
bucketMode) {
+ this.indexManifestFile = indexManifestFile;
+ this.bucketMode = bucketMode;
+ }
+
+ String write(@Nullable String previousIndexManifest,
List<IndexManifestEntry> newIndexFiles) {
+ List<IndexManifestEntry> entries =
+ previousIndexManifest == null
+ ? new ArrayList<>()
+ : indexManifestFile.read(previousIndexManifest);
+ for (IndexManifestEntry entry : entries) {
+ Preconditions.checkArgument(entry.kind() == FileKind.ADD);
+ }
+
+ Pair<List<IndexManifestEntry>, List<IndexManifestEntry>> previous =
+ separateIndexEntries(entries);
+ Pair<List<IndexManifestEntry>, List<IndexManifestEntry>> current =
+ separateIndexEntries(newIndexFiles);
+
+ // Step1: get the hash index files;
+ List<IndexManifestEntry> indexEntries =
+ getIndexManifestFileCombine(HASH_INDEX)
+ .combine(previous.getLeft(), current.getLeft());
+
+ // Step2: get the dv index files;
+ indexEntries.addAll(
+ getIndexManifestFileCombine(DELETION_VECTORS_INDEX)
+ .combine(previous.getRight(), current.getRight()));
+
+ return indexManifestFile.writeWithoutRolling(indexEntries);
+ }
+
+ private Pair<List<IndexManifestEntry>, List<IndexManifestEntry>>
separateIndexEntries(
+ List<IndexManifestEntry> indexFiles) {
+ List<IndexManifestEntry> hashEntries = new ArrayList<>();
+ List<IndexManifestEntry> dvEntries = new ArrayList<>();
+ for (IndexManifestEntry entry : indexFiles) {
+ String indexType = entry.indexFile().indexType();
+ if (indexType.equals(DELETION_VECTORS_INDEX)) {
+ dvEntries.add(entry);
+ } else if (indexType.equals(HASH_INDEX)) {
+ hashEntries.add(entry);
+ } else {
+ throw new IllegalArgumentException("Can't recognize this index
type: " + indexType);
+ }
+ }
+ return Pair.of(hashEntries, dvEntries);
+ }
+
+ private IndexManifestFileCombiner getIndexManifestFileCombine(String
indexType) {
+ if (DELETION_VECTORS_INDEX.equals(indexType) &&
BucketMode.BUCKET_UNAWARE == bucketMode) {
+ return new UnawareBucketCombiner();
+ } else {
+ return new CommonBucketCombiner();
+ }
+ }
+
+ interface IndexManifestFileCombiner {
+ List<IndexManifestEntry> combine(
+ List<IndexManifestEntry> prevIndexFiles,
List<IndexManifestEntry> newIndexFiles);
+ }
+
+ /**
+ * We combine the previous and new index files by the file name. This is
only used for tables
+ * with UnawareBucket.
+ */
+ static class UnawareBucketCombiner implements IndexManifestFileCombiner {
+
+ @Override
+ public List<IndexManifestEntry> combine(
+ List<IndexManifestEntry> prevIndexFiles,
List<IndexManifestEntry> newIndexFiles) {
+ Map<String, IndexManifestEntry> indexEntries = new HashMap<>();
+ for (IndexManifestEntry entry : prevIndexFiles) {
+ indexEntries.put(entry.indexFile().fileName(), entry);
+ }
+
+ for (IndexManifestEntry entry : newIndexFiles) {
+ if (entry.kind() == FileKind.ADD) {
+ indexEntries.put(entry.indexFile().fileName(), entry);
+ } else {
+ indexEntries.remove(entry.indexFile().fileName());
+ }
+ }
+ return new ArrayList<>(indexEntries.values());
+ }
+ }
+
+ /** We combine the previous and new index files by {@link Identifier}. */
+ static class CommonBucketCombiner implements IndexManifestFileCombiner {
+
+ @Override
+ public List<IndexManifestEntry> combine(
+ List<IndexManifestEntry> prevIndexFiles,
List<IndexManifestEntry> newIndexFiles) {
+ Map<Identifier, IndexManifestEntry> indexEntries = new HashMap<>();
+ for (IndexManifestEntry entry : prevIndexFiles) {
+ indexEntries.put(identifier(entry), entry);
+ }
+
+ for (IndexManifestEntry entry : newIndexFiles) {
+ if (entry.kind() == FileKind.ADD) {
+ indexEntries.put(identifier(entry), entry);
+ } else {
+ indexEntries.remove(identifier(entry));
+ }
+ }
+ return new ArrayList<>(indexEntries.values());
+ }
+ }
+
+ private static Identifier identifier(IndexManifestEntry
indexManifestEntry) {
+ return new Identifier(
+ indexManifestEntry.partition(),
+ indexManifestEntry.bucket(),
+ indexManifestEntry.indexFile().indexType());
+ }
+
+ /** The {@link Identifier} of a {@link IndexFileMeta}. */
+ public static class Identifier {
+
+ public final BinaryRow partition;
+ public final int bucket;
+ public final String indexType;
+
+ private Integer hash;
+
+ private Identifier(BinaryRow partition, int bucket, String indexType) {
+ this.partition = partition;
+ this.bucket = bucket;
+ this.indexType = indexType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Identifier that = (Identifier) o;
+ return bucket == that.bucket
+ && Objects.equals(partition, that.partition)
+ && Objects.equals(indexType, that.indexType);
+ }
+
+ @Override
+ public int hashCode() {
+ if (hash == null) {
+ hash = Objects.hash(partition, bucket, indexType);
+ }
+ return hash;
+ }
+
+ @Override
+ public String toString() {
+ return "Identifier{"
+ + "partition="
+ + partition
+ + ", bucket="
+ + bucket
+ + ", indexType='"
+ + indexType
+ + '\''
+ + '}';
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 7ddf8c210..0001d5b19 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -44,6 +44,7 @@ import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.stats.StatsFileHandler;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.types.RowType;
@@ -125,6 +126,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
private final StatsFileHandler statsFileHandler;
+ private final BucketMode bucketMode;
+
public FileStoreCommitImpl(
FileIO fileIO,
SchemaManager schemaManager,
@@ -143,7 +146,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
boolean dynamicPartitionOverwrite,
@Nullable Comparator<InternalRow> keyComparator,
String branchName,
- StatsFileHandler statsFileHandler) {
+ StatsFileHandler statsFileHandler,
+ BucketMode bucketMode) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.commitUser = commitUser;
@@ -166,6 +170,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
this.ignoreEmptyCommit = true;
this.commitMetrics = null;
this.statsFileHandler = statsFileHandler;
+ this.bucketMode = bucketMode;
}
@Override
@@ -633,6 +638,24 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
"Unknown index type: " +
f.indexType());
}
});
+ commitMessage
+ .indexIncrement()
+ .deletedIndexFiles()
+ .forEach(
+ f -> {
+ if
(f.indexType().equals(DELETION_VECTORS_INDEX)) {
+ compactDvIndexFiles.add(
+ new IndexManifestEntry(
+ FileKind.DELETE,
+ commitMessage.partition(),
+ commitMessage.bucket(),
+ f));
+ } else {
+ throw new RuntimeException(
+ "This index type is not supported
to delete: "
+ + f.indexType());
+ }
+ });
}
}
@@ -834,7 +857,9 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
}
// write new index manifest
- String indexManifest =
indexManifestFile.merge(previousIndexManifest, indexFiles);
+ String indexManifest =
+ indexManifestFile.writeIndexFiles(
+ previousIndexManifest, indexFiles, bucketMode);
if (!Objects.equals(indexManifest, previousIndexManifest)) {
newIndexManifest = indexManifest;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
index 24c9fc892..a7b566b32 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
@@ -81,6 +81,7 @@ public class CommitMessageSerializer implements
VersionedSerializer<CommitMessag
dataFileSerializer.serializeList(message.compactIncrement().compactAfter(),
view);
dataFileSerializer.serializeList(message.compactIncrement().changelogFiles(),
view);
indexEntrySerializer.serializeList(message.indexIncrement().newIndexFiles(),
view);
+
indexEntrySerializer.serializeList(message.indexIncrement().deletedIndexFiles(),
view);
}
@Override
@@ -124,6 +125,8 @@ public class CommitMessageSerializer implements
VersionedSerializer<CommitMessag
dataFileSerializer.deserializeList(view),
dataFileSerializer.deserializeList(view),
dataFileSerializer.deserializeList(view)),
- new
IndexIncrement(indexEntrySerializer.deserializeList(view)));
+ new IndexIncrement(
+ indexEntrySerializer.deserializeList(view),
+ indexEntrySerializer.deserializeList(view)));
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index eab7a0c6d..d244c2a94 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -281,6 +281,10 @@ public class TableCommitImpl implements InnerTableCommit {
.map(IndexFileMeta::fileName)
.map(indexFileFactory::toPath)
.forEach(files::add);
+ msg.indexIncrement().deletedIndexFiles().stream()
+ .map(IndexFileMeta::fileName)
+ .map(indexFileFactory::toPath)
+ .forEach(files::add);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 2ed258e85..ffb52ce6f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -48,8 +48,6 @@ import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TypeUtils;
-import javax.annotation.Nullable;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -284,12 +282,11 @@ public class SnapshotReaderImpl implements SnapshotReader
{
? splitGenerator.splitForStreaming(bucketFiles)
: splitGenerator.splitForBatch(bucketFiles);
- IndexFileMeta deletionIndexFile =
+ List<IndexFileMeta> deletionIndexFiles =
deletionVectors
- ? indexFileHandler
- .scan(snapshotId,
DELETION_VECTORS_INDEX, partition, bucket)
- .orElse(null)
- : null;
+ ? indexFileHandler.scan(
+ snapshotId, DELETION_VECTORS_INDEX,
partition, bucket)
+ : Collections.emptyList();
for (SplitGenerator.SplitGroup splitGroup : splitGroups) {
List<DataFileMeta> dataFiles = splitGroup.files;
String bucketPath = pathFactory.bucketPath(partition,
bucket).toString();
@@ -298,7 +295,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
.withBucketPath(bucketPath);
if (deletionVectors) {
builder.withDataDeletionFiles(
- getDeletionFiles(dataFiles,
deletionIndexFile));
+ getDeletionFiles(dataFiles,
deletionIndexFiles));
}
splits.add(builder.build());
@@ -373,16 +370,15 @@ public class SnapshotReaderImpl implements SnapshotReader
{
.isStreaming(isStreaming)
.withBucketPath(pathFactory.bucketPath(part,
bucket).toString());
if (deletionVectors) {
- IndexFileMeta beforeDeletionIndex =
- indexFileHandler
- .scan(beforeSnapshotId,
DELETION_VECTORS_INDEX, part, bucket)
- .orElse(null);
- IndexFileMeta deletionIndex =
- indexFileHandler
- .scan(plan.snapshotId(),
DELETION_VECTORS_INDEX, part, bucket)
- .orElse(null);
- builder.withBeforeDeletionFiles(getDeletionFiles(before,
beforeDeletionIndex));
- builder.withDataDeletionFiles(getDeletionFiles(data,
deletionIndex));
+ List<IndexFileMeta> beforeDeletionIndexes =
+ indexFileHandler.scan(
+ beforeSnapshotId, DELETION_VECTORS_INDEX,
part, bucket);
+ List<IndexFileMeta> deletionIndexes =
+ indexFileHandler.scan(
+ plan.snapshotId(), DELETION_VECTORS_INDEX,
part, bucket);
+ builder.withBeforeDeletionFiles(
+ getDeletionFiles(before, beforeDeletionIndexes));
+ builder.withDataDeletionFiles(getDeletionFiles(data,
deletionIndexes));
}
splits.add(builder.build());
}
@@ -412,14 +408,22 @@ public class SnapshotReaderImpl implements SnapshotReader
{
}
private List<DeletionFile> getDeletionFiles(
- List<DataFileMeta> dataFiles, @Nullable IndexFileMeta
indexFileMeta) {
+ List<DataFileMeta> dataFiles, List<IndexFileMeta> indexFileMetas) {
List<DeletionFile> deletionFiles = new ArrayList<>(dataFiles.size());
- Map<String, Pair<Integer, Integer>> deletionRanges =
- indexFileMeta == null ? null :
indexFileMeta.deletionVectorsRanges();
+ Map<String, IndexFileMeta> dataFileToIndexFileMeta = new HashMap<>();
+ for (IndexFileMeta indexFileMeta : indexFileMetas) {
+ if (indexFileMeta.deletionVectorsRanges() != null) {
+ for (String dataFileName :
indexFileMeta.deletionVectorsRanges().keySet()) {
+ dataFileToIndexFileMeta.put(dataFileName, indexFileMeta);
+ }
+ }
+ }
for (DataFileMeta file : dataFiles) {
- if (deletionRanges != null) {
- Pair<Integer, Integer> range =
deletionRanges.get(file.fileName());
- if (range != null) {
+ IndexFileMeta indexFileMeta =
dataFileToIndexFileMeta.get(file.fileName());
+ if (indexFileMeta != null) {
+ Map<String, Pair<Integer, Integer>> ranges =
indexFileMeta.deletionVectorsRanges();
+ if (ranges != null && ranges.containsKey(file.fileName())) {
+ Pair<Integer, Integer> range = ranges.get(file.fileName());
deletionFiles.add(
new DeletionFile(
indexFileHandler.filePath(indexFileMeta).toString(),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
new file mode 100644
index 000000000..11612ac1b
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileIOFinder;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.operation.FileStoreCommitImpl;
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
+
+/** Wrapper of AppendOnlyFileStore. */
+public class TestAppendFileStore extends AppendOnlyFileStore {
+
+ private final String commitUser;
+
+ private final IndexFileHandler fileHandler;
+
+ private long commitIdentifier;
+
+ private FileIO fileIO;
+
+ public TestAppendFileStore(
+ FileIO fileIO,
+ SchemaManager schemaManage,
+ CoreOptions options,
+ TableSchema tableSchema,
+ RowType partitionType,
+ RowType bucketType,
+ RowType rowType,
+ String tableName) {
+ super(
+ fileIO,
+ schemaManage,
+ tableSchema,
+ options,
+ partitionType,
+ bucketType,
+ rowType,
+ tableName,
+ new CatalogEnvironment(Lock.emptyFactory(), null, null));
+
+ this.fileIO = fileIO;
+ this.commitUser = UUID.randomUUID().toString();
+ this.fileHandler = this.newIndexFileHandler();
+ this.commitIdentifier = 0L;
+ }
+
+ public FileIO fileIO() {
+ return this.fileIO;
+ }
+
+ public FileStoreCommitImpl newCommit() {
+ return super.newCommit(commitUser);
+ }
+
+ public void commit(CommitMessage... commitMessages) {
+ ManifestCommittable committable = new
ManifestCommittable(commitIdentifier++);
+ for (CommitMessage commitMessage : commitMessages) {
+ committable.addFileCommittable(commitMessage);
+ }
+ newCommit().commit(committable, Collections.emptyMap());
+ }
+
+ public CommitMessage removeIndexFiles(
+ BinaryRow partition, int bucket, List<IndexFileMeta>
indexFileMetas) {
+ return new CommitMessageImpl(
+ partition,
+ bucket,
+ DataIncrement.emptyIncrement(),
+ CompactIncrement.emptyIncrement(),
+ new IndexIncrement(Collections.emptyList(), indexFileMetas));
+ }
+
+ public List<IndexFileMeta> scanDVIndexFiles(BinaryRow partition, int
bucket) {
+ Long lastSnapshotId = snapshotManager().latestSnapshotId();
+ return fileHandler.scan(lastSnapshotId, DELETION_VECTORS_INDEX,
partition, bucket);
+ }
+
+ public DeletionVectorsMaintainer createOrRestoreDVMaintainer(BinaryRow
partition, int bucket) {
+ Long lastSnapshotId = snapshotManager().latestSnapshotId();
+ DeletionVectorsMaintainer.Factory factory =
+ new DeletionVectorsMaintainer.Factory(fileHandler);
+ return factory.createOrRestore(lastSnapshotId, partition, bucket);
+ }
+
+ public CommitMessage writeDVIndexFiles(
+ BinaryRow partition, int bucket, Map<String, List<Integer>>
dataFileToPositions) {
+ DeletionVectorsMaintainer dvMaintainer =
createOrRestoreDVMaintainer(partition, bucket);
+ for (Map.Entry<String, List<Integer>> entry :
dataFileToPositions.entrySet()) {
+ for (Integer pos : entry.getValue()) {
+ dvMaintainer.notifyNewDeletion(entry.getKey(), pos);
+ }
+ }
+ return new CommitMessageImpl(
+ partition,
+ bucket,
+ DataIncrement.emptyIncrement(),
+ CompactIncrement.emptyIncrement(),
+ new IndexIncrement(dvMaintainer.prepareCommit()));
+ }
+
+ public static TestAppendFileStore createAppendStore(
+ java.nio.file.Path tempDir, Map<String, String> options) throws
Exception {
+ String root = TraceableFileIO.SCHEME + "://" + tempDir.toString();
+ Path path = new Path(tempDir.toUri());
+ FileIO fileIO = FileIOFinder.find(new Path(root));
+ SchemaManager schemaManage = new SchemaManager(new LocalFileIO(),
path);
+
+ options.put(CoreOptions.PATH.key(), root);
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ schemaManage,
+ new Schema(
+
TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
+
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+ Collections.emptyList(),
+ options,
+ null));
+ return new TestAppendFileStore(
+ fileIO,
+ schemaManage,
+ new CoreOptions(options),
+ tableSchema,
+ TestKeyValueGenerator.DEFAULT_PART_TYPE,
+ RowType.of(),
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+ (new Path(root)).getName());
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
index fbaa407d2..c43950a44 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.deletionvectors;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;
@@ -32,6 +33,7 @@ import java.util.Map;
import java.util.Random;
import java.util.UUID;
+import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link DeletionVectorsIndexFile}. */
@@ -66,8 +68,10 @@ public class DeletionVectorsIndexFileTest {
LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorRanges =
pair.getRight();
// read
+ IndexFileMeta indexFileMeta =
+ new IndexFileMeta(DELETION_VECTORS_INDEX, fileName, 0L, 0L,
deletionVectorRanges);
Map<String, DeletionVector> actualDeleteMap =
- deletionVectorsIndexFile.readAllDeletionVectors(fileName,
deletionVectorRanges);
+ deletionVectorsIndexFile.readAllDeletionVectors(indexFileMeta);
assertThat(actualDeleteMap.get("file1.parquet").isDeleted(1)).isTrue();
assertThat(actualDeleteMap.get("file1.parquet").isDeleted(2)).isFalse();
assertThat(actualDeleteMap.get("file2.parquet").isDeleted(2)).isTrue();
@@ -104,8 +108,10 @@ public class DeletionVectorsIndexFileTest {
deletionVectorsIndexFile.write(deleteMap);
// read
+ IndexFileMeta indexFileMeta =
+ new IndexFileMeta(DELETION_VECTORS_INDEX, pair.getLeft(), 0L,
0L, pair.getRight());
Map<String, DeletionVector> dvs =
-
deletionVectorsIndexFile.readAllDeletionVectors(pair.getLeft(),
pair.getRight());
+ deletionVectorsIndexFile.readAllDeletionVectors(indexFileMeta);
assertThat(dvs.size()).isEqualTo(100000);
}
@@ -128,8 +134,10 @@ public class DeletionVectorsIndexFileTest {
deletionVectorsIndexFile.write(deleteMap);
// read
+ IndexFileMeta indexFileMeta =
+ new IndexFileMeta(DELETION_VECTORS_INDEX, pair.getLeft(), 0L,
0L, pair.getRight());
Map<String, DeletionVector> dvs =
-
deletionVectorsIndexFile.readAllDeletionVectors(pair.getLeft(),
pair.getRight());
+ deletionVectorsIndexFile.readAllDeletionVectors(indexFileMeta);
assertThat(dvs.size()).isEqualTo(1);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
index 9bfb4f81a..dd6d3102c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
@@ -22,10 +22,17 @@ import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -56,12 +63,63 @@ public class DeletionVectorsMaintainerTest extends
PrimaryKeyTableTestBase {
assertThat(dvMaintainer.deletionVectorOf("f3")).isEmpty();
List<IndexFileMeta> fileMetas = dvMaintainer.prepareCommit();
- Map<String, DeletionVector> deletionVectors =
- fileHandler.readAllDeletionVectors(fileMetas.get(0));
+ Map<String, DeletionVector> deletionVectors =
fileHandler.readAllDeletionVectors(fileMetas);
assertThat(deletionVectors.get("f1").isDeleted(1)).isTrue();
assertThat(deletionVectors.get("f1").isDeleted(2)).isFalse();
assertThat(deletionVectors.get("f2").isDeleted(1)).isFalse();
assertThat(deletionVectors.get("f2").isDeleted(2)).isTrue();
assertThat(deletionVectors.containsKey("f3")).isFalse();
}
+
+ @Test
+ public void test1() {
+ DeletionVectorsMaintainer.Factory factory =
+ new DeletionVectorsMaintainer.Factory(fileHandler);
+
+ DeletionVectorsMaintainer dvMaintainer = factory.create();
+ BitmapDeletionVector deletionVector1 = new BitmapDeletionVector();
+ deletionVector1.delete(1);
+ deletionVector1.delete(3);
+ deletionVector1.delete(5);
+ dvMaintainer.notifyNewDeletion("f1", deletionVector1);
+
+ List<IndexFileMeta> fileMetas1 = dvMaintainer.prepareCommit();
+ assertThat(fileMetas1.size()).isEqualTo(1);
+ CommitMessage commitMessage =
+ new CommitMessageImpl(
+ BinaryRow.EMPTY_ROW,
+ 0,
+ DataIncrement.emptyIncrement(),
+ CompactIncrement.emptyIncrement(),
+ new IndexIncrement(fileMetas1));
+ BatchTableCommit commit = table.newBatchWriteBuilder().newCommit();
+ commit.commit(Collections.singletonList(commitMessage));
+
+ Long lastSnapshotId = table.snapshotManager().latestSnapshotId();
+ dvMaintainer = factory.createOrRestore(lastSnapshotId,
BinaryRow.EMPTY_ROW, 0);
+ DeletionVector deletionVector2 =
dvMaintainer.deletionVectorOf("f1").get();
+ assertThat(deletionVector2.isDeleted(1)).isTrue();
+ assertThat(deletionVector2.isDeleted(2)).isFalse();
+
+ deletionVector2.delete(2);
+ dvMaintainer.notifyNewDeletion("f1", deletionVector2);
+
+ List<IndexFileMeta> fileMetas2 = dvMaintainer.prepareCommit();
+ assertThat(fileMetas2.size()).isEqualTo(1);
+ commitMessage =
+ new CommitMessageImpl(
+ BinaryRow.EMPTY_ROW,
+ 0,
+ DataIncrement.emptyIncrement(),
+ CompactIncrement.emptyIncrement(),
+ new IndexIncrement(fileMetas2));
+ commit = table.newBatchWriteBuilder().newCommit();
+ commit.commit(Collections.singletonList(commitMessage));
+
+ lastSnapshotId = table.snapshotManager().latestSnapshotId();
+ dvMaintainer = factory.createOrRestore(lastSnapshotId,
BinaryRow.EMPTY_ROW, 0);
+ DeletionVector deletionVector3 =
dvMaintainer.deletionVectorOf("f1").get();
+ assertThat(deletionVector3.isDeleted(1)).isTrue();
+ assertThat(deletionVector3.isDeleted(2)).isTrue();
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
index 53058a05f..724d5b416 100644
---
a/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
@@ -42,22 +42,31 @@ public class IndexFileMetaSerializerTest extends
ObjectSerializerTestBase<IndexF
public static IndexFileMeta randomIndexFile() {
Random rnd = new Random();
if (rnd.nextBoolean()) {
- return new IndexFileMeta(
- HashIndexFile.HASH_INDEX,
- "my_file_name" + rnd.nextLong(),
- rnd.nextInt(),
- rnd.nextInt());
+ return randomHashIndexFile();
} else {
- LinkedHashMap<String, Pair<Integer, Integer>>
deletionVectorsRanges =
- new LinkedHashMap<>();
- deletionVectorsRanges.put("my_file_name1", Pair.of(rnd.nextInt(),
rnd.nextInt()));
- deletionVectorsRanges.put("my_file_name2", Pair.of(rnd.nextInt(),
rnd.nextInt()));
- return new IndexFileMeta(
- DeletionVectorsIndexFile.DELETION_VECTORS_INDEX,
- "deletion_vectors_index_file_name" + rnd.nextLong(),
- rnd.nextInt(),
- rnd.nextInt(),
- deletionVectorsRanges);
+ return randomDeletionVectorIndexFile();
}
}
+
+ public static IndexFileMeta randomHashIndexFile() {
+ Random rnd = new Random();
+ return new IndexFileMeta(
+ HashIndexFile.HASH_INDEX,
+ "my_file_name" + rnd.nextLong(),
+ rnd.nextInt(),
+ rnd.nextInt());
+ }
+
+ public static IndexFileMeta randomDeletionVectorIndexFile() {
+ Random rnd = new Random();
+ LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorsRanges =
new LinkedHashMap<>();
+ deletionVectorsRanges.put("my_file_name1", Pair.of(rnd.nextInt(),
rnd.nextInt()));
+ deletionVectorsRanges.put("my_file_name2", Pair.of(rnd.nextInt(),
rnd.nextInt()));
+ return new IndexFileMeta(
+ DeletionVectorsIndexFile.DELETION_VECTORS_INDEX,
+ "deletion_vectors_index_file_name" + rnd.nextLong(),
+ rnd.nextInt(),
+ rnd.nextInt(),
+ deletionVectorsRanges);
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
new file mode 100644
index 000000000..4ca59a75c
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import org.apache.paimon.TestAppendFileStore;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.table.BucketMode;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import static
org.apache.paimon.index.IndexFileMetaSerializerTest.randomDeletionVectorIndexFile;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for IndexManifestFileHandler. */
+public class IndexManifestFileHandlerTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @Test
+ public void testUnawareMode() throws Exception {
+ TestAppendFileStore fileStore =
+ TestAppendFileStore.createAppendStore(tempDir, new
HashMap<>());
+
+ IndexManifestFile indexManifestFile =
+ new IndexManifestFile.Factory(
+ fileStore.fileIO(),
+ fileStore.options().manifestFormat(),
+ fileStore.pathFactory())
+ .create();
+ IndexManifestFileHandler indexManifestFileHandler =
+ new IndexManifestFileHandler(indexManifestFile,
BucketMode.BUCKET_UNAWARE);
+
+ IndexManifestEntry entry1 =
+ new IndexManifestEntry(
+ FileKind.ADD, BinaryRow.EMPTY_ROW, 0,
randomDeletionVectorIndexFile());
+ String indexManifestFile1 = indexManifestFileHandler.write(null,
Arrays.asList(entry1));
+
+ IndexManifestEntry entry2 = entry1.toDeleteEntry();
+ IndexManifestEntry entry3 =
+ new IndexManifestEntry(
+ FileKind.ADD, BinaryRow.EMPTY_ROW, 0,
randomDeletionVectorIndexFile());
+ String indexManifestFile2 =
+ indexManifestFileHandler.write(indexManifestFile1,
Arrays.asList(entry2, entry3));
+
+ List<IndexManifestEntry> entries =
indexManifestFile.read(indexManifestFile2);
+ assertThat(entries.size()).isEqualTo(1);
+ assertThat(entries.contains(entry1)).isFalse();
+ assertThat(entries.contains(entry2)).isFalse();
+ assertThat(entries.contains(entry3)).isTrue();
+ }
+
+ @Test
+ public void testHashFixedBucket() throws Exception {
+ TestAppendFileStore fileStore =
+ TestAppendFileStore.createAppendStore(tempDir, new
HashMap<>());
+
+ IndexManifestFile indexManifestFile =
+ new IndexManifestFile.Factory(
+ fileStore.fileIO(),
+ fileStore.options().manifestFormat(),
+ fileStore.pathFactory())
+ .create();
+ IndexManifestFileHandler indexManifestFileHandler =
+ new IndexManifestFileHandler(indexManifestFile,
BucketMode.HASH_FIXED);
+
+ IndexManifestEntry entry1 =
+ new IndexManifestEntry(
+ FileKind.ADD, BinaryRow.EMPTY_ROW, 0,
randomDeletionVectorIndexFile());
+ IndexManifestEntry entry2 =
+ new IndexManifestEntry(
+ FileKind.ADD, BinaryRow.EMPTY_ROW, 1,
randomDeletionVectorIndexFile());
+ String indexManifestFile1 =
+ indexManifestFileHandler.write(null, Arrays.asList(entry1,
entry2));
+
+ IndexManifestEntry entry3 =
+ new IndexManifestEntry(
+ FileKind.ADD, BinaryRow.EMPTY_ROW, 1,
randomDeletionVectorIndexFile());
+ IndexManifestEntry entry4 =
+ new IndexManifestEntry(
+ FileKind.ADD, BinaryRow.EMPTY_ROW, 2,
randomDeletionVectorIndexFile());
+ String indexManifestFile2 =
+ indexManifestFileHandler.write(indexManifestFile1,
Arrays.asList(entry3, entry4));
+
+ List<IndexManifestEntry> entries =
indexManifestFile.read(indexManifestFile2);
+ assertThat(entries.size()).isEqualTo(3);
+ assertThat(entries.contains(entry1)).isTrue();
+ assertThat(entries.contains(entry2)).isFalse();
+ assertThat(entries.contains(entry3)).isTrue();
+ assertThat(entries.contains(entry4)).isTrue();
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index d603eda41..03630e71f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -21,9 +21,12 @@ package org.apache.paimon.operation;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.TestAppendFileStore;
import org.apache.paimon.TestFileStore;
import org.apache.paimon.TestKeyValueGenerator;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.index.IndexFileHandler;
@@ -39,6 +42,8 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.ColStats;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.stats.StatsFileHandler;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
@@ -57,6 +62,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -757,7 +763,7 @@ public class FileStoreCommitTest {
.containsExactlyInAnyOrder(6, 8);
// assert scan one bucket
- Optional<IndexFileMeta> file = indexFileHandler.scan(snapshot.id(),
HASH_INDEX, part1, 0);
+ Optional<IndexFileMeta> file =
indexFileHandler.scanHashIndex(snapshot.id(), part1, 0);
assertThat(file).isPresent();
assertThat(indexFileHandler.readHashIndexList(file.get())).containsExactlyInAnyOrder(1,
4);
@@ -766,9 +772,9 @@ public class FileStoreCommitTest {
store.overwriteData(
Collections.singletonList(record1), gen::getPartition, kv ->
0, new HashMap<>());
snapshot = store.snapshotManager().latestSnapshot();
- file = indexFileHandler.scan(snapshot.id(), HASH_INDEX, part1, 0);
+ file = indexFileHandler.scanHashIndex(snapshot.id(), part1, 0);
assertThat(file).isEmpty();
- file = indexFileHandler.scan(snapshot.id(), HASH_INDEX, part2, 2);
+ file = indexFileHandler.scanHashIndex(snapshot.id(), part2, 2);
assertThat(file).isPresent();
// overwrite all partitions
@@ -776,7 +782,7 @@ public class FileStoreCommitTest {
store.overwriteData(
Collections.singletonList(record1), gen::getPartition, kv ->
0, new HashMap<>());
snapshot = store.snapshotManager().latestSnapshot();
- file = indexFileHandler.scan(snapshot.id(), HASH_INDEX, part2, 2);
+ file = indexFileHandler.scanHashIndex(snapshot.id(), part2, 2);
assertThat(file).isEmpty();
}
@@ -843,6 +849,52 @@ public class FileStoreCommitTest {
assertThat(readStats.get()).isEqualTo(fakeStats);
}
+ @Test
+ public void testDVIndexFiles() throws Exception {
+ TestAppendFileStore store =
TestAppendFileStore.createAppendStore(tempDir, new HashMap<>());
+
+ // commit 1
+ CommitMessage commitMessage1 =
+ store.writeDVIndexFiles(
+ BinaryRow.EMPTY_ROW,
+ 0,
+ Collections.singletonMap("f1", Arrays.asList(1, 3)));
+ CommitMessage commitMessage2 =
+ store.writeDVIndexFiles(
+ BinaryRow.EMPTY_ROW,
+ 0,
+ Collections.singletonMap("f2", Arrays.asList(2, 4)));
+ store.commit(commitMessage1, commitMessage2);
+
+ // assert 1
+ assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW,
0).size()).isEqualTo(2);
+ DeletionVectorsMaintainer maintainer =
+ store.createOrRestoreDVMaintainer(BinaryRow.EMPTY_ROW, 0);
+ Map<String, DeletionVector> dvs = maintainer.deletionVectors();
+ assertThat(dvs.size()).isEqualTo(2);
+ assertThat(dvs.get("f2").isDeleted(2)).isTrue();
+ assertThat(dvs.get("f2").isDeleted(3)).isFalse();
+ assertThat(dvs.get("f2").isDeleted(4)).isTrue();
+
+ // commit 2
+ CommitMessage commitMessage3 =
+ store.writeDVIndexFiles(
+ BinaryRow.EMPTY_ROW, 0, Collections.singletonMap("f2",
Arrays.asList(3)));
+ CommitMessage commitMessage4 =
+ store.removeIndexFiles(
+ BinaryRow.EMPTY_ROW,
+ 0,
+ ((CommitMessageImpl)
commitMessage1).indexIncrement().newIndexFiles());
+ store.commit(commitMessage3, commitMessage4);
+
+ // assert 2
+ assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW,
0).size()).isEqualTo(2);
+ maintainer = store.createOrRestoreDVMaintainer(BinaryRow.EMPTY_ROW, 0);
+ dvs = maintainer.deletionVectors();
+ assertThat(dvs.size()).isEqualTo(2);
+ assertThat(dvs.get("f2").isDeleted(3)).isTrue();
+ }
+
private TestFileStore createStore(boolean failing) throws Exception {
return createStore(failing, 1);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
index 190027d36..47a210742 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
@@ -42,7 +42,9 @@ public class CommitMessageSerializerTest {
DataIncrement dataIncrement = randomNewFilesIncrement();
CompactIncrement compactIncrement = randomCompactIncrement();
IndexIncrement indexIncrement =
- new IndexIncrement(Arrays.asList(randomIndexFile(),
randomIndexFile()));
+ new IndexIncrement(
+ Arrays.asList(randomIndexFile(), randomIndexFile()),
+ Arrays.asList(randomIndexFile(), randomIndexFile()));
CommitMessageImpl committable =
new CommitMessageImpl(row(0), 1, dataIncrement,
compactIncrement, indexIncrement);
CommitMessageImpl newCommittable =