BsoBird commented on code in PR #3805:
URL: https://github.com/apache/paimon/pull/3805#discussion_r1689184744


##########
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java:
##########
@@ -1274,4 +1308,101 @@ static ConflictCheck noConflictCheck() {
     public static ConflictCheck mustConflictCheck() {
         return latestSnapshot -> true;
     }
+
+    boolean doCommit(
+            long newSnapshotId,
+            Path newSnapshotPath,
+            Snapshot newSnapshot,
+            boolean useSerializableIsolation,
+            boolean useLock) {
+        try {
+            boolean committed = tryAtomicCommitNewSnapshot(newSnapshotPath, 
newSnapshot);
+            if (committed) {
+                if (useSerializableIsolation && !useLock) {
+                    // If the useSerializableIsolation is turned on,And we not 
use lock service,
+                    // we will remove all HINTs,as it may provide incorrect 
information.
+                    snapshotManager.removeSnapshotEarliestHint();
+                    snapshotManager.removeSnapshotLatestHint();
+                    snapshotManager.removeChangeLogEarliestHint();
+                    snapshotManager.removeChangeLogLatestHint();
+                } else {
+                    snapshotManager.commitLatestHint(newSnapshotId);
+                }
+            }
+            return committed;
+        } catch (Exception e) {
+            throw new CommitStateUnknownException(e.getMessage(), e);
+        }
+    }
+
+    boolean tryAtomicCommitNewSnapshot(Path newSnapshotPath, Snapshot 
newSnapshot)
+            throws IOException {
+        return fileIO.tryToWriteAtomic(newSnapshotPath, newSnapshot.toJson());
+    }
+
+    void checkBeforeCommit(long newSnapshotId) throws IOException {
+        long latestSnapshotIdBeforeCommit =
+                
Optional.ofNullable(snapshotManager.latestSnapshotIdWithOutHint())
+                        .orElse(Snapshot.FIRST_SNAPSHOT_ID - 1);
+        boolean currentCommitIsLatest =
+                Objects.equals(newSnapshotId, latestSnapshotIdBeforeCommit + 
1);
+        if (!currentCommitIsLatest) {
+            snapshotManager.removeSnapshotLatestHint();
+            snapshotManager.removeSnapshotEarliestHint();
+            throw new CommitFailedException(
+                    String.format(
+                            "Can't submit snapshotId [%s], because it is 
expected that snapshotId [%s] should be submitted now.",
+                            newSnapshotId, latestSnapshotIdBeforeCommit + 1));
+        }
+    }
+
+    void checkAfterCommit(long newSnapshotId) throws IOException {
+        String maxSaveSnapshotsNumStr = 
tableOptions.get(SNAPSHOT_NUM_RETAINED_MAX.key());
+        int maxSaveSnapshotNum = SNAPSHOT_NUM_RETAINED_MAX.defaultValue();
+        if (!StringUtils.isBlank(maxSaveSnapshotsNumStr)) {
+            maxSaveSnapshotNum = Integer.parseInt(maxSaveSnapshotsNumStr);
+        }
+        long latestSnapshotIdAfterCommit =
+                
Optional.ofNullable(snapshotManager.latestSnapshotIdWithOutHint())
+                        .orElse(Snapshot.FIRST_SNAPSHOT_ID);
+        long waterMark =
+                latestSnapshotIdAfterCommit > maxSaveSnapshotNum
+                        ? latestSnapshotIdAfterCommit - maxSaveSnapshotNum
+                        : -1;
+        fastFailIfDirtyCommit(newSnapshotId, waterMark, 
latestSnapshotIdAfterCommit);

Review Comment:
   Here we need to add the logic of whether the snapshot belongs to a branch or 
tag.PR 3787 provides this functionality, I'm not sure if I need to merge it 
with that PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@paimon.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to