This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ec86cb99b [core] Clean up constructor for FileStoreCommitImpl (#7213)
7ec86cb99b is described below

commit 7ec86cb99b46b3c8dbbd17aa73c75c6548e5a481
Author: tsreaper <[email protected]>
AuthorDate: Thu Feb 5 17:24:36 2026 +0800

    [core] Clean up constructor for FileStoreCommitImpl (#7213)
---
 .../java/org/apache/paimon/AbstractFileStore.java  |  24 +----
 .../paimon/operation/FileStoreCommitImpl.java      | 112 +++++++++------------
 .../paimon/operation/commit/StrictModeChecker.java |  16 ---
 3 files changed, 47 insertions(+), 105 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 8b30113f02..eea68990c7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -48,7 +48,6 @@ import org.apache.paimon.operation.SnapshotDeletion;
 import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.operation.commit.CommitRollback;
 import org.apache.paimon.operation.commit.ConflictDetection;
-import org.apache.paimon.operation.commit.StrictModeChecker;
 import org.apache.paimon.partition.PartitionExpireStrategy;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -291,12 +290,6 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
                                 newIndexFileHandler(),
                                 snapshotManager,
                                 scanner);
-        StrictModeChecker strictModeChecker =
-                StrictModeChecker.create(
-                        snapshotManager,
-                        commitUser,
-                        this::newScan,
-                        
options.commitStrictModeLastSafeSnapshot().orElse(null));
         CommitRollback rollback = null;
         TableRollback tableRollback = 
catalogEnvironment.catalogTableRollback();
         if (tableRollback != null) {
@@ -310,32 +303,17 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
                 commitUser,
                 partitionType,
                 options,
-                options.partitionDefaultName(),
                 pathFactory(),
                 snapshotManager,
                 manifestFileFactory(),
                 manifestListFactory(),
                 indexManifestFileFactory(),
-                newScan(),
-                options.bucket(),
-                options.manifestTargetSize(),
-                options.manifestFullCompactionThresholdSize(),
-                options.manifestMergeMinCount(),
-                partitionType.getFieldCount() > 0 && 
options.dynamicPartitionOverwrite(),
-                options.branch(),
+                this::newScan,
                 newStatsFileHandler(),
                 bucketMode(),
-                options.scanManifestParallelism(),
                 createCommitPreCallbacks(table),
                 createCommitCallbacks(commitUser, table),
-                options.commitMaxRetries(),
-                options.commitTimeout(),
-                options.commitMinRetryWait(),
-                options.commitMaxRetryWait(),
-                options.rowTrackingEnabled(),
-                options.commitDiscardDuplicateFiles(),
                 conflictDetectFactory,
-                strictModeChecker,
                 rollback);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 983f541446..7cc0266b43 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -54,7 +54,6 @@ import org.apache.paimon.operation.commit.StrictModeChecker;
 import org.apache.paimon.operation.commit.SuccessCommitResult;
 import org.apache.paimon.operation.metrics.CommitMetrics;
 import org.apache.paimon.operation.metrics.CommitStats;
-import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.partition.PartitionStatistics;
 import org.apache.paimon.predicate.Predicate;
@@ -91,6 +90,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static java.util.Collections.emptyList;
@@ -135,7 +135,6 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     private final String commitUser;
     private final RowType partitionType;
     private final CoreOptions options;
-    private final String partitionDefaultName;
     private final FileStorePathFactory pathFactory;
     private final SnapshotManager snapshotManager;
     private final ManifestFile manifestFile;
@@ -143,23 +142,12 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
     private final IndexManifestFile indexManifestFile;
     @Nullable private final CommitRollback rollback;
     private final CommitScanner scanner;
-    private final int numBucket;
-    private final MemorySize manifestTargetSize;
-    private final MemorySize manifestFullCompactionSize;
-    private final int manifestMergeMinCount;
-    private final boolean dynamicPartitionOverwrite;
-    private final String branchName;
-    @Nullable private final Integer manifestReadParallelism;
     private final List<CommitPreCallback> commitPreCallbacks;
     private final List<CommitCallback> commitCallbacks;
     private final StatsFileHandler statsFileHandler;
     private final BucketMode bucketMode;
-    private final long commitTimeout;
     private final RetryWaiter retryWaiter;
-    private final int commitMaxRetries;
     private final InternalRowPartitionComputer partitionComputer;
-    private final boolean rowTrackingEnabled;
-    private final boolean discardDuplicateFiles;
     @Nullable private final StrictModeChecker strictModeChecker;
     private final ConflictDetection conflictDetection;
     private final CommitCleaner commitCleaner;
@@ -176,32 +164,17 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             String commitUser,
             RowType partitionType,
             CoreOptions options,
-            String partitionDefaultName,
             FileStorePathFactory pathFactory,
             SnapshotManager snapshotManager,
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory,
             IndexManifestFile.Factory indexManifestFileFactory,
-            FileStoreScan scan,
-            int numBucket,
-            MemorySize manifestTargetSize,
-            MemorySize manifestFullCompactionSize,
-            int manifestMergeMinCount,
-            boolean dynamicPartitionOverwrite,
-            String branchName,
+            Supplier<FileStoreScan> scanSupplier,
             StatsFileHandler statsFileHandler,
             BucketMode bucketMode,
-            @Nullable Integer manifestReadParallelism,
             List<CommitPreCallback> commitPreCallbacks,
             List<CommitCallback> commitCallbacks,
-            int commitMaxRetries,
-            long commitTimeout,
-            long commitMinRetryWait,
-            long commitMaxRetryWait,
-            boolean rowTrackingEnabled,
-            boolean discardDuplicateFiles,
             ConflictDetection.Factory conflictDetectFactory,
-            @Nullable StrictModeChecker strictModeChecker,
             @Nullable CommitRollback rollback) {
         this.snapshotCommit = snapshotCommit;
         this.fileIO = fileIO;
@@ -210,26 +183,17 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         this.commitUser = commitUser;
         this.partitionType = partitionType;
         this.options = options;
-        this.partitionDefaultName = partitionDefaultName;
         this.pathFactory = pathFactory;
         this.snapshotManager = snapshotManager;
         this.manifestFile = manifestFileFactory.create();
         this.manifestList = manifestListFactory.create();
         this.indexManifestFile = indexManifestFileFactory.create();
         this.rollback = rollback;
-        this.scanner = new CommitScanner(scan, indexManifestFile, options);
-        this.numBucket = numBucket;
-        this.manifestTargetSize = manifestTargetSize;
-        this.manifestFullCompactionSize = manifestFullCompactionSize;
-        this.manifestMergeMinCount = manifestMergeMinCount;
-        this.dynamicPartitionOverwrite = dynamicPartitionOverwrite;
-        this.branchName = branchName;
-        this.manifestReadParallelism = manifestReadParallelism;
+        this.scanner = new CommitScanner(scanSupplier.get(), 
indexManifestFile, options);
         this.commitPreCallbacks = commitPreCallbacks;
         this.commitCallbacks = commitCallbacks;
-        this.commitMaxRetries = commitMaxRetries;
-        this.commitTimeout = commitTimeout;
-        this.retryWaiter = new RetryWaiter(commitMinRetryWait, 
commitMaxRetryWait);
+        this.retryWaiter =
+                new RetryWaiter(options.commitMinRetryWait(), 
options.commitMaxRetryWait());
         this.partitionComputer =
                 new InternalRowPartitionComputer(
                         options.partitionDefaultName(),
@@ -240,9 +204,16 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         this.commitMetrics = null;
         this.statsFileHandler = statsFileHandler;
         this.bucketMode = bucketMode;
-        this.rowTrackingEnabled = rowTrackingEnabled;
-        this.discardDuplicateFiles = discardDuplicateFiles;
-        this.strictModeChecker = strictModeChecker;
+        this.strictModeChecker =
+                options.commitStrictModeLastSafeSnapshot()
+                        .map(
+                                id ->
+                                        new StrictModeChecker(
+                                                snapshotManager,
+                                                commitUser,
+                                                scanSupplier.get(),
+                                                id))
+                        .orElse(null);
         this.conflictDetection = conflictDetectFactory.create(scanner);
         this.commitCleaner = new CommitCleaner(manifestList, manifestFile, 
indexManifestFile);
     }
@@ -468,7 +439,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             boolean skipOverwrite = false;
             // partition filter is built from static or dynamic partition 
according to properties
             PartitionPredicate partitionFilter = null;
-            if (dynamicPartitionOverwrite) {
+            if (partitionType.getFieldCount() > 0 && 
options.dynamicPartitionOverwrite()) {
                 if (changes.appendTableFiles.isEmpty()) {
                     // in dynamic mode, if there is no changes to commit, no 
data will be deleted
                     skipOverwrite = true;
@@ -482,7 +453,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             } else {
                 // partition may be partial partition fields, so here must use 
predicate way.
                 Predicate partitionPredicate =
-                        createPartitionPredicate(partition, partitionType, 
partitionDefaultName);
+                        createPartitionPredicate(
+                                partition, partitionType, 
options.partitionDefaultName());
                 partitionFilter =
                         PartitionPredicate.fromPredicate(partitionType, 
partitionPredicate);
                 // sanity check, all changes must be done within the given 
partition
@@ -613,16 +585,19 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         PartitionPredicate partitionFilter;
         if (fullMode) {
             List<BinaryRow> binaryPartitions =
-                    createBinaryPartitions(partitions, partitionType, 
partitionDefaultName);
+                    createBinaryPartitions(
+                            partitions, partitionType, 
options.partitionDefaultName());
             partitionFilter = PartitionPredicate.fromMultiple(partitionType, 
binaryPartitions);
         } else {
-            // partitions may be partial partition fields, so here must to use 
predicate way.
+            // partitions may be partial partition fields, so here must use 
predicate way.
             Predicate predicate =
                     partitions.stream()
                             .map(
                                     partition ->
                                             createPartitionPredicate(
-                                                    partition, partitionType, 
partitionDefaultName))
+                                                    partition,
+                                                    partitionType,
+                                                    
options.partitionDefaultName()))
                             .reduce(PredicateBuilder::or)
                             .orElseThrow(
                                     () -> new RuntimeException("Failed to get 
partition filter."));
@@ -688,7 +663,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     }
 
     private ManifestEntryChanges collectChanges(List<CommitMessage> 
commitMessages) {
-        ManifestEntryChanges changes = new ManifestEntryChanges(numBucket);
+        ManifestEntryChanges changes = new 
ManifestEntryChanges(options.bucket());
         commitMessages.forEach(changes::collect);
         LOG.info("Finished collecting changes, including: {}", changes);
         return changes;
@@ -730,12 +705,12 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
 
             retryResult = (RetryCommitResult) result;
 
-            if (System.currentTimeMillis() - startMillis > commitTimeout
-                    || retryCount >= commitMaxRetries) {
+            if (System.currentTimeMillis() - startMillis > 
options.commitTimeout()
+                    || retryCount >= options.commitMaxRetries()) {
                 String message =
                         String.format(
                                 "Commit failed after %s millis with %s 
retries, there maybe exist commit conflicts between multiple jobs.",
-                                commitTimeout, retryCount);
+                                options.commitTimeout(), retryCount);
                 throw new RuntimeException(message, retryResult.exception);
             }
 
@@ -761,7 +736,11 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         return tryCommit(
                 latestSnapshot ->
                         scanner.readOverwriteChanges(
-                                numBucket, changes, indexFiles, 
latestSnapshot, partitionFilter),
+                                options.bucket(),
+                                changes,
+                                indexFiles,
+                                latestSnapshot,
+                                partitionFilter),
                 identifier,
                 watermark,
                 properties,
@@ -836,7 +815,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         }
 
         List<SimpleFileEntry> baseDataFiles = new ArrayList<>();
-        boolean discardDuplicate = discardDuplicateFiles && commitKind == 
CommitKind.APPEND;
+        boolean discardDuplicate =
+                options.commitDiscardDuplicateFiles() && commitKind == 
CommitKind.APPEND;
         if (latestSnapshot != null && (discardDuplicate || detectConflicts)) {
             // latestSnapshotId is different from the snapshot id we've 
checked for conflicts,
             // so we have to check again
@@ -920,14 +900,14 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                     ManifestFileMerger.merge(
                             mergeBeforeManifests,
                             manifestFile,
-                            manifestTargetSize.getBytes(),
-                            manifestMergeMinCount,
-                            manifestFullCompactionSize.getBytes(),
+                            options.manifestTargetSize().getBytes(),
+                            options.manifestMergeMinCount(),
+                            
options.manifestFullCompactionThresholdSize().getBytes(),
                             partitionType,
-                            manifestReadParallelism);
+                            options.scanManifestParallelism());
             baseManifestList = manifestList.write(mergeAfterManifests);
 
-            if (rowTrackingEnabled) {
+            if (options.rowTrackingEnabled()) {
                 RowTrackingAssigned assigned =
                         assignRowTracking(newSnapshotId, firstRowIdStart, 
deltaFiles);
                 nextRowIdStart = assigned.nextRowIdStart;
@@ -1097,12 +1077,12 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                 break;
             }
 
-            if (System.currentTimeMillis() - startMillis > commitTimeout
-                    || retryCount >= commitMaxRetries) {
+            if (System.currentTimeMillis() - startMillis > 
options.commitTimeout()
+                    || retryCount >= options.commitMaxRetries()) {
                 throw new RuntimeException(
                         String.format(
                                 "Commit failed after %s millis with %s 
retries, there maybe exist commit conflicts between multiple jobs.",
-                                commitTimeout, retryCount));
+                                options.commitTimeout(), retryCount));
             }
 
             retryWaiter.retryWait(retryCount);
@@ -1126,11 +1106,11 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                 ManifestFileMerger.merge(
                         mergeBeforeManifests,
                         manifestFile,
-                        manifestTargetSize.getBytes(),
+                        options.manifestTargetSize().getBytes(),
                         1,
                         1,
                         partitionType,
-                        manifestReadParallelism);
+                        options.scanManifestParallelism());
 
         if (new HashSet<>(mergeBeforeManifests).equals(new 
HashSet<>(mergeAfterManifests))) {
             // no need to commit this snapshot, because no compact were 
happened
@@ -1173,7 +1153,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             for (PartitionEntry entry : deltaStatistics) {
                 statistics.add(entry.toPartitionStatistics(partitionComputer));
             }
-            return snapshotCommit.commit(newSnapshot, branchName, statistics);
+            return snapshotCommit.commit(newSnapshot, options.branch(), 
statistics);
         } catch (Throwable e) {
             // exception when performing the atomic rename,
             // we cannot clean up because we can't determine the success
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
index e2d76c2327..b0084b70be 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
@@ -26,10 +26,7 @@ import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.utils.SnapshotManager;
 
-import javax.annotation.Nullable;
-
 import java.util.Iterator;
-import java.util.function.Supplier;
 
 /** A checker to check strict mode based on last safe snapshot. */
 public class StrictModeChecker {
@@ -51,19 +48,6 @@ public class StrictModeChecker {
         this.strictModeLastSafeSnapshot = strictModeLastSafeSnapshot;
     }
 
-    @Nullable
-    public static StrictModeChecker create(
-            SnapshotManager snapshotManager,
-            String commitUser,
-            Supplier<FileStoreScan> scanSupplier,
-            @Nullable Long strictModeLastSafeSnapshot) {
-        if (strictModeLastSafeSnapshot == null) {
-            return null;
-        }
-        return new StrictModeChecker(
-                snapshotManager, commitUser, scanSupplier.get(), 
strictModeLastSafeSnapshot);
-    }
-
     public void check(long newSnapshotId, CommitKind newCommitKind) {
         for (long id = strictModeLastSafeSnapshot + 1; id < newSnapshotId; 
id++) {
             Snapshot snapshot = snapshotManager.snapshot(id);

Reply via email to