This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 5caa6f6fd78b12762ac68fe5f8fed7b4c1470793 Author: Zouxxyy <[email protected]> AuthorDate: Thu Oct 9 12:58:30 2025 +0800 [core] Add dv conflict detection during commit (#6303) --- .../java/org/apache/paimon/AbstractFileStore.java | 5 +- .../java/org/apache/paimon/manifest/FileEntry.java | 6 +- .../apache/paimon/manifest/SimpleFileEntry.java | 20 ++ .../paimon/manifest/SimpleFileEntryWithDV.java | 124 ++++++++ .../paimon/operation/FileStoreCommitImpl.java | 154 +++++++--- .../apache/paimon/utils/ConflictDeletionUtils.java | 149 +++++++++ .../org/apache/paimon/TestAppendFileStore.java | 35 +++ .../deletionvectors/BucketedDvMaintainerTest.java | 87 ++++-- .../paimon/operation/FileStoreCommitTest.java | 26 +- .../paimon/utils/ConflictDeletionUtilsTest.java | 342 +++++++++++++++++++++ .../commands/DeleteFromPaimonTableCommand.scala | 3 +- .../spark/commands/MergeIntoPaimonTable.scala | 4 +- .../paimon/spark/commands/PaimonSparkWriter.scala | 7 +- .../spark/commands/UpdatePaimonTableCommand.scala | 3 +- .../paimon/spark/sql/MergeIntoTableTestBase.scala | 71 +++-- .../paimon/spark/sql/UpdateTableTestBase.scala | 39 +++ 16 files changed, 955 insertions(+), 120 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index f4a77f877d..bbeb0aa150 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -298,7 +298,10 @@ abstract class AbstractFileStore<T> implements FileStore<T> { options.commitMinRetryWait(), options.commitMaxRetryWait(), options.commitStrictModeLastSafeSnapshot().orElse(null), - options.rowTrackingEnabled()); + options.rowTrackingEnabled(), + !schema.primaryKeys().isEmpty(), + options.deletionVectorsEnabled(), + newIndexFileHandler()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java index 52b029563e..b400258b84 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java @@ -22,7 +22,6 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Filter; -import org.apache.paimon.utils.Preconditions; import javax.annotation.Nullable; @@ -40,6 +39,7 @@ import java.util.stream.Collectors; import static org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn; import static org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute; +import static org.apache.paimon.utils.Preconditions.checkState; /** Entry representing a file. */ public interface FileEntry { @@ -77,7 +77,7 @@ public interface FileEntry { public final int level; public final String fileName; public final List<String> extraFiles; - @Nullable private final byte[] embeddedIndex; + @Nullable public final byte[] embeddedIndex; @Nullable public final String externalPath; /* Cache the hash code for the string */ @@ -190,7 +190,7 @@ public interface FileEntry { Identifier identifier = entry.identifier(); switch (entry.kind()) { case ADD: - Preconditions.checkState( + checkState( !map.containsKey(identifier), "Trying to add file %s which is already added.", identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java index 14ae43c349..e0da3c8d53 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java @@ -81,6 +81,21 @@ public class SimpleFileEntry implements FileEntry { entry.externalPath()); } + public SimpleFileEntry toDelete() { + return new SimpleFileEntry( + FileKind.DELETE, + partition, + bucket, + totalBuckets, + level, + fileName, + extraFiles, + embeddedIndex, + minKey, + maxKey, + externalPath); + } + public static List<SimpleFileEntry> from(List<ManifestEntry> entries) { return entries.stream().map(SimpleFileEntry::from).collect(Collectors.toList()); } @@ -115,6 +130,11 @@ public class SimpleFileEntry implements FileEntry { return fileName; } + @Nullable + public byte[] embeddedIndex() { + return embeddedIndex; + } + @Nullable @Override public String externalPath() { diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java new file mode 100644 index 0000000000..75d73f345f --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.manifest; + +import javax.annotation.Nullable; + +import java.util.Objects; + +/** A {@link FileEntry} contains {@link SimpleFileEntry} and dv file name. */ +public class SimpleFileEntryWithDV extends SimpleFileEntry { + + @Nullable private final String dvFileName; + + public SimpleFileEntryWithDV(SimpleFileEntry entry, @Nullable String dvFileName) { + super( + entry.kind(), + entry.partition(), + entry.bucket(), + entry.totalBuckets(), + entry.level(), + entry.fileName(), + entry.extraFiles(), + entry.embeddedIndex(), + entry.minKey(), + entry.maxKey(), + entry.externalPath()); + this.dvFileName = dvFileName; + } + + public Identifier identifier() { + return new IdentifierWithDv(super.identifier(), dvFileName); + } + + @Nullable + public String dvFileName() { + return dvFileName; + } + + public SimpleFileEntry toDelete() { + return new SimpleFileEntryWithDV(super.toDelete(), dvFileName); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + SimpleFileEntryWithDV that = (SimpleFileEntryWithDV) o; + return Objects.equals(dvFileName, that.dvFileName); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), dvFileName); + } + + @Override + public String toString() { + return super.toString() + ", {dvFileName=" + dvFileName + '}'; + } + + /** + * The same {@link Identifier} indicates that the {@link ManifestEntry} refers to the same data + * file. + */ + static class IdentifierWithDv extends Identifier { + + private final String dvFileName; + + public IdentifierWithDv(Identifier identifier, String dvFileName) { + super( + identifier.partition, + identifier.bucket, + identifier.level, + identifier.fileName, + identifier.extraFiles, + identifier.embeddedIndex, + identifier.externalPath); + this.dvFileName = dvFileName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + IdentifierWithDv that = (IdentifierWithDv) o; + return Objects.equals(dvFileName, that.dvFileName); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), dvFileName); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 466df201a3..7df1589781 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -26,6 +26,7 @@ import org.apache.paimon.catalog.SnapshotCommit; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.manifest.FileEntry; @@ -61,7 +62,6 @@ import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.IOUtils; import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.Pair; -import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; import org.slf4j.Logger; @@ -70,7 +70,6 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -93,8 +92,11 @@ import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd; import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete; import static org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions; import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; +import static org.apache.paimon.utils.ConflictDeletionUtils.buildBaseEntriesWithDV; +import static org.apache.paimon.utils.ConflictDeletionUtils.buildDeltaEntriesWithDV; import static org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString; import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.paimon.utils.Preconditions.checkState; /** * Default implementation of {@link FileStoreCommit}. @@ -151,6 +153,9 @@ public class FileStoreCommitImpl implements FileStoreCommit { @Nullable private Long strictModeLastSafeSnapshot; private final InternalRowPartitionComputer partitionComputer; private final boolean rowTrackingEnabled; + private final boolean isPkTable; + private final boolean deletionVectorsEnabled; + private final IndexFileHandler indexFileHandler; private boolean ignoreEmptyCommit; private CommitMetrics commitMetrics; @@ -187,7 +192,10 @@ public class FileStoreCommitImpl implements FileStoreCommit { long commitMinRetryWait, long commitMaxRetryWait, @Nullable Long strictModeLastSafeSnapshot, - boolean rowTrackingEnabled) { + boolean rowTrackingEnabled, + boolean isPkTable, + boolean deletionVectorsEnabled, + IndexFileHandler indexFileHandler) { this.snapshotCommit = snapshotCommit; this.fileIO = fileIO; this.schemaManager = schemaManager; @@ -231,6 +239,9 @@ public class FileStoreCommitImpl implements FileStoreCommit { this.statsFileHandler = statsFileHandler; this.bucketMode = bucketMode; this.rowTrackingEnabled = rowTrackingEnabled; + this.isPkTable = isPkTable; + this.deletionVectorsEnabled = deletionVectorsEnabled; + this.indexFileHandler = indexFileHandler; } @Override @@ -333,11 +344,16 @@ public class FileStoreCommitImpl implements FileStoreCommit { // so we need to contain all changes baseEntries.addAll( readAllEntriesFromChangedPartitions( - latestSnapshot, appendTableFiles, compactTableFiles)); + latestSnapshot, + changedPartitions( + appendTableFiles, + compactTableFiles, + appendIndexFiles))); noConflictsOrFail( - latestSnapshot.commitUser(), + latestSnapshot, baseEntries, appendSimpleEntries, + appendIndexFiles, commitKind); safeLatestSnapshotId = latestSnapshot.id(); } @@ -370,9 +386,10 @@ public class FileStoreCommitImpl implements FileStoreCommit { if (safeLatestSnapshotId != null) { baseEntries.addAll(appendSimpleEntries); noConflictsOrFail( - latestSnapshot.commitUser(), + latestSnapshot, baseEntries, SimpleFileEntry.from(compactTableFiles), + compactIndexFiles, CommitKind.COMPACT); // assume this compact commit follows just after the append commit created above safeLatestSnapshotId += 1; @@ -1003,10 +1020,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { // latestSnapshotId is different from the snapshot id we've checked for conflicts, // so we have to check again List<BinaryRow> changedPartitions = - deltaFiles.stream() - .map(ManifestEntry::partition) - .distinct() - .collect(Collectors.toList()); + changedPartitions(deltaFiles, Collections.emptyList(), indexFiles); if (retryResult != null && retryResult.latestSnapshot != null) { baseDataFiles = new ArrayList<>(retryResult.baseDataFiles); List<SimpleFileEntry> incremental = @@ -1021,9 +1035,10 @@ public class FileStoreCommitImpl implements FileStoreCommit { readAllEntriesFromChangedPartitions(latestSnapshot, changedPartitions); } noConflictsOrFail( - latestSnapshot.commitUser(), + latestSnapshot, baseDataFiles, SimpleFileEntry.from(deltaFiles), + indexFiles, commitKind); } @@ -1351,16 +1366,23 @@ public class FileStoreCommitImpl implements FileStoreCommit { return entries; } - @SafeVarargs - private final List<SimpleFileEntry> readAllEntriesFromChangedPartitions( - Snapshot snapshot, List<ManifestEntry>... changes) { - List<BinaryRow> changedPartitions = - Arrays.stream(changes) - .flatMap(Collection::stream) - .map(ManifestEntry::partition) - .distinct() - .collect(Collectors.toList()); - return readAllEntriesFromChangedPartitions(snapshot, changedPartitions); + private List<BinaryRow> changedPartitions( + List<ManifestEntry> appendTableFiles, + List<ManifestEntry> compactTableFiles, + List<IndexManifestEntry> appendIndexFiles) { + Set<BinaryRow> changedPartitions = new HashSet<>(); + for (ManifestEntry appendTableFile : appendTableFiles) { + changedPartitions.add(appendTableFile.partition()); + } + for (ManifestEntry compactTableFile : compactTableFiles) { + changedPartitions.add(compactTableFile.partition()); + } + for (IndexManifestEntry appendIndexFile : appendIndexFiles) { + if (appendIndexFile.indexFile().indexType().equals(DELETION_VECTORS_INDEX)) { + changedPartitions.add(appendIndexFile.partition()); + } + } + return new ArrayList<>(changedPartitions); } private List<SimpleFileEntry> readAllEntriesFromChangedPartitions( @@ -1376,12 +1398,35 @@ public class FileStoreCommitImpl implements FileStoreCommit { } private void noConflictsOrFail( - String baseCommitUser, + Snapshot snapshot, List<SimpleFileEntry> baseEntries, - List<SimpleFileEntry> changes, + List<SimpleFileEntry> deltaEntries, + List<IndexManifestEntry> deltaIndexEntries, CommitKind commitKind) { + String baseCommitUser = snapshot.commitUser(); + if (checkForDeletionVector(commitKind)) { + // Enrich dvName in fileEntry to checker for base ADD dv and delta DELETE dv. + // For example: + // If the base file is <ADD baseFile1, ADD dv1>, + // then the delta file must be <DELETE deltaFile1, DELETE dv1>; and vice versa, + // If the delta file is <DELETE deltaFile2, DELETE dv2>, + // then the base file must be <ADD baseFile2, ADD dv2>. + try { + baseEntries = + buildBaseEntriesWithDV( + baseEntries, + snapshot.indexManifest() == null + ? Collections.emptyList() + : indexFileHandler.readManifest(snapshot.indexManifest())); + deltaEntries = + buildDeltaEntriesWithDV(baseEntries, deltaEntries, deltaIndexEntries); + } catch (Throwable e) { + throw conflictException(commitUser, baseEntries, deltaEntries).apply(e); + } + } + List<SimpleFileEntry> allEntries = new ArrayList<>(baseEntries); - allEntries.addAll(changes); + allEntries.addAll(deltaEntries); if (commitKind != CommitKind.OVERWRITE) { // total buckets within the same partition should remain the same @@ -1412,37 +1457,22 @@ public class FileStoreCommitImpl implements FileStoreCommit { + " without overwrite. Give up committing.", baseCommitUser, baseEntries, - changes, + deltaEntries, null); LOG.warn("", conflictException.getLeft()); throw conflictException.getRight(); } } - Function<Throwable, RuntimeException> exceptionFunction = - e -> { - Pair<RuntimeException, RuntimeException> conflictException = - createConflictException( - "File deletion conflicts detected! Give up committing.", - baseCommitUser, - baseEntries, - changes, - e); - LOG.warn("", conflictException.getLeft()); - return conflictException.getRight(); - }; - Collection<SimpleFileEntry> mergedEntries; try { // merge manifest entries and also check if the files we want to delete are still there mergedEntries = FileEntry.mergeEntries(allEntries); } catch (Throwable e) { - throw exceptionFunction.apply(e); + throw conflictException(commitUser, baseEntries, deltaEntries).apply(e); } - assertNoDelete(mergedEntries, exceptionFunction); - - // TODO check for deletion vectors + assertNoDelete(mergedEntries, conflictException(commitUser, baseEntries, deltaEntries)); // fast exit for file store without keys if (keyComparator == null) { @@ -1476,7 +1506,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { + b.identifier().toString(pathFactory), baseCommitUser, baseEntries, - changes, + deltaEntries, null); LOG.warn("", conflictException.getLeft()); @@ -1486,12 +1516,48 @@ public class FileStoreCommitImpl implements FileStoreCommit { } } + private Function<Throwable, RuntimeException> conflictException( + String baseCommitUser, + List<SimpleFileEntry> baseEntries, + List<SimpleFileEntry> deltaEntries) { + return e -> { + Pair<RuntimeException, RuntimeException> conflictException = + createConflictException( + "File deletion conflicts detected! Give up committing.", + baseCommitUser, + baseEntries, + deltaEntries, + e); + LOG.warn("", conflictException.getLeft()); + return conflictException.getRight(); + }; + } + + private boolean checkForDeletionVector(CommitKind commitKind) { + if (!deletionVectorsEnabled) { + return false; + } + + // todo: Add them once contains DELETE type. + // PK table's compact dv index only contains ADD type, skip conflict detection. + if (isPkTable && commitKind == CommitKind.COMPACT) { + return false; + } + + // Non-PK table's hash fixed bucket mode only contains ADD type, skip conflict detection. + if (!isPkTable && bucketMode.equals(BucketMode.HASH_FIXED)) { + return false; + } + + return true; + } + private void assertNoDelete( Collection<SimpleFileEntry> mergedEntries, Function<Throwable, RuntimeException> exceptionFunction) { try { for (SimpleFileEntry entry : mergedEntries) { - Preconditions.checkState( + checkState( entry.kind() != FileKind.DELETE, "Trying to delete file %s for table %s which is not previously added.", entry.fileName(), diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ConflictDeletionUtils.java b/paimon-core/src/main/java/org/apache/paimon/utils/ConflictDeletionUtils.java new file mode 100644 index 0000000000..00942ea048 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ConflictDeletionUtils.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.index.DeletionVectorMeta; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.manifest.SimpleFileEntry; +import org.apache.paimon.manifest.SimpleFileEntryWithDV; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.paimon.utils.Preconditions.checkState; + +/** Utils for conflict deletion. */ +public class ConflictDeletionUtils { + + public static List<SimpleFileEntry> buildBaseEntriesWithDV( + List<SimpleFileEntry> baseEntries, List<IndexManifestEntry> baseIndexEntries) { + if (baseEntries.isEmpty()) { + return Collections.emptyList(); + } + + Map<String, String> fileNameToDVFileName = new HashMap<>(); + for (IndexManifestEntry indexManifestEntry : baseIndexEntries) { + // Should not attach DELETE type dv index for base file. + if (!indexManifestEntry.kind().equals(FileKind.DELETE)) { + IndexFileMeta indexFile = indexManifestEntry.indexFile(); + if (indexFile.dvRanges() != null) { + for (DeletionVectorMeta value : indexFile.dvRanges().values()) { + checkState( + !fileNameToDVFileName.containsKey(value.dataFileName()), + "One file should correspond to only one dv entry."); + fileNameToDVFileName.put(value.dataFileName(), indexFile.fileName()); + } + } + } + } + + // Attach dv name to file entries. + List<SimpleFileEntry> entriesWithDV = new ArrayList<>(baseEntries.size()); + for (SimpleFileEntry fileEntry : baseEntries) { + entriesWithDV.add( + new SimpleFileEntryWithDV( + fileEntry, fileNameToDVFileName.get(fileEntry.fileName()))); + } + return entriesWithDV; + } + + public static List<SimpleFileEntry> buildDeltaEntriesWithDV( + List<SimpleFileEntry> baseEntries, + List<SimpleFileEntry> deltaEntries, + List<IndexManifestEntry> deltaIndexEntries) { + if (deltaEntries.isEmpty() && deltaIndexEntries.isEmpty()) { + return Collections.emptyList(); + } + + List<SimpleFileEntry> entriesWithDV = new ArrayList<>(deltaEntries.size()); + + // One file may correspond to more than one dv entries, for example, delete the old dv, and + // create a new one. + Map<String, List<IndexManifestEntry>> fileNameToDVEntry = new HashMap<>(); + for (IndexManifestEntry deltaIndexEntry : deltaIndexEntries) { + if (deltaIndexEntry.indexFile().dvRanges() != null) { + for (DeletionVectorMeta meta : deltaIndexEntry.indexFile().dvRanges().values()) { + fileNameToDVEntry.putIfAbsent(meta.dataFileName(), new ArrayList<>()); + fileNameToDVEntry.get(meta.dataFileName()).add(deltaIndexEntry); + } + } + } + + Set<String> fileNotInDeltaEntries = new HashSet<>(fileNameToDVEntry.keySet()); + // 1. Attach dv name to delta file entries. + for (SimpleFileEntry fileEntry : deltaEntries) { + if (fileNameToDVEntry.containsKey(fileEntry.fileName())) { + List<IndexManifestEntry> dvs = fileNameToDVEntry.get(fileEntry.fileName()); + checkState(dvs.size() == 1, "Delta entry only can have one dv file"); + entriesWithDV.add( + new SimpleFileEntryWithDV(fileEntry, dvs.get(0).indexFile().fileName())); + fileNotInDeltaEntries.remove(fileEntry.fileName()); + } else { + entriesWithDV.add(new SimpleFileEntryWithDV(fileEntry, null)); + } + } + + // 2. For file not in delta entries, build entry with dv with baseEntries. + if (!fileNotInDeltaEntries.isEmpty()) { + Map<String, SimpleFileEntry> fileNameToFileEntry = new HashMap<>(); + for (SimpleFileEntry baseEntry : baseEntries) { + if (baseEntry.kind().equals(FileKind.ADD)) { + fileNameToFileEntry.put(baseEntry.fileName(), baseEntry); + } + } + + for (String fileName : fileNotInDeltaEntries) { + SimpleFileEntryWithDV simpleFileEntry = + (SimpleFileEntryWithDV) fileNameToFileEntry.get(fileName); + checkState( + simpleFileEntry != null, + String.format( + "Trying to create deletion vector on file %s which is not previously added.", + fileName)); + List<IndexManifestEntry> dvEntries = fileNameToDVEntry.get(fileName); + // If dv entry's type id DELETE, add DELETE<f, dv> + // If dv entry's type id ADD, add ADD<f, dv> + for (IndexManifestEntry dvEntry : dvEntries) { + entriesWithDV.add( + new SimpleFileEntryWithDV( + dvEntry.kind().equals(FileKind.ADD) + ? simpleFileEntry + : simpleFileEntry.toDelete(), + dvEntry.indexFile().fileName())); + } + + // If one file correspond to only one dv entry and the type is ADD, + // we need to add a DELETE<f, null>. + // This happens when create a dv for a file that doesn't have dv before. + if (dvEntries.size() == 1 && dvEntries.get(0).kind().equals(FileKind.ADD)) { + entriesWithDV.add(new SimpleFileEntryWithDV(simpleFileEntry.toDelete(), null)); + } + } + } + + return entriesWithDV; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java index 3ca33a8222..2754022dd7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java @@ -29,6 +29,7 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.operation.FileStoreCommitImpl; @@ -36,6 +37,7 @@ import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.SchemaUtils; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; @@ -43,6 +45,7 @@ import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.TraceableFileIO; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -142,6 +145,38 @@ public class TestAppendFileStore extends AppendOnlyFileStore { return factory.create(partition, bucket, indexFiles); } + public CommitMessageImpl writeDataFiles( + BinaryRow partition, int bucket, List<String> dataFileNames) throws IOException { + List<DataFileMeta> fileMetas = new ArrayList<>(); + Path bucketPath = pathFactory().bucketPath(partition, bucket); + for (String dataFileName : dataFileNames) { + Path path = new Path(bucketPath, dataFileName); + fileIO.newOutputStream(path, false).close(); + fileMetas.add( + DataFileMeta.forAppend( + path.getName(), + 10L, + 10L, + SimpleStats.EMPTY_STATS, + 0L, + 0L, + schema.id(), + Collections.emptyList(), + null, + null, + null, + null, + null, + null)); + } + return new CommitMessageImpl( + partition, + bucket, + options().bucket(), + new DataIncrement(fileMetas, Collections.emptyList(), Collections.emptyList()), + CompactIncrement.emptyIncrement()); + } + public CommitMessageImpl writeDVIndexFiles( BinaryRow partition, int bucket, Map<String, List<Integer>> dataFileToPositions) { BucketedDvMaintainer dvMaintainer = createOrRestoreDVMaintainer(partition, bucket); diff --git a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java index 10c6f45200..018d55e933 100644 --- a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java @@ -23,16 +23,20 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.PrimaryKeyTableTestBase; import org.apache.paimon.compact.CompactDeletionFile; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.utils.FileIOUtils; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -40,6 +44,8 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -47,13 +53,22 @@ import java.util.Map; import java.util.stream.Collectors; import static java.util.Collections.emptyList; -import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link BucketedDvMaintainer}. */ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { private IndexFileHandler fileHandler; + private final BinaryRow partition = BinaryRow.singleColumn(1); + + @BeforeEach + public void setUp() throws Exception { + // write files + CommitMessageImpl commitMessage = + writeDataFiles(partition, 0, Arrays.asList("f1", "f2", "f3")); + BatchTableCommit commit = table.newBatchWriteBuilder().newCommit(); + commit.commit(Collections.singletonList(commitMessage)); + } @ParameterizedTest @ValueSource(booleans = {true, false}) @@ -61,7 +76,7 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { initIndexHandler(bitmap64); BucketedDvMaintainer.Factory factory = BucketedDvMaintainer.factory(fileHandler); - BucketedDvMaintainer dvMaintainer = factory.create(EMPTY_ROW, 0, emptyList()); + BucketedDvMaintainer dvMaintainer = factory.create(partition, 0, emptyList()); assertThat(dvMaintainer.bitmap64).isEqualTo(bitmap64); dvMaintainer.notifyNewDeletion("f1", 1); @@ -74,7 +89,7 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { IndexFileMeta file = dvMaintainer.writeDeletionVectorsIndex().get(); Map<String, DeletionVector> deletionVectors = - fileHandler.readAllDeletionVectors(EMPTY_ROW, 0, Collections.singletonList(file)); + fileHandler.readAllDeletionVectors(partition, 0, Collections.singletonList(file)); assertThat(deletionVectors.get("f1").isDeleted(1)).isTrue(); assertThat(deletionVectors.get("f1").isDeleted(2)).isFalse(); assertThat(deletionVectors.get("f2").isDeleted(1)).isFalse(); @@ -89,7 +104,7 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { BucketedDvMaintainer.Factory factory = BucketedDvMaintainer.factory(fileHandler); - BucketedDvMaintainer dvMaintainer = factory.create(EMPTY_ROW, 0, new HashMap<>()); + BucketedDvMaintainer dvMaintainer = factory.create(partition, 0, new HashMap<>()); DeletionVector deletionVector1 = createDeletionVector(bitmap64); deletionVector1.delete(1); deletionVector1.delete(3); @@ -100,7 +115,7 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { IndexFileMeta file = dvMaintainer.writeDeletionVectorsIndex().get(); CommitMessage commitMessage = new CommitMessageImpl( - EMPTY_ROW, + partition, 0, 1, new DataIncrement( @@ -115,8 +130,8 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { Snapshot latestSnapshot = table.snapshotManager().latestSnapshot(); List<IndexFileMeta> indexFiles = - fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX, EMPTY_ROW, 0); - dvMaintainer = factory.create(EMPTY_ROW, 0, indexFiles); + fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX, partition, 0); + dvMaintainer = factory.create(partition, 0, indexFiles); DeletionVector deletionVector2 = dvMaintainer.deletionVectorOf("f1").get(); assertThat(deletionVector2.isDeleted(1)).isTrue(); assertThat(deletionVector2.isDeleted(2)).isFalse(); @@ -127,7 +142,7 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { file = dvMaintainer.writeDeletionVectorsIndex().get(); commitMessage = new CommitMessageImpl( - EMPTY_ROW, + partition, 0, 1, new DataIncrement( @@ -141,8 +156,8 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { commit.commit(Collections.singletonList(commitMessage)); latestSnapshot = table.snapshotManager().latestSnapshot(); - indexFiles = fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX, EMPTY_ROW, 0); - dvMaintainer = factory.create(EMPTY_ROW, 0, indexFiles); + indexFiles = fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX, partition, 0); + dvMaintainer = factory.create(partition, 0, indexFiles); DeletionVector deletionVector3 = dvMaintainer.deletionVectorOf("f1").get(); assertThat(deletionVector3.isDeleted(1)).isTrue(); assertThat(deletionVector3.isDeleted(2)).isTrue(); @@ -154,7 +169,7 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { initIndexHandler(bitmap64); BucketedDvMaintainer.Factory factory = BucketedDvMaintainer.factory(fileHandler); - BucketedDvMaintainer dvMaintainer = factory.create(EMPTY_ROW, 0, emptyList()); + BucketedDvMaintainer dvMaintainer = factory.create(partition, 0, emptyList()); File indexDir = new File(tempPath.toFile(), "/default.db/T/index"); @@ -195,7 +210,7 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { // write first kind dv initIndexHandler(bitmap64); BucketedDvMaintainer.Factory factory1 = BucketedDvMaintainer.factory(fileHandler); - BucketedDvMaintainer dvMaintainer1 = factory1.create(EMPTY_ROW, 0, new HashMap<>()); + BucketedDvMaintainer dvMaintainer1 = factory1.create(partition, 0, new HashMap<>()); dvMaintainer1.notifyNewDeletion("f1", 1); dvMaintainer1.notifyNewDeletion("f1", 3); dvMaintainer1.notifyNewDeletion("f2", 1); @@ -205,7 +220,7 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { IndexFileMeta file = dvMaintainer1.writeDeletionVectorsIndex().get(); CommitMessage commitMessage1 = new CommitMessageImpl( - EMPTY_ROW, + partition, 0, 1, DataIncrement.emptyIncrement(), @@ -223,8 +238,8 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { BucketedDvMaintainer.Factory factory2 = BucketedDvMaintainer.factory(fileHandler); List<IndexFileMeta> indexFiles = fileHandler.scan( - table.latestSnapshot().get(), DELETION_VECTORS_INDEX, EMPTY_ROW, 0); - BucketedDvMaintainer dvMaintainer2 = factory2.create(EMPTY_ROW, 0, indexFiles); + table.latestSnapshot().get(), DELETION_VECTORS_INDEX, partition, 0); + BucketedDvMaintainer dvMaintainer2 = factory2.create(partition, 0, indexFiles); dvMaintainer2.notifyNewDeletion("f1", 10); dvMaintainer2.notifyNewDeletion("f3", 1); dvMaintainer2.notifyNewDeletion("f3", 3); @@ -242,7 +257,7 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { file = dvMaintainer2.writeDeletionVectorsIndex().get(); CommitMessage commitMessage2 = new CommitMessageImpl( - EMPTY_ROW, + partition, 0, 1, DataIncrement.emptyIncrement(), @@ -258,10 +273,10 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { // test read dv index file which contains two kinds of dv Map<String, DeletionVector> readDvs = fileHandler.readAllDeletionVectors( - EMPTY_ROW, + partition, 0, fileHandler.scan( - table.latestSnapshot().get(), "DELETION_VECTORS", EMPTY_ROW, 0)); + table.latestSnapshot().get(), "DELETION_VECTORS", partition, 0)); assertThat(readDvs.size()).isEqualTo(3); assertThat(dvs.get("f1").getCardinality()).isEqualTo(3); assertThat(dvs.get("f2").getCardinality()).isEqualTo(2); @@ -293,7 +308,39 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { .map(IndexManifestEntry::indexFile) .collect(Collectors.toList()); Map<String, DeletionVector> deletionVectors = - new HashMap<>(handler.readAllDeletionVectors(EMPTY_ROW, 0, indexFiles)); - return factory.create(EMPTY_ROW, 0, deletionVectors); + new HashMap<>(handler.readAllDeletionVectors(partition, 0, indexFiles)); + return factory.create(partition, 0, deletionVectors); + } + + private CommitMessageImpl writeDataFiles( + BinaryRow partition, int bucket, List<String> dataFileNames) throws IOException { + List<DataFileMeta> fileMetas = new ArrayList<>(); + Path bucketPath = table.store().pathFactory().bucketPath(partition, bucket); + for (String dataFileName : dataFileNames) { + Path path = new Path(bucketPath, dataFileName); + table.fileIO().newOutputStream(path, false).close(); + fileMetas.add( + DataFileMeta.forAppend( + path.getName(), + 10L, + 10L, + SimpleStats.EMPTY_STATS, + 0L, + 0L, + table.schema().id(), + Collections.emptyList(), + null, + null, + null, + null, + null, + null)); + } + return new CommitMessageImpl( + partition, + bucket, + null, + new DataIncrement(fileMetas, Collections.emptyList(), Collections.emptyList()), + CompactIncrement.emptyIncrement()); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index 35843e0c2f..505417a412 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -886,23 +886,25 @@ public class FileStoreCommitTest { Map<String, String> options = new HashMap<>(); options.put(CoreOptions.DELETION_VECTOR_BITMAP64.key(), String.valueOf(bitmap64)); TestAppendFileStore store = TestAppendFileStore.createAppendStore(tempDir, options); + BinaryRow partition = gen.getPartition(gen.next()); + + // create files + CommitMessageImpl commitMessage0 = + store.writeDataFiles(partition, 0, Arrays.asList("f1", "f2")); + store.commit(commitMessage0); // commit 1 CommitMessageImpl commitMessage1 = store.writeDVIndexFiles( - BinaryRow.EMPTY_ROW, - 0, - Collections.singletonMap("f1", Arrays.asList(1, 3))); + partition, 0, Collections.singletonMap("f1", Arrays.asList(1, 3))); CommitMessageImpl commitMessage2 = store.writeDVIndexFiles( - BinaryRow.EMPTY_ROW, - 0, - Collections.singletonMap("f2", Arrays.asList(2, 4))); + partition, 0, Collections.singletonMap("f2", Arrays.asList(2, 4))); store.commit(commitMessage1, commitMessage2); // assert 1 - assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW, 0).size()).isEqualTo(2); - BucketedDvMaintainer maintainer = store.createOrRestoreDVMaintainer(BinaryRow.EMPTY_ROW, 0); + assertThat(store.scanDVIndexFiles(partition, 0).size()).isEqualTo(2); + BucketedDvMaintainer maintainer = store.createOrRestoreDVMaintainer(partition, 0); Map<String, DeletionVector> dvs = maintainer.deletionVectors(); assertThat(dvs.size()).isEqualTo(2); assertThat(dvs.get("f2").isDeleted(2)).isTrue(); @@ -912,16 +914,16 @@ public class FileStoreCommitTest { // commit 2 CommitMessage commitMessage3 = store.writeDVIndexFiles( - BinaryRow.EMPTY_ROW, 0, Collections.singletonMap("f2", Arrays.asList(3))); + partition, 0, Collections.singletonMap("f2", Arrays.asList(3))); List<IndexFileMeta> deleted = new ArrayList<>(commitMessage1.newFilesIncrement().newIndexFiles()); deleted.addAll(commitMessage2.newFilesIncrement().newIndexFiles()); - CommitMessage commitMessage4 = store.removeIndexFiles(BinaryRow.EMPTY_ROW, 0, deleted); + CommitMessage commitMessage4 = store.removeIndexFiles(partition, 0, deleted); store.commit(commitMessage3, commitMessage4); // assert 2 - assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW, 0).size()).isEqualTo(1); - maintainer = store.createOrRestoreDVMaintainer(BinaryRow.EMPTY_ROW, 0); + assertThat(store.scanDVIndexFiles(partition, 0).size()).isEqualTo(1); + maintainer = store.createOrRestoreDVMaintainer(partition, 0); dvs = maintainer.deletionVectors(); assertThat(dvs.size()).isEqualTo(2); assertThat(dvs.get("f1").isDeleted(3)).isTrue(); diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/ConflictDeletionUtilsTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/ConflictDeletionUtilsTest.java new file mode 100644 index 0000000000..34e6e59e7c --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/utils/ConflictDeletionUtilsTest.java @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.index.DeletionVectorMeta; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.manifest.FileEntry; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.manifest.SimpleFileEntry; +import org.apache.paimon.manifest.SimpleFileEntryWithDV; + +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; + +import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; +import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; +import static org.apache.paimon.manifest.FileKind.ADD; +import static org.apache.paimon.manifest.FileKind.DELETE; +import static org.apache.paimon.utils.ConflictDeletionUtils.buildBaseEntriesWithDV; +import static org.apache.paimon.utils.ConflictDeletionUtils.buildDeltaEntriesWithDV; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link ConflictDeletionUtils}. */ +public class ConflictDeletionUtilsTest { + + @Test + public void testBuildBaseEntriesWithDV() { + { + // Scene 1 + List<SimpleFileEntry> baseEntries = new ArrayList<>(); + baseEntries.add(createFileEntry("f1", ADD)); + baseEntries.add(createFileEntry("f2", ADD)); + + List<IndexManifestEntry> deltaIndexEntries = new ArrayList<>(); + deltaIndexEntries.add(createDvIndexEntry("dv1", ADD, Arrays.asList("f2"))); + + assertThat(buildBaseEntriesWithDV(baseEntries, deltaIndexEntries)) + .containsExactlyInAnyOrder( + createFileEntryWithDV("f1", ADD, null), + createFileEntryWithDV("f2", ADD, "dv1")); + } + + { + // Scene 2: skip delete dv + List<SimpleFileEntry> baseEntries = new ArrayList<>(); + baseEntries.add(createFileEntry("f1", ADD)); + baseEntries.add(createFileEntry("f2", ADD)); + + List<IndexManifestEntry> deltaIndexEntries = new ArrayList<>(); + deltaIndexEntries.add(createDvIndexEntry("dv1", DELETE, Arrays.asList("f2"))); + + assertThat(buildBaseEntriesWithDV(baseEntries, deltaIndexEntries)) + .containsExactlyInAnyOrder( + createFileEntryWithDV("f1", ADD, null), + createFileEntryWithDV("f2", ADD, null)); + } + } + + @Test + public void testBuildDeltaEntriesWithDV() { + { + // Scene 1: update f2's dv + List<SimpleFileEntry> baseEntries = new ArrayList<>(); + baseEntries.add(createFileEntryWithDV("f1", ADD, "dv1")); + baseEntries.add(createFileEntryWithDV("f2", ADD, null)); + + List<SimpleFileEntry> deltaEntries = new ArrayList<>(); + deltaEntries.add(createFileEntry("f2", DELETE)); + deltaEntries.add(createFileEntry("f2_new", ADD)); + + List<IndexManifestEntry> deltaIndexEntries = new ArrayList<>(); + deltaIndexEntries.add(createDvIndexEntry("dv2", ADD, Arrays.asList("f2_new"))); + + assertThat(buildDeltaEntriesWithDV(baseEntries, deltaEntries, deltaIndexEntries)) + .containsExactlyInAnyOrder( + createFileEntryWithDV("f2", DELETE, null), + createFileEntryWithDV("f2_new", ADD, "dv2")); + } + + { + // Scene 2: update f2 and merge f1's dv + List<SimpleFileEntry> baseEntries = new ArrayList<>(); + baseEntries.add(createFileEntryWithDV("f1", ADD, "dv1")); + baseEntries.add(createFileEntryWithDV("f2", ADD, null)); + + List<SimpleFileEntry> deltaEntries = new ArrayList<>(); + deltaEntries.add(createFileEntry("f2", DELETE)); + deltaEntries.add(createFileEntry("f2_new", ADD)); + deltaEntries.add(createFileEntry("f3", ADD)); + + List<IndexManifestEntry> deltaIndexEntries = new ArrayList<>(); + deltaIndexEntries.add(createDvIndexEntry("dv1", DELETE, Arrays.asList("f1"))); + deltaIndexEntries.add(createDvIndexEntry("dv2", ADD, Arrays.asList("f1", "f2_new"))); + + assertThat(buildDeltaEntriesWithDV(baseEntries, deltaEntries, deltaIndexEntries)) + .containsExactlyInAnyOrder( + createFileEntryWithDV("f1", DELETE, "dv1"), + createFileEntryWithDV("f1", ADD, "dv2"), + createFileEntryWithDV("f2", DELETE, null), + createFileEntryWithDV("f2_new", ADD, "dv2"), + createFileEntryWithDV("f3", ADD, null)); + } + + { + // Scene 3: update f2 (with dv) and merge f1's dv + List<SimpleFileEntry> baseEntries = new ArrayList<>(); + baseEntries.add(createFileEntryWithDV("f1", ADD, "dv1")); + baseEntries.add(createFileEntryWithDV("f2", ADD, "dv2")); + + List<SimpleFileEntry> deltaEntries = new ArrayList<>(); + deltaEntries.add(createFileEntry("f2", DELETE)); + deltaEntries.add(createFileEntry("f2_new", ADD)); + deltaEntries.add(createFileEntry("f3", ADD)); + + List<IndexManifestEntry> deltaIndexEntries = new ArrayList<>(); + deltaIndexEntries.add(createDvIndexEntry("dv1", DELETE, Arrays.asList("f1"))); + deltaIndexEntries.add(createDvIndexEntry("dv2", DELETE, Arrays.asList("f2"))); + deltaIndexEntries.add(createDvIndexEntry("dv3", ADD, Arrays.asList("f1", "f2_new"))); + + assertThat(buildDeltaEntriesWithDV(baseEntries, deltaEntries, deltaIndexEntries)) + .containsExactlyInAnyOrder( + createFileEntryWithDV("f1", DELETE, "dv1"), + createFileEntryWithDV("f1", ADD, "dv3"), + createFileEntryWithDV("f2", DELETE, "dv2"), + createFileEntryWithDV("f2_new", ADD, "dv3"), + createFileEntryWithDV("f3", ADD, null)); + } + + { + // Scene 4: full compact + List<SimpleFileEntry> baseEntries = new ArrayList<>(); + baseEntries.add(createFileEntryWithDV("f1", ADD, null)); + baseEntries.add(createFileEntryWithDV("f2", ADD, "dv1")); + baseEntries.add(createFileEntryWithDV("f3", ADD, "dv1")); + baseEntries.add(createFileEntryWithDV("f4", ADD, "dv2")); + + List<SimpleFileEntry> deltaEntries = new ArrayList<>(); + deltaEntries.add(createFileEntry("f1", DELETE)); + deltaEntries.add(createFileEntry("f2", DELETE)); + deltaEntries.add(createFileEntry("f3", DELETE)); + deltaEntries.add(createFileEntry("f4", DELETE)); + deltaEntries.add(createFileEntry("f5_compact", ADD)); + + List<IndexManifestEntry> deltaIndexEntries = new ArrayList<>(); + deltaIndexEntries.add(createDvIndexEntry("dv1", DELETE, Arrays.asList("f2", "f3"))); + deltaIndexEntries.add(createDvIndexEntry("dv2", DELETE, Arrays.asList("f4"))); + + assertThat(buildDeltaEntriesWithDV(baseEntries, deltaEntries, deltaIndexEntries)) + .containsExactlyInAnyOrder( + createFileEntryWithDV("f1", DELETE, null), + createFileEntryWithDV("f2", DELETE, "dv1"), + createFileEntryWithDV("f3", DELETE, "dv1"), + createFileEntryWithDV("f4", DELETE, "dv2"), + createFileEntryWithDV("f5_compact", ADD, null)); + } + + { + // Scene 5: merge into with update, delete and insert + List<SimpleFileEntry> baseEntries = new ArrayList<>(); + baseEntries.add(createFileEntryWithDV("f1", ADD, null)); + baseEntries.add(createFileEntryWithDV("f2", ADD, null)); + baseEntries.add(createFileEntryWithDV("f3", ADD, "dv1")); + baseEntries.add(createFileEntryWithDV("f4", ADD, "dv1")); + baseEntries.add(createFileEntryWithDV("f5", ADD, "dv2")); + + List<SimpleFileEntry> deltaEntries = new ArrayList<>(); + deltaEntries.add(createFileEntry("f2", DELETE)); + deltaEntries.add(createFileEntry("f3", DELETE)); + deltaEntries.add(createFileEntry("f3_new", ADD)); + deltaEntries.add(createFileEntry("f7", ADD)); + + List<IndexManifestEntry> deltaIndexEntries = new ArrayList<>(); + deltaIndexEntries.add(createDvIndexEntry("dv1", DELETE, Arrays.asList("f3", "f4"))); + deltaIndexEntries.add(createDvIndexEntry("dv2", DELETE, Arrays.asList("f5"))); + deltaIndexEntries.add(createDvIndexEntry("dv3", ADD, Arrays.asList("f1", "f4", "f5"))); + + assertThat(buildDeltaEntriesWithDV(baseEntries, deltaEntries, deltaIndexEntries)) + .containsExactlyInAnyOrder( + createFileEntryWithDV("f1", DELETE, null), + createFileEntryWithDV("f1", ADD, "dv3"), + createFileEntryWithDV("f2", DELETE, null), + createFileEntryWithDV("f3", DELETE, "dv1"), + createFileEntryWithDV("f3_new", ADD, null), + createFileEntryWithDV("f4", DELETE, "dv1"), + createFileEntryWithDV("f4", ADD, "dv3"), + createFileEntryWithDV("f5", DELETE, "dv2"), + createFileEntryWithDV("f5", ADD, "dv3"), + createFileEntryWithDV("f7", ADD, null)); + } + } + + @Test + public void testConflictDeletionWithDV() { + { + // Scene 1: base -------------> update2 (conflict) + // f1 ^ <f1, +dv2> + // | + // update1 (finished) + // <f1, +dv1> + List<SimpleFileEntry> update1Entries = new ArrayList<>(); + update1Entries.add(createFileEntryWithDV("f1", ADD, "dv1")); + + List<SimpleFileEntry> update2DeltaEntries = new ArrayList<>(); + + List<IndexManifestEntry> update2DeltaIndexEntries = new ArrayList<>(); + update2DeltaIndexEntries.add(createDvIndexEntry("dv2", ADD, Arrays.asList("f1"))); + + List<SimpleFileEntry> update2DeltaEntriesWithDV = + buildDeltaEntriesWithDV( + update1Entries, update2DeltaEntries, update2DeltaIndexEntries); + assertThat(update2DeltaEntriesWithDV) + .containsExactlyInAnyOrder( + createFileEntryWithDV("f1", DELETE, null), + createFileEntryWithDV("f1", ADD, "dv2")); + assertConflict(update1Entries, update2DeltaEntriesWithDV); + } + + { + // Scene 2: base -------------> update2 (conflict) + // <f1, dv0> ^ <f1, +dv2> + // | + // update1 (finished) + // <f1, +dv1> + List<SimpleFileEntry> update1Entries = new ArrayList<>(); + update1Entries.add(createFileEntryWithDV("f1", ADD, "dv1")); + + List<SimpleFileEntry> update2DeltaEntries = new ArrayList<>(); + + List<IndexManifestEntry> update2DeltaIndexEntries = new ArrayList<>(); + update2DeltaIndexEntries.add(createDvIndexEntry("dv0", DELETE, Arrays.asList("f1"))); + update2DeltaIndexEntries.add(createDvIndexEntry("dv2", ADD, Arrays.asList("f1"))); + + List<SimpleFileEntry> update2DeltaEntriesWithDV = + buildDeltaEntriesWithDV( + update1Entries, update2DeltaEntries, update2DeltaIndexEntries); + assertThat(update2DeltaEntriesWithDV) + .containsExactlyInAnyOrder( + createFileEntryWithDV("f1", DELETE, "dv0"), + createFileEntryWithDV("f1", ADD, "dv2")); + assertConflict(update1Entries, update2DeltaEntriesWithDV); + } + + { + // Scene 3: base -------------> update2 (conflict) + // <f1, dv0> ^ <-f1, -dv0>, <+f3, null> + // | + // update1 (finished) + // <-f1, -dv0>, <+f2, dv1> + List<SimpleFileEntry> update1Entries = new ArrayList<>(); + update1Entries.add(createFileEntryWithDV("f2", ADD, "dv1")); + + List<SimpleFileEntry> update2DeltaEntries = new ArrayList<>(); + update2DeltaEntries.add(createFileEntry("f1", DELETE)); + update2DeltaEntries.add(createFileEntry("f3", ADD)); + + List<IndexManifestEntry> update2DeltaIndexEntries = new ArrayList<>(); + update2DeltaIndexEntries.add(createDvIndexEntry("dv0", DELETE, Arrays.asList("f1"))); + + List<SimpleFileEntry> update2DeltaEntriesWithDV = + buildDeltaEntriesWithDV( + update1Entries, update2DeltaEntries, update2DeltaIndexEntries); + assertThat(update2DeltaEntriesWithDV) + .containsExactlyInAnyOrder( + createFileEntryWithDV("f1", DELETE, "dv0"), + createFileEntryWithDV("f3", ADD, null)); + assertConflict(update1Entries, update2DeltaEntriesWithDV); + } + } + + private SimpleFileEntry createFileEntry(String fileName, FileKind kind) { + return new SimpleFileEntry( + kind, + EMPTY_ROW, + 0, + 1, + 0, + fileName, + Collections.emptyList(), + null, + EMPTY_ROW, + EMPTY_ROW, + null); + } + + private SimpleFileEntryWithDV createFileEntryWithDV( + String fileName, FileKind kind, @Nullable String dvFileName) { + return new SimpleFileEntryWithDV(createFileEntry(fileName, kind), dvFileName); + } + + private IndexManifestEntry createDvIndexEntry( + String fileName, FileKind kind, List<String> fileNames) { + LinkedHashMap<String, DeletionVectorMeta> dvRanges = new LinkedHashMap<>(); + for (String name : fileNames) { + dvRanges.put(name, new DeletionVectorMeta(name, 1, 1, 1L)); + } + return new IndexManifestEntry( + kind, + EMPTY_ROW, + 0, + new IndexFileMeta( + DELETION_VECTORS_INDEX, fileName, 11, dvRanges.size(), dvRanges, null)); + } + + private void assertConflict( + List<SimpleFileEntry> baseEntries, List<SimpleFileEntry> deltaEntries) { + ArrayList<SimpleFileEntry> simpleFileEntryWithDVS = new ArrayList<>(baseEntries); + simpleFileEntryWithDVS.addAll(deltaEntries); + Collection<SimpleFileEntry> merged = FileEntry.mergeEntries(simpleFileEntryWithDVS); + int deleteCount = 0; + for (SimpleFileEntry simpleFileEntryWithDV : merged) { + if (simpleFileEntryWithDV.kind().equals(FileKind.DELETE)) { + deleteCount++; + } + } + assert (deleteCount > 0); + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala index c85315d876..461aca90ef 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala @@ -119,6 +119,7 @@ case class DeleteFromPaimonTableCommand( } private def performNonPrimaryKeyDelete(sparkSession: SparkSession): Seq[CommitMessage] = { + val readSnapshot = table.snapshotManager().latestSnapshot() // Step1: the candidate data splits which are filtered by Paimon Predicate. val candidateDataSplits = findCandidateDataSplits(condition, relation.output) val dataFilePathToMeta = candidateFileMap(candidateDataSplits) @@ -133,7 +134,7 @@ case class DeleteFromPaimonTableCommand( sparkSession) // Step3: update the touched deletion vectors and index files - writer.persistDeletionVectors(deletionVectors) + writer.persistDeletionVectors(deletionVectors, readSnapshot) } else { // Step2: extract out the exactly files, which must have at least one record to be updated. val touchedFilePaths = diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala index 5f12d1110d..1ae894dbae 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala @@ -88,6 +88,8 @@ case class MergeIntoPaimonTable( } private def performMergeForNonPkTable(sparkSession: SparkSession): Seq[CommitMessage] = { + // todo: find a more universal way to make read snapshot consistent. + val readSnapshot = table.snapshotManager().latestSnapshot() val targetDS = createDataset(sparkSession, filteredTargetPlan) val sourceDS = createDataset(sparkSession, sourceTable) @@ -113,7 +115,7 @@ case class MergeIntoPaimonTable( val dvDS = ds.where( s"$ROW_KIND_COL = ${RowKind.DELETE.toByteValue} or $ROW_KIND_COL = ${RowKind.UPDATE_AFTER.toByteValue}") val deletionVectors = collectDeletionVectors(dataFilePathToMeta, dvDS, sparkSession) - val indexCommitMsg = writer.persistDeletionVectors(deletionVectors) + val indexCommitMsg = writer.persistDeletionVectors(deletionVectors, readSnapshot) // Step4: filter rows that should be written as the inserted/updated data. val toWriteDS = ds diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala index 246245c052..6d0563b364 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala @@ -18,7 +18,7 @@ package org.apache.paimon.spark.commands -import org.apache.paimon.CoreOptions +import org.apache.paimon.{CoreOptions, Snapshot} import org.apache.paimon.CoreOptions.{PartitionSinkStrategy, WRITE_ONLY} import org.apache.paimon.codegen.CodeGenUtils import org.apache.paimon.crosspartition.{IndexBootstrap, KeyPartOrRow} @@ -304,10 +304,11 @@ case class PaimonSparkWriter(table: FileStoreTable, writeRowTracking: Boolean = * deletion vectors; else, one index file will contain all deletion vector with the same partition * and bucket. */ - def persistDeletionVectors(deletionVectors: Dataset[SparkDeletionVector]): Seq[CommitMessage] = { + def persistDeletionVectors( + deletionVectors: Dataset[SparkDeletionVector], + snapshot: Snapshot): Seq[CommitMessage] = { val sparkSession = deletionVectors.sparkSession import sparkSession.implicits._ - val snapshot = table.snapshotManager().latestSnapshotFromFileSystem() val serializedCommits = deletionVectors .groupByKey(_.partitionAndBucket) .mapGroups { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala index 1babd7b5c3..8839d5c8ac 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala @@ -71,6 +71,7 @@ case class UpdatePaimonTableCommand( /** Update for table without primary keys */ private def performUpdateForNonPkTable(sparkSession: SparkSession): Seq[CommitMessage] = { + val readSnapshot = table.snapshotManager().latestSnapshot() // Step1: the candidate data splits which are filtered by Paimon Predicate. val candidateDataSplits = findCandidateDataSplits(condition, relation.output) val dataFilePathToMeta = candidateFileMap(candidateDataSplits) @@ -100,7 +101,7 @@ case class UpdatePaimonTableCommand( val addCommitMessage = writeOnlyUpdatedData(sparkSession, touchedDataSplits) // Step4: write these deletion vectors. - val indexCommitMsg = writer.persistDeletionVectors(deletionVectors) + val indexCommitMsg = writer.persistDeletionVectors(deletionVectors, readSnapshot) addCommitMessage ++ indexCommitMsg } finally { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala index c867aed9f0..bfbadc7624 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala @@ -760,46 +760,49 @@ trait MergeIntoAppendTableTest extends PaimonSparkTestBase with PaimonAppendTabl } test("Paimon MergeInto: concurrent merge and compact") { - withTable("s", "t") { - sql("CREATE TABLE s (id INT, b INT, c INT)") - sql("INSERT INTO s VALUES (1, 1, 1)") - - sql("CREATE TABLE t (id INT, b INT, c INT)") - sql("INSERT INTO t VALUES (1, 1, 1)") - - val mergeInto = Future { - for (_ <- 1 to 10) { - try { - sql(""" - |MERGE INTO t - |USING s - |ON t.id = s.id - |WHEN MATCHED THEN - |UPDATE SET t.id = s.id, t.b = s.b + t.b, t.c = s.c + t.c - |""".stripMargin) - } catch { - case a: Throwable => - assert( - a.getMessage.contains("Conflicts during commits") || a.getMessage.contains( - "Missing file")) + for (dvEnabled <- Seq("true", "false")) { + withTable("s", "t") { + sql("CREATE TABLE s (id INT, b INT, c INT)") + sql("INSERT INTO s VALUES (1, 1, 1)") + + sql( + s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES ('deletion-vectors.enabled' = '$dvEnabled')") + sql("INSERT INTO t VALUES (1, 1, 1)") + + val mergeInto = Future { + for (_ <- 1 to 10) { + try { + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN + |UPDATE SET t.id = s.id, t.b = s.b + t.b, t.c = s.c + t.c + |""".stripMargin) + } catch { + case a: Throwable => + assert( + a.getMessage.contains("Conflicts during commits") || a.getMessage.contains( + "Missing file")) + } + checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1))) } - checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1))) } - } - val compact = Future { - for (_ <- 1 to 10) { - try { - sql("CALL sys.compact(table => 't', order_strategy => 'order', order_by => 'id')") - } catch { - case a: Throwable => assert(a.getMessage.contains("Conflicts during commits")) + val compact = Future { + for (_ <- 1 to 10) { + try { + sql("CALL sys.compact(table => 't', order_strategy => 'order', order_by => 'id')") + } catch { + case a: Throwable => assert(a.getMessage.contains("Conflicts during commits")) + } + checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1))) } - checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1))) } - } - Await.result(mergeInto, 60.seconds) - Await.result(compact, 60.seconds) + Await.result(mergeInto, 60.seconds) + Await.result(compact, 60.seconds) + } } } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala index 097d2c4e14..072324ce3a 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala @@ -25,6 +25,11 @@ import org.apache.paimon.spark.catalyst.analysis.Update import org.apache.spark.sql.Row import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy} +import scala.concurrent.{Await, Future} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.DurationInt +import scala.util.Random + abstract class UpdateTableTestBase extends PaimonSparkTestBase { import testImplicits._ @@ -393,4 +398,38 @@ abstract class UpdateTableTestBase extends PaimonSparkTestBase { assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.COMPACT)) } } + + test("Paimon update: random concurrent update and dv table") { + withTable("t") { + val recordCount = 10000 + val maxCurrent = Random.nextInt(2) + 1 + + sql(s"CREATE TABLE t (a INT, b INT) TBLPROPERTIES ('deletion-vectors.enabled' = 'true')") + sql(s"INSERT INTO t SELECT id AS a, 0 AS b FROM range(0, $recordCount)") + + def run(): Future[Unit] = Future { + for (_ <- 1 to 20) { + try { + val i = 20 + Random.nextInt(100) + Random.nextInt(2) match { + case 0 => sql(s"UPDATE t SET b = b + 1 WHERE (a % $i) = ${Random.nextInt(i)}") + case 1 => + sql("CALL sys.compact(table => 't', options => 'compaction.min.file-num=1')") + case 2 => + sql("CALL sys.compact(table => 't', order_strategy => 'order', order_by => 'a')") + } + } catch { + case a: Throwable => assert(a.getMessage.contains("Conflicts during commits")) + } + checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(recordCount))) + } + } + + (1 to maxCurrent) + .map(_ => run()) + .foreach( + Await.result(_, 600.seconds) + ) + } + } }
