This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 1c269968c2 [core] Simplify AppendDeleteFileMaintainer to maintainer free (#6123) 1c269968c2 is described below commit 1c269968c2b32d2eb4987435b98dc143f76df1ca Author: Jingsong Lee <jingsongl...@gmail.com> AuthorDate: Fri Aug 22 19:57:32 2025 +0800 [core] Simplify AppendDeleteFileMaintainer to maintainer free (#6123) --- .../main/java/org/apache/paimon/CoreOptions.java | 2 +- .../java/org/apache/paimon/AbstractFileStore.java | 6 +- .../org/apache/paimon/AppendOnlyFileStore.java | 11 ++- .../java/org/apache/paimon/KeyValueFileStore.java | 9 +-- .../append/BucketedAppendCompactManager.java | 16 ++-- .../apache/paimon/compact/CompactDeletionFile.java | 21 ++--- ...rsMaintainer.java => BucketedDvMaintainer.java} | 27 +++---- .../paimon/deletionvectors/DeletionVector.java | 2 +- .../DeletionVectorIndexFileWriter.java | 38 +++++---- .../deletionvectors/DeletionVectorsIndexFile.java | 32 ++++---- .../append/AppendDeleteFileMaintainer.java | 90 ++++++++-------------- .../append/BaseAppendDeleteFileMaintainer.java | 36 +++++++-- .../append/BucketedAppendDeleteFileMaintainer.java | 15 ++-- .../org/apache/paimon/index/IndexFileHandler.java | 55 +------------ .../paimon/mergetree/compact/CompactStrategy.java | 4 +- .../LookupChangelogMergeFunctionWrapper.java | 6 +- .../compact/LookupMergeTreeCompactRewriter.java | 12 +-- .../mergetree/compact/MergeTreeCompactManager.java | 6 +- .../paimon/operation/AbstractFileStoreWrite.java | 14 ++-- .../paimon/operation/AppendFileStoreWrite.java | 7 +- .../paimon/operation/BaseAppendFileStoreWrite.java | 8 +- .../operation/BucketedAppendFileStoreWrite.java | 6 +- .../apache/paimon/operation/FileStoreWrite.java | 6 +- .../paimon/operation/KeyValueFileStoreWrite.java | 10 +-- .../paimon/operation/MemoryFileStoreWrite.java | 4 +- .../postpone/PostponeBucketFileStoreWrite.java | 4 +- .../org/apache/paimon/TestAppendFileStore.java | 14 ++-- ...inerTest.java => BucketedDvMaintainerTest.java} | 58 ++++++-------- .../DeletionVectorsIndexFileTest.java | 16 ++-- .../append/AppendDeletionFileMaintainerHelper.java | 19 ++++- .../paimon/operation/FileStoreCommitTest.java | 5 +- .../paimon/spark/sql/DeletionVectorTest.scala | 20 ++--- 32 files changed, 266 insertions(+), 313 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index e0f9af088d..8d86219cfb 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2863,7 +2863,7 @@ public class CoreOptions implements Serializable { return deletionVectorsEnabled() || mergeEngine() == FIRST_ROW; } - public MemorySize deletionVectorIndexFileTargetSize() { + public MemorySize dvIndexFileTargetSize() { return options.get(DELETION_VECTOR_INDEX_FILE_TARGET_SIZE); } diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 11f67a2c10..4d2b7726de 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -43,14 +43,12 @@ import org.apache.paimon.operation.ManifestsReader; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.operation.SnapshotDeletion; import org.apache.paimon.operation.TagDeletion; -import org.apache.paimon.options.MemorySize; import org.apache.paimon.partition.PartitionExpireStrategy; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.service.ServiceManager; import org.apache.paimon.stats.StatsFile; import org.apache.paimon.stats.StatsFileHandler; -import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.PartitionHandler; @@ -230,9 +228,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> { new DeletionVectorsIndexFile( fileIO, pathFactory().indexFileFactory(), - bucketMode() == BucketMode.BUCKET_UNAWARE - ? options.deletionVectorIndexFileTargetSize() - : MemorySize.ofBytes(Long.MAX_VALUE), + options.dvIndexFileTargetSize(), options.deletionVectorBitmap64())); } diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index 0d9ad3b121..2f552d799b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -19,7 +19,7 @@ package org.apache.paimon; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.fs.FileIO; import org.apache.paimon.operation.AppendFileStoreWrite; @@ -107,10 +107,6 @@ public class AppendOnlyFileStore extends AbstractFileStore<InternalRow> { @Override public BaseAppendFileStoreWrite newWrite(String commitUser, @Nullable Integer writeId) { - DeletionVectorsMaintainer.Factory dvMaintainerFactory = - options.deletionVectorsEnabled() - ? DeletionVectorsMaintainer.factory(newIndexFileHandler()) - : null; if (bucketMode() == BucketMode.BUCKET_UNAWARE) { RawFileSplitRead readForCompact = newRead(); if (options.rowTrackingEnabled()) { @@ -126,9 +122,12 @@ public class AppendOnlyFileStore extends AbstractFileStore<InternalRow> { snapshotManager(), newScan(), options, - dvMaintainerFactory, tableName); } else { + BucketedDvMaintainer.Factory dvMaintainerFactory = + options.deletionVectorsEnabled() + ? BucketedDvMaintainer.factory(newIndexFileHandler()) + : null; return new BucketedAppendFileStoreWrite( fileIO, newRead(), diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index f7a743ace1..5e5b354e65 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -20,7 +20,7 @@ package org.apache.paimon; import org.apache.paimon.codegen.RecordEqualiser; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.fs.FileIO; import org.apache.paimon.index.DynamicBucketIndexMaintainer; @@ -177,10 +177,9 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> { if (bucketMode() == BucketMode.HASH_DYNAMIC) { indexFactory = new DynamicBucketIndexMaintainer.Factory(newIndexFileHandler()); } - DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory = null; + BucketedDvMaintainer.Factory dvMaintainerFactory = null; if (options.deletionVectorsEnabled()) { - deletionVectorsMaintainerFactory = - new DeletionVectorsMaintainer.Factory(newIndexFileHandler()); + dvMaintainerFactory = BucketedDvMaintainer.factory(newIndexFileHandler()); } return new KeyValueFileStoreWrite( fileIO, @@ -199,7 +198,7 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> { snapshotManager(), newScan(), indexFactory, - deletionVectorsMaintainerFactory, + dvMaintainerFactory, options, keyValueFieldsExtractor, tableName); diff --git a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java index 666704b836..4a057f9576 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java @@ -24,7 +24,7 @@ import org.apache.paimon.compact.CompactDeletionFile; import org.apache.paimon.compact.CompactFutureManager; import org.apache.paimon.compact.CompactResult; import org.apache.paimon.compact.CompactTask; -import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.operation.metrics.CompactionMetrics; import org.apache.paimon.operation.metrics.MetricUtils; @@ -56,7 +56,7 @@ public class BucketedAppendCompactManager extends CompactFutureManager { private static final int FULL_COMPACT_MIN_FILE = 3; private final ExecutorService executor; - private final DeletionVectorsMaintainer dvMaintainer; + private final BucketedDvMaintainer dvMaintainer; private final PriorityQueue<DataFileMeta> toCompact; private final int minFileNum; private final long targetFileSize; @@ -70,7 +70,7 @@ public class BucketedAppendCompactManager extends CompactFutureManager { public BucketedAppendCompactManager( ExecutorService executor, List<DataFileMeta> restored, - @Nullable DeletionVectorsMaintainer dvMaintainer, + @Nullable BucketedDvMaintainer dvMaintainer, int minFileNum, long targetFileSize, boolean forceRewriteAllFiles, @@ -241,14 +241,14 @@ public class BucketedAppendCompactManager extends CompactFutureManager { /** A {@link CompactTask} impl for full compaction of append-only table. */ public static class FullCompactTask extends CompactTask { - private final DeletionVectorsMaintainer dvMaintainer; + private final BucketedDvMaintainer dvMaintainer; private final LinkedList<DataFileMeta> toCompact; private final long targetFileSize; private final boolean forceRewriteAllFiles; private final CompactRewriter rewriter; public FullCompactTask( - DeletionVectorsMaintainer dvMaintainer, + BucketedDvMaintainer dvMaintainer, Collection<DataFileMeta> inputs, long targetFileSize, boolean forceRewriteAllFiles, @@ -314,12 +314,12 @@ public class BucketedAppendCompactManager extends CompactFutureManager { */ public static class AutoCompactTask extends CompactTask { - private final DeletionVectorsMaintainer dvMaintainer; + private final BucketedDvMaintainer dvMaintainer; private final List<DataFileMeta> toCompact; private final CompactRewriter rewriter; public AutoCompactTask( - DeletionVectorsMaintainer dvMaintainer, + BucketedDvMaintainer dvMaintainer, List<DataFileMeta> toCompact, CompactRewriter rewriter, @Nullable CompactionMetrics.Reporter metricsReporter) { @@ -336,7 +336,7 @@ public class BucketedAppendCompactManager extends CompactFutureManager { } private static CompactResult compact( - @Nullable DeletionVectorsMaintainer dvMaintainer, + @Nullable BucketedDvMaintainer dvMaintainer, List<DataFileMeta> toCompact, CompactRewriter rewriter) throws Exception { diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java index 390ab7af90..f52c3a41d8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java @@ -18,13 +18,12 @@ package org.apache.paimon.compact; -import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; import javax.annotation.Nullable; -import java.util.List; import java.util.Optional; /** Deletion File from compaction. */ @@ -41,19 +40,13 @@ public interface CompactDeletionFile { * immediately, so when updateCompactResult, we need to merge old deletion files (just delete * them). */ - static CompactDeletionFile generateFiles(DeletionVectorsMaintainer maintainer) { - List<IndexFileMeta> files = maintainer.writeDeletionVectorsIndex(); - if (files.size() > 1) { - throw new IllegalStateException( - "Should only generate one compact deletion file, this is a bug."); - } - - return new GeneratedDeletionFile( - files.isEmpty() ? null : files.get(0), maintainer.indexFileHandler()); + static CompactDeletionFile generateFiles(BucketedDvMaintainer maintainer) { + Optional<IndexFileMeta> file = maintainer.writeDeletionVectorsIndex(); + return new GeneratedDeletionFile(file.orElse(null), maintainer.indexFileHandler()); } /** For sync compaction, only create deletion files when prepareCommit. */ - static CompactDeletionFile lazyGeneration(DeletionVectorsMaintainer maintainer) { + static CompactDeletionFile lazyGeneration(BucketedDvMaintainer maintainer) { return new LazyCompactDeletionFile(maintainer); } @@ -107,11 +100,11 @@ public interface CompactDeletionFile { /** A lazy generation implementation of {@link CompactDeletionFile}. */ class LazyCompactDeletionFile implements CompactDeletionFile { - private final DeletionVectorsMaintainer maintainer; + private final BucketedDvMaintainer maintainer; private boolean generated = false; - public LazyCompactDeletionFile(DeletionVectorsMaintainer maintainer) { + public LazyCompactDeletionFile(BucketedDvMaintainer maintainer) { this.maintainer = maintainer; } diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java similarity index 86% rename from paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java rename to paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java index f19a236a14..1feb135efc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java @@ -31,14 +31,14 @@ import java.util.Map; import java.util.Optional; /** Maintainer of deletionVectors index. */ -public class DeletionVectorsMaintainer { +public class BucketedDvMaintainer { private final IndexFileHandler indexFileHandler; private final Map<String, DeletionVector> deletionVectors; protected final boolean bitmap64; private boolean modified; - private DeletionVectorsMaintainer( + private BucketedDvMaintainer( IndexFileHandler fileHandler, Map<String, DeletionVector> deletionVectors) { this.indexFileHandler = fileHandler; this.deletionVectors = deletionVectors; @@ -108,15 +108,16 @@ public class DeletionVectorsMaintainer { /** * Write new deletion vectors index file if any modifications have been made. * - * @return A list containing the metadata of the deletion vectors index file, or an empty list - * if no changes need to be committed. + * @return None if no modifications have been made, otherwise the new deletion vectors index + * file. */ - public List<IndexFileMeta> writeDeletionVectorsIndex() { + public Optional<IndexFileMeta> writeDeletionVectorsIndex() { if (modified) { modified = false; - return indexFileHandler.writeDeletionVectorsIndex(deletionVectors); + return Optional.of( + indexFileHandler.deletionVectorsIndex().writeSingleFile(deletionVectors)); } - return Collections.emptyList(); + return Optional.empty(); } /** @@ -147,12 +148,12 @@ public class DeletionVectorsMaintainer { return new Factory(handler); } - /** Factory to restore {@link DeletionVectorsMaintainer}. */ + /** Factory to restore {@link BucketedDvMaintainer}. */ public static class Factory { private final IndexFileHandler handler; - public Factory(IndexFileHandler handler) { + private Factory(IndexFileHandler handler) { this.handler = handler; } @@ -160,7 +161,7 @@ public class DeletionVectorsMaintainer { return handler; } - public DeletionVectorsMaintainer create(@Nullable List<IndexFileMeta> restoredFiles) { + public BucketedDvMaintainer create(@Nullable List<IndexFileMeta> restoredFiles) { if (restoredFiles == null) { restoredFiles = Collections.emptyList(); } @@ -169,12 +170,12 @@ public class DeletionVectorsMaintainer { return create(deletionVectors); } - public DeletionVectorsMaintainer create() { + public BucketedDvMaintainer create() { return create(new HashMap<>()); } - public DeletionVectorsMaintainer create(Map<String, DeletionVector> deletionVectors) { - return new DeletionVectorsMaintainer(handler, deletionVectors); + public BucketedDvMaintainer create(Map<String, DeletionVector> deletionVectors) { + return new BucketedDvMaintainer(handler, deletionVectors); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java index ff48c7d218..e06dc767a9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java @@ -149,7 +149,7 @@ public interface DeletionVector extends DeletionVectorJudger { return fileName -> Optional.empty(); } - static Factory factory(@Nullable DeletionVectorsMaintainer dvMaintainer) { + static Factory factory(@Nullable BucketedDvMaintainer dvMaintainer) { if (dvMaintainer == null) { return emptyFactory(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java index 4647b55fb1..4104305e05 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java @@ -45,12 +45,29 @@ public class DeletionVectorIndexFileWriter { } /** - * For unaware-bucket mode, this method will write out multiple index files, else, it will write - * out only one index file. + * The deletion file of the bucketed table is updated according to the bucket. If a compaction + * occurs and there is no longer a deletion file, an empty deletion file needs to be generated + * to overwrite the old file. + * + * <p>TODO: We can consider sending a message to delete the deletion file in the future. */ - public List<IndexFileMeta> write(Map<String, DeletionVector> input) throws IOException { + public IndexFileMeta writeSingleFile(Map<String, DeletionVector> input) throws IOException { + + DeletionFileWriter writer = new DeletionFileWriter(indexPathFactory.newPath(), fileIO); + try { + for (Map.Entry<String, DeletionVector> entry : input.entrySet()) { + writer.write(entry.getKey(), entry.getValue()); + } + } finally { + writer.close(); + } + return writer.result(); + } + + public List<IndexFileMeta> writeWithRolling(Map<String, DeletionVector> input) + throws IOException { if (input.isEmpty()) { - return emptyIndexFile(); + return Collections.emptyList(); } List<IndexFileMeta> result = new ArrayList<>(); Iterator<Map.Entry<String, DeletionVector>> iterator = input.entrySet().iterator(); @@ -76,17 +93,4 @@ public class DeletionVectorIndexFileWriter { } return writer.result(); } - - /** - * The deletion file of the bucketed table is updated according to the bucket. If a compaction - * occurs and there is no longer a deletion file, an empty deletion file needs to be generated - * to overwrite the old file. - * - * <p>TODO: We can consider sending a message to delete the deletion file in the future. - */ - private List<IndexFileMeta> emptyIndexFile() throws IOException { - DeletionFileWriter writer = new DeletionFileWriter(indexPathFactory.newPath(), fileIO); - writer.close(); - return Collections.singletonList(writer.result()); - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java index fa9ed56417..cc554b816a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java @@ -141,29 +141,27 @@ public class DeletionVectorsIndexFile extends IndexFile { } } - /** - * Write deletion vectors to a new file, the format of this file can be referenced at: <a - * href="https://cwiki.apache.org/confluence/x/Tws4EQ">PIP-16</a>. - * - * @param input A map where the key represents which file the DeletionVector belongs to, and the - * value is the corresponding DeletionVector object. - * @return A Pair object specifying the name of the written new file and a map where the key - * represents which file the DeletionVector belongs to and the value is a Pair object - * specifying the range (start position and size) within the file where the deletion vector - * data is located. - * @throws UncheckedIOException If an I/O error occurs while writing to the file. - */ - public List<IndexFileMeta> write(Map<String, DeletionVector> input) { + public IndexFileMeta writeSingleFile(Map<String, DeletionVector> input) { + try { + return createWriter().writeSingleFile(input); + } catch (IOException e) { + throw new RuntimeException("Failed to write deletion vectors.", e); + } + } + + public List<IndexFileMeta> writeWithRolling(Map<String, DeletionVector> input) { try { - DeletionVectorIndexFileWriter writer = - new DeletionVectorIndexFileWriter( - this.fileIO, this.pathFactory, this.targetSizePerIndexFile); - return writer.write(input); + return createWriter().writeWithRolling(input); } catch (IOException e) { throw new RuntimeException("Failed to write deletion vectors.", e); } } + private DeletionVectorIndexFileWriter createWriter() { + return new DeletionVectorIndexFileWriter( + this.fileIO, this.pathFactory, this.targetSizePerIndexFile); + } + private void checkVersion(InputStream in) throws IOException { int version = in.read(); if (version != VERSION_ID_V1) { diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeleteFileMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeleteFileMaintainer.java index 38e1176de7..70e00bc4ad 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeleteFileMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeleteFileMaintainer.java @@ -22,9 +22,7 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile; -import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.fs.Path; -import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.IndexManifestEntry; @@ -36,58 +34,48 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; import static org.apache.paimon.table.BucketMode.UNAWARE_BUCKET; /** A {@link BaseAppendDeleteFileMaintainer} of unaware bucket append table. */ public class AppendDeleteFileMaintainer implements BaseAppendDeleteFileMaintainer { - private final IndexFileHandler indexFileHandler; + private final DeletionVectorsIndexFile dvIndexFile; private final BinaryRow partition; private final Map<String, DeletionFile> dataFileToDeletionFile; - private final Map<String, IndexManifestEntry> indexNameToEntry = new HashMap<>(); - - private final Map<String, Map<String, DeletionFile>> indexFileToDeletionFiles = new HashMap<>(); - private final Map<String, String> dataFileToIndexFile = new HashMap<>(); - - private final Set<String> touchedIndexFiles = new HashSet<>(); - - private final DeletionVectorsMaintainer maintainer; + private final Map<String, IndexManifestEntry> indexNameToEntry; + private final Map<String, Map<String, DeletionFile>> indexFileToDeletionFiles; + private final Map<String, String> dataFileToIndexFile; + private final Set<String> touchedIndexFiles; + private final Map<String, DeletionVector> deletionVectors; AppendDeleteFileMaintainer( - IndexFileHandler indexFileHandler, + DeletionVectorsIndexFile dvIndexFile, BinaryRow partition, + List<IndexManifestEntry> manifestEntries, Map<String, DeletionFile> deletionFiles) { - this.indexFileHandler = indexFileHandler; + this.dvIndexFile = dvIndexFile; this.partition = partition; this.dataFileToDeletionFile = new HashMap<>(deletionFiles); - // the deletion of data files is independent - // just create an empty maintainer - this.maintainer = new DeletionVectorsMaintainer.Factory(indexFileHandler).create(); - - List<String> touchedIndexFileNames = - deletionFiles.values().stream() - .map(deletionFile -> new Path(deletionFile.path()).getName()) - .distinct() - .collect(Collectors.toList()); - indexFileHandler.scanEntries().stream() - .filter( - indexManifestEntry -> - touchedIndexFileNames.contains( - indexManifestEntry.indexFile().fileName())) - .forEach(entry -> indexNameToEntry.put(entry.indexFile().fileName(), entry)); + this.deletionVectors = new HashMap<>(); + + this.indexNameToEntry = new HashMap<>(); + for (IndexManifestEntry entry : manifestEntries) { + indexNameToEntry.put(entry.indexFile().fileName(), entry); + } + this.indexFileToDeletionFiles = new HashMap<>(); + this.dataFileToIndexFile = new HashMap<>(); for (String dataFile : deletionFiles.keySet()) { DeletionFile deletionFile = deletionFiles.get(dataFile); String indexFileName = new Path(deletionFile.path()).getName(); - if (!indexFileToDeletionFiles.containsKey(indexFileName)) { - indexFileToDeletionFiles.put(indexFileName, new HashMap<>()); - } - indexFileToDeletionFiles.get(indexFileName).put(dataFile, deletionFile); + indexFileToDeletionFiles + .computeIfAbsent(indexFileName, k -> new HashMap<>()) + .put(dataFile, deletionFile); dataFileToIndexFile.put(dataFile, indexFileName); } + this.touchedIndexFiles = new HashSet<>(); } @Override @@ -111,7 +99,7 @@ public class AppendDeleteFileMaintainer implements BaseAppendDeleteFileMaintaine public DeletionVector getDeletionVector(String dataFile) { DeletionFile deletionFile = getDeletionFile(dataFile); if (deletionFile != null) { - return indexFileHandler.deletionVectorsIndex().readDeletionVector(deletionFile); + return dvIndexFile.readDeletionVector(deletionFile); } return null; } @@ -129,28 +117,26 @@ public class AppendDeleteFileMaintainer implements BaseAppendDeleteFileMaintaine @Override public void notifyNewDeletionVector(String dataFile, DeletionVector deletionVector) { - DeletionVectorsIndexFile deletionVectorsIndexFile = indexFileHandler.deletionVectorsIndex(); DeletionFile previous = notifyRemovedDeletionVector(dataFile); if (previous != null) { - deletionVector.merge(deletionVectorsIndexFile.readDeletionVector(previous)); + deletionVector.merge(dvIndexFile.readDeletionVector(previous)); } - maintainer.notifyNewDeletion(dataFile, deletionVector); + deletionVectors.put(dataFile, deletionVector); } @Override public List<IndexManifestEntry> persist() { List<IndexManifestEntry> result = writeUnchangedDeletionVector(); - List<IndexManifestEntry> newIndexFileEntries = - maintainer.writeDeletionVectorsIndex().stream() - .map( - fileMeta -> - new IndexManifestEntry( - FileKind.ADD, partition, UNAWARE_BUCKET, fileMeta)) - .collect(Collectors.toList()); - result.addAll(newIndexFileEntries); + dvIndexFile.writeWithRolling(deletionVectors).stream() + .map(this::toAddEntry) + .forEach(result::add); return result; } + private IndexManifestEntry toAddEntry(IndexFileMeta file) { + return new IndexManifestEntry(FileKind.ADD, partition, UNAWARE_BUCKET, file); + } + public String getIndexFilePath(String dataFile) { DeletionFile deletionFile = getDeletionFile(dataFile); return deletionFile == null ? null : deletionFile.path(); @@ -158,7 +144,6 @@ public class AppendDeleteFileMaintainer implements BaseAppendDeleteFileMaintaine @VisibleForTesting List<IndexManifestEntry> writeUnchangedDeletionVector() { - DeletionVectorsIndexFile deletionVectorsIndexFile = indexFileHandler.deletionVectorsIndex(); List<IndexManifestEntry> newIndexEntries = new ArrayList<>(); for (String indexFile : indexFileToDeletionFiles.keySet()) { if (touchedIndexFiles.contains(indexFile)) { @@ -169,17 +154,10 @@ public class AppendDeleteFileMaintainer implements BaseAppendDeleteFileMaintaine indexFileToDeletionFiles.get(indexFile); if (!dataFileToDeletionFiles.isEmpty()) { List<IndexFileMeta> newIndexFiles = - indexFileHandler.writeDeletionVectorsIndex( - deletionVectorsIndexFile.readDeletionVector( - dataFileToDeletionFiles)); + dvIndexFile.writeWithRolling( + dvIndexFile.readDeletionVector(dataFileToDeletionFiles)); newIndexFiles.forEach( - newIndexFile -> - newIndexEntries.add( - new IndexManifestEntry( - FileKind.ADD, - oldEntry.partition(), - oldEntry.bucket(), - newIndexFile))); + newIndexFile -> newIndexEntries.add(toAddEntry(newIndexFile))); } // mark the touched index file as removed. diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java index 6afffda970..ae3d3e8a97 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java @@ -20,8 +20,9 @@ package org.apache.paimon.deletionvectors.append; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.deletionvectors.DeletionVector; -import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.index.DeletionVectorMeta; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.manifest.IndexManifestEntry; @@ -29,11 +30,14 @@ import org.apache.paimon.table.source.DeletionFile; import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; -import static org.apache.paimon.table.BucketMode.UNAWARE_BUCKET; +import static org.apache.paimon.utils.Preconditions.checkNotNull; /** * A maintainer to maintain deletion files for append table, the core methods: @@ -63,15 +67,33 @@ public interface BaseAppendDeleteFileMaintainer { // overwrite the entire deletion file of the bucket when writing deletes. List<IndexFileMeta> indexFiles = indexFileHandler.scan(snapshot, DELETION_VECTORS_INDEX, partition, bucket); - DeletionVectorsMaintainer maintainer = - new DeletionVectorsMaintainer.Factory(indexFileHandler).create(indexFiles); + BucketedDvMaintainer maintainer = + BucketedDvMaintainer.factory(indexFileHandler).create(indexFiles); return new BucketedAppendDeleteFileMaintainer(partition, bucket, maintainer); } static AppendDeleteFileMaintainer forUnawareAppend( IndexFileHandler indexFileHandler, @Nullable Snapshot snapshot, BinaryRow partition) { - Map<String, DeletionFile> deletionFiles = - indexFileHandler.scanDVIndex(snapshot, partition, UNAWARE_BUCKET); - return new AppendDeleteFileMaintainer(indexFileHandler, partition, deletionFiles); + List<IndexManifestEntry> manifestEntries = + indexFileHandler.scan(snapshot, DELETION_VECTORS_INDEX).stream() + .filter(e -> e.partition().equals(partition)) + .collect(Collectors.toList()); + Map<String, DeletionFile> deletionFiles = new HashMap<>(); + for (IndexManifestEntry file : manifestEntries) { + IndexFileMeta meta = file.indexFile(); + LinkedHashMap<String, DeletionVectorMeta> dvMetas = meta.deletionVectorMetas(); + checkNotNull(dvMetas); + for (DeletionVectorMeta dvMeta : dvMetas.values()) { + deletionFiles.put( + dvMeta.dataFileName(), + new DeletionFile( + indexFileHandler.filePath(meta).toString(), + dvMeta.offset(), + dvMeta.length(), + dvMeta.cardinality())); + } + } + return new AppendDeleteFileMaintainer( + indexFileHandler.deletionVectorsIndex(), partition, manifestEntries, deletionFiles); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeleteFileMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeleteFileMaintainer.java index 847caae499..7245ebfe12 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeleteFileMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeleteFileMaintainer.java @@ -19,23 +19,23 @@ package org.apache.paimon.deletionvectors.append; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.deletionvectors.DeletionVector; -import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.IndexManifestEntry; +import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; /** A {@link BaseAppendDeleteFileMaintainer} of bucketed append table. */ public class BucketedAppendDeleteFileMaintainer implements BaseAppendDeleteFileMaintainer { private final BinaryRow partition; private final int bucket; - private final DeletionVectorsMaintainer maintainer; + private final BucketedDvMaintainer maintainer; BucketedAppendDeleteFileMaintainer( - BinaryRow partition, int bucket, DeletionVectorsMaintainer maintainer) { + BinaryRow partition, int bucket, BucketedDvMaintainer maintainer) { this.partition = partition; this.bucket = bucket; this.maintainer = maintainer; @@ -58,8 +58,11 @@ public class BucketedAppendDeleteFileMaintainer implements BaseAppendDeleteFileM @Override public List<IndexManifestEntry> persist() { - return maintainer.writeDeletionVectorsIndex().stream() + List<IndexManifestEntry> result = new ArrayList<>(); + maintainer + .writeDeletionVectorsIndex() .map(fileMeta -> new IndexManifestEntry(FileKind.ADD, partition, bucket, fileMeta)) - .collect(Collectors.toList()); + .ifPresent(result::add); + return result; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java index 1d18bde05f..38c4df0ae9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java @@ -25,20 +25,16 @@ import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.IndexManifestFile; -import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.utils.IntIterator; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.PathFactory; import org.apache.paimon.utils.SnapshotManager; -import javax.annotation.Nullable; - import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -47,7 +43,6 @@ import java.util.Set; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.apache.paimon.index.HashIndexFile.HASH_INDEX; import static org.apache.paimon.utils.Preconditions.checkArgument; -import static org.apache.paimon.utils.Preconditions.checkNotNull; /** Handle index files. */ public class IndexFileHandler { @@ -85,37 +80,6 @@ public class IndexFileHandler { return result.isEmpty() ? Optional.empty() : Optional.of(result.get(0)); } - public Map<String, DeletionFile> scanDVIndex( - @Nullable Snapshot snapshot, BinaryRow partition, int bucket) { - if (snapshot == null) { - return Collections.emptyMap(); - } - String indexManifest = snapshot.indexManifest(); - if (indexManifest == null) { - return Collections.emptyMap(); - } - Map<String, DeletionFile> result = new HashMap<>(); - for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) { - IndexFileMeta meta = file.indexFile(); - if (meta.indexType().equals(DELETION_VECTORS_INDEX) - && file.partition().equals(partition) - && file.bucket() == bucket) { - LinkedHashMap<String, DeletionVectorMeta> dvMetas = meta.deletionVectorMetas(); - checkNotNull(dvMetas); - for (DeletionVectorMeta dvMeta : dvMetas.values()) { - result.put( - dvMeta.dataFileName(), - new DeletionFile( - filePath(meta).toString(), - dvMeta.offset(), - dvMeta.length(), - dvMeta.cardinality())); - } - } - } - return result; - } - public List<IndexManifestEntry> scan(String indexType) { return scan(snapshotManager.latestSnapshot(), indexType); } @@ -189,18 +153,10 @@ public class IndexFileHandler { public List<IndexManifestEntry> scanEntries( Snapshot snapshot, String indexType, Set<BinaryRow> partitions) { - if (snapshot == null) { - return Collections.emptyList(); - } - String indexManifest = snapshot.indexManifest(); - if (indexManifest == null) { - return Collections.emptyList(); - } - + List<IndexManifestEntry> manifestEntries = scan(snapshot, indexType); List<IndexManifestEntry> result = new ArrayList<>(); - for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) { - if (file.indexFile().indexType().equals(indexType) - && partitions.contains(file.partition())) { + for (IndexManifestEntry file : manifestEntries) { + if (partitions.contains(file.partition())) { result.add(file); } } @@ -289,9 +245,4 @@ public class IndexFileHandler { } return deletionVectorsIndex.readAllDeletionVectors(fileMetas); } - - public List<IndexFileMeta> writeDeletionVectorsIndex( - Map<String, DeletionVector> deletionVectors) { - return deletionVectorsIndex.write(deletionVectors); - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java index 0ab0981963..48d0a0ca24 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java @@ -19,7 +19,7 @@ package org.apache.paimon.mergetree.compact; import org.apache.paimon.compact.CompactUnit; -import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.RecordLevelExpire; import org.apache.paimon.mergetree.LevelSortedRun; @@ -54,7 +54,7 @@ public interface CompactStrategy { int numLevels, List<LevelSortedRun> runs, @Nullable RecordLevelExpire recordLevelExpire, - @Nullable DeletionVectorsMaintainer dvMaintainer, + @Nullable BucketedDvMaintainer dvMaintainer, boolean forceRewriteAllFiles) { int maxLevel = numLevels - 1; if (runs.isEmpty()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java index ba427f7e92..6c9376ae33 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java @@ -21,7 +21,7 @@ package org.apache.paimon.mergetree.compact; import org.apache.paimon.KeyValue; import org.apache.paimon.codegen.RecordEqualiser; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.lookup.LookupStrategy; import org.apache.paimon.mergetree.LookupLevels.PositionedKeyValue; import org.apache.paimon.types.RowKind; @@ -64,7 +64,7 @@ public class LookupChangelogMergeFunctionWrapper<T> private final KeyValue reusedAfter = new KeyValue(); @Nullable private final RecordEqualiser valueEqualiser; private final LookupStrategy lookupStrategy; - private final @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer; + private final @Nullable BucketedDvMaintainer deletionVectorsMaintainer; private final Comparator<KeyValue> comparator; public LookupChangelogMergeFunctionWrapper( @@ -72,7 +72,7 @@ public class LookupChangelogMergeFunctionWrapper<T> Function<InternalRow, T> lookup, @Nullable RecordEqualiser valueEqualiser, LookupStrategy lookupStrategy, - @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer, + @Nullable BucketedDvMaintainer deletionVectorsMaintainer, @Nullable UserDefinedSeqComparator userDefinedSeqComparator) { MergeFunction<KeyValue> mergeFunction = mergeFunctionFactory.create(); checkArgument( diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java index feb6d0b10b..896fba0d60 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java @@ -23,7 +23,7 @@ import org.apache.paimon.CoreOptions.MergeEngine; import org.apache.paimon.KeyValue; import org.apache.paimon.codegen.RecordEqualiser; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.FileReaderFactory; import org.apache.paimon.io.KeyValueFileWriterFactory; @@ -56,7 +56,7 @@ public class LookupMergeTreeCompactRewriter<T> extends ChangelogMergeTreeRewrite private final LookupLevels<T> lookupLevels; private final MergeFunctionWrapperFactory<T> wrapperFactory; private final boolean noSequenceField; - @Nullable private final DeletionVectorsMaintainer dvMaintainer; + @Nullable private final BucketedDvMaintainer dvMaintainer; private final IntFunction<String> level2FileFormat; public LookupMergeTreeCompactRewriter( @@ -71,7 +71,7 @@ public class LookupMergeTreeCompactRewriter<T> extends ChangelogMergeTreeRewrite MergeSorter mergeSorter, MergeFunctionWrapperFactory<T> wrapperFactory, boolean produceChangelog, - @Nullable DeletionVectorsMaintainer dvMaintainer, + @Nullable BucketedDvMaintainer dvMaintainer, CoreOptions options) { super( maxLevel, @@ -156,7 +156,7 @@ public class LookupMergeTreeCompactRewriter<T> extends ChangelogMergeTreeRewrite MergeFunctionFactory<KeyValue> mfFactory, int outputLevel, LookupLevels<T> lookupLevels, - @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer); + @Nullable BucketedDvMaintainer deletionVectorsMaintainer); } /** A normal {@link MergeFunctionWrapperFactory} to create lookup wrapper. */ @@ -181,7 +181,7 @@ public class LookupMergeTreeCompactRewriter<T> extends ChangelogMergeTreeRewrite MergeFunctionFactory<KeyValue> mfFactory, int outputLevel, LookupLevels<T> lookupLevels, - @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) { + @Nullable BucketedDvMaintainer deletionVectorsMaintainer) { return new LookupChangelogMergeFunctionWrapper<>( mfFactory, key -> { @@ -207,7 +207,7 @@ public class LookupMergeTreeCompactRewriter<T> extends ChangelogMergeTreeRewrite MergeFunctionFactory<KeyValue> mfFactory, int outputLevel, LookupLevels<Boolean> lookupLevels, - @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) { + @Nullable BucketedDvMaintainer deletionVectorsMaintainer) { return new FirstRowMergeFunctionWrapper( mfFactory, key -> { diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java index 987fa06a30..f0909565ee 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java @@ -26,7 +26,7 @@ import org.apache.paimon.compact.CompactResult; import org.apache.paimon.compact.CompactTask; import org.apache.paimon.compact.CompactUnit; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.RecordLevelExpire; import org.apache.paimon.mergetree.LevelSortedRun; @@ -63,7 +63,7 @@ public class MergeTreeCompactManager extends CompactFutureManager { private final CompactRewriter rewriter; @Nullable private final CompactionMetrics.Reporter metricsReporter; - @Nullable private final DeletionVectorsMaintainer dvMaintainer; + @Nullable private final BucketedDvMaintainer dvMaintainer; private final boolean lazyGenDeletionFile; private final boolean needLookup; private final boolean forceRewriteAllFiles; @@ -79,7 +79,7 @@ public class MergeTreeCompactManager extends CompactFutureManager { int numSortedRunStopTrigger, CompactRewriter rewriter, @Nullable CompactionMetrics.Reporter metricsReporter, - @Nullable DeletionVectorsMaintainer dvMaintainer, + @Nullable BucketedDvMaintainer dvMaintainer, boolean lazyGenDeletionFile, boolean needLookup, @Nullable RecordLevelExpire recordLevelExpire, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index 2e61c3d2f1..15af7eb4fd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -24,7 +24,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.compact.CompactDeletionFile; import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.disk.IOManager; import org.apache.paimon.index.DynamicBucketIndexMaintainer; import org.apache.paimon.index.IndexFileHandler; @@ -73,7 +73,7 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> { private final int writerNumberMax; @Nullable private final DynamicBucketIndexMaintainer.Factory dbMaintainerFactory; - @Nullable private final DeletionVectorsMaintainer.Factory dvMaintainerFactory; + @Nullable private final BucketedDvMaintainer.Factory dvMaintainerFactory; private final int numBuckets; private final RowType partitionType; @@ -95,7 +95,7 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> { SnapshotManager snapshotManager, FileStoreScan scan, @Nullable DynamicBucketIndexMaintainer.Factory dbMaintainerFactory, - @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory, + @Nullable BucketedDvMaintainer.Factory dvMaintainerFactory, String tableName, CoreOptions options, RowType partitionType) { @@ -430,7 +430,7 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> { dbMaintainerFactory == null ? null : dbMaintainerFactory.create(restored.dynamicBucketIndex()); - DeletionVectorsMaintainer dvMaintainer = + BucketedDvMaintainer dvMaintainer = dvMaintainerFactory == null ? null : dvMaintainerFactory.create(restored.deleteVectorsIndex()); @@ -524,7 +524,7 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> { long restoredMaxSeqNumber, @Nullable CommitIncrement restoreIncrement, ExecutorService compactExecutor, - @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer); + @Nullable BucketedDvMaintainer deletionVectorsMaintainer); // force buffer spill to avoid out of memory in batch mode protected void forceBufferSpill() throws Exception {} @@ -538,7 +538,7 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> { public final RecordWriter<T> writer; public final int totalBuckets; @Nullable public final DynamicBucketIndexMaintainer dynamicBucketMaintainer; - @Nullable public final DeletionVectorsMaintainer deletionVectorsMaintainer; + @Nullable public final BucketedDvMaintainer deletionVectorsMaintainer; protected final long baseSnapshotId; protected long lastModifiedCommitIdentifier; @@ -546,7 +546,7 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> { RecordWriter<T> writer, int totalBuckets, @Nullable DynamicBucketIndexMaintainer dynamicBucketMaintainer, - @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer, + @Nullable BucketedDvMaintainer deletionVectorsMaintainer, Long baseSnapshotId) { this.writer = writer; this.totalBuckets = totalBuckets; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendFileStoreWrite.java index 3f1e701255..2df7c36934 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendFileStoreWrite.java @@ -23,7 +23,7 @@ import org.apache.paimon.compact.CompactManager; import org.apache.paimon.compact.NoopCompactManager; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.fs.FileIO; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.types.RowType; @@ -51,7 +51,6 @@ public class AppendFileStoreWrite extends BaseAppendFileStoreWrite { SnapshotManager snapshotManager, FileStoreScan scan, CoreOptions options, - @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory, String tableName) { super( fileIO, @@ -63,7 +62,7 @@ public class AppendFileStoreWrite extends BaseAppendFileStoreWrite { snapshotManager, scan, options, - dvMaintainerFactory, + null, tableName); super.withIgnorePreviousFiles(true); } @@ -74,7 +73,7 @@ public class AppendFileStoreWrite extends BaseAppendFileStoreWrite { int bucket, List<DataFileMeta> restoredFiles, ExecutorService compactExecutor, - @Nullable DeletionVectorsMaintainer dvMaintainer) { + @Nullable BucketedDvMaintainer dvMaintainer) { return new NoopCompactManager(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java index dca05a6522..3bd8264831 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java @@ -24,8 +24,8 @@ import org.apache.paimon.append.AppendOnlyWriter; import org.apache.paimon.compact.CompactManager; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.deletionvectors.DeletionVector; -import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.FileIO; @@ -88,7 +88,7 @@ public abstract class BaseAppendFileStoreWrite extends MemoryFileStoreWrite<Inte SnapshotManager snapshotManager, FileStoreScan scan, CoreOptions options, - @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory, + @Nullable BucketedDvMaintainer.Factory dvMaintainerFactory, String tableName) { super(snapshotManager, scan, options, partitionType, null, dvMaintainerFactory, tableName); this.fileIO = fileIO; @@ -111,7 +111,7 @@ public abstract class BaseAppendFileStoreWrite extends MemoryFileStoreWrite<Inte long restoredMaxSeqNumber, @Nullable CommitIncrement restoreIncrement, ExecutorService compactExecutor, - @Nullable DeletionVectorsMaintainer dvMaintainer) { + @Nullable BucketedDvMaintainer dvMaintainer) { return new AppendOnlyWriter( fileIO, ioManager, @@ -161,7 +161,7 @@ public abstract class BaseAppendFileStoreWrite extends MemoryFileStoreWrite<Inte int bucket, List<DataFileMeta> restoredFiles, ExecutorService compactExecutor, - @Nullable DeletionVectorsMaintainer dvMaintainer); + @Nullable BucketedDvMaintainer dvMaintainer); public List<DataFileMeta> compactRewrite( BinaryRow partition, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java index 9fffc0ce7b..71f10d4b3d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java @@ -24,8 +24,8 @@ import org.apache.paimon.compact.CompactManager; import org.apache.paimon.compact.NoopCompactManager; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.deletionvectors.DeletionVector; -import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.fs.FileIO; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.types.RowType; @@ -54,7 +54,7 @@ public class BucketedAppendFileStoreWrite extends BaseAppendFileStoreWrite { SnapshotManager snapshotManager, FileStoreScan scan, CoreOptions options, - @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory, + @Nullable BucketedDvMaintainer.Factory dvMaintainerFactory, String tableName) { super( fileIO, @@ -77,7 +77,7 @@ public class BucketedAppendFileStoreWrite extends BaseAppendFileStoreWrite { int bucket, List<DataFileMeta> restoredFiles, ExecutorService compactExecutor, - @Nullable DeletionVectorsMaintainer dvMaintainer) { + @Nullable BucketedDvMaintainer dvMaintainer) { if (options.writeOnly()) { return new NoopCompactManager(); } else { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java index 4c75fdbb73..b268f7b7a5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java @@ -20,7 +20,7 @@ package org.apache.paimon.operation; import org.apache.paimon.FileStore; import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.disk.IOManager; import org.apache.paimon.index.DynamicBucketIndexMaintainer; import org.apache.paimon.io.DataFileMeta; @@ -156,7 +156,7 @@ public interface FileStoreWrite<T> extends Restorable<List<FileStoreWrite.State< protected final List<DataFileMeta> dataFiles; protected final long maxSequenceNumber; @Nullable protected final DynamicBucketIndexMaintainer indexMaintainer; - @Nullable protected final DeletionVectorsMaintainer deletionVectorsMaintainer; + @Nullable protected final BucketedDvMaintainer deletionVectorsMaintainer; protected final CommitIncrement commitIncrement; protected State( @@ -168,7 +168,7 @@ public interface FileStoreWrite<T> extends Restorable<List<FileStoreWrite.State< Collection<DataFileMeta> dataFiles, long maxSequenceNumber, @Nullable DynamicBucketIndexMaintainer indexMaintainer, - @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer, + @Nullable BucketedDvMaintainer deletionVectorsMaintainer, CommitIncrement commitIncrement) { this.partition = partition; this.bucket = bucket; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 4e9e558d04..afd3fb906f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -29,8 +29,8 @@ import org.apache.paimon.compact.NoopCompactManager; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.RowCompactedSerializer; +import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.deletionvectors.DeletionVector; -import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.fs.FileIO; import org.apache.paimon.index.DynamicBucketIndexMaintainer; @@ -131,7 +131,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { SnapshotManager snapshotManager, FileStoreScan scan, @Nullable DynamicBucketIndexMaintainer.Factory dbMaintainerFactory, - @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory, + @Nullable BucketedDvMaintainer.Factory dvMaintainerFactory, CoreOptions options, KeyValueFieldsExtractor extractor, String tableName) { @@ -185,7 +185,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { long restoredMaxSeqNumber, @Nullable CommitIncrement restoreIncrement, ExecutorService compactExecutor, - @Nullable DeletionVectorsMaintainer dvMaintainer) { + @Nullable BucketedDvMaintainer dvMaintainer) { if (LOG.isDebugEnabled()) { LOG.debug( "Creating merge tree writer for partition {} bucket {} from restored files {}", @@ -260,7 +260,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { CompactStrategy compactStrategy, ExecutorService compactExecutor, Levels levels, - @Nullable DeletionVectorsMaintainer dvMaintainer) { + @Nullable BucketedDvMaintainer dvMaintainer) { if (options.writeOnly()) { return new NoopCompactManager(); } else { @@ -299,7 +299,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { Comparator<InternalRow> keyComparator, @Nullable FieldsComparator userDefinedSeqComparator, Levels levels, - @Nullable DeletionVectorsMaintainer dvMaintainer) { + @Nullable BucketedDvMaintainer dvMaintainer) { DeletionVector.Factory dvFactory = DeletionVector.factory(dvMaintainer); FileReaderFactory<KeyValue> readerFactory = readerFactoryBuilder.build(partition, bucket, dvFactory); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java index c91a664fad..06263d4ec6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java @@ -19,7 +19,7 @@ package org.apache.paimon.operation; import org.apache.paimon.CoreOptions; -import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.index.DynamicBucketIndexMaintainer; import org.apache.paimon.io.cache.CacheManager; import org.apache.paimon.memory.HeapMemorySegmentPool; @@ -65,7 +65,7 @@ public abstract class MemoryFileStoreWrite<T> extends AbstractFileStoreWrite<T> CoreOptions options, RowType partitionType, @Nullable DynamicBucketIndexMaintainer.Factory dbMaintainerFactory, - @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory, + @Nullable BucketedDvMaintainer.Factory dvMaintainerFactory, String tableName) { super( snapshotManager, diff --git a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java index da7287ad96..ac9a851cb2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java @@ -21,7 +21,7 @@ package org.apache.paimon.postpone; import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.format.FileFormat; import org.apache.paimon.format.avro.AvroSchemaConverter; import org.apache.paimon.fs.FileIO; @@ -180,7 +180,7 @@ public class PostponeBucketFileStoreWrite extends MemoryFileStoreWrite<KeyValue> long restoredMaxSeqNumber, @Nullable CommitIncrement restoreIncrement, ExecutorService compactExecutor, - @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) { + @Nullable BucketedDvMaintainer deletionVectorsMaintainer) { Preconditions.checkArgument(bucket == BucketMode.POSTPONE_BUCKET); Preconditions.checkArgument( restoreFiles.isEmpty(), diff --git a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java index a67b5c0a97..55ffdcbe53 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java @@ -19,7 +19,7 @@ package org.apache.paimon; import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.deletionvectors.append.AppendDeleteFileMaintainer; import org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainerHelper; import org.apache.paimon.fs.FileIO; @@ -44,6 +44,7 @@ import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.TraceableFileIO; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -126,10 +127,9 @@ public class TestAppendFileStore extends AppendOnlyFileStore { fileHandler, partition, dataFileToDeletionFiles); } - public DeletionVectorsMaintainer createOrRestoreDVMaintainer(BinaryRow partition, int bucket) { + public BucketedDvMaintainer createOrRestoreDVMaintainer(BinaryRow partition, int bucket) { Snapshot latestSnapshot = snapshotManager().latestSnapshot(); - DeletionVectorsMaintainer.Factory factory = - new DeletionVectorsMaintainer.Factory(fileHandler); + BucketedDvMaintainer.Factory factory = BucketedDvMaintainer.factory(fileHandler); List<IndexFileMeta> indexFiles = fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX, partition, bucket); return factory.create(indexFiles); @@ -137,19 +137,21 @@ public class TestAppendFileStore extends AppendOnlyFileStore { public CommitMessageImpl writeDVIndexFiles( BinaryRow partition, int bucket, Map<String, List<Integer>> dataFileToPositions) { - DeletionVectorsMaintainer dvMaintainer = createOrRestoreDVMaintainer(partition, bucket); + BucketedDvMaintainer dvMaintainer = createOrRestoreDVMaintainer(partition, bucket); for (Map.Entry<String, List<Integer>> entry : dataFileToPositions.entrySet()) { for (Integer pos : entry.getValue()) { dvMaintainer.notifyNewDeletion(entry.getKey(), pos); } } + List<IndexFileMeta> indexFiles = new ArrayList<>(); + dvMaintainer.writeDeletionVectorsIndex().ifPresent(indexFiles::add); return new CommitMessageImpl( partition, bucket, options().bucket(), DataIncrement.emptyIncrement(), CompactIncrement.emptyIncrement(), - new IndexIncrement(dvMaintainer.writeDeletionVectorsIndex())); + new IndexIncrement(indexFiles)); } public static TestAppendFileStore createAppendStore( diff --git a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java similarity index 83% rename from paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java rename to paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java index 7e82ad8d65..2128ae8714 100644 --- a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java @@ -51,8 +51,8 @@ import static java.util.Collections.emptyList; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.assertj.core.api.Assertions.assertThat; -/** Test for {@link DeletionVectorsMaintainer}. */ -public class DeletionVectorsMaintainerTest extends PrimaryKeyTableTestBase { +/** Test for {@link BucketedDvMaintainer}. */ +public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { private IndexFileHandler fileHandler; @ParameterizedTest @@ -60,9 +60,8 @@ public class DeletionVectorsMaintainerTest extends PrimaryKeyTableTestBase { public void test0(boolean bitmap64) { initIndexHandler(bitmap64); - DeletionVectorsMaintainer.Factory factory = - new DeletionVectorsMaintainer.Factory(fileHandler); - DeletionVectorsMaintainer dvMaintainer = factory.create(emptyList()); + BucketedDvMaintainer.Factory factory = BucketedDvMaintainer.factory(fileHandler); + BucketedDvMaintainer dvMaintainer = factory.create(emptyList()); assertThat(dvMaintainer.bitmap64).isEqualTo(bitmap64); dvMaintainer.notifyNewDeletion("f1", 1); @@ -72,9 +71,10 @@ public class DeletionVectorsMaintainerTest extends PrimaryKeyTableTestBase { assertThat(dvMaintainer.deletionVectorOf("f1")).isPresent(); assertThat(dvMaintainer.deletionVectorOf("f3")).isEmpty(); - List<IndexFileMeta> fileMetas = dvMaintainer.writeDeletionVectorsIndex(); + IndexFileMeta file = dvMaintainer.writeDeletionVectorsIndex().get(); - Map<String, DeletionVector> deletionVectors = fileHandler.readAllDeletionVectors(fileMetas); + Map<String, DeletionVector> deletionVectors = + fileHandler.readAllDeletionVectors(Collections.singletonList(file)); assertThat(deletionVectors.get("f1").isDeleted(1)).isTrue(); assertThat(deletionVectors.get("f1").isDeleted(2)).isFalse(); assertThat(deletionVectors.get("f2").isDeleted(1)).isFalse(); @@ -87,10 +87,9 @@ public class DeletionVectorsMaintainerTest extends PrimaryKeyTableTestBase { public void test1(boolean bitmap64) { initIndexHandler(bitmap64); - DeletionVectorsMaintainer.Factory factory = - new DeletionVectorsMaintainer.Factory(fileHandler); + BucketedDvMaintainer.Factory factory = BucketedDvMaintainer.factory(fileHandler); - DeletionVectorsMaintainer dvMaintainer = factory.create(); + BucketedDvMaintainer dvMaintainer = factory.create(); DeletionVector deletionVector1 = createDeletionVector(bitmap64); deletionVector1.delete(1); deletionVector1.delete(3); @@ -98,8 +97,7 @@ public class DeletionVectorsMaintainerTest extends PrimaryKeyTableTestBase { dvMaintainer.notifyNewDeletion("f1", deletionVector1); assertThat(dvMaintainer.bitmap64()).isEqualTo(bitmap64); - List<IndexFileMeta> fileMetas1 = dvMaintainer.writeDeletionVectorsIndex(); - assertThat(fileMetas1.size()).isEqualTo(1); + IndexFileMeta file = dvMaintainer.writeDeletionVectorsIndex().get(); CommitMessage commitMessage = new CommitMessageImpl( BinaryRow.EMPTY_ROW, @@ -107,7 +105,7 @@ public class DeletionVectorsMaintainerTest extends PrimaryKeyTableTestBase { 1, DataIncrement.emptyIncrement(), CompactIncrement.emptyIncrement(), - new IndexIncrement(fileMetas1)); + new IndexIncrement(Collections.singletonList(file))); BatchTableCommit commit = table.newBatchWriteBuilder().newCommit(); commit.commit(Collections.singletonList(commitMessage)); @@ -122,8 +120,7 @@ public class DeletionVectorsMaintainerTest extends PrimaryKeyTableTestBase { deletionVector2.delete(2); dvMaintainer.notifyNewDeletion("f1", deletionVector2); - List<IndexFileMeta> fileMetas2 = dvMaintainer.writeDeletionVectorsIndex(); - assertThat(fileMetas2.size()).isEqualTo(1); + file = dvMaintainer.writeDeletionVectorsIndex().get(); commitMessage = new CommitMessageImpl( BinaryRow.EMPTY_ROW, @@ -131,7 +128,7 @@ public class DeletionVectorsMaintainerTest extends PrimaryKeyTableTestBase { 1, DataIncrement.emptyIncrement(), CompactIncrement.emptyIncrement(), - new IndexIncrement(fileMetas2)); + new IndexIncrement(Collections.singletonList(file))); commit = table.newBatchWriteBuilder().newCommit(); commit.commit(Collections.singletonList(commitMessage)); @@ -149,9 +146,8 @@ public class DeletionVectorsMaintainerTest extends PrimaryKeyTableTestBase { public void testCompactDeletion(boolean bitmap64) throws IOException { initIndexHandler(bitmap64); - DeletionVectorsMaintainer.Factory factory = - new DeletionVectorsMaintainer.Factory(fileHandler); - DeletionVectorsMaintainer dvMaintainer = factory.create(emptyList()); + BucketedDvMaintainer.Factory factory = BucketedDvMaintainer.factory(fileHandler); + BucketedDvMaintainer dvMaintainer = factory.create(emptyList()); File indexDir = new File(tempPath.toFile(), "/default.db/T/index"); @@ -191,17 +187,15 @@ public class DeletionVectorsMaintainerTest extends PrimaryKeyTableTestBase { public void testReadAndWriteMixedDv(boolean bitmap64) { // write first kind dv initIndexHandler(bitmap64); - DeletionVectorsMaintainer.Factory factory1 = - new DeletionVectorsMaintainer.Factory(fileHandler); - DeletionVectorsMaintainer dvMaintainer1 = factory1.create(); + BucketedDvMaintainer.Factory factory1 = BucketedDvMaintainer.factory(fileHandler); + BucketedDvMaintainer dvMaintainer1 = factory1.create(); dvMaintainer1.notifyNewDeletion("f1", 1); dvMaintainer1.notifyNewDeletion("f1", 3); dvMaintainer1.notifyNewDeletion("f2", 1); dvMaintainer1.notifyNewDeletion("f2", 3); assertThat(dvMaintainer1.bitmap64()).isEqualTo(bitmap64); - List<IndexFileMeta> fileMetas1 = dvMaintainer1.writeDeletionVectorsIndex(); - assertThat(fileMetas1.size()).isEqualTo(1); + IndexFileMeta file = dvMaintainer1.writeDeletionVectorsIndex().get(); CommitMessage commitMessage1 = new CommitMessageImpl( BinaryRow.EMPTY_ROW, @@ -209,21 +203,20 @@ public class DeletionVectorsMaintainerTest extends PrimaryKeyTableTestBase { 1, DataIncrement.emptyIncrement(), CompactIncrement.emptyIncrement(), - new IndexIncrement(fileMetas1)); + new IndexIncrement(Collections.singletonList(file))); BatchTableCommit commit1 = table.newBatchWriteBuilder().newCommit(); commit1.commit(Collections.singletonList(commitMessage1)); // write second kind dv initIndexHandler(!bitmap64); - DeletionVectorsMaintainer.Factory factory2 = - new DeletionVectorsMaintainer.Factory(fileHandler); + BucketedDvMaintainer.Factory factory2 = BucketedDvMaintainer.factory(fileHandler); List<IndexFileMeta> indexFiles = fileHandler.scan( table.latestSnapshot().get(), DELETION_VECTORS_INDEX, BinaryRow.EMPTY_ROW, 0); - DeletionVectorsMaintainer dvMaintainer2 = factory2.create(indexFiles); + BucketedDvMaintainer dvMaintainer2 = factory2.create(indexFiles); dvMaintainer2.notifyNewDeletion("f1", 10); dvMaintainer2.notifyNewDeletion("f3", 1); dvMaintainer2.notifyNewDeletion("f3", 3); @@ -238,8 +231,7 @@ public class DeletionVectorsMaintainerTest extends PrimaryKeyTableTestBase { assertThat(dvs.get("f3")) .isInstanceOf(bitmap64 ? BitmapDeletionVector.class : Bitmap64DeletionVector.class); - List<IndexFileMeta> fileMetas2 = dvMaintainer2.writeDeletionVectorsIndex(); - assertThat(fileMetas2.size()).isEqualTo(1); + file = dvMaintainer2.writeDeletionVectorsIndex().get(); CommitMessage commitMessage2 = new CommitMessageImpl( BinaryRow.EMPTY_ROW, @@ -247,7 +239,7 @@ public class DeletionVectorsMaintainerTest extends PrimaryKeyTableTestBase { 1, DataIncrement.emptyIncrement(), CompactIncrement.emptyIncrement(), - new IndexIncrement(fileMetas2)); + new IndexIncrement(Collections.singletonList(file))); BatchTableCommit commit2 = table.newBatchWriteBuilder().newCommit(); commit2.commit(Collections.singletonList(commitMessage2)); @@ -278,8 +270,8 @@ public class DeletionVectorsMaintainerTest extends PrimaryKeyTableTestBase { fileHandler = table.store().newIndexFileHandler(); } - public static DeletionVectorsMaintainer createOrRestore( - DeletionVectorsMaintainer.Factory factory, + public static BucketedDvMaintainer createOrRestore( + BucketedDvMaintainer.Factory factory, @Nullable Snapshot snapshot, BinaryRow partition) { IndexFileHandler handler = factory.indexFileHandler(); diff --git a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java index 2b55cba306..d30462c25d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java @@ -72,7 +72,7 @@ public class DeletionVectorsIndexFileTest { index3.delete(3); deleteMap.put("file33.parquet", index3); - List<IndexFileMeta> indexFiles = deletionVectorsIndexFile.write(deleteMap); + List<IndexFileMeta> indexFiles = deletionVectorsIndexFile.writeWithRolling(deleteMap); assertThat(indexFiles.size()).isEqualTo(1); // read @@ -110,7 +110,7 @@ public class DeletionVectorsIndexFileTest { } // read - List<IndexFileMeta> indexFiles = deletionVectorsIndexFile.write(deleteMap); + List<IndexFileMeta> indexFiles = deletionVectorsIndexFile.writeWithRolling(deleteMap); assertThat(indexFiles.size()).isEqualTo(1); Map<String, DeletionVector> dvs = deletionVectorsIndexFile.readAllDeletionVectors(indexFiles); @@ -142,7 +142,7 @@ public class DeletionVectorsIndexFileTest { fileToCardinality.put("f" + i, index.getCardinality()); fileToDV.put("f" + i, index); } - List<IndexFileMeta> indexFiles = deletionVectorsIndexFile.write(fileToDV); + List<IndexFileMeta> indexFiles = deletionVectorsIndexFile.writeWithRolling(fileToDV); // read assertThat(indexFiles.size()).isEqualTo(1); @@ -174,7 +174,7 @@ public class DeletionVectorsIndexFileTest { fileToCardinality.put("f" + i, index.getCardinality()); fileToDV.put("f" + i, index); } - List<IndexFileMeta> indexFiles = deletionVectorsIndexFile.write(fileToDV); + List<IndexFileMeta> indexFiles = deletionVectorsIndexFile.writeWithRolling(fileToDV); // assert 1 assertThat(indexFiles.size()).isEqualTo(3); @@ -196,7 +196,7 @@ public class DeletionVectorsIndexFileTest { fileToCardinality.put("f" + i, index.getCardinality()); fileToDV.put("f" + i, index); } - indexFiles = deletionVectorsIndexFile.write(fileToDV); + indexFiles = deletionVectorsIndexFile.writeWithRolling(fileToDV); // assert 2 assertThat(indexFiles.size()).isGreaterThan(1); @@ -226,7 +226,7 @@ public class DeletionVectorsIndexFileTest { deleteMap1.put(String.format("file%s.parquet", i), index); deleteInteger.put(String.format("file%s.parquet", i), num); } - List<IndexFileMeta> indexFiles1 = v1DeletionVectorsIndexFile.write(deleteMap1); + List<IndexFileMeta> indexFiles1 = v1DeletionVectorsIndexFile.writeWithRolling(deleteMap1); assertThat(indexFiles1.size()).isEqualTo(1); // write v2 dv @@ -238,7 +238,7 @@ public class DeletionVectorsIndexFileTest { deleteMap2.put(String.format("file%s.parquet", i), index); deleteInteger.put(String.format("file%s.parquet", i), num); } - List<IndexFileMeta> indexFiles2 = v2DeletionVectorsIndexFile.write(deleteMap2); + List<IndexFileMeta> indexFiles2 = v2DeletionVectorsIndexFile.writeWithRolling(deleteMap2); assertThat(indexFiles2.size()).isEqualTo(1); List<IndexFileMeta> totalIndexFiles = @@ -274,7 +274,7 @@ public class DeletionVectorsIndexFileTest { index1.delete(100); deleteMap.put("file1.parquet", index1); - List<IndexFileMeta> indexFiles = deletionVectorsIndexFile.write(deleteMap); + List<IndexFileMeta> indexFiles = deletionVectorsIndexFile.writeWithRolling(deleteMap); assertThat(indexFiles.size()).isEqualTo(1); IndexFileMeta indexFileMeta = indexFiles.get(0); diff --git a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java index c6048b95f4..892d219278 100644 --- a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java +++ b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java @@ -19,10 +19,14 @@ package org.apache.paimon.deletionvectors.append; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; +import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.table.source.DeletionFile; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** Helper for {@link BaseAppendDeleteFileMaintainer}. */ public class AppendDeletionFileMaintainerHelper { @@ -31,6 +35,19 @@ public class AppendDeletionFileMaintainerHelper { IndexFileHandler indexFileHandler, BinaryRow partition, Map<String, DeletionFile> deletionFiles) { - return new AppendDeleteFileMaintainer(indexFileHandler, partition, deletionFiles); + List<String> touchedIndexFileNames = + deletionFiles.values().stream() + .map(deletionFile -> new Path(deletionFile.path()).getName()) + .distinct() + .collect(Collectors.toList()); + List<IndexManifestEntry> manifests = + indexFileHandler.scanEntries().stream() + .filter( + indexManifestEntry -> + touchedIndexFileNames.contains( + indexManifestEntry.indexFile().fileName())) + .collect(Collectors.toList()); + return new AppendDeleteFileMaintainer( + indexFileHandler.deletionVectorsIndex(), partition, manifests, deletionFiles); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index 23e942d3f4..5701106be4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -25,8 +25,8 @@ import org.apache.paimon.TestAppendFileStore; import org.apache.paimon.TestFileStore; import org.apache.paimon.TestKeyValueGenerator; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.deletionvectors.DeletionVector; -import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.index.IndexFileHandler; @@ -890,8 +890,7 @@ public class FileStoreCommitTest { // assert 1 assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW, 0).size()).isEqualTo(2); - DeletionVectorsMaintainer maintainer = - store.createOrRestoreDVMaintainer(BinaryRow.EMPTY_ROW, 0); + BucketedDvMaintainer maintainer = store.createOrRestoreDVMaintainer(BinaryRow.EMPTY_ROW, 0); Map<String, DeletionVector> dvs = maintainer.deletionVectors(); assertThat(dvs.size()).isEqualTo(2); assertThat(dvs.get("f2").isDeleted(2)).isTrue(); diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala index f85213da7c..445d1507d9 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala @@ -19,7 +19,7 @@ package org.apache.paimon.spark.sql import org.apache.paimon.data.BinaryRow -import org.apache.paimon.deletionvectors.{DeletionVector, DeletionVectorsMaintainer, DeletionVectorsMaintainerTest} +import org.apache.paimon.deletionvectors.{BucketedDvMaintainer, BucketedDvMaintainerTest, DeletionVector} import org.apache.paimon.fs.Path import org.apache.paimon.spark.{PaimonSparkTestBase, PaimonSplitScan} import org.apache.paimon.spark.schema.PaimonMetadataColumn @@ -117,7 +117,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe val table = loadTable("target") val dvMaintainerFactory = - new DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler()) + BucketedDvMaintainer.factory(table.store().newIndexFileHandler()) runAndCheckSplitScan(s""" |MERGE INTO target |USING source @@ -167,7 +167,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe val table = loadTable("T") val dvMaintainerFactory = - new DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler()) + BucketedDvMaintainer.factory(table.store().newIndexFileHandler()) spark.sql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')") val deletionVectors1 = getAllLatestDeletionVectors(table, dvMaintainerFactory) @@ -239,7 +239,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe val table = loadTable("T") val dvMaintainerFactory = - new DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler()) + BucketedDvMaintainer.factory(table.store().newIndexFileHandler()) spark.sql( "INSERT INTO T VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'), (4, 'd', '2025')") @@ -326,7 +326,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe val table = loadTable("T") val dvMaintainerFactory = - new DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler()) + BucketedDvMaintainer.factory(table.store().newIndexFileHandler()) spark.sql("INSERT INTO T VALUES (1, 'a'), (2, 'b')") val deletionVectors1 = getAllLatestDeletionVectors(table, dvMaintainerFactory) @@ -383,7 +383,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe val table = loadTable("T") val dvMaintainerFactory = - new DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler()) + BucketedDvMaintainer.factory(table.store().newIndexFileHandler()) def getDeletionVectors(ptValues: Seq[String]): Map[String, DeletionVector] = { getLatestDeletionVectors( @@ -467,7 +467,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe Row(1, "a_new1") :: Row(2, "b") :: Row(3, "c_new1") :: Nil) val dvMaintainerFactory = - new DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler()) + BucketedDvMaintainer.factory(table.store().newIndexFileHandler()) val deletionVectors1 = getAllLatestDeletionVectors(table, dvMaintainerFactory) // 1, 3 deleted, their row positions are 0, 2 @@ -705,17 +705,17 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe private def getAllLatestDeletionVectors( table: FileStoreTable, - dvMaintainerFactory: DeletionVectorsMaintainer.Factory): Map[String, DeletionVector] = { + dvMaintainerFactory: BucketedDvMaintainer.Factory): Map[String, DeletionVector] = { getLatestDeletionVectors(table, dvMaintainerFactory, Seq(BinaryRow.EMPTY_ROW)) } private def getLatestDeletionVectors( table: FileStoreTable, - dvMaintainerFactory: DeletionVectorsMaintainer.Factory, + dvMaintainerFactory: BucketedDvMaintainer.Factory, partitions: Seq[BinaryRow]): Map[String, DeletionVector] = { partitions.flatMap { partition => - DeletionVectorsMaintainerTest + BucketedDvMaintainerTest .createOrRestore( dvMaintainerFactory, table.snapshotManager().latestSnapshot(),