This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 671762a14b [core] Extract ConflictDetection for FileStoreCommitImpl
671762a14b is described below
commit 671762a14b65e36f36cb8415cee7194f44098d63
Author: JingsongLi <[email protected]>
AuthorDate: Fri Oct 10 14:30:42 2025 +0800
[core] Extract ConflictDetection for FileStoreCommitImpl
---
.../java/org/apache/paimon/AbstractFileStore.java | 15 +-
.../paimon/operation/FileStoreCommitImpl.java | 372 +-------------
.../paimon/operation/commit/ConflictDetection.java | 547 +++++++++++++++++++++
.../apache/paimon/utils/ConflictDeletionUtils.java | 149 ------
.../apache/paimon/operation/FileDeletionTest.java | 2 +-
.../paimon/operation/FileStoreCommitTest.java | 2 +-
.../commit/ConflictDetectionTest.java} | 9 +-
7 files changed, 577 insertions(+), 519 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 31918b84cf..2e209fba45 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -41,6 +41,7 @@ import org.apache.paimon.operation.ManifestsReader;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.operation.commit.ConflictDetection;
import org.apache.paimon.partition.PartitionExpireStrategy;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -267,6 +268,16 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
if (snapshotCommit == null) {
snapshotCommit = new RenamingSnapshotCommit(snapshotManager,
Lock.empty());
}
+ ConflictDetection conflictDetection =
+ new ConflictDetection(
+ tableName,
+ commitUser,
+ partitionType,
+ pathFactory(),
+ newKeyComparator(),
+ bucketMode(),
+ options.deletionVectorsEnabled(),
+ newIndexFileHandler());
return new FileStoreCommitImpl(
snapshotCommit,
fileIO,
@@ -287,7 +298,6 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
options.manifestFullCompactionThresholdSize(),
options.manifestMergeMinCount(),
partitionType.getFieldCount() > 0 &&
options.dynamicPartitionOverwrite(),
- newKeyComparator(),
options.branch(),
newStatsFileHandler(),
bucketMode(),
@@ -299,8 +309,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
options.commitMaxRetryWait(),
options.commitStrictModeLastSafeSnapshot().orElse(null),
options.rowTrackingEnabled(),
- options.deletionVectorsEnabled(),
- newIndexFileHandler());
+ conflictDetection);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 97e67ec5cc..365317a07d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -24,9 +24,7 @@ import org.apache.paimon.Snapshot.CommitKind;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.SnapshotCommit;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.FileEntry;
@@ -41,6 +39,8 @@ import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.manifest.SimpleFileEntry;
+import org.apache.paimon.operation.commit.ConflictDetection;
+import org.apache.paimon.operation.commit.ConflictDetection.ConflictCheck;
import org.apache.paimon.operation.metrics.CommitMetrics;
import org.apache.paimon.operation.metrics.CommitStats;
import org.apache.paimon.options.MemorySize;
@@ -70,9 +70,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -82,7 +80,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
@@ -91,13 +88,12 @@ import static
org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
import static org.apache.paimon.manifest.ManifestEntry.recordCount;
import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd;
import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete;
+import static
org.apache.paimon.operation.commit.ConflictDetection.hasConflictChecked;
+import static
org.apache.paimon.operation.commit.ConflictDetection.mustConflictCheck;
+import static
org.apache.paimon.operation.commit.ConflictDetection.noConflictCheck;
import static
org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
-import static
org.apache.paimon.utils.ConflictDeletionUtils.buildBaseEntriesWithDV;
-import static
org.apache.paimon.utils.ConflictDeletionUtils.buildDeltaEntriesWithDV;
-import static
org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
import static org.apache.paimon.utils.Preconditions.checkArgument;
-import static org.apache.paimon.utils.Preconditions.checkState;
/**
* Default implementation of {@link FileStoreCommit}.
@@ -141,7 +137,6 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
private final MemorySize manifestFullCompactionSize;
private final int manifestMergeMinCount;
private final boolean dynamicPartitionOverwrite;
- @Nullable private final Comparator<InternalRow> keyComparator;
private final String branchName;
@Nullable private final Integer manifestReadParallelism;
private final List<CommitCallback> commitCallbacks;
@@ -154,12 +149,10 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
@Nullable private Long strictModeLastSafeSnapshot;
private final InternalRowPartitionComputer partitionComputer;
private final boolean rowTrackingEnabled;
- private final boolean deletionVectorsEnabled;
- private final IndexFileHandler indexFileHandler;
+ private final ConflictDetection conflictDetection;
private boolean ignoreEmptyCommit;
private CommitMetrics commitMetrics;
- @Nullable private PartitionExpire partitionExpire;
public FileStoreCommitImpl(
SnapshotCommit snapshotCommit,
@@ -181,7 +174,6 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
MemorySize manifestFullCompactionSize,
int manifestMergeMinCount,
boolean dynamicPartitionOverwrite,
- @Nullable Comparator<InternalRow> keyComparator,
String branchName,
StatsFileHandler statsFileHandler,
BucketMode bucketMode,
@@ -193,8 +185,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
long commitMaxRetryWait,
@Nullable Long strictModeLastSafeSnapshot,
boolean rowTrackingEnabled,
- boolean deletionVectorsEnabled,
- IndexFileHandler indexFileHandler) {
+ ConflictDetection conflictDetection) {
this.snapshotCommit = snapshotCommit;
this.fileIO = fileIO;
this.schemaManager = schemaManager;
@@ -217,7 +208,6 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
this.manifestFullCompactionSize = manifestFullCompactionSize;
this.manifestMergeMinCount = manifestMergeMinCount;
this.dynamicPartitionOverwrite = dynamicPartitionOverwrite;
- this.keyComparator = keyComparator;
this.branchName = branchName;
this.manifestReadParallelism = manifestReadParallelism;
this.commitCallbacks = commitCallbacks;
@@ -238,8 +228,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
this.statsFileHandler = statsFileHandler;
this.bucketMode = bucketMode;
this.rowTrackingEnabled = rowTrackingEnabled;
- this.deletionVectorsEnabled = deletionVectorsEnabled;
- this.indexFileHandler = indexFileHandler;
+ this.conflictDetection = conflictDetection;
}
@Override
@@ -250,7 +239,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
@Override
public FileStoreCommit withPartitionExpire(PartitionExpire
partitionExpire) {
- this.partitionExpire = partitionExpire;
+ this.conflictDetection.withPartitionExpire(partitionExpire);
return this;
}
@@ -347,7 +336,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
appendTableFiles,
compactTableFiles,
appendIndexFiles)));
- noConflictsOrFail(
+ conflictDetection.checkNoConflictsOrFail(
latestSnapshot,
baseEntries,
appendSimpleEntries,
@@ -383,7 +372,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
// files.
if (safeLatestSnapshotId != null) {
baseEntries.addAll(appendSimpleEntries);
- noConflictsOrFail(
+ conflictDetection.checkNoConflictsOrFail(
latestSnapshot,
baseEntries,
SimpleFileEntry.from(compactTableFiles),
@@ -1032,7 +1021,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
baseDataFiles =
readAllEntriesFromChangedPartitions(latestSnapshot,
changedPartitions);
}
- noConflictsOrFail(
+ conflictDetection.checkNoConflictsOrFail(
latestSnapshot,
baseDataFiles,
SimpleFileEntry.from(deltaFiles),
@@ -1409,296 +1398,6 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
}
- private void noConflictsOrFail(
- Snapshot snapshot,
- List<SimpleFileEntry> baseEntries,
- List<SimpleFileEntry> deltaEntries,
- List<IndexManifestEntry> deltaIndexEntries,
- CommitKind commitKind) {
- String baseCommitUser = snapshot.commitUser();
- if (checkForDeletionVector()) {
- // Enrich dvName in fileEntry to checker for base ADD dv and delta
DELETE dv.
- // For example:
- // If the base file is <ADD baseFile1, ADD dv1>,
- // then the delta file must be <DELETE deltaFile1, DELETE dv1>;
and vice versa,
- // If the delta file is <DELETE deltaFile2, DELETE dv2>,
- // then the base file must be <ADD baseFile2, ADD dv2>.
- try {
- baseEntries =
- buildBaseEntriesWithDV(
- baseEntries,
- snapshot.indexManifest() == null
- ? Collections.emptyList()
- :
indexFileHandler.readManifest(snapshot.indexManifest()));
- deltaEntries =
- buildDeltaEntriesWithDV(baseEntries, deltaEntries,
deltaIndexEntries);
- } catch (Throwable e) {
- throw conflictException(commitUser, baseEntries,
deltaEntries).apply(e);
- }
- }
-
- List<SimpleFileEntry> allEntries = new ArrayList<>(baseEntries);
- allEntries.addAll(deltaEntries);
-
- checkBucketKeepSame(baseEntries, deltaEntries, commitKind, allEntries,
baseCommitUser);
-
- Function<Throwable, RuntimeException> conflictException =
- conflictException(baseCommitUser, baseEntries, deltaEntries);
- Collection<SimpleFileEntry> mergedEntries;
- try {
- // merge manifest entries and also check if the files we want to
delete are still there
- mergedEntries = FileEntry.mergeEntries(allEntries);
- } catch (Throwable e) {
- throw conflictException.apply(e);
- }
-
- checkNoDeleteInMergedEntries(mergedEntries, conflictException);
- checkKeyRangeNoConflicts(baseEntries, deltaEntries, mergedEntries,
baseCommitUser);
- }
-
- private void checkBucketKeepSame(
- List<SimpleFileEntry> baseEntries,
- List<SimpleFileEntry> deltaEntries,
- CommitKind commitKind,
- List<SimpleFileEntry> allEntries,
- String baseCommitUser) {
- if (commitKind == CommitKind.OVERWRITE) {
- return;
- }
-
- // total buckets within the same partition should remain the same
- Map<BinaryRow, Integer> totalBuckets = new HashMap<>();
- for (SimpleFileEntry entry : allEntries) {
- if (entry.totalBuckets() <= 0) {
- continue;
- }
-
- if (!totalBuckets.containsKey(entry.partition())) {
- totalBuckets.put(entry.partition(), entry.totalBuckets());
- continue;
- }
-
- int old = totalBuckets.get(entry.partition());
- if (old == entry.totalBuckets()) {
- continue;
- }
-
- Pair<RuntimeException, RuntimeException> conflictException =
- createConflictException(
- "Total buckets of partition "
- + entry.partition()
- + " changed from "
- + old
- + " to "
- + entry.totalBuckets()
- + " without overwrite. Give up
committing.",
- baseCommitUser,
- baseEntries,
- deltaEntries,
- null);
- LOG.warn("", conflictException.getLeft());
- throw conflictException.getRight();
- }
- }
-
- private void checkKeyRangeNoConflicts(
- List<SimpleFileEntry> baseEntries,
- List<SimpleFileEntry> deltaEntries,
- Collection<SimpleFileEntry> mergedEntries,
- String baseCommitUser) {
- // fast exit for file store without keys
- if (keyComparator == null) {
- return;
- }
-
- // group entries by partitions, buckets and levels
- Map<LevelIdentifier, List<SimpleFileEntry>> levels = new HashMap<>();
- for (SimpleFileEntry entry : mergedEntries) {
- int level = entry.level();
- if (level >= 1) {
- levels.computeIfAbsent(
- new LevelIdentifier(entry.partition(),
entry.bucket(), level),
- lv -> new ArrayList<>())
- .add(entry);
- }
- }
-
- // check for all LSM level >= 1, key ranges of files do not intersect
- for (List<SimpleFileEntry> entries : levels.values()) {
- entries.sort((a, b) -> keyComparator.compare(a.minKey(),
b.minKey()));
- for (int i = 0; i + 1 < entries.size(); i++) {
- SimpleFileEntry a = entries.get(i);
- SimpleFileEntry b = entries.get(i + 1);
- if (keyComparator.compare(a.maxKey(), b.minKey()) >= 0) {
- Pair<RuntimeException, RuntimeException> conflictException
=
- createConflictException(
- "LSM conflicts detected! Give up
committing. Conflict files are:\n"
- +
a.identifier().toString(pathFactory)
- + "\n"
- +
b.identifier().toString(pathFactory),
- baseCommitUser,
- baseEntries,
- deltaEntries,
- null);
-
- LOG.warn("", conflictException.getLeft());
- throw conflictException.getRight();
- }
- }
- }
- }
-
- private Function<Throwable, RuntimeException> conflictException(
- String baseCommitUser,
- List<SimpleFileEntry> baseEntries,
- List<SimpleFileEntry> deltaEntries) {
- return e -> {
- Pair<RuntimeException, RuntimeException> conflictException =
- createConflictException(
- "File deletion conflicts detected! Give up
committing.",
- baseCommitUser,
- baseEntries,
- deltaEntries,
- e);
- LOG.warn("", conflictException.getLeft());
- return conflictException.getRight();
- };
- }
-
- private boolean checkForDeletionVector() {
- return deletionVectorsEnabled &&
bucketMode.equals(BucketMode.BUCKET_UNAWARE);
- }
-
- private void checkNoDeleteInMergedEntries(
- Collection<SimpleFileEntry> mergedEntries,
- Function<Throwable, RuntimeException> exceptionFunction) {
- try {
- for (SimpleFileEntry entry : mergedEntries) {
- checkState(
- entry.kind() != FileKind.DELETE,
- "Trying to delete file %s for table %s which is not
previously added.",
- entry.fileName(),
- tableName);
- }
- } catch (Throwable e) {
- assertConflictForPartitionExpire(mergedEntries);
- throw exceptionFunction.apply(e);
- }
- }
-
- private void assertConflictForPartitionExpire(Collection<SimpleFileEntry>
mergedEntries) {
- if (partitionExpire != null && partitionExpire.isValueExpiration()) {
- Set<BinaryRow> deletedPartitions = new HashSet<>();
- for (SimpleFileEntry entry : mergedEntries) {
- if (entry.kind() == FileKind.DELETE) {
- deletedPartitions.add(entry.partition());
- }
- }
- if (partitionExpire.isValueAllExpired(deletedPartitions)) {
- List<String> expiredPartitions =
- deletedPartitions.stream()
- .map(
- partition ->
- partToSimpleString(
- partitionType,
partition, "-", 200))
- .collect(Collectors.toList());
- throw new RuntimeException(
- "You are writing data to expired partitions, and you
can filter this data to avoid job failover."
- + " Otherwise, continuous expired records will
cause the job to failover restart continuously."
- + " Expired partitions are: "
- + expiredPartitions);
- }
- }
- }
-
- /**
- * Construct detailed conflict exception. The returned exception is formed
of (full exception,
- * simplified exception), The simplified exception is generated when the
entry length is larger
- * than the max limit.
- */
- private Pair<RuntimeException, RuntimeException> createConflictException(
- String message,
- String baseCommitUser,
- List<SimpleFileEntry> baseEntries,
- List<SimpleFileEntry> changes,
- Throwable cause) {
- String possibleCauses =
- String.join(
- "\n",
- "Don't panic!",
- "Conflicts during commits are normal and this failure
is intended to resolve the conflicts.",
- "Conflicts are mainly caused by the following
scenarios:",
- "1. Multiple jobs are writing into the same partition
at the same time, "
- + "or you use STATEMENT SET to execute
multiple INSERT statements into the same Paimon table.",
- " You'll probably see different base commit user and
current commit user below.",
- " You can use "
- +
"https://paimon.apache.org/docs/master/maintenance/dedicated-compaction#dedicated-compaction-job"
- + " to support multiple writing.",
- "2. You're recovering from an old savepoint, or you're
creating multiple jobs from a savepoint.",
- " The job will fail continuously in this scenario to
protect metadata from corruption.",
- " You can either recover from the latest savepoint, "
- + "or you can revert the table to the snapshot
corresponding to the old savepoint.");
- String commitUserString =
- "Base commit user is: "
- + baseCommitUser
- + "; Current commit user is: "
- + commitUser;
- String baseEntriesString =
- "Base entries are:\n"
- + baseEntries.stream()
- .map(Object::toString)
- .collect(Collectors.joining("\n"));
- String changesString =
- "Changes are:\n"
- +
changes.stream().map(Object::toString).collect(Collectors.joining("\n"));
-
- RuntimeException fullException =
- new RuntimeException(
- message
- + "\n\n"
- + possibleCauses
- + "\n\n"
- + commitUserString
- + "\n\n"
- + baseEntriesString
- + "\n\n"
- + changesString,
- cause);
-
- RuntimeException simplifiedException;
- int maxEntry = 50;
- if (baseEntries.size() > maxEntry || changes.size() > maxEntry) {
- baseEntriesString =
- "Base entries are:\n"
- + baseEntries.subList(0,
Math.min(baseEntries.size(), maxEntry))
- .stream()
- .map(Object::toString)
- .collect(Collectors.joining("\n"));
- changesString =
- "Changes are:\n"
- + changes.subList(0, Math.min(changes.size(),
maxEntry)).stream()
- .map(Object::toString)
- .collect(Collectors.joining("\n"));
- simplifiedException =
- new RuntimeException(
- message
- + "\n\n"
- + possibleCauses
- + "\n\n"
- + commitUserString
- + "\n\n"
- + baseEntriesString
- + "\n\n"
- + changesString
- + "\n\n"
- + "The entry list above are not fully
displayed, please refer to taskmanager.log for more information.",
- cause);
- return Pair.of(fullException, simplifiedException);
- } else {
- return Pair.of(fullException, fullException);
- }
- }
-
private void cleanUpNoReuseTmpManifests(
Pair<String, Long> baseManifestList,
List<ManifestFileMeta> mergeBeforeManifests,
@@ -1766,53 +1465,6 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
IOUtils.closeQuietly(snapshotCommit);
}
- private static class LevelIdentifier {
-
- private final BinaryRow partition;
- private final int bucket;
- private final int level;
-
- private LevelIdentifier(BinaryRow partition, int bucket, int level) {
- this.partition = partition;
- this.bucket = bucket;
- this.level = level;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof LevelIdentifier)) {
- return false;
- }
- LevelIdentifier that = (LevelIdentifier) o;
- return Objects.equals(partition, that.partition)
- && bucket == that.bucket
- && level == that.level;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(partition, bucket, level);
- }
- }
-
- /** Should do conflict check. */
- interface ConflictCheck {
- boolean shouldCheck(long latestSnapshot);
- }
-
- static ConflictCheck hasConflictChecked(@Nullable Long
checkedLatestSnapshotId) {
- return latestSnapshot -> !Objects.equals(latestSnapshot,
checkedLatestSnapshotId);
- }
-
- static ConflictCheck noConflictCheck() {
- return latestSnapshot -> false;
- }
-
- @VisibleForTesting
- static ConflictCheck mustConflictCheck() {
- return latestSnapshot -> true;
- }
-
private interface CommitResult {
boolean isSuccess();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
new file mode 100644
index 0000000000..9739beae88
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.commit;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.Snapshot.CommitKind;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.index.DeletionVectorMeta;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.FileEntry;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.SimpleFileEntry;
+import org.apache.paimon.manifest.SimpleFileEntryWithDV;
+import org.apache.paimon.operation.PartitionExpire;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static
org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
+import static org.apache.paimon.utils.Preconditions.checkState;
+
+/** Util class for detecting conflicts between base and delta files. */
+public class ConflictDetection {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ConflictDetection.class);
+
+ private final String tableName;
+ private final String commitUser;
+ private final RowType partitionType;
+ private final FileStorePathFactory pathFactory;
+ private final @Nullable Comparator<InternalRow> keyComparator;
+ private final BucketMode bucketMode;
+ private final boolean deletionVectorsEnabled;
+ private final IndexFileHandler indexFileHandler;
+
+ private @Nullable PartitionExpire partitionExpire;
+
+ public ConflictDetection(
+ String tableName,
+ String commitUser,
+ RowType partitionType,
+ FileStorePathFactory pathFactory,
+ @Nullable Comparator<InternalRow> keyComparator,
+ BucketMode bucketMode,
+ boolean deletionVectorsEnabled,
+ IndexFileHandler indexFileHandler) {
+ this.tableName = tableName;
+ this.commitUser = commitUser;
+ this.partitionType = partitionType;
+ this.pathFactory = pathFactory;
+ this.keyComparator = keyComparator;
+ this.bucketMode = bucketMode;
+ this.deletionVectorsEnabled = deletionVectorsEnabled;
+ this.indexFileHandler = indexFileHandler;
+ }
+
+ public void withPartitionExpire(PartitionExpire partitionExpire) {
+ this.partitionExpire = partitionExpire;
+ }
+
+ public void checkNoConflictsOrFail(
+ Snapshot snapshot,
+ List<SimpleFileEntry> baseEntries,
+ List<SimpleFileEntry> deltaEntries,
+ List<IndexManifestEntry> deltaIndexEntries,
+ CommitKind commitKind) {
+ String baseCommitUser = snapshot.commitUser();
+ if (checkForDeletionVector()) {
+ // Enrich dvName in fileEntry to checker for base ADD dv and delta
DELETE dv.
+ // For example:
+ // If the base file is <ADD baseFile1, ADD dv1>,
+ // then the delta file must be <DELETE deltaFile1, DELETE dv1>;
and vice versa,
+ // If the delta file is <DELETE deltaFile2, DELETE dv2>,
+ // then the base file must be <ADD baseFile2, ADD dv2>.
+ try {
+ baseEntries =
+ buildBaseEntriesWithDV(
+ baseEntries,
+ snapshot.indexManifest() == null
+ ? Collections.emptyList()
+ :
indexFileHandler.readManifest(snapshot.indexManifest()));
+ deltaEntries =
+ buildDeltaEntriesWithDV(baseEntries, deltaEntries,
deltaIndexEntries);
+ } catch (Throwable e) {
+ throw conflictException(commitUser, baseEntries,
deltaEntries).apply(e);
+ }
+ }
+
+ List<SimpleFileEntry> allEntries = new ArrayList<>(baseEntries);
+ allEntries.addAll(deltaEntries);
+
+ checkBucketKeepSame(baseEntries, deltaEntries, commitKind, allEntries,
baseCommitUser);
+
+ Function<Throwable, RuntimeException> conflictException =
+ conflictException(baseCommitUser, baseEntries, deltaEntries);
+ Collection<SimpleFileEntry> mergedEntries;
+ try {
+ // merge manifest entries and also check if the files we want to
delete are still there
+ mergedEntries = FileEntry.mergeEntries(allEntries);
+ } catch (Throwable e) {
+ throw conflictException.apply(e);
+ }
+
+ checkNoDeleteInMergedEntries(mergedEntries, conflictException);
+ checkKeyRangeNoConflicts(baseEntries, deltaEntries, mergedEntries,
baseCommitUser);
+ }
+
+ private void checkBucketKeepSame(
+ List<SimpleFileEntry> baseEntries,
+ List<SimpleFileEntry> deltaEntries,
+ CommitKind commitKind,
+ List<SimpleFileEntry> allEntries,
+ String baseCommitUser) {
+ if (commitKind == CommitKind.OVERWRITE) {
+ return;
+ }
+
+ // total buckets within the same partition should remain the same
+ Map<BinaryRow, Integer> totalBuckets = new HashMap<>();
+ for (SimpleFileEntry entry : allEntries) {
+ if (entry.totalBuckets() <= 0) {
+ continue;
+ }
+
+ if (!totalBuckets.containsKey(entry.partition())) {
+ totalBuckets.put(entry.partition(), entry.totalBuckets());
+ continue;
+ }
+
+ int old = totalBuckets.get(entry.partition());
+ if (old == entry.totalBuckets()) {
+ continue;
+ }
+
+ Pair<RuntimeException, RuntimeException> conflictException =
+ createConflictException(
+ "Total buckets of partition "
+ + entry.partition()
+ + " changed from "
+ + old
+ + " to "
+ + entry.totalBuckets()
+ + " without overwrite. Give up
committing.",
+ baseCommitUser,
+ baseEntries,
+ deltaEntries,
+ null);
+ LOG.warn("", conflictException.getLeft());
+ throw conflictException.getRight();
+ }
+ }
+
+ private void checkKeyRangeNoConflicts(
+ List<SimpleFileEntry> baseEntries,
+ List<SimpleFileEntry> deltaEntries,
+ Collection<SimpleFileEntry> mergedEntries,
+ String baseCommitUser) {
+ // fast exit for file store without keys
+ if (keyComparator == null) {
+ return;
+ }
+
+ // group entries by partitions, buckets and levels
+ Map<LevelIdentifier, List<SimpleFileEntry>> levels = new HashMap<>();
+ for (SimpleFileEntry entry : mergedEntries) {
+ int level = entry.level();
+ if (level >= 1) {
+ levels.computeIfAbsent(
+ new LevelIdentifier(entry.partition(),
entry.bucket(), level),
+ lv -> new ArrayList<>())
+ .add(entry);
+ }
+ }
+
+ // check for all LSM level >= 1, key ranges of files do not intersect
+ for (List<SimpleFileEntry> entries : levels.values()) {
+ entries.sort((a, b) -> keyComparator.compare(a.minKey(),
b.minKey()));
+ for (int i = 0; i + 1 < entries.size(); i++) {
+ SimpleFileEntry a = entries.get(i);
+ SimpleFileEntry b = entries.get(i + 1);
+ if (keyComparator.compare(a.maxKey(), b.minKey()) >= 0) {
+ Pair<RuntimeException, RuntimeException> conflictException
=
+ createConflictException(
+ "LSM conflicts detected! Give up
committing. Conflict files are:\n"
+ +
a.identifier().toString(pathFactory)
+ + "\n"
+ +
b.identifier().toString(pathFactory),
+ baseCommitUser,
+ baseEntries,
+ deltaEntries,
+ null);
+
+ LOG.warn("", conflictException.getLeft());
+ throw conflictException.getRight();
+ }
+ }
+ }
+ }
+
+ private Function<Throwable, RuntimeException> conflictException(
+ String baseCommitUser,
+ List<SimpleFileEntry> baseEntries,
+ List<SimpleFileEntry> deltaEntries) {
+ return e -> {
+ Pair<RuntimeException, RuntimeException> conflictException =
+ createConflictException(
+ "File deletion conflicts detected! Give up
committing.",
+ baseCommitUser,
+ baseEntries,
+ deltaEntries,
+ e);
+ LOG.warn("", conflictException.getLeft());
+ return conflictException.getRight();
+ };
+ }
+
+ private boolean checkForDeletionVector() {
+ return deletionVectorsEnabled &&
bucketMode.equals(BucketMode.BUCKET_UNAWARE);
+ }
+
+ private void checkNoDeleteInMergedEntries(
+ Collection<SimpleFileEntry> mergedEntries,
+ Function<Throwable, RuntimeException> exceptionFunction) {
+ try {
+ for (SimpleFileEntry entry : mergedEntries) {
+ checkState(
+ entry.kind() != FileKind.DELETE,
+ "Trying to delete file %s for table %s which is not
previously added.",
+ entry.fileName(),
+ tableName);
+ }
+ } catch (Throwable e) {
+ assertConflictForPartitionExpire(mergedEntries);
+ throw exceptionFunction.apply(e);
+ }
+ }
+
+ private void assertConflictForPartitionExpire(Collection<SimpleFileEntry>
mergedEntries) {
+ if (partitionExpire != null && partitionExpire.isValueExpiration()) {
+ Set<BinaryRow> deletedPartitions = new HashSet<>();
+ for (SimpleFileEntry entry : mergedEntries) {
+ if (entry.kind() == FileKind.DELETE) {
+ deletedPartitions.add(entry.partition());
+ }
+ }
+ if (partitionExpire.isValueAllExpired(deletedPartitions)) {
+ List<String> expiredPartitions =
+ deletedPartitions.stream()
+ .map(
+ partition ->
+ partToSimpleString(
+ partitionType,
partition, "-", 200))
+ .collect(Collectors.toList());
+ throw new RuntimeException(
+ "You are writing data to expired partitions, and you
can filter this data to avoid job failover."
+ + " Otherwise, continuous expired records will
cause the job to failover restart continuously."
+ + " Expired partitions are: "
+ + expiredPartitions);
+ }
+ }
+ }
+
+ static List<SimpleFileEntry> buildBaseEntriesWithDV(
+ List<SimpleFileEntry> baseEntries, List<IndexManifestEntry>
baseIndexEntries) {
+ if (baseEntries.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ Map<String, String> fileNameToDVFileName = new HashMap<>();
+ for (IndexManifestEntry indexManifestEntry : baseIndexEntries) {
+ // Should not attach DELETE type dv index for base file.
+ if (!indexManifestEntry.kind().equals(FileKind.DELETE)) {
+ IndexFileMeta indexFile = indexManifestEntry.indexFile();
+ LinkedHashMap<String, DeletionVectorMeta> dvRanges =
indexFile.dvRanges();
+ if (dvRanges != null) {
+ for (DeletionVectorMeta value : dvRanges.values()) {
+ checkState(
+
!fileNameToDVFileName.containsKey(value.dataFileName()),
+ "One file should correspond to only one dv
entry.");
+ fileNameToDVFileName.put(value.dataFileName(),
indexFile.fileName());
+ }
+ }
+ }
+ }
+
+ // Attach dv name to file entries.
+ List<SimpleFileEntry> entriesWithDV = new
ArrayList<>(baseEntries.size());
+ for (SimpleFileEntry fileEntry : baseEntries) {
+ entriesWithDV.add(
+ new SimpleFileEntryWithDV(
+ fileEntry,
fileNameToDVFileName.get(fileEntry.fileName())));
+ }
+ return entriesWithDV;
+ }
+
+ static List<SimpleFileEntry> buildDeltaEntriesWithDV(
+ List<SimpleFileEntry> baseEntries,
+ List<SimpleFileEntry> deltaEntries,
+ List<IndexManifestEntry> deltaIndexEntries) {
+ if (deltaEntries.isEmpty() && deltaIndexEntries.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<SimpleFileEntry> entriesWithDV = new
ArrayList<>(deltaEntries.size());
+
+ // One file may correspond to more than one dv entries, for example,
delete the old dv, and
+ // create a new one.
+ Map<String, List<IndexManifestEntry>> fileNameToDVEntry = new
HashMap<>();
+ for (IndexManifestEntry deltaIndexEntry : deltaIndexEntries) {
+ LinkedHashMap<String, DeletionVectorMeta> dvRanges =
+ deltaIndexEntry.indexFile().dvRanges();
+ if (dvRanges != null) {
+ for (DeletionVectorMeta meta : dvRanges.values()) {
+ fileNameToDVEntry.putIfAbsent(meta.dataFileName(), new
ArrayList<>());
+
fileNameToDVEntry.get(meta.dataFileName()).add(deltaIndexEntry);
+ }
+ }
+ }
+
+ Set<String> fileNotInDeltaEntries = new
HashSet<>(fileNameToDVEntry.keySet());
+ // 1. Attach dv name to delta file entries.
+ for (SimpleFileEntry fileEntry : deltaEntries) {
+ if (fileNameToDVEntry.containsKey(fileEntry.fileName())) {
+ List<IndexManifestEntry> dvs =
fileNameToDVEntry.get(fileEntry.fileName());
+ checkState(dvs.size() == 1, "Delta entry only can have one dv
file");
+ entriesWithDV.add(
+ new SimpleFileEntryWithDV(fileEntry,
dvs.get(0).indexFile().fileName()));
+ fileNotInDeltaEntries.remove(fileEntry.fileName());
+ } else {
+ entriesWithDV.add(new SimpleFileEntryWithDV(fileEntry, null));
+ }
+ }
+
+ // 2. For file not in delta entries, build entry with dv with
baseEntries.
+ if (!fileNotInDeltaEntries.isEmpty()) {
+ Map<String, SimpleFileEntry> fileNameToFileEntry = new HashMap<>();
+ for (SimpleFileEntry baseEntry : baseEntries) {
+ if (baseEntry.kind().equals(FileKind.ADD)) {
+ fileNameToFileEntry.put(baseEntry.fileName(), baseEntry);
+ }
+ }
+
+ for (String fileName : fileNotInDeltaEntries) {
+ SimpleFileEntryWithDV simpleFileEntry =
+ (SimpleFileEntryWithDV)
fileNameToFileEntry.get(fileName);
+ checkState(
+ simpleFileEntry != null,
+ String.format(
+ "Trying to create deletion vector on file %s
which is not previously added.",
+ fileName));
+ List<IndexManifestEntry> dvEntries =
fileNameToDVEntry.get(fileName);
+ // If dv entry's type id DELETE, add DELETE<f, dv>
+ // If dv entry's type id ADD, add ADD<f, dv>
+ for (IndexManifestEntry dvEntry : dvEntries) {
+ entriesWithDV.add(
+ new SimpleFileEntryWithDV(
+ dvEntry.kind().equals(FileKind.ADD)
+ ? simpleFileEntry
+ : simpleFileEntry.toDelete(),
+ dvEntry.indexFile().fileName()));
+ }
+
+ // If one file correspond to only one dv entry and the type is
ADD,
+ // we need to add a DELETE<f, null>.
+ // This happens when create a dv for a file that doesn't have
dv before.
+ if (dvEntries.size() == 1 &&
dvEntries.get(0).kind().equals(FileKind.ADD)) {
+ entriesWithDV.add(new
SimpleFileEntryWithDV(simpleFileEntry.toDelete(), null));
+ }
+ }
+ }
+
+ return entriesWithDV;
+ }
+
+ /**
+ * Construct detailed conflict exception. The returned exception is formed
of (full exception,
+ * simplified exception), The simplified exception is generated when the
entry length is larger
+ * than the max limit.
+ */
+ private Pair<RuntimeException, RuntimeException> createConflictException(
+ String message,
+ String baseCommitUser,
+ List<SimpleFileEntry> baseEntries,
+ List<SimpleFileEntry> changes,
+ Throwable cause) {
+ String possibleCauses =
+ String.join(
+ "\n",
+ "Don't panic!",
+ "Conflicts during commits are normal and this failure
is intended to resolve the conflicts.",
+ "Conflicts are mainly caused by the following
scenarios:",
+ "1. Multiple jobs are writing into the same partition
at the same time, "
+ + "or you use STATEMENT SET to execute
multiple INSERT statements into the same Paimon table.",
+ " You'll probably see different base commit user and
current commit user below.",
+ " You can use "
+ +
"https://paimon.apache.org/docs/master/maintenance/dedicated-compaction#dedicated-compaction-job"
+ + " to support multiple writing.",
+ "2. You're recovering from an old savepoint, or you're
creating multiple jobs from a savepoint.",
+ " The job will fail continuously in this scenario to
protect metadata from corruption.",
+ " You can either recover from the latest savepoint, "
+ + "or you can revert the table to the snapshot
corresponding to the old savepoint.");
+ String commitUserString =
+ "Base commit user is: "
+ + baseCommitUser
+ + "; Current commit user is: "
+ + commitUser;
+ String baseEntriesString =
+ "Base entries are:\n"
+ + baseEntries.stream()
+ .map(Object::toString)
+ .collect(Collectors.joining("\n"));
+ String changesString =
+ "Changes are:\n"
+ +
changes.stream().map(Object::toString).collect(Collectors.joining("\n"));
+
+ RuntimeException fullException =
+ new RuntimeException(
+ message
+ + "\n\n"
+ + possibleCauses
+ + "\n\n"
+ + commitUserString
+ + "\n\n"
+ + baseEntriesString
+ + "\n\n"
+ + changesString,
+ cause);
+
+ RuntimeException simplifiedException;
+ int maxEntry = 50;
+ if (baseEntries.size() > maxEntry || changes.size() > maxEntry) {
+ baseEntriesString =
+ "Base entries are:\n"
+ + baseEntries.subList(0,
Math.min(baseEntries.size(), maxEntry))
+ .stream()
+ .map(Object::toString)
+ .collect(Collectors.joining("\n"));
+ changesString =
+ "Changes are:\n"
+ + changes.subList(0, Math.min(changes.size(),
maxEntry)).stream()
+ .map(Object::toString)
+ .collect(Collectors.joining("\n"));
+ simplifiedException =
+ new RuntimeException(
+ message
+ + "\n\n"
+ + possibleCauses
+ + "\n\n"
+ + commitUserString
+ + "\n\n"
+ + baseEntriesString
+ + "\n\n"
+ + changesString
+ + "\n\n"
+ + "The entry list above are not fully
displayed, please refer to taskmanager.log for more information.",
+ cause);
+ return Pair.of(fullException, simplifiedException);
+ } else {
+ return Pair.of(fullException, fullException);
+ }
+ }
+
+ private static class LevelIdentifier {
+
+ private final BinaryRow partition;
+ private final int bucket;
+ private final int level;
+
+ private LevelIdentifier(BinaryRow partition, int bucket, int level) {
+ this.partition = partition;
+ this.bucket = bucket;
+ this.level = level;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof LevelIdentifier)) {
+ return false;
+ }
+ LevelIdentifier that = (LevelIdentifier) o;
+ return Objects.equals(partition, that.partition)
+ && bucket == that.bucket
+ && level == that.level;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(partition, bucket, level);
+ }
+ }
+
+ /** Should do conflict check. */
+ public interface ConflictCheck {
+ boolean shouldCheck(long latestSnapshot);
+ }
+
+ public static ConflictCheck hasConflictChecked(@Nullable Long
checkedLatestSnapshotId) {
+ return latestSnapshot -> !Objects.equals(latestSnapshot,
checkedLatestSnapshotId);
+ }
+
+ public static ConflictCheck noConflictCheck() {
+ return latestSnapshot -> false;
+ }
+
+ public static ConflictCheck mustConflictCheck() {
+ return latestSnapshot -> true;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ConflictDeletionUtils.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ConflictDeletionUtils.java
deleted file mode 100644
index 00942ea048..0000000000
---
a/paimon-core/src/main/java/org/apache/paimon/utils/ConflictDeletionUtils.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.utils;
-
-import org.apache.paimon.index.DeletionVectorMeta;
-import org.apache.paimon.index.IndexFileMeta;
-import org.apache.paimon.manifest.FileKind;
-import org.apache.paimon.manifest.IndexManifestEntry;
-import org.apache.paimon.manifest.SimpleFileEntry;
-import org.apache.paimon.manifest.SimpleFileEntryWithDV;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.paimon.utils.Preconditions.checkState;
-
-/** Utils for conflict deletion. */
-public class ConflictDeletionUtils {
-
- public static List<SimpleFileEntry> buildBaseEntriesWithDV(
- List<SimpleFileEntry> baseEntries, List<IndexManifestEntry>
baseIndexEntries) {
- if (baseEntries.isEmpty()) {
- return Collections.emptyList();
- }
-
- Map<String, String> fileNameToDVFileName = new HashMap<>();
- for (IndexManifestEntry indexManifestEntry : baseIndexEntries) {
- // Should not attach DELETE type dv index for base file.
- if (!indexManifestEntry.kind().equals(FileKind.DELETE)) {
- IndexFileMeta indexFile = indexManifestEntry.indexFile();
- if (indexFile.dvRanges() != null) {
- for (DeletionVectorMeta value :
indexFile.dvRanges().values()) {
- checkState(
-
!fileNameToDVFileName.containsKey(value.dataFileName()),
- "One file should correspond to only one dv
entry.");
- fileNameToDVFileName.put(value.dataFileName(),
indexFile.fileName());
- }
- }
- }
- }
-
- // Attach dv name to file entries.
- List<SimpleFileEntry> entriesWithDV = new
ArrayList<>(baseEntries.size());
- for (SimpleFileEntry fileEntry : baseEntries) {
- entriesWithDV.add(
- new SimpleFileEntryWithDV(
- fileEntry,
fileNameToDVFileName.get(fileEntry.fileName())));
- }
- return entriesWithDV;
- }
-
- public static List<SimpleFileEntry> buildDeltaEntriesWithDV(
- List<SimpleFileEntry> baseEntries,
- List<SimpleFileEntry> deltaEntries,
- List<IndexManifestEntry> deltaIndexEntries) {
- if (deltaEntries.isEmpty() && deltaIndexEntries.isEmpty()) {
- return Collections.emptyList();
- }
-
- List<SimpleFileEntry> entriesWithDV = new
ArrayList<>(deltaEntries.size());
-
- // One file may correspond to more than one dv entries, for example,
delete the old dv, and
- // create a new one.
- Map<String, List<IndexManifestEntry>> fileNameToDVEntry = new
HashMap<>();
- for (IndexManifestEntry deltaIndexEntry : deltaIndexEntries) {
- if (deltaIndexEntry.indexFile().dvRanges() != null) {
- for (DeletionVectorMeta meta :
deltaIndexEntry.indexFile().dvRanges().values()) {
- fileNameToDVEntry.putIfAbsent(meta.dataFileName(), new
ArrayList<>());
-
fileNameToDVEntry.get(meta.dataFileName()).add(deltaIndexEntry);
- }
- }
- }
-
- Set<String> fileNotInDeltaEntries = new
HashSet<>(fileNameToDVEntry.keySet());
- // 1. Attach dv name to delta file entries.
- for (SimpleFileEntry fileEntry : deltaEntries) {
- if (fileNameToDVEntry.containsKey(fileEntry.fileName())) {
- List<IndexManifestEntry> dvs =
fileNameToDVEntry.get(fileEntry.fileName());
- checkState(dvs.size() == 1, "Delta entry only can have one dv
file");
- entriesWithDV.add(
- new SimpleFileEntryWithDV(fileEntry,
dvs.get(0).indexFile().fileName()));
- fileNotInDeltaEntries.remove(fileEntry.fileName());
- } else {
- entriesWithDV.add(new SimpleFileEntryWithDV(fileEntry, null));
- }
- }
-
- // 2. For file not in delta entries, build entry with dv with
baseEntries.
- if (!fileNotInDeltaEntries.isEmpty()) {
- Map<String, SimpleFileEntry> fileNameToFileEntry = new HashMap<>();
- for (SimpleFileEntry baseEntry : baseEntries) {
- if (baseEntry.kind().equals(FileKind.ADD)) {
- fileNameToFileEntry.put(baseEntry.fileName(), baseEntry);
- }
- }
-
- for (String fileName : fileNotInDeltaEntries) {
- SimpleFileEntryWithDV simpleFileEntry =
- (SimpleFileEntryWithDV)
fileNameToFileEntry.get(fileName);
- checkState(
- simpleFileEntry != null,
- String.format(
- "Trying to create deletion vector on file %s
which is not previously added.",
- fileName));
- List<IndexManifestEntry> dvEntries =
fileNameToDVEntry.get(fileName);
- // If dv entry's type id DELETE, add DELETE<f, dv>
- // If dv entry's type id ADD, add ADD<f, dv>
- for (IndexManifestEntry dvEntry : dvEntries) {
- entriesWithDV.add(
- new SimpleFileEntryWithDV(
- dvEntry.kind().equals(FileKind.ADD)
- ? simpleFileEntry
- : simpleFileEntry.toDelete(),
- dvEntry.indexFile().fileName()));
- }
-
- // If one file correspond to only one dv entry and the type is
ADD,
- // we need to add a DELETE<f, null>.
- // This happens when create a dv for a file that doesn't have
dv before.
- if (dvEntries.size() == 1 &&
dvEntries.get(0).kind().equals(FileKind.ADD)) {
- entriesWithDV.add(new
SimpleFileEntryWithDV(simpleFileEntry.toDelete(), null));
- }
- }
- }
-
- return entriesWithDV;
- }
-}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 9d17593c11..984424610d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -72,12 +72,12 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.SNAPSHOT_CLEAN_EMPTY_DIRECTORIES;
-import static
org.apache.paimon.operation.FileStoreCommitImpl.mustConflictCheck;
import static
org.apache.paimon.operation.FileStoreTestUtils.assertNFilesExists;
import static org.apache.paimon.operation.FileStoreTestUtils.assertPathExists;
import static
org.apache.paimon.operation.FileStoreTestUtils.assertPathNotExists;
import static org.apache.paimon.operation.FileStoreTestUtils.commitData;
import static org.apache.paimon.operation.FileStoreTestUtils.partitionedData;
+import static
org.apache.paimon.operation.commit.ConflictDetection.mustConflictCheck;
import static org.assertj.core.api.Assertions.assertThat;
/**
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index c4bcc74f56..2c1f6c50ea 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -84,7 +84,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
-import static
org.apache.paimon.operation.FileStoreCommitImpl.mustConflictCheck;
+import static
org.apache.paimon.operation.commit.ConflictDetection.mustConflictCheck;
import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.stats.SimpleStats.EMPTY_STATS;
import static
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/ConflictDeletionUtilsTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
similarity index 98%
rename from
paimon-core/src/test/java/org/apache/paimon/utils/ConflictDeletionUtilsTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
index 34e6e59e7c..e75e4d0358 100644
---
a/paimon-core/src/test/java/org/apache/paimon/utils/ConflictDeletionUtilsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.utils;
+package org.apache.paimon.operation.commit;
import org.apache.paimon.index.DeletionVectorMeta;
import org.apache.paimon.index.IndexFileMeta;
@@ -41,12 +41,11 @@ import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.manifest.FileKind.ADD;
import static org.apache.paimon.manifest.FileKind.DELETE;
-import static
org.apache.paimon.utils.ConflictDeletionUtils.buildBaseEntriesWithDV;
-import static
org.apache.paimon.utils.ConflictDeletionUtils.buildDeltaEntriesWithDV;
+import static
org.apache.paimon.operation.commit.ConflictDetection.buildBaseEntriesWithDV;
+import static
org.apache.paimon.operation.commit.ConflictDetection.buildDeltaEntriesWithDV;
import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for {@link ConflictDeletionUtils}. */
-public class ConflictDeletionUtilsTest {
+class ConflictDetectionTest {
@Test
public void testBuildBaseEntriesWithDV() {