This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 24d28dd960 [hotfix] Minor refactor for FileStoreCommitImpl 24d28dd960 is described below commit 24d28dd9609c578ad484df628395c94eb57e50e3 Author: JingsongLi <jingsongl...@gmail.com> AuthorDate: Fri Sep 19 15:43:41 2025 +0800 [hotfix] Minor refactor for FileStoreCommitImpl --- .../paimon/operation/FileStoreCommitImpl.java | 40 +++++++++++++--------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 300cc9c926..466df201a3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -20,6 +20,7 @@ package org.apache.paimon.operation; import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; +import org.apache.paimon.Snapshot.CommitKind; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.SnapshotCommit; import org.apache.paimon.data.BinaryRow; @@ -320,9 +321,12 @@ public class FileStoreCommitImpl implements FileStoreCommit { // This optimization is mainly used to decrease the number of times we read from // files. latestSnapshot = snapshotManager.latestSnapshot(); - boolean hasDelete = hasDelete(appendSimpleEntries, appendIndexFiles); - Snapshot.CommitKind commitKind = - hasDelete ? Snapshot.CommitKind.OVERWRITE : Snapshot.CommitKind.APPEND; + CommitKind commitKind = CommitKind.APPEND; + ConflictCheck conflictCheck = noConflictCheck(); + if (containsFileDeletionOrDeletionVectors(appendSimpleEntries, appendIndexFiles)) { + commitKind = CommitKind.OVERWRITE; + conflictCheck = mustConflictCheck(); + } if (latestSnapshot != null && checkAppendFiles) { // it is possible that some partitions only have compact changes, @@ -348,7 +352,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { committable.logOffsets(), committable.properties(), commitKind, - hasDelete ? mustConflictCheck() : noConflictCheck(), + conflictCheck, null); generatedSnapshot += 1; } @@ -369,7 +373,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { latestSnapshot.commitUser(), baseEntries, SimpleFileEntry.from(compactTableFiles), - Snapshot.CommitKind.COMPACT); + CommitKind.COMPACT); // assume this compact commit follows just after the append commit created above safeLatestSnapshotId += 1; } @@ -383,7 +387,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { committable.watermark(), committable.logOffsets(), committable.properties(), - Snapshot.CommitKind.COMPACT, + CommitKind.COMPACT, hasConflictChecked(safeLatestSnapshotId), null); generatedSnapshot += 1; @@ -428,7 +432,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { commitMetrics.reportCommit(commitStats); } - private boolean hasDelete( + private boolean containsFileDeletionOrDeletionVectors( List<SimpleFileEntry> appendSimpleEntries, List<IndexManifestEntry> appendIndexFiles) { for (SimpleFileEntry appendSimpleEntry : appendSimpleEntries) { if (appendSimpleEntry.kind().equals(FileKind.DELETE)) { @@ -554,7 +558,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { committable.watermark(), committable.logOffsets(), committable.properties(), - Snapshot.CommitKind.COMPACT, + CommitKind.COMPACT, mustConflictCheck(), null); generatedSnapshot += 1; @@ -664,7 +668,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { null, Collections.emptyMap(), Collections.emptyMap(), - Snapshot.CommitKind.ANALYZE, + CommitKind.ANALYZE, noConflictCheck(), statsFileName); } @@ -807,7 +811,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { @Nullable Long watermark, Map<Integer, Long> logOffsets, Map<String, String> properties, - Snapshot.CommitKind commitKind, + CommitKind commitKind, ConflictCheck conflictCheck, @Nullable String statsFileName) { int retryCount = 0; @@ -910,7 +914,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { watermark, logOffsets, properties, - Snapshot.CommitKind.OVERWRITE, + CommitKind.OVERWRITE, mustConflictCheck(), null); } @@ -925,7 +929,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { @Nullable Long watermark, Map<Integer, Long> logOffsets, Map<String, String> properties, - Snapshot.CommitKind commitKind, + CommitKind commitKind, @Nullable Snapshot latestSnapshot, ConflictCheck conflictCheck, @Nullable String newStatsFileName) { @@ -964,8 +968,8 @@ public class FileStoreCommitImpl implements FileStoreCommit { if (strictModeLastSafeSnapshot != null && strictModeLastSafeSnapshot >= 0) { for (long id = strictModeLastSafeSnapshot + 1; id < newSnapshotId; id++) { Snapshot snapshot = snapshotManager.snapshot(id); - if ((snapshot.commitKind() == Snapshot.CommitKind.COMPACT - || snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE) + if ((snapshot.commitKind() == CommitKind.COMPACT + || snapshot.commitKind() == CommitKind.OVERWRITE) && !snapshot.commitUser().equals(commitUser)) { throw new RuntimeException( String.format( @@ -1296,7 +1300,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { latestSnapshot.indexManifest(), commitUser, Long.MAX_VALUE, - Snapshot.CommitKind.COMPACT, + CommitKind.COMPACT, System.currentTimeMillis(), latestSnapshot.logOffsets(), latestSnapshot.totalRecordCount(), @@ -1375,11 +1379,11 @@ public class FileStoreCommitImpl implements FileStoreCommit { String baseCommitUser, List<SimpleFileEntry> baseEntries, List<SimpleFileEntry> changes, - Snapshot.CommitKind commitKind) { + CommitKind commitKind) { List<SimpleFileEntry> allEntries = new ArrayList<>(baseEntries); allEntries.addAll(changes); - if (commitKind != Snapshot.CommitKind.OVERWRITE) { + if (commitKind != CommitKind.OVERWRITE) { // total buckets within the same partition should remain the same Map<BinaryRow, Integer> totalBuckets = new HashMap<>(); for (SimpleFileEntry entry : allEntries) { @@ -1438,6 +1442,8 @@ public class FileStoreCommitImpl implements FileStoreCommit { assertNoDelete(mergedEntries, exceptionFunction); + // TODO check for deletion vectors + // fast exit for file store without keys if (keyComparator == null) { return;