This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 24d28dd960 [hotfix] Minor refactor for FileStoreCommitImpl
24d28dd960 is described below

commit 24d28dd9609c578ad484df628395c94eb57e50e3
Author: JingsongLi <jingsongl...@gmail.com>
AuthorDate: Fri Sep 19 15:43:41 2025 +0800

    [hotfix] Minor refactor for FileStoreCommitImpl
---
 .../paimon/operation/FileStoreCommitImpl.java      | 40 +++++++++++++---------
 1 file changed, 23 insertions(+), 17 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 300cc9c926..466df201a3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -20,6 +20,7 @@ package org.apache.paimon.operation;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.Snapshot.CommitKind;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.SnapshotCommit;
 import org.apache.paimon.data.BinaryRow;
@@ -320,9 +321,12 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                 // This optimization is mainly used to decrease the number of 
times we read from
                 // files.
                 latestSnapshot = snapshotManager.latestSnapshot();
-                boolean hasDelete = hasDelete(appendSimpleEntries, 
appendIndexFiles);
-                Snapshot.CommitKind commitKind =
-                        hasDelete ? Snapshot.CommitKind.OVERWRITE : 
Snapshot.CommitKind.APPEND;
+                CommitKind commitKind = CommitKind.APPEND;
+                ConflictCheck conflictCheck = noConflictCheck();
+                if (containsFileDeletionOrDeletionVectors(appendSimpleEntries, 
appendIndexFiles)) {
+                    commitKind = CommitKind.OVERWRITE;
+                    conflictCheck = mustConflictCheck();
+                }
 
                 if (latestSnapshot != null && checkAppendFiles) {
                     // it is possible that some partitions only have compact 
changes,
@@ -348,7 +352,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                 committable.logOffsets(),
                                 committable.properties(),
                                 commitKind,
-                                hasDelete ? mustConflictCheck() : 
noConflictCheck(),
+                                conflictCheck,
                                 null);
                 generatedSnapshot += 1;
             }
@@ -369,7 +373,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                             latestSnapshot.commitUser(),
                             baseEntries,
                             SimpleFileEntry.from(compactTableFiles),
-                            Snapshot.CommitKind.COMPACT);
+                            CommitKind.COMPACT);
                     // assume this compact commit follows just after the 
append commit created above
                     safeLatestSnapshotId += 1;
                 }
@@ -383,7 +387,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                 committable.watermark(),
                                 committable.logOffsets(),
                                 committable.properties(),
-                                Snapshot.CommitKind.COMPACT,
+                                CommitKind.COMPACT,
                                 hasConflictChecked(safeLatestSnapshotId),
                                 null);
                 generatedSnapshot += 1;
@@ -428,7 +432,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         commitMetrics.reportCommit(commitStats);
     }
 
-    private boolean hasDelete(
+    private boolean containsFileDeletionOrDeletionVectors(
             List<SimpleFileEntry> appendSimpleEntries, 
List<IndexManifestEntry> appendIndexFiles) {
         for (SimpleFileEntry appendSimpleEntry : appendSimpleEntries) {
             if (appendSimpleEntry.kind().equals(FileKind.DELETE)) {
@@ -554,7 +558,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                 committable.watermark(),
                                 committable.logOffsets(),
                                 committable.properties(),
-                                Snapshot.CommitKind.COMPACT,
+                                CommitKind.COMPACT,
                                 mustConflictCheck(),
                                 null);
                 generatedSnapshot += 1;
@@ -664,7 +668,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                 null,
                 Collections.emptyMap(),
                 Collections.emptyMap(),
-                Snapshot.CommitKind.ANALYZE,
+                CommitKind.ANALYZE,
                 noConflictCheck(),
                 statsFileName);
     }
@@ -807,7 +811,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             @Nullable Long watermark,
             Map<Integer, Long> logOffsets,
             Map<String, String> properties,
-            Snapshot.CommitKind commitKind,
+            CommitKind commitKind,
             ConflictCheck conflictCheck,
             @Nullable String statsFileName) {
         int retryCount = 0;
@@ -910,7 +914,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                 watermark,
                 logOffsets,
                 properties,
-                Snapshot.CommitKind.OVERWRITE,
+                CommitKind.OVERWRITE,
                 mustConflictCheck(),
                 null);
     }
@@ -925,7 +929,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             @Nullable Long watermark,
             Map<Integer, Long> logOffsets,
             Map<String, String> properties,
-            Snapshot.CommitKind commitKind,
+            CommitKind commitKind,
             @Nullable Snapshot latestSnapshot,
             ConflictCheck conflictCheck,
             @Nullable String newStatsFileName) {
@@ -964,8 +968,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         if (strictModeLastSafeSnapshot != null && strictModeLastSafeSnapshot 
>= 0) {
             for (long id = strictModeLastSafeSnapshot + 1; id < newSnapshotId; 
id++) {
                 Snapshot snapshot = snapshotManager.snapshot(id);
-                if ((snapshot.commitKind() == Snapshot.CommitKind.COMPACT
-                                || snapshot.commitKind() == 
Snapshot.CommitKind.OVERWRITE)
+                if ((snapshot.commitKind() == CommitKind.COMPACT
+                                || snapshot.commitKind() == 
CommitKind.OVERWRITE)
                         && !snapshot.commitUser().equals(commitUser)) {
                     throw new RuntimeException(
                             String.format(
@@ -1296,7 +1300,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                         latestSnapshot.indexManifest(),
                         commitUser,
                         Long.MAX_VALUE,
-                        Snapshot.CommitKind.COMPACT,
+                        CommitKind.COMPACT,
                         System.currentTimeMillis(),
                         latestSnapshot.logOffsets(),
                         latestSnapshot.totalRecordCount(),
@@ -1375,11 +1379,11 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             String baseCommitUser,
             List<SimpleFileEntry> baseEntries,
             List<SimpleFileEntry> changes,
-            Snapshot.CommitKind commitKind) {
+            CommitKind commitKind) {
         List<SimpleFileEntry> allEntries = new ArrayList<>(baseEntries);
         allEntries.addAll(changes);
 
-        if (commitKind != Snapshot.CommitKind.OVERWRITE) {
+        if (commitKind != CommitKind.OVERWRITE) {
             // total buckets within the same partition should remain the same
             Map<BinaryRow, Integer> totalBuckets = new HashMap<>();
             for (SimpleFileEntry entry : allEntries) {
@@ -1438,6 +1442,8 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
 
         assertNoDelete(mergedEntries, exceptionFunction);
 
+        // TODO check for deletion vectors
+
         // fast exit for file store without keys
         if (keyComparator == null) {
             return;

Reply via email to