This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6f702adb04 [core] Add conflict detection for data evolution row id
(#7124)
6f702adb04 is described below
commit 6f702adb04c271f9a5d20024de87a49efdef308b
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jan 28 14:49:36 2026 +0800
[core] Add conflict detection for data evolution row id (#7124)
---
.../main/java/org/apache/paimon/utils/Range.java | 4 +
.../java/org/apache/paimon/AbstractFileStore.java | 26 ++--
.../apache/paimon/manifest/ExpireFileEntry.java | 12 +-
.../java/org/apache/paimon/manifest/FileEntry.java | 6 +
.../apache/paimon/manifest/PojoManifestEntry.java | 10 ++
.../apache/paimon/manifest/SimpleFileEntry.java | 46 +++++-
.../paimon/manifest/SimpleFileEntryWithDV.java | 4 +-
.../apache/paimon/operation/FileStoreCommit.java | 4 +
.../paimon/operation/FileStoreCommitImpl.java | 12 +-
.../paimon/operation/commit/CommitScanner.java | 9 ++
.../paimon/operation/commit/ConflictDetection.java | 167 +++++++++++++++++++--
.../operation/commit/ManifestEntryChanges.java | 5 +-
.../paimon/table/sink/BatchWriteBuilderImpl.java | 9 +-
.../apache/paimon/table/sink/InnerTableCommit.java | 2 +
.../apache/paimon/table/sink/TableCommitImpl.java | 6 +
.../operation/commit/ConflictDetectionTest.java | 2 +
.../MergeIntoPaimonDataEvolutionTable.scala | 16 +-
.../paimon/spark/commands/PaimonSparkWriter.scala | 4 +
.../paimon/spark/sql/RowTrackingTestBase.scala | 129 ++++++++++++++++
19 files changed, 426 insertions(+), 47 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
b/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
index 1cd80bc56b..461a68fbab 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
@@ -47,6 +47,10 @@ public class Range implements Serializable {
return new Range(from + offset, to + offset);
}
+ public boolean hasIntersection(Range range) {
+ return from <= range.to && to >= range.from;
+ }
+
public boolean isBefore(Range other) {
return to < other.from;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index edf7e16294..70108a1920 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -275,16 +275,20 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
if (snapshotCommit == null) {
snapshotCommit = new RenamingSnapshotCommit(snapshotManager,
Lock.empty());
}
- ConflictDetection conflictDetection =
- new ConflictDetection(
- tableName,
- commitUser,
- partitionType,
- pathFactory(),
- newKeyComparator(),
- bucketMode(),
- options.deletionVectorsEnabled(),
- newIndexFileHandler());
+ ConflictDetection.Factory conflictDetectFactory =
+ scanner ->
+ new ConflictDetection(
+ tableName,
+ commitUser,
+ partitionType,
+ pathFactory(),
+ newKeyComparator(),
+ bucketMode(),
+ options.deletionVectorsEnabled(),
+ options.dataEvolutionEnabled(),
+ newIndexFileHandler(),
+ snapshotManager,
+ scanner);
StrictModeChecker strictModeChecker =
StrictModeChecker.create(
snapshotManager,
@@ -327,7 +331,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
options.commitMaxRetryWait(),
options.rowTrackingEnabled(),
options.commitDiscardDuplicateFiles(),
- conflictDetection,
+ conflictDetectFactory,
strictModeChecker,
rollback);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
index 48ab6d5d5c..024eac2b20 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
@@ -43,7 +43,9 @@ public class ExpireFileEntry extends SimpleFileEntry {
BinaryRow minKey,
BinaryRow maxKey,
@Nullable FileSource fileSource,
- @Nullable String externalPath) {
+ @Nullable String externalPath,
+ long rowCount,
+ @Nullable Long firstRowId) {
super(
kind,
partition,
@@ -55,7 +57,9 @@ public class ExpireFileEntry extends SimpleFileEntry {
embeddedIndex,
minKey,
maxKey,
- externalPath);
+ externalPath,
+ rowCount,
+ firstRowId);
this.fileSource = fileSource;
}
@@ -76,7 +80,9 @@ public class ExpireFileEntry extends SimpleFileEntry {
entry.minKey(),
entry.maxKey(),
entry.file().fileSource().orElse(null),
- entry.externalPath());
+ entry.externalPath(),
+ entry.rowCount(),
+ entry.firstRowId());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index 9d649d6f39..3a80088255 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -67,11 +67,17 @@ public interface FileEntry {
List<String> extraFiles();
+ long rowCount();
+
+ @Nullable
+ Long firstRowId();
+
/**
* The same {@link Identifier} indicates that the {@link ManifestEntry}
refers to the same data
* file.
*/
class Identifier {
+
public final BinaryRow partition;
public final int bucket;
public final int level;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/PojoManifestEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/PojoManifestEntry.java
index 7329a5b6c7..5514bd4cd9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/PojoManifestEntry.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/PojoManifestEntry.java
@@ -91,6 +91,16 @@ public class PojoManifestEntry implements ManifestEntry {
return file.extraFiles();
}
+ @Override
+ public long rowCount() {
+ return file.rowCount();
+ }
+
+ @Override
+ public @Nullable Long firstRowId() {
+ return file.firstRowId();
+ }
+
@Override
public int totalBuckets() {
return totalBuckets;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
index e0da3c8d53..c1f71c8bda 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
@@ -26,6 +26,8 @@ import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
/** A simple {@link FileEntry} only contains identifier and min max key. */
public class SimpleFileEntry implements FileEntry {
@@ -40,6 +42,8 @@ public class SimpleFileEntry implements FileEntry {
private final BinaryRow minKey;
private final BinaryRow maxKey;
@Nullable private final String externalPath;
+ private final long rowCount;
+ @Nullable private final Long firstRowId;
public SimpleFileEntry(
FileKind kind,
@@ -52,7 +56,9 @@ public class SimpleFileEntry implements FileEntry {
@Nullable byte[] embeddedIndex,
BinaryRow minKey,
BinaryRow maxKey,
- @Nullable String externalPath) {
+ @Nullable String externalPath,
+ long rowCount,
+ @Nullable Long firstRowId) {
this.kind = kind;
this.partition = partition;
this.bucket = bucket;
@@ -64,6 +70,8 @@ public class SimpleFileEntry implements FileEntry {
this.minKey = minKey;
this.maxKey = maxKey;
this.externalPath = externalPath;
+ this.rowCount = rowCount;
+ this.firstRowId = firstRowId;
}
public static SimpleFileEntry from(ManifestEntry entry) {
@@ -78,7 +86,9 @@ public class SimpleFileEntry implements FileEntry {
entry.file().embeddedIndex(),
entry.minKey(),
entry.maxKey(),
- entry.externalPath());
+ entry.externalPath(),
+ entry.file().rowCount(),
+ entry.firstRowId());
}
public SimpleFileEntry toDelete() {
@@ -93,7 +103,9 @@ public class SimpleFileEntry implements FileEntry {
embeddedIndex,
minKey,
maxKey,
- externalPath);
+ externalPath,
+ rowCount,
+ firstRowId);
}
public static List<SimpleFileEntry> from(List<ManifestEntry> entries) {
@@ -162,6 +174,22 @@ public class SimpleFileEntry implements FileEntry {
return extraFiles;
}
+ @Override
+ public long rowCount() {
+ return rowCount;
+ }
+
+ @Override
+ public @Nullable Long firstRowId() {
+ return firstRowId;
+ }
+
+ public long nonNullFirstRowId() {
+ Long firstRowId = firstRowId();
+ checkArgument(firstRowId != null, "First row id of '%s' should not be
null.", fileName());
+ return firstRowId;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -180,7 +208,9 @@ public class SimpleFileEntry implements FileEntry {
&& Objects.equals(extraFiles, that.extraFiles)
&& Objects.equals(minKey, that.minKey)
&& Objects.equals(maxKey, that.maxKey)
- && Objects.equals(externalPath, that.externalPath);
+ && Objects.equals(externalPath, that.externalPath)
+ && rowCount == that.rowCount
+ && Objects.equals(firstRowId, that.firstRowId);
}
@Override
@@ -195,7 +225,9 @@ public class SimpleFileEntry implements FileEntry {
extraFiles,
minKey,
maxKey,
- externalPath);
+ externalPath,
+ rowCount,
+ firstRowId);
}
@Override
@@ -221,6 +253,10 @@ public class SimpleFileEntry implements FileEntry {
+ maxKey
+ ", externalPath="
+ externalPath
+ + ", rowCount="
+ + rowCount
+ + ", firstRowId="
+ + firstRowId
+ '}';
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java
index 75d73f345f..6b4c479157 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java
@@ -39,7 +39,9 @@ public class SimpleFileEntryWithDV extends SimpleFileEntry {
entry.embeddedIndex(),
entry.minKey(),
entry.maxKey(),
- entry.externalPath());
+ entry.externalPath(),
+ entry.rowCount(),
+ entry.firstRowId());
this.dvFileName = dvFileName;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index ecd4975858..31fb3c52ca 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -26,6 +26,8 @@ import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.utils.FileStorePathFactory;
+import javax.annotation.Nullable;
+
import java.util.List;
import java.util.Map;
@@ -38,6 +40,8 @@ public interface FileStoreCommit extends AutoCloseable {
FileStoreCommit appendCommitCheckConflict(boolean
appendCommitCheckConflict);
+ FileStoreCommit rowIdCheckConflict(@Nullable Long rowIdCheckFromSnapshot);
+
/** Find out which committables need to be retried when recovering from
the failure. */
List<ManifestCommittable> filterCommitted(List<ManifestCommittable>
committables);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 6494b61855..41ba1412c4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -197,7 +197,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
long commitMaxRetryWait,
boolean rowTrackingEnabled,
boolean discardDuplicateFiles,
- ConflictDetection conflictDetection,
+ ConflictDetection.Factory conflictDetectFactory,
@Nullable StrictModeChecker strictModeChecker,
@Nullable CommitRollback rollback) {
this.snapshotCommit = snapshotCommit;
@@ -239,7 +239,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
this.rowTrackingEnabled = rowTrackingEnabled;
this.discardDuplicateFiles = discardDuplicateFiles;
this.strictModeChecker = strictModeChecker;
- this.conflictDetection = conflictDetection;
+ this.conflictDetection = conflictDetectFactory.create(scanner);
this.commitCleaner = new CommitCleaner(manifestList, manifestFile,
indexManifestFile);
}
@@ -261,6 +261,12 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
return this;
}
+ @Override
+ public FileStoreCommit rowIdCheckConflict(@Nullable Long
rowIdCheckFromSnapshot) {
+
this.conflictDetection.setRowIdCheckFromSnapshot(rowIdCheckFromSnapshot);
+ return this;
+ }
+
@Override
public List<ManifestCommittable> filterCommitted(List<ManifestCommittable>
committables) {
// nothing to filter, fast exit
@@ -320,7 +326,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
}
boolean allowRollback = false;
- if (containsFileDeletionOrDeletionVectors(
+ if (conflictDetection.shouldBeOverwriteCommit(
appendSimpleEntries, changes.appendIndexFiles)) {
commitKind = CommitKind.OVERWRITE;
checkAppendFiles = true;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java
index da8b5c7563..9afe4500a4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java
@@ -68,6 +68,15 @@ public class CommitScanner {
return entries;
}
+ public List<ManifestEntry> readIncrementalEntries(
+ Snapshot snapshot, List<BinaryRow> changedPartitions) {
+ return scan.withSnapshot(snapshot)
+ .withKind(ScanMode.DELTA)
+ .withPartitionFilter(changedPartitions)
+ .plan()
+ .files();
+ }
+
public List<SimpleFileEntry> readAllEntriesFromChangedPartitions(
Snapshot snapshot, List<BinaryRow> changedPartitions) {
try {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
index 990b47f0f6..15f6359417 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
@@ -25,9 +25,11 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.index.DeletionVectorMeta;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileEntry;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.manifest.SimpleFileEntryWithDV;
import org.apache.paimon.operation.PartitionExpire;
@@ -35,6 +37,9 @@ import org.apache.paimon.table.BucketMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RangeHelper;
+import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +61,9 @@ import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
+import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
+import static
org.apache.paimon.operation.commit.ManifestEntryChanges.changedPartitions;
import static
org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
import static org.apache.paimon.utils.Preconditions.checkState;
@@ -71,9 +79,13 @@ public class ConflictDetection {
private final @Nullable Comparator<InternalRow> keyComparator;
private final BucketMode bucketMode;
private final boolean deletionVectorsEnabled;
+ private final boolean dataEvolutionEnabled;
private final IndexFileHandler indexFileHandler;
+ private final SnapshotManager snapshotManager;
+ private final CommitScanner commitScanner;
private @Nullable PartitionExpire partitionExpire;
+ private @Nullable Long rowIdCheckFromSnapshot = null;
public ConflictDetection(
String tableName,
@@ -83,7 +95,10 @@ public class ConflictDetection {
@Nullable Comparator<InternalRow> keyComparator,
BucketMode bucketMode,
boolean deletionVectorsEnabled,
- IndexFileHandler indexFileHandler) {
+ boolean dataEvolutionEnabled,
+ IndexFileHandler indexFileHandler,
+ SnapshotManager snapshotManager,
+ CommitScanner commitScanner) {
this.tableName = tableName;
this.commitUser = commitUser;
this.partitionType = partitionType;
@@ -91,7 +106,14 @@ public class ConflictDetection {
this.keyComparator = keyComparator;
this.bucketMode = bucketMode;
this.deletionVectorsEnabled = deletionVectorsEnabled;
+ this.dataEvolutionEnabled = dataEvolutionEnabled;
this.indexFileHandler = indexFileHandler;
+ this.snapshotManager = snapshotManager;
+ this.commitScanner = commitScanner;
+ }
+
+ public void setRowIdCheckFromSnapshot(@Nullable Long
rowIdCheckFromSnapshot) {
+ this.rowIdCheckFromSnapshot = rowIdCheckFromSnapshot;
}
@Nullable
@@ -103,14 +125,29 @@ public class ConflictDetection {
this.partitionExpire = partitionExpire;
}
+ public <T extends FileEntry> boolean shouldBeOverwriteCommit(
+ List<T> appendFileEntries, List<IndexManifestEntry>
appendIndexFiles) {
+ for (T appendFileEntry : appendFileEntries) {
+ if (appendFileEntry.kind().equals(FileKind.DELETE)) {
+ return true;
+ }
+ }
+ for (IndexManifestEntry appendIndexFile : appendIndexFiles) {
+ if
(appendIndexFile.indexFile().indexType().equals(DELETION_VECTORS_INDEX)) {
+ return true;
+ }
+ }
+ return rowIdCheckFromSnapshot != null;
+ }
+
public Optional<RuntimeException> checkConflicts(
- Snapshot snapshot,
+ Snapshot latestSnapshot,
List<SimpleFileEntry> baseEntries,
List<SimpleFileEntry> deltaEntries,
List<IndexManifestEntry> deltaIndexEntries,
CommitKind commitKind) {
- String baseCommitUser = snapshot.commitUser();
- if (checkForDeletionVector()) {
+ String baseCommitUser = latestSnapshot.commitUser();
+ if (deletionVectorsEnabled &&
bucketMode.equals(BucketMode.BUCKET_UNAWARE)) {
// Enrich dvName in fileEntry to checker for base ADD dv and delta
DELETE dv.
// For example:
// If the base file is <ADD baseFile1, ADD dv1>,
@@ -121,9 +158,10 @@ public class ConflictDetection {
baseEntries =
buildBaseEntriesWithDV(
baseEntries,
- snapshot.indexManifest() == null
+ latestSnapshot.indexManifest() == null
? Collections.emptyList()
- :
indexFileHandler.readManifest(snapshot.indexManifest()));
+ : indexFileHandler.readManifest(
+
latestSnapshot.indexManifest()));
deltaEntries =
buildDeltaEntriesWithDV(baseEntries, deltaEntries,
deltaIndexEntries);
} catch (Throwable e) {
@@ -165,7 +203,17 @@ public class ConflictDetection {
if (exception.isPresent()) {
return exception;
}
- return checkKeyRange(baseEntries, deltaEntries, mergedEntries,
baseCommitUser);
+ exception = checkKeyRange(baseEntries, deltaEntries, mergedEntries,
baseCommitUser);
+ if (exception.isPresent()) {
+ return exception;
+ }
+
+ exception = checkRowIdRangeConflicts(commitKind, mergedEntries);
+ if (exception.isPresent()) {
+ return exception;
+ }
+
+ return checkForRowIdFromSnapshot(latestSnapshot, deltaEntries,
deltaIndexEntries);
}
private Optional<RuntimeException> checkBucketKeepSame(
@@ -279,10 +327,6 @@ public class ConflictDetection {
};
}
- private boolean checkForDeletionVector() {
- return deletionVectorsEnabled &&
bucketMode.equals(BucketMode.BUCKET_UNAWARE);
- }
-
private Optional<RuntimeException> checkDeleteInEntries(
Collection<SimpleFileEntry> mergedEntries,
Function<Throwable, RuntimeException> exceptionFunction) {
@@ -295,7 +339,7 @@ public class ConflictDetection {
tableName);
}
} catch (Throwable e) {
- Optional<RuntimeException> exception =
assertConflictForPartitionExpire(mergedEntries);
+ Optional<RuntimeException> exception =
checkConflictForPartitionExpire(mergedEntries);
if (exception.isPresent()) {
return exception;
}
@@ -304,7 +348,7 @@ public class ConflictDetection {
return Optional.empty();
}
- private Optional<RuntimeException> assertConflictForPartitionExpire(
+ private Optional<RuntimeException> checkConflictForPartitionExpire(
Collection<SimpleFileEntry> mergedEntries) {
if (partitionExpire != null && partitionExpire.isValueExpiration()) {
Set<BinaryRow> deletedPartitions = new HashSet<>();
@@ -332,6 +376,98 @@ public class ConflictDetection {
return Optional.empty();
}
+ private Optional<RuntimeException> checkRowIdRangeConflicts(
+ CommitKind commitKind, Collection<SimpleFileEntry> mergedEntries) {
+ if (!dataEvolutionEnabled) {
+ return Optional.empty();
+ }
+ if (rowIdCheckFromSnapshot == null && commitKind !=
CommitKind.COMPACT) {
+ return Optional.empty();
+ }
+
+ List<SimpleFileEntry> entries =
+ mergedEntries.stream()
+ .filter(file -> file.firstRowId() != null)
+ .collect(Collectors.toList());
+
+ RangeHelper<SimpleFileEntry> rangeHelper =
+ new RangeHelper<>(
+ SimpleFileEntry::nonNullFirstRowId,
+ f -> f.nonNullFirstRowId() + f.rowCount() - 1);
+ List<List<SimpleFileEntry>> merged =
rangeHelper.mergeOverlappingRanges(entries);
+ for (List<SimpleFileEntry> group : merged) {
+ List<SimpleFileEntry> dataFiles = new ArrayList<>();
+ for (SimpleFileEntry f : group) {
+ if (!isBlobFile(f.fileName())) {
+ dataFiles.add(f);
+ }
+ }
+ if (!rangeHelper.areAllRangesSame(dataFiles)) {
+ return Optional.of(
+ new RuntimeException(
+ "For Data Evolution table, multiple 'MERGE
INTO' and 'COMPACT' operations "
+ + "have encountered conflicts, data
files: "
+ + dataFiles));
+ }
+ }
+ return Optional.empty();
+ }
+
+ private Optional<RuntimeException> checkForRowIdFromSnapshot(
+ Snapshot latestSnapshot,
+ List<SimpleFileEntry> deltaEntries,
+ List<IndexManifestEntry> deltaIndexEntries) {
+ if (!dataEvolutionEnabled) {
+ return Optional.empty();
+ }
+ if (rowIdCheckFromSnapshot == null) {
+ return Optional.empty();
+ }
+
+ List<BinaryRow> changedPartitions = changedPartitions(deltaEntries,
deltaIndexEntries);
+ // collect history row id ranges
+ List<Range> historyIdRanges = new ArrayList<>();
+ for (SimpleFileEntry entry : deltaEntries) {
+ Long firstRowId = entry.firstRowId();
+ long rowCount = entry.rowCount();
+ if (firstRowId != null) {
+ historyIdRanges.add(new Range(firstRowId, firstRowId +
rowCount - 1));
+ }
+ }
+
+ // check history row id ranges
+ Long checkNextRowId =
snapshotManager.snapshot(rowIdCheckFromSnapshot).nextRowId();
+ checkState(
+ checkNextRowId != null,
+ "Next row id cannot be null for snapshot %s.",
+ rowIdCheckFromSnapshot);
+ for (long i = rowIdCheckFromSnapshot + 1; i <= latestSnapshot.id();
i++) {
+ Snapshot snapshot = snapshotManager.snapshot(i);
+ if (snapshot.commitKind() == CommitKind.COMPACT) {
+ continue;
+ }
+ List<ManifestEntry> changes =
+ commitScanner.readIncrementalEntries(snapshot,
changedPartitions);
+ for (ManifestEntry entry : changes) {
+ DataFileMeta file = entry.file();
+ long firstRowId = file.nonNullFirstRowId();
+ if (firstRowId < checkNextRowId) {
+ Range fileRange = new Range(firstRowId, firstRowId +
file.rowCount() - 1);
+ for (Range range : historyIdRanges) {
+ if (range.hasIntersection(fileRange)) {
+ return Optional.of(
+ new RuntimeException(
+ "For Data Evolution table,
multiple 'MERGE INTO' operations have encountered conflicts,"
+ + " updating the same
file, which can render some updates ineffective."));
+ }
+ }
+ }
+ }
+ }
+
+ return Optional.empty();
+ }
+
static List<SimpleFileEntry> buildBaseEntriesWithDV(
List<SimpleFileEntry> baseEntries, List<IndexManifestEntry>
baseIndexEntries) {
if (baseEntries.isEmpty()) {
@@ -560,4 +696,9 @@ public class ConflictDetection {
return Objects.hash(partition, bucket, level);
}
}
+
+ /** Factory to create {@link ConflictDetection}. */
+ public interface Factory {
+ ConflictDetection create(CommitScanner scanner);
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java
index dcea67e4df..f32f6c9ef5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java
@@ -20,6 +20,7 @@ package org.apache.paimon.operation.commit;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileEntry;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestEntry;
@@ -163,9 +164,9 @@ public class ManifestEntryChanges {
}
public static List<BinaryRow> changedPartitions(
- List<ManifestEntry> dataFileChanges, List<IndexManifestEntry>
indexFileChanges) {
+ List<? extends FileEntry> dataFileChanges,
List<IndexManifestEntry> indexFileChanges) {
Set<BinaryRow> changedPartitions = new HashSet<>();
- for (ManifestEntry file : dataFileChanges) {
+ for (FileEntry file : dataFileChanges) {
changedPartitions.add(file.partition());
}
for (IndexManifestEntry file : indexFileChanges) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
index 66c67e8965..c06bd9de42 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
@@ -41,6 +41,7 @@ public class BatchWriteBuilderImpl implements
BatchWriteBuilder {
private Map<String, String> staticPartition;
private boolean appendCommitCheckConflict = false;
+ private @Nullable Long rowIdCheckFromSnapshot = null;
public BatchWriteBuilderImpl(InnerTable table) {
this.table = table;
@@ -85,7 +86,8 @@ public class BatchWriteBuilderImpl implements
BatchWriteBuilder {
InnerTableCommit commit =
table.newCommit(commitUser)
.withOverwrite(staticPartition)
- .appendCommitCheckConflict(appendCommitCheckConflict);
+ .appendCommitCheckConflict(appendCommitCheckConflict)
+ .rowIdCheckConflict(rowIdCheckFromSnapshot);
commit.ignoreEmptyCommit(
Options.fromMap(table.options())
.getOptional(CoreOptions.SNAPSHOT_IGNORE_EMPTY_COMMIT)
@@ -101,4 +103,9 @@ public class BatchWriteBuilderImpl implements
BatchWriteBuilder {
this.appendCommitCheckConflict = appendCommitCheckConflict;
return this;
}
+
+ public BatchWriteBuilderImpl rowIdCheckConflict(@Nullable Long
rowIdCheckFromSnapshot) {
+ this.rowIdCheckFromSnapshot = rowIdCheckFromSnapshot;
+ return this;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
index 0a8fdd6742..a73771f218 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
@@ -48,6 +48,8 @@ public interface InnerTableCommit extends StreamTableCommit,
BatchTableCommit {
InnerTableCommit appendCommitCheckConflict(boolean
appendCommitCheckConflict);
+ InnerTableCommit rowIdCheckConflict(@Nullable Long rowIdCheckFromSnapshot);
+
@Override
InnerTableCommit withMetricRegistry(MetricRegistry registry);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 8bd65b05c3..0311c9bbe4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -164,6 +164,12 @@ public class TableCommitImpl implements InnerTableCommit {
return this;
}
+ @Override
+ public TableCommitImpl rowIdCheckConflict(@Nullable Long
rowIdCheckFromSnapshot) {
+ commit.rowIdCheckConflict(rowIdCheckFromSnapshot);
+ return this;
+ }
+
@Override
public InnerTableCommit withMetricRegistry(MetricRegistry registry) {
commit.withMetrics(new CommitMetrics(registry, tableName));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
index e75e4d0358..522cb73e72 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
@@ -303,6 +303,8 @@ class ConflictDetectionTest {
null,
EMPTY_ROW,
EMPTY_ROW,
+ null,
+ 0L,
null);
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
index 82c046e621..f8c2b6c24f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
@@ -136,16 +136,13 @@ case class MergeIntoPaimonDataEvolutionTable(
lazy val tableSchema: StructType = v2Table.schema
override def run(sparkSession: SparkSession): Seq[Row] = {
- // Avoid that more than one source rows match the same target row.
- val commitMessages = invokeMergeInto(sparkSession)
- writer.commit(commitMessages)
+ invokeMergeInto(sparkSession)
Seq.empty[Row]
}
- private def invokeMergeInto(sparkSession: SparkSession): Seq[CommitMessage]
= {
- val tableSplits: Seq[DataSplit] = table
- .newSnapshotReader()
- .read()
+ private def invokeMergeInto(sparkSession: SparkSession): Unit = {
+ val plan = table.newSnapshotReader().read()
+ val tableSplits: Seq[DataSplit] = plan
.splits()
.asScala
.map(_.asInstanceOf[DataSplit])
@@ -197,7 +194,10 @@ case class MergeIntoPaimonDataEvolutionTable(
insertActionInvoke(sparkSession, touchedFileTargetRelation)
else Nil
- updateCommit ++ insertCommit
+ if (plan.snapshotId() != null) {
+ writer.rowIdCheckConflict(plan.snapshotId())
+ }
+ writer.commit(updateCommit ++ insertCommit)
}
private def targetRelatedSplits(
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 721c09e2d5..d81d70f55b 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -403,6 +403,10 @@ case class PaimonSparkWriter(
.map(deserializeCommitMessage(serializer, _))
}
+ def rowIdCheckConflict(rowIdCheckFromSnapshot: Long): Unit = {
+
writeBuilder.asInstanceOf[BatchWriteBuilderImpl].rowIdCheckConflict(rowIdCheckFromSnapshot)
+ }
+
def commit(commitMessages: Seq[CommitMessage]): Unit = {
val finalWriteBuilder = if (postponeBatchWriteFixedBucket) {
writeBuilder
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index 4079cf5eed..0ea9d21bcf 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -87,6 +87,135 @@ abstract class RowTrackingTestBase extends
PaimonSparkTestBase {
}
}
+ test("Data Evolution: concurrent merge and merge") {
+ withTable("s", "t") {
+ sql(s"""
+ CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES (
+ 'row-tracking.enabled' = 'true',
+ 'data-evolution.enabled' = 'true')
+ """)
+ sql("INSERT INTO t VALUES (1, 0, 0)")
+ Seq((1, 1, 1)).toDF("id", "b", "c").createOrReplaceTempView("s")
+
+ def doMerge(): Unit = {
+ var success = false
+ while (!success) {
+ try {
+ sql(s"""
+ |MERGE INTO t
+ |USING s
+ |ON t.id = s.id
+ |WHEN MATCHED THEN
+ |UPDATE SET t.id = s.id, t.b = s.b + t.b, t.c = s.c + t.c
+ |""".stripMargin).collect()
+ success = true
+ } catch {
+ case e: Exception =>
+ if (
+ !e.getMessage.contains(
+ "multiple 'MERGE INTO' operations have encountered
conflicts")
+ ) {
+ throw e
+ }
+ }
+ }
+ }
+
+ val mergeInto1 = Future {
+ for (_ <- 1 to 10) {
+ doMerge()
+ }
+ }
+
+ val mergeInto2 = Future {
+ for (_ <- 1 to 10) {
+ doMerge()
+ }
+ }
+
+ Await.result(mergeInto1, 60.seconds)
+ Await.result(mergeInto2, 60.seconds)
+
+ checkAnswer(sql("SELECT * FROM t"), Seq(Row(1, 20, 20)))
+ }
+ }
+
+ test("Data Evolution: concurrent merge and small files compact") {
+ withTable("s", "t") {
+ sql(s"""
+ CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES (
+ 'row-tracking.enabled' = 'true',
+ 'compaction.min.file-num' = '2',
+ 'data-evolution.enabled' = 'true')
+ """)
+ sql("INSERT INTO t VALUES (1, 0, 0)")
+ Seq((1, 1, 1)).toDF("id", "b", "c").createOrReplaceTempView("s")
+
+ def doWithRetry(doAction: () => Unit): Unit = {
+ var success = false
+ while (!success) {
+ try {
+ doAction.apply()
+ success = true
+ } catch {
+ case e: Exception =>
+ if (!e.getMessage.contains("multiple 'MERGE INTO' and 'COMPACT'
operations")) {
+ throw e
+ }
+ }
+ }
+ }
+
+ val mergeInto = Future {
+ for (i <- 1 to 10) {
+ doWithRetry(() => sql(s"""
+ |MERGE INTO t
+ |USING s
+ |ON t.id = s.id
+ |WHEN MATCHED THEN
+ |UPDATE SET t.id = s.id, t.b = s.b + t.b,
t.c = s.c + t.c
+ |""".stripMargin).collect())
+ if (i > 1) {
+ sql(s"INSERT INTO t VALUES ($i, $i, $i)")
+ }
+ }
+ }
+
+ val t = loadTable("t")
+
+ def canBeCompacted: Boolean = {
+ val split =
t.newSnapshotReader().read().splits().get(0).asInstanceOf[DataSplit]
+ split.dataFiles().size() > 1
+ }
+
+ val compact = Future {
+ for (_ <- 1 to 10) {
+ while (!canBeCompacted) {
+ Thread.sleep(1)
+ }
+ doWithRetry(() => sql("CALL sys.compact(table => 't')"))
+ }
+ }
+
+ Await.result(mergeInto, 60.seconds)
+ Await.result(compact, 60.seconds)
+
+ checkAnswer(
+ sql("SELECT * FROM t"),
+ Seq(
+ Row(1, 10, 10),
+ Row(2, 2, 2),
+ Row(3, 3, 3),
+ Row(4, 4, 4),
+ Row(5, 5, 5),
+ Row(6, 6, 6),
+ Row(7, 7, 7),
+ Row(8, 8, 8),
+ Row(9, 9, 9),
+ Row(10, 10, 10)))
+ }
+ }
+
test("Row Tracking: read row Tracking") {
withTable("t") {
sql("CREATE TABLE t (id INT, data STRING) TBLPROPERTIES
('row-tracking.enabled' = 'true')")