This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new f970245100 [core] Add partition and bucket arguements to FileStorePathFactory.indexFileFactory f970245100 is described below commit f9702451006b876b5f3455eeb86bed782bd97f74 Author: JingsongLi <jingsongl...@gmail.com> AuthorDate: Sat Aug 23 11:50:53 2025 +0800 [core] Add partition and bucket arguements to FileStorePathFactory.indexFileFactory --- .../java/org/apache/paimon/AbstractFileStore.java | 14 ++--- .../deletionvectors/BucketedDvMaintainer.java | 13 +++-- .../append/BaseAppendDeleteFileMaintainer.java | 9 ++- .../paimon/index/DynamicBucketIndexMaintainer.java | 6 +- .../org/apache/paimon/index/IndexFileHandler.java | 65 ++++++++++++---------- .../org/apache/paimon/index/PartitionIndex.java | 6 +- .../paimon/operation/AbstractFileStoreWrite.java | 6 +- .../apache/paimon/table/sink/TableCommitImpl.java | 5 +- .../table/source/snapshot/SnapshotReaderImpl.java | 6 +- .../apache/paimon/utils/FileStorePathFactory.java | 2 +- .../paimon/utils/IndexFilePathFactories.java | 41 ++++++++++++++ .../org/apache/paimon/TestAppendFileStore.java | 2 +- .../deletionvectors/BucketedDvMaintainerTest.java | 46 +++++++-------- .../append/AppendDeletionFileMaintainerHelper.java | 7 ++- .../append/AppendDeletionFileMaintainerTest.java | 2 +- .../index/DynamicBucketIndexMaintainerTest.java | 3 +- .../paimon/index/HashBucketAssignerTest.java | 50 ++++++++++++----- .../paimon/operation/FileStoreCommitTest.java | 56 +++++++++++++++---- .../apache/paimon/flink/BatchFileStoreITCase.java | 4 +- 19 files changed, 230 insertions(+), 113 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 4d2b7726de..1c12877e75 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -22,13 +22,11 @@ import org.apache.paimon.CoreOptions.ExternalPathStrategy; import org.apache.paimon.catalog.RenamingSnapshotCommit; import org.apache.paimon.catalog.SnapshotCommit; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile; import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.iceberg.IcebergCommitCallback; import org.apache.paimon.iceberg.IcebergOptions; -import org.apache.paimon.index.HashIndexFile; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.manifest.IndexManifestFile; import org.apache.paimon.manifest.ManifestFile; @@ -61,6 +59,7 @@ import org.apache.paimon.tag.TagPreview; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.IndexFilePathFactories; import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.SnapshotManager; @@ -221,15 +220,12 @@ abstract class AbstractFileStore<T> implements FileStore<T> { @Override public IndexFileHandler newIndexFileHandler() { return new IndexFileHandler( + fileIO, snapshotManager(), - pathFactory().indexFileFactory(), indexManifestFileFactory().create(), - new HashIndexFile(fileIO, pathFactory().indexFileFactory()), - new DeletionVectorsIndexFile( - fileIO, - pathFactory().indexFileFactory(), - options.dvIndexFileTargetSize(), - options.deletionVectorBitmap64())); + new IndexFilePathFactories(pathFactory()), + options.dvIndexFileTargetSize(), + options.deletionVectorBitmap64()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java index eef21f3b54..788ea1cce8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java @@ -19,6 +19,7 @@ package org.apache.paimon.deletionvectors; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; @@ -160,17 +161,19 @@ public class BucketedDvMaintainer { return handler; } - public BucketedDvMaintainer create(@Nullable List<IndexFileMeta> restoredFiles) { + public BucketedDvMaintainer create( + BinaryRow partition, int bucket, @Nullable List<IndexFileMeta> restoredFiles) { if (restoredFiles == null) { restoredFiles = Collections.emptyList(); } Map<String, DeletionVector> deletionVectors = - new HashMap<>(handler.readAllDeletionVectors(restoredFiles)); - return create(deletionVectors); + new HashMap<>(handler.readAllDeletionVectors(partition, bucket, restoredFiles)); + return create(partition, bucket, deletionVectors); } - public BucketedDvMaintainer create(Map<String, DeletionVector> deletionVectors) { - return new BucketedDvMaintainer(handler.dvIndex(), deletionVectors); + public BucketedDvMaintainer create( + BinaryRow partition, int bucket, Map<String, DeletionVector> deletionVectors) { + return new BucketedDvMaintainer(handler.dvIndex(partition, bucket), deletionVectors); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java index 729f660177..21d1870b76 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.stream.Collectors; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; +import static org.apache.paimon.table.BucketMode.UNAWARE_BUCKET; import static org.apache.paimon.utils.Preconditions.checkNotNull; /** @@ -68,7 +69,8 @@ public interface BaseAppendDeleteFileMaintainer { List<IndexFileMeta> indexFiles = indexFileHandler.scan(snapshot, DELETION_VECTORS_INDEX, partition, bucket); BucketedDvMaintainer maintainer = - BucketedDvMaintainer.factory(indexFileHandler).create(indexFiles); + BucketedDvMaintainer.factory(indexFileHandler) + .create(partition, bucket, indexFiles); return new BucketedAppendDeleteFileMaintainer(partition, bucket, maintainer); } @@ -94,6 +96,9 @@ public interface BaseAppendDeleteFileMaintainer { } } return new AppendDeleteFileMaintainer( - indexFileHandler.dvIndex(), partition, manifestEntries, deletionFiles); + indexFileHandler.dvIndex(partition, UNAWARE_BUCKET), + partition, + manifestEntries, + deletionFiles); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java index fbf95d7a14..50397a7a85 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java @@ -110,8 +110,10 @@ public class DynamicBucketIndexMaintainer { return handler; } - public DynamicBucketIndexMaintainer create(@Nullable IndexFileMeta restoredFile) { - return new DynamicBucketIndexMaintainer(handler.hashIndex(), restoredFile); + public DynamicBucketIndexMaintainer create( + BinaryRow partition, int bucket, @Nullable IndexFileMeta restoredFile) { + return new DynamicBucketIndexMaintainer( + handler.hashIndex(partition, bucket), restoredFile); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java index 188efe9348..186f375633 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java @@ -22,11 +22,13 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile; +import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.IndexManifestFile; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.utils.IndexFilePathFactories; import org.apache.paimon.utils.Pair; -import org.apache.paimon.utils.PathFactory; import org.apache.paimon.utils.SnapshotManager; import java.io.IOException; @@ -40,36 +42,39 @@ import java.util.Set; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.apache.paimon.index.HashIndexFile.HASH_INDEX; -import static org.apache.paimon.utils.Preconditions.checkArgument; /** Handle index files. */ public class IndexFileHandler { + private final FileIO fileIO; private final SnapshotManager snapshotManager; - private final PathFactory pathFactory; private final IndexManifestFile indexManifestFile; - private final HashIndexFile hashIndex; - private final DeletionVectorsIndexFile dvIndex; + private final IndexFilePathFactories pathFactories; + private final MemorySize dvTargetFileSize; + private final boolean dvBitmap64; public IndexFileHandler( + FileIO fileIO, SnapshotManager snapshotManager, - PathFactory pathFactory, IndexManifestFile indexManifestFile, - HashIndexFile hashIndex, - DeletionVectorsIndexFile dvIndex) { + IndexFilePathFactories pathFactories, + MemorySize dvTargetFileSize, + boolean dvBitmap64) { + this.fileIO = fileIO; this.snapshotManager = snapshotManager; - this.pathFactory = pathFactory; + this.pathFactories = pathFactories; this.indexManifestFile = indexManifestFile; - this.hashIndex = hashIndex; - this.dvIndex = dvIndex; + this.dvTargetFileSize = dvTargetFileSize; + this.dvBitmap64 = dvBitmap64; } - public HashIndexFile hashIndex() { - return this.hashIndex; + public HashIndexFile hashIndex(BinaryRow partition, int bucket) { + return new HashIndexFile(fileIO, pathFactories.get(partition, bucket)); } - public DeletionVectorsIndexFile dvIndex() { - return this.dvIndex; + public DeletionVectorsIndexFile dvIndex(BinaryRow partition, int bucket) { + return new DeletionVectorsIndexFile( + fileIO, pathFactories.get(partition, bucket), dvTargetFileSize, dvBitmap64); } public Optional<IndexFileMeta> scanHashIndex( @@ -166,7 +171,9 @@ public class IndexFileHandler { } public Path filePath(IndexManifestEntry entry) { - return pathFactory.toPath(entry.indexFile().fileName()); + return pathFactories + .get(entry.partition(), entry.bucket()) + .toPath(entry.indexFile().fileName()); } public boolean existsManifest(String indexManifest) { @@ -182,36 +189,36 @@ public class IndexFileHandler { return indexManifestFile.readWithIOException(indexManifest); } - private IndexFile indexFile(IndexFileMeta file) { + private IndexFile indexFile(IndexManifestEntry entry) { + IndexFileMeta file = entry.indexFile(); switch (file.indexType()) { case HASH_INDEX: - return hashIndex; + return hashIndex(entry.partition(), entry.bucket()); case DELETION_VECTORS_INDEX: - return dvIndex; + return dvIndex(entry.partition(), entry.bucket()); default: throw new IllegalArgumentException("Unknown index type: " + file.indexType()); } } public boolean existsIndexFile(IndexManifestEntry file) { - return indexFile(file.indexFile()).exists(file.indexFile().fileName()); + return indexFile(file).exists(file.indexFile().fileName()); } public void deleteIndexFile(IndexManifestEntry entry) { - IndexFileMeta file = entry.indexFile(); - indexFile(file).delete(file.fileName()); + indexFile(entry).delete(entry.indexFile().fileName()); } public void deleteManifest(String indexManifest) { indexManifestFile.delete(indexManifest); } - public Map<String, DeletionVector> readAllDeletionVectors(List<IndexFileMeta> fileMetas) { - for (IndexFileMeta indexFile : fileMetas) { - checkArgument( - indexFile.indexType().equals(DELETION_VECTORS_INDEX), - "Input file is not deletion vectors index " + indexFile.indexType()); - } - return dvIndex.readAllDeletionVectors(fileMetas); + public Map<String, DeletionVector> readAllDeletionVectors( + BinaryRow partition, int bucket, List<IndexFileMeta> fileMetas) { + return dvIndex(partition, bucket).readAllDeletionVectors(fileMetas); + } + + public Map<String, DeletionVector> readAllDeletionVectors(IndexManifestEntry entry) { + return dvIndex(entry.partition(), entry.bucket()).readAllDeletionVectors(entry.indexFile()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java index 958dcfcdee..1e525ea4e0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java @@ -124,11 +124,13 @@ public class PartitionIndex { IntPredicate loadFilter, IntPredicate bucketFilter) { List<IndexManifestEntry> files = indexFileHandler.scanEntries(HASH_INDEX, partition); - HashIndexFile hashIndex = indexFileHandler.hashIndex(); Int2ShortHashMap.Builder mapBuilder = Int2ShortHashMap.builder(); Map<Integer, Long> buckets = new HashMap<>(); for (IndexManifestEntry file : files) { - try (IntIterator iterator = hashIndex.read(file.indexFile().fileName())) { + try (IntIterator iterator = + indexFileHandler + .hashIndex(file.partition(), file.bucket()) + .read(file.indexFile().fileName())) { while (true) { try { int hash = iterator.next(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index 15af7eb4fd..7b77fb179f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -429,11 +429,13 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> { DynamicBucketIndexMaintainer indexMaintainer = dbMaintainerFactory == null ? null - : dbMaintainerFactory.create(restored.dynamicBucketIndex()); + : dbMaintainerFactory.create( + partition, bucket, restored.dynamicBucketIndex()); BucketedDvMaintainer dvMaintainer = dvMaintainerFactory == null ? null - : dvMaintainerFactory.create(restored.deleteVectorsIndex()); + : dvMaintainerFactory.create( + partition, bucket, restored.deleteVectorsIndex()); List<DataFileMeta> restoreFiles = restored.dataFiles(); if (restoreFiles == null) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index 40c69289a5..fd2aa13f36 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -35,6 +35,7 @@ import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.tag.TagTimeExpire; import org.apache.paimon.utils.DataFilePathFactories; import org.apache.paimon.utils.ExecutorThreadFactory; +import org.apache.paimon.utils.IndexFilePathFactories; import org.apache.paimon.utils.PathFactory; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; @@ -278,12 +279,14 @@ public class TableCommitImpl implements InnerTableCommit { private void checkFilesExistence(List<ManifestCommittable> committables) { List<Path> files = new ArrayList<>(); DataFilePathFactories factories = new DataFilePathFactories(commit.pathFactory()); - PathFactory indexFileFactory = commit.pathFactory().indexFileFactory(); + IndexFilePathFactories indexFactories = new IndexFilePathFactories(commit.pathFactory()); for (ManifestCommittable committable : committables) { for (CommitMessage message : committable.fileCommittables()) { CommitMessageImpl msg = (CommitMessageImpl) message; DataFilePathFactory pathFactory = factories.get(message.partition(), message.bucket()); + PathFactory indexFileFactory = + indexFactories.get(message.partition(), message.bucket()); Consumer<DataFileMeta> collector = f -> files.addAll(f.collectFiles(pathFactory)); msg.newFilesIncrement().newFiles().forEach(collector); msg.newFilesIncrement().changelogFiles().forEach(collector); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index ce3f2e9c06..e7c8366372 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -393,7 +393,7 @@ public class SnapshotReaderImpl implements SnapshotReader { if (deletionVectors && deletionIndexFilesMap != null) { builder.withDataDeletionFiles( getDeletionFiles( - indexFileHandler.dvIndex(), + indexFileHandler.dvIndex(partition, bucket), dataFiles, deletionIndexFilesMap.getOrDefault( Pair.of(partition, bucket), @@ -520,13 +520,13 @@ public class SnapshotReaderImpl implements SnapshotReader { && deletionIndexFilesMap != null) { builder.withBeforeDeletionFiles( getDeletionFiles( - indexFileHandler.dvIndex(), + indexFileHandler.dvIndex(part, bucket), before, beforDeletionIndexFilesMap.getOrDefault( Pair.of(part, bucket), Collections.emptyList()))); builder.withDataDeletionFiles( getDeletionFiles( - indexFileHandler.dvIndex(), + indexFileHandler.dvIndex(part, bucket), data, deletionIndexFilesMap.getOrDefault( Pair.of(part, bucket), Collections.emptyList()))); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java index b63023653a..63d2b06dc0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java @@ -268,7 +268,7 @@ public class FileStorePathFactory { }; } - public PathFactory indexFileFactory() { + public PathFactory indexFileFactory(BinaryRow partition, int bucket) { return new PathFactory() { @Override public Path newPath() { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/IndexFilePathFactories.java b/paimon-core/src/main/java/org/apache/paimon/utils/IndexFilePathFactories.java new file mode 100644 index 0000000000..934e6b01f2 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/IndexFilePathFactories.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.data.BinaryRow; + +import java.util.HashMap; +import java.util.Map; + +/** Cache for index {@link PathFactory}s. */ +public class IndexFilePathFactories { + + private final Map<Pair<BinaryRow, Integer>, PathFactory> cache = new HashMap<>(); + private final FileStorePathFactory pathFactory; + + public IndexFilePathFactories(FileStorePathFactory pathFactory) { + this.pathFactory = pathFactory; + } + + public PathFactory get(BinaryRow partition, int bucket) { + return cache.computeIfAbsent( + Pair.of(partition, bucket), + k -> pathFactory.indexFileFactory(k.getKey(), k.getValue())); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java index 55ffdcbe53..9caa379884 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java @@ -132,7 +132,7 @@ public class TestAppendFileStore extends AppendOnlyFileStore { BucketedDvMaintainer.Factory factory = BucketedDvMaintainer.factory(fileHandler); List<IndexFileMeta> indexFiles = fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX, partition, bucket); - return factory.create(indexFiles); + return factory.create(partition, bucket, indexFiles); } public CommitMessageImpl writeDVIndexFiles( diff --git a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java index 5d3b1594ee..c195517731 100644 --- a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java @@ -48,6 +48,7 @@ import java.util.Map; import java.util.stream.Collectors; import static java.util.Collections.emptyList; +import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.assertj.core.api.Assertions.assertThat; @@ -61,7 +62,7 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { initIndexHandler(bitmap64); BucketedDvMaintainer.Factory factory = BucketedDvMaintainer.factory(fileHandler); - BucketedDvMaintainer dvMaintainer = factory.create(emptyList()); + BucketedDvMaintainer dvMaintainer = factory.create(EMPTY_ROW, 0, emptyList()); assertThat(dvMaintainer.bitmap64).isEqualTo(bitmap64); dvMaintainer.notifyNewDeletion("f1", 1); @@ -74,7 +75,7 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { IndexFileMeta file = dvMaintainer.writeDeletionVectorsIndex().get(); Map<String, DeletionVector> deletionVectors = - fileHandler.readAllDeletionVectors(Collections.singletonList(file)); + fileHandler.readAllDeletionVectors(EMPTY_ROW, 0, Collections.singletonList(file)); assertThat(deletionVectors.get("f1").isDeleted(1)).isTrue(); assertThat(deletionVectors.get("f1").isDeleted(2)).isFalse(); assertThat(deletionVectors.get("f2").isDeleted(1)).isFalse(); @@ -89,7 +90,7 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { BucketedDvMaintainer.Factory factory = BucketedDvMaintainer.factory(fileHandler); - BucketedDvMaintainer dvMaintainer = factory.create(new HashMap<>()); + BucketedDvMaintainer dvMaintainer = factory.create(EMPTY_ROW, 0, new HashMap<>()); DeletionVector deletionVector1 = createDeletionVector(bitmap64); deletionVector1.delete(1); deletionVector1.delete(3); @@ -100,7 +101,7 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { IndexFileMeta file = dvMaintainer.writeDeletionVectorsIndex().get(); CommitMessage commitMessage = new CommitMessageImpl( - BinaryRow.EMPTY_ROW, + EMPTY_ROW, 0, 1, DataIncrement.emptyIncrement(), @@ -111,8 +112,8 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { Snapshot latestSnapshot = table.snapshotManager().latestSnapshot(); List<IndexFileMeta> indexFiles = - fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX, BinaryRow.EMPTY_ROW, 0); - dvMaintainer = factory.create(indexFiles); + fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX, EMPTY_ROW, 0); + dvMaintainer = factory.create(EMPTY_ROW, 0, indexFiles); DeletionVector deletionVector2 = dvMaintainer.deletionVectorOf("f1").get(); assertThat(deletionVector2.isDeleted(1)).isTrue(); assertThat(deletionVector2.isDeleted(2)).isFalse(); @@ -123,7 +124,7 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { file = dvMaintainer.writeDeletionVectorsIndex().get(); commitMessage = new CommitMessageImpl( - BinaryRow.EMPTY_ROW, + EMPTY_ROW, 0, 1, DataIncrement.emptyIncrement(), @@ -133,9 +134,8 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { commit.commit(Collections.singletonList(commitMessage)); latestSnapshot = table.snapshotManager().latestSnapshot(); - indexFiles = - fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX, BinaryRow.EMPTY_ROW, 0); - dvMaintainer = factory.create(indexFiles); + indexFiles = fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX, EMPTY_ROW, 0); + dvMaintainer = factory.create(EMPTY_ROW, 0, indexFiles); DeletionVector deletionVector3 = dvMaintainer.deletionVectorOf("f1").get(); assertThat(deletionVector3.isDeleted(1)).isTrue(); assertThat(deletionVector3.isDeleted(2)).isTrue(); @@ -147,7 +147,7 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { initIndexHandler(bitmap64); BucketedDvMaintainer.Factory factory = BucketedDvMaintainer.factory(fileHandler); - BucketedDvMaintainer dvMaintainer = factory.create(emptyList()); + BucketedDvMaintainer dvMaintainer = factory.create(EMPTY_ROW, 0, emptyList()); File indexDir = new File(tempPath.toFile(), "/default.db/T/index"); @@ -188,7 +188,7 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { // write first kind dv initIndexHandler(bitmap64); BucketedDvMaintainer.Factory factory1 = BucketedDvMaintainer.factory(fileHandler); - BucketedDvMaintainer dvMaintainer1 = factory1.create(new HashMap<>()); + BucketedDvMaintainer dvMaintainer1 = factory1.create(EMPTY_ROW, 0, new HashMap<>()); dvMaintainer1.notifyNewDeletion("f1", 1); dvMaintainer1.notifyNewDeletion("f1", 3); dvMaintainer1.notifyNewDeletion("f2", 1); @@ -198,7 +198,7 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { IndexFileMeta file = dvMaintainer1.writeDeletionVectorsIndex().get(); CommitMessage commitMessage1 = new CommitMessageImpl( - BinaryRow.EMPTY_ROW, + EMPTY_ROW, 0, 1, DataIncrement.emptyIncrement(), @@ -212,11 +212,8 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { BucketedDvMaintainer.Factory factory2 = BucketedDvMaintainer.factory(fileHandler); List<IndexFileMeta> indexFiles = fileHandler.scan( - table.latestSnapshot().get(), - DELETION_VECTORS_INDEX, - BinaryRow.EMPTY_ROW, - 0); - BucketedDvMaintainer dvMaintainer2 = factory2.create(indexFiles); + table.latestSnapshot().get(), DELETION_VECTORS_INDEX, EMPTY_ROW, 0); + BucketedDvMaintainer dvMaintainer2 = factory2.create(EMPTY_ROW, 0, indexFiles); dvMaintainer2.notifyNewDeletion("f1", 10); dvMaintainer2.notifyNewDeletion("f3", 1); dvMaintainer2.notifyNewDeletion("f3", 3); @@ -234,7 +231,7 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { file = dvMaintainer2.writeDeletionVectorsIndex().get(); CommitMessage commitMessage2 = new CommitMessageImpl( - BinaryRow.EMPTY_ROW, + EMPTY_ROW, 0, 1, DataIncrement.emptyIncrement(), @@ -246,11 +243,10 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { // test read dv index file which contains two kinds of dv Map<String, DeletionVector> readDvs = fileHandler.readAllDeletionVectors( + EMPTY_ROW, + 0, fileHandler.scan( - table.latestSnapshot().get(), - "DELETION_VECTORS", - BinaryRow.EMPTY_ROW, - 0)); + table.latestSnapshot().get(), "DELETION_VECTORS", EMPTY_ROW, 0)); assertThat(readDvs.size()).isEqualTo(3); assertThat(dvs.get("f1").getCardinality()).isEqualTo(3); assertThat(dvs.get("f2").getCardinality()).isEqualTo(2); @@ -282,7 +278,7 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { .map(IndexManifestEntry::indexFile) .collect(Collectors.toList()); Map<String, DeletionVector> deletionVectors = - new HashMap<>(handler.readAllDeletionVectors(indexFiles)); - return factory.create(deletionVectors); + new HashMap<>(handler.readAllDeletionVectors(EMPTY_ROW, 0, indexFiles)); + return factory.create(EMPTY_ROW, 0, deletionVectors); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java index 98adf5e8fb..e84d09ae72 100644 --- a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java +++ b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static org.apache.paimon.table.BucketMode.UNAWARE_BUCKET; + /** Helper for {@link BaseAppendDeleteFileMaintainer}. */ public class AppendDeletionFileMaintainerHelper { @@ -48,6 +50,9 @@ public class AppendDeletionFileMaintainerHelper { indexManifestEntry.indexFile().fileName())) .collect(Collectors.toList()); return new AppendDeleteFileMaintainer( - indexFileHandler.dvIndex(), partition, manifests, deletionFiles); + indexFileHandler.dvIndex(partition, UNAWARE_BUCKET), + partition, + manifests, + deletionFiles); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java index 3b81c8478e..ab0376095c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java @@ -66,7 +66,7 @@ class AppendDeletionFileMaintainerTest { Collections.singletonMap("f3", Arrays.asList(1, 2, 3))); store.commit(commitMessage1, commitMessage2); - PathFactory indexPathFactory = store.pathFactory().indexFileFactory(); + PathFactory indexPathFactory = store.pathFactory().indexFileFactory(BinaryRow.EMPTY_ROW, 0); Map<String, DeletionFile> dataFileToDeletionFiles = new HashMap<>(); dataFileToDeletionFiles.putAll( createDeletionFileMapFromIndexFileMetas( diff --git a/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java b/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java index 4c6c79d17e..769817d63b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java @@ -80,7 +80,8 @@ public class DynamicBucketIndexMaintainerTest extends PrimaryKeyTableTestBase { continue; } int[] ints = - fileHandler.hashIndex().readList(files.get(0).fileName()).stream() + fileHandler.hashIndex(message.partition(), message.bucket()) + .readList(files.get(0).fileName()).stream() .mapToInt(Integer::intValue) .toArray(); index.computeIfAbsent(message.partition(), k -> new HashMap<>()) diff --git a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java index 6ece642aa7..01eca82c2e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java @@ -45,13 +45,11 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; public class HashBucketAssignerTest extends PrimaryKeyTableTestBase { private IndexFileHandler fileHandler; - private HashIndexFile indexFile; private StreamTableCommit commit; @BeforeEach public void beforeEach() throws Exception { fileHandler = table.store().newIndexFileHandler(); - indexFile = fileHandler.hashIndex(); commit = table.newStreamWriteBuilder().withCommitUser(commitUser).newCommit(); } @@ -226,13 +224,19 @@ public class HashBucketAssignerTest extends PrimaryKeyTableTestBase { @Test public void testAssignRestore() throws IOException { - IndexFileMeta bucket0 = indexFile.write(new int[] {2, 5}); - IndexFileMeta bucket2 = indexFile.write(new int[] {4, 7}); commit.commit( 0, Arrays.asList( - createCommitMessage(row(1), 0, 3, bucket0), - createCommitMessage(row(1), 2, 3, bucket2))); + createCommitMessage( + row(1), + 0, + 3, + fileHandler.hashIndex(row(1), 0).write(new int[] {2, 5})), + createCommitMessage( + row(1), + 2, + 3, + fileHandler.hashIndex(row(1), 2).write(new int[] {4, 7})))); HashBucketAssigner assigner0 = createAssigner(3, 3, 0); HashBucketAssigner assigner2 = createAssigner(3, 3, 2); @@ -252,13 +256,19 @@ public class HashBucketAssignerTest extends PrimaryKeyTableTestBase { @Test public void testAssignRestoreWithUpperBound() throws IOException { - IndexFileMeta bucket0 = indexFile.write(new int[] {2, 5}); - IndexFileMeta bucket2 = indexFile.write(new int[] {4, 7}); commit.commit( 0, Arrays.asList( - createCommitMessage(row(1), 0, 3, bucket0), - createCommitMessage(row(1), 2, 3, bucket2))); + createCommitMessage( + row(1), + 0, + 3, + fileHandler.hashIndex(row(1), 0).write(new int[] {2, 5})), + createCommitMessage( + row(1), + 2, + 3, + fileHandler.hashIndex(row(1), 2).write(new int[] {4, 7})))); HashBucketAssigner assigner0 = createAssigner(3, 3, 0, 1); HashBucketAssigner assigner2 = createAssigner(3, 3, 2, 1); @@ -316,8 +326,17 @@ public class HashBucketAssignerTest extends PrimaryKeyTableTestBase { commit.commit( 0, Arrays.asList( - createCommitMessage(row(1), 0, 1, indexFile.write(new int[] {0})), - createCommitMessage(row(2), 0, 1, indexFile.write(new int[] {0})))); + createCommitMessage( + row(1), + 0, + 1, + fileHandler.hashIndex(row(1), 0).write(new int[] {0})), + createCommitMessage( + row(2), + 0, + 1, + fileHandler.hashIndex(row(2), 0).write(new int[] {0})))); + assertThat(assigner.currentPartitions()).containsExactlyInAnyOrder(row(1), row(2)); // checkpoint 1, but no commit @@ -333,7 +352,12 @@ public class HashBucketAssignerTest extends PrimaryKeyTableTestBase { commit.commit( 1, Collections.singletonList( - createCommitMessage(row(1), 0, 1, indexFile.write(new int[] {1})))); + createCommitMessage( + row(1), + 0, + 1, + fileHandler.hashIndex(row(1), 0).write(new int[] {1})))); + assigner.prepareCommit(3); assertThat(assigner.currentPartitions()).isEmpty(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index f9a584b508..f8a9309a09 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -29,7 +29,6 @@ import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; -import org.apache.paimon.index.HashIndexFile; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.manifest.FileKind; @@ -715,7 +714,6 @@ public class FileStoreCommitTest { public void testIndexFiles() throws Exception { TestFileStore store = createStore(false, 2); IndexFileHandler indexFileHandler = store.newIndexFileHandler(); - HashIndexFile hashIndex = indexFileHandler.hashIndex(); KeyValue record1 = gen.next(); BinaryRow part1 = gen.getPartition(record1); @@ -727,9 +725,23 @@ public class FileStoreCommitTest { } // init write - store.commitDataIndex(record1, gen::getPartition, 0, hashIndex.write(new int[] {1, 2, 5})); - store.commitDataIndex(record1, gen::getPartition, 1, hashIndex.write(new int[] {6, 8})); - store.commitDataIndex(record2, gen::getPartition, 2, hashIndex.write(new int[] {3, 5})); + store.commitDataIndex( + record1, + gen::getPartition, + 0, + indexFileHandler + .hashIndex(gen.getPartition(record1), 0) + .write(new int[] {1, 2, 5})); + store.commitDataIndex( + record1, + gen::getPartition, + 1, + indexFileHandler.hashIndex(gen.getPartition(record1), 1).write(new int[] {6, 8})); + store.commitDataIndex( + record2, + gen::getPartition, + 2, + indexFileHandler.hashIndex(gen.getPartition(record2), 2).write(new int[] {3, 5})); Snapshot snapshot = store.snapshotManager().latestSnapshot(); @@ -740,12 +752,18 @@ public class FileStoreCommitTest { IndexManifestEntry indexManifestEntry = part1Index.stream().filter(entry -> entry.bucket() == 0).findAny().get(); - assertThat(hashIndex.readList(indexManifestEntry.indexFile().fileName())) + assertThat( + indexFileHandler + .hashIndex(part1, 0) + .readList(indexManifestEntry.indexFile().fileName())) .containsExactlyInAnyOrder(1, 2, 5); indexManifestEntry = part1Index.stream().filter(entry -> entry.bucket() == 1).findAny().get(); - assertThat(hashIndex.readList(indexManifestEntry.indexFile().fileName())) + assertThat( + indexFileHandler + .hashIndex(part1, 1) + .readList(indexManifestEntry.indexFile().fileName())) .containsExactlyInAnyOrder(6, 8); // assert part2 @@ -753,11 +771,18 @@ public class FileStoreCommitTest { indexFileHandler.scanEntries(snapshot, HASH_INDEX, part2); assertThat(part2Index.size()).isEqualTo(1); assertThat(part2Index.get(0).bucket()).isEqualTo(2); - assertThat(hashIndex.readList(part2Index.get(0).indexFile().fileName())) + assertThat( + indexFileHandler + .hashIndex(part2, 2) + .readList(part2Index.get(0).indexFile().fileName())) .containsExactlyInAnyOrder(3, 5); // update part1 - store.commitDataIndex(record1, gen::getPartition, 0, hashIndex.write(new int[] {1, 4})); + store.commitDataIndex( + record1, + gen::getPartition, + 0, + indexFileHandler.hashIndex(gen.getPartition(record1), 0).write(new int[] {1, 4})); snapshot = store.snapshotManager().latestSnapshot(); // assert update part1 @@ -766,18 +791,25 @@ public class FileStoreCommitTest { indexManifestEntry = part1Index.stream().filter(entry -> entry.bucket() == 0).findAny().get(); - assertThat(hashIndex.readList(indexManifestEntry.indexFile().fileName())) + assertThat( + indexFileHandler + .hashIndex(part1, 0) + .readList(indexManifestEntry.indexFile().fileName())) .containsExactlyInAnyOrder(1, 4); indexManifestEntry = part1Index.stream().filter(entry -> entry.bucket() == 1).findAny().get(); - assertThat(hashIndex.readList(indexManifestEntry.indexFile().fileName())) + assertThat( + indexFileHandler + .hashIndex(part1, 1) + .readList(indexManifestEntry.indexFile().fileName())) .containsExactlyInAnyOrder(6, 8); // assert scan one bucket Optional<IndexFileMeta> file = indexFileHandler.scanHashIndex(snapshot, part1, 0); assertThat(file).isPresent(); - assertThat(hashIndex.readList(file.get().fileName())).containsExactlyInAnyOrder(1, 4); + assertThat(indexFileHandler.hashIndex(part1, 0).readList(file.get().fileName())) + .containsExactlyInAnyOrder(1, 4); // overwrite one partition store.options().toConfiguration().set(CoreOptions.DYNAMIC_PARTITION_OVERWRITE, true); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index 44c7ebc754..f61f1fa1d7 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -23,7 +23,6 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.flink.util.AbstractTestBase; -import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; @@ -121,10 +120,9 @@ public class BatchFileStoreITCase extends CatalogITCaseBase { List<IndexManifestEntry> indexManifestEntries = table.indexManifestFileReader().read(snapshot.indexManifest()); assertThat(indexManifestEntries.size()).isEqualTo(1); - IndexFileMeta indexFileMeta = indexManifestEntries.get(0).indexFile(); return table.store() .newIndexFileHandler() - .readAllDeletionVectors(singletonList(indexFileMeta)); + .readAllDeletionVectors(indexManifestEntries.get(0)); } @Test