This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7ec86cb99b [core] Clean up constructor for FileStoreCommitImpl (#7213)
7ec86cb99b is described below
commit 7ec86cb99b46b3c8dbbd17aa73c75c6548e5a481
Author: tsreaper <[email protected]>
AuthorDate: Thu Feb 5 17:24:36 2026 +0800
[core] Clean up constructor for FileStoreCommitImpl (#7213)
---
.../java/org/apache/paimon/AbstractFileStore.java | 24 +----
.../paimon/operation/FileStoreCommitImpl.java | 112 +++++++++------------
.../paimon/operation/commit/StrictModeChecker.java | 16 ---
3 files changed, 47 insertions(+), 105 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 8b30113f02..eea68990c7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -48,7 +48,6 @@ import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.operation.commit.CommitRollback;
import org.apache.paimon.operation.commit.ConflictDetection;
-import org.apache.paimon.operation.commit.StrictModeChecker;
import org.apache.paimon.partition.PartitionExpireStrategy;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -291,12 +290,6 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
newIndexFileHandler(),
snapshotManager,
scanner);
- StrictModeChecker strictModeChecker =
- StrictModeChecker.create(
- snapshotManager,
- commitUser,
- this::newScan,
-
options.commitStrictModeLastSafeSnapshot().orElse(null));
CommitRollback rollback = null;
TableRollback tableRollback =
catalogEnvironment.catalogTableRollback();
if (tableRollback != null) {
@@ -310,32 +303,17 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
commitUser,
partitionType,
options,
- options.partitionDefaultName(),
pathFactory(),
snapshotManager,
manifestFileFactory(),
manifestListFactory(),
indexManifestFileFactory(),
- newScan(),
- options.bucket(),
- options.manifestTargetSize(),
- options.manifestFullCompactionThresholdSize(),
- options.manifestMergeMinCount(),
- partitionType.getFieldCount() > 0 &&
options.dynamicPartitionOverwrite(),
- options.branch(),
+ this::newScan,
newStatsFileHandler(),
bucketMode(),
- options.scanManifestParallelism(),
createCommitPreCallbacks(table),
createCommitCallbacks(commitUser, table),
- options.commitMaxRetries(),
- options.commitTimeout(),
- options.commitMinRetryWait(),
- options.commitMaxRetryWait(),
- options.rowTrackingEnabled(),
- options.commitDiscardDuplicateFiles(),
conflictDetectFactory,
- strictModeChecker,
rollback);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 983f541446..7cc0266b43 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -54,7 +54,6 @@ import org.apache.paimon.operation.commit.StrictModeChecker;
import org.apache.paimon.operation.commit.SuccessCommitResult;
import org.apache.paimon.operation.metrics.CommitMetrics;
import org.apache.paimon.operation.metrics.CommitStats;
-import org.apache.paimon.options.MemorySize;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.partition.PartitionStatistics;
import org.apache.paimon.predicate.Predicate;
@@ -91,6 +90,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
@@ -135,7 +135,6 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
private final String commitUser;
private final RowType partitionType;
private final CoreOptions options;
- private final String partitionDefaultName;
private final FileStorePathFactory pathFactory;
private final SnapshotManager snapshotManager;
private final ManifestFile manifestFile;
@@ -143,23 +142,12 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
private final IndexManifestFile indexManifestFile;
@Nullable private final CommitRollback rollback;
private final CommitScanner scanner;
- private final int numBucket;
- private final MemorySize manifestTargetSize;
- private final MemorySize manifestFullCompactionSize;
- private final int manifestMergeMinCount;
- private final boolean dynamicPartitionOverwrite;
- private final String branchName;
- @Nullable private final Integer manifestReadParallelism;
private final List<CommitPreCallback> commitPreCallbacks;
private final List<CommitCallback> commitCallbacks;
private final StatsFileHandler statsFileHandler;
private final BucketMode bucketMode;
- private final long commitTimeout;
private final RetryWaiter retryWaiter;
- private final int commitMaxRetries;
private final InternalRowPartitionComputer partitionComputer;
- private final boolean rowTrackingEnabled;
- private final boolean discardDuplicateFiles;
@Nullable private final StrictModeChecker strictModeChecker;
private final ConflictDetection conflictDetection;
private final CommitCleaner commitCleaner;
@@ -176,32 +164,17 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
String commitUser,
RowType partitionType,
CoreOptions options,
- String partitionDefaultName,
FileStorePathFactory pathFactory,
SnapshotManager snapshotManager,
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
IndexManifestFile.Factory indexManifestFileFactory,
- FileStoreScan scan,
- int numBucket,
- MemorySize manifestTargetSize,
- MemorySize manifestFullCompactionSize,
- int manifestMergeMinCount,
- boolean dynamicPartitionOverwrite,
- String branchName,
+ Supplier<FileStoreScan> scanSupplier,
StatsFileHandler statsFileHandler,
BucketMode bucketMode,
- @Nullable Integer manifestReadParallelism,
List<CommitPreCallback> commitPreCallbacks,
List<CommitCallback> commitCallbacks,
- int commitMaxRetries,
- long commitTimeout,
- long commitMinRetryWait,
- long commitMaxRetryWait,
- boolean rowTrackingEnabled,
- boolean discardDuplicateFiles,
ConflictDetection.Factory conflictDetectFactory,
- @Nullable StrictModeChecker strictModeChecker,
@Nullable CommitRollback rollback) {
this.snapshotCommit = snapshotCommit;
this.fileIO = fileIO;
@@ -210,26 +183,17 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
this.commitUser = commitUser;
this.partitionType = partitionType;
this.options = options;
- this.partitionDefaultName = partitionDefaultName;
this.pathFactory = pathFactory;
this.snapshotManager = snapshotManager;
this.manifestFile = manifestFileFactory.create();
this.manifestList = manifestListFactory.create();
this.indexManifestFile = indexManifestFileFactory.create();
this.rollback = rollback;
- this.scanner = new CommitScanner(scan, indexManifestFile, options);
- this.numBucket = numBucket;
- this.manifestTargetSize = manifestTargetSize;
- this.manifestFullCompactionSize = manifestFullCompactionSize;
- this.manifestMergeMinCount = manifestMergeMinCount;
- this.dynamicPartitionOverwrite = dynamicPartitionOverwrite;
- this.branchName = branchName;
- this.manifestReadParallelism = manifestReadParallelism;
+ this.scanner = new CommitScanner(scanSupplier.get(),
indexManifestFile, options);
this.commitPreCallbacks = commitPreCallbacks;
this.commitCallbacks = commitCallbacks;
- this.commitMaxRetries = commitMaxRetries;
- this.commitTimeout = commitTimeout;
- this.retryWaiter = new RetryWaiter(commitMinRetryWait,
commitMaxRetryWait);
+ this.retryWaiter =
+ new RetryWaiter(options.commitMinRetryWait(),
options.commitMaxRetryWait());
this.partitionComputer =
new InternalRowPartitionComputer(
options.partitionDefaultName(),
@@ -240,9 +204,16 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
this.commitMetrics = null;
this.statsFileHandler = statsFileHandler;
this.bucketMode = bucketMode;
- this.rowTrackingEnabled = rowTrackingEnabled;
- this.discardDuplicateFiles = discardDuplicateFiles;
- this.strictModeChecker = strictModeChecker;
+ this.strictModeChecker =
+ options.commitStrictModeLastSafeSnapshot()
+ .map(
+ id ->
+ new StrictModeChecker(
+ snapshotManager,
+ commitUser,
+ scanSupplier.get(),
+ id))
+ .orElse(null);
this.conflictDetection = conflictDetectFactory.create(scanner);
this.commitCleaner = new CommitCleaner(manifestList, manifestFile,
indexManifestFile);
}
@@ -468,7 +439,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
boolean skipOverwrite = false;
// partition filter is built from static or dynamic partition
according to properties
PartitionPredicate partitionFilter = null;
- if (dynamicPartitionOverwrite) {
+ if (partitionType.getFieldCount() > 0 &&
options.dynamicPartitionOverwrite()) {
if (changes.appendTableFiles.isEmpty()) {
// in dynamic mode, if there is no changes to commit, no
data will be deleted
skipOverwrite = true;
@@ -482,7 +453,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
} else {
// partition may be partial partition fields, so here must use
predicate way.
Predicate partitionPredicate =
- createPartitionPredicate(partition, partitionType,
partitionDefaultName);
+ createPartitionPredicate(
+ partition, partitionType,
options.partitionDefaultName());
partitionFilter =
PartitionPredicate.fromPredicate(partitionType,
partitionPredicate);
// sanity check, all changes must be done within the given
partition
@@ -613,16 +585,19 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
PartitionPredicate partitionFilter;
if (fullMode) {
List<BinaryRow> binaryPartitions =
- createBinaryPartitions(partitions, partitionType,
partitionDefaultName);
+ createBinaryPartitions(
+ partitions, partitionType,
options.partitionDefaultName());
partitionFilter = PartitionPredicate.fromMultiple(partitionType,
binaryPartitions);
} else {
- // partitions may be partial partition fields, so here must to use
predicate way.
+ // partitions may be partial partition fields, so here must use
predicate way.
Predicate predicate =
partitions.stream()
.map(
partition ->
createPartitionPredicate(
- partition, partitionType,
partitionDefaultName))
+ partition,
+ partitionType,
+
options.partitionDefaultName()))
.reduce(PredicateBuilder::or)
.orElseThrow(
() -> new RuntimeException("Failed to get
partition filter."));
@@ -688,7 +663,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
}
private ManifestEntryChanges collectChanges(List<CommitMessage>
commitMessages) {
- ManifestEntryChanges changes = new ManifestEntryChanges(numBucket);
+ ManifestEntryChanges changes = new
ManifestEntryChanges(options.bucket());
commitMessages.forEach(changes::collect);
LOG.info("Finished collecting changes, including: {}", changes);
return changes;
@@ -730,12 +705,12 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
retryResult = (RetryCommitResult) result;
- if (System.currentTimeMillis() - startMillis > commitTimeout
- || retryCount >= commitMaxRetries) {
+ if (System.currentTimeMillis() - startMillis >
options.commitTimeout()
+ || retryCount >= options.commitMaxRetries()) {
String message =
String.format(
"Commit failed after %s millis with %s
retries, there maybe exist commit conflicts between multiple jobs.",
- commitTimeout, retryCount);
+ options.commitTimeout(), retryCount);
throw new RuntimeException(message, retryResult.exception);
}
@@ -761,7 +736,11 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
return tryCommit(
latestSnapshot ->
scanner.readOverwriteChanges(
- numBucket, changes, indexFiles,
latestSnapshot, partitionFilter),
+ options.bucket(),
+ changes,
+ indexFiles,
+ latestSnapshot,
+ partitionFilter),
identifier,
watermark,
properties,
@@ -836,7 +815,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
}
List<SimpleFileEntry> baseDataFiles = new ArrayList<>();
- boolean discardDuplicate = discardDuplicateFiles && commitKind ==
CommitKind.APPEND;
+ boolean discardDuplicate =
+ options.commitDiscardDuplicateFiles() && commitKind ==
CommitKind.APPEND;
if (latestSnapshot != null && (discardDuplicate || detectConflicts)) {
// latestSnapshotId is different from the snapshot id we've
checked for conflicts,
// so we have to check again
@@ -920,14 +900,14 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
ManifestFileMerger.merge(
mergeBeforeManifests,
manifestFile,
- manifestTargetSize.getBytes(),
- manifestMergeMinCount,
- manifestFullCompactionSize.getBytes(),
+ options.manifestTargetSize().getBytes(),
+ options.manifestMergeMinCount(),
+
options.manifestFullCompactionThresholdSize().getBytes(),
partitionType,
- manifestReadParallelism);
+ options.scanManifestParallelism());
baseManifestList = manifestList.write(mergeAfterManifests);
- if (rowTrackingEnabled) {
+ if (options.rowTrackingEnabled()) {
RowTrackingAssigned assigned =
assignRowTracking(newSnapshotId, firstRowIdStart,
deltaFiles);
nextRowIdStart = assigned.nextRowIdStart;
@@ -1097,12 +1077,12 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
break;
}
- if (System.currentTimeMillis() - startMillis > commitTimeout
- || retryCount >= commitMaxRetries) {
+ if (System.currentTimeMillis() - startMillis >
options.commitTimeout()
+ || retryCount >= options.commitMaxRetries()) {
throw new RuntimeException(
String.format(
"Commit failed after %s millis with %s
retries, there maybe exist commit conflicts between multiple jobs.",
- commitTimeout, retryCount));
+ options.commitTimeout(), retryCount));
}
retryWaiter.retryWait(retryCount);
@@ -1126,11 +1106,11 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
ManifestFileMerger.merge(
mergeBeforeManifests,
manifestFile,
- manifestTargetSize.getBytes(),
+ options.manifestTargetSize().getBytes(),
1,
1,
partitionType,
- manifestReadParallelism);
+ options.scanManifestParallelism());
if (new HashSet<>(mergeBeforeManifests).equals(new
HashSet<>(mergeAfterManifests))) {
// no need to commit this snapshot, because no compact were
happened
@@ -1173,7 +1153,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
for (PartitionEntry entry : deltaStatistics) {
statistics.add(entry.toPartitionStatistics(partitionComputer));
}
- return snapshotCommit.commit(newSnapshot, branchName, statistics);
+ return snapshotCommit.commit(newSnapshot, options.branch(),
statistics);
} catch (Throwable e) {
// exception when performing the atomic rename,
// we cannot clean up because we can't determine the success
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
index e2d76c2327..b0084b70be 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
@@ -26,10 +26,7 @@ import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.utils.SnapshotManager;
-import javax.annotation.Nullable;
-
import java.util.Iterator;
-import java.util.function.Supplier;
/** A checker to check strict mode based on last safe snapshot. */
public class StrictModeChecker {
@@ -51,19 +48,6 @@ public class StrictModeChecker {
this.strictModeLastSafeSnapshot = strictModeLastSafeSnapshot;
}
- @Nullable
- public static StrictModeChecker create(
- SnapshotManager snapshotManager,
- String commitUser,
- Supplier<FileStoreScan> scanSupplier,
- @Nullable Long strictModeLastSafeSnapshot) {
- if (strictModeLastSafeSnapshot == null) {
- return null;
- }
- return new StrictModeChecker(
- snapshotManager, commitUser, scanSupplier.get(),
strictModeLastSafeSnapshot);
- }
-
public void check(long newSnapshotId, CommitKind newCommitKind) {
for (long id = strictModeLastSafeSnapshot + 1; id < newSnapshotId;
id++) {
Snapshot snapshot = snapshotManager.snapshot(id);