BsoBird commented on code in PR #3805: URL: https://github.com/apache/paimon/pull/3805#discussion_r1689184744
########## paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java: ########## @@ -1274,4 +1308,101 @@ static ConflictCheck noConflictCheck() { public static ConflictCheck mustConflictCheck() { return latestSnapshot -> true; } + + boolean doCommit( + long newSnapshotId, + Path newSnapshotPath, + Snapshot newSnapshot, + boolean useSerializableIsolation, + boolean useLock) { + try { + boolean committed = tryAtomicCommitNewSnapshot(newSnapshotPath, newSnapshot); + if (committed) { + if (useSerializableIsolation && !useLock) { + // If the useSerializableIsolation is turned on,And we not use lock service, + // we will remove all HINTs,as it may provide incorrect information. + snapshotManager.removeSnapshotEarliestHint(); + snapshotManager.removeSnapshotLatestHint(); + snapshotManager.removeChangeLogEarliestHint(); + snapshotManager.removeChangeLogLatestHint(); + } else { + snapshotManager.commitLatestHint(newSnapshotId); + } + } + return committed; + } catch (Exception e) { + throw new CommitStateUnknownException(e.getMessage(), e); + } + } + + boolean tryAtomicCommitNewSnapshot(Path newSnapshotPath, Snapshot newSnapshot) + throws IOException { + return fileIO.tryToWriteAtomic(newSnapshotPath, newSnapshot.toJson()); + } + + void checkBeforeCommit(long newSnapshotId) throws IOException { + long latestSnapshotIdBeforeCommit = + Optional.ofNullable(snapshotManager.latestSnapshotIdWithOutHint()) + .orElse(Snapshot.FIRST_SNAPSHOT_ID - 1); + boolean currentCommitIsLatest = + Objects.equals(newSnapshotId, latestSnapshotIdBeforeCommit + 1); + if (!currentCommitIsLatest) { + snapshotManager.removeSnapshotLatestHint(); + snapshotManager.removeSnapshotEarliestHint(); + throw new CommitFailedException( + String.format( + "Can't submit snapshotId [%s], because it is expected that snapshotId [%s] should be submitted now.", + newSnapshotId, latestSnapshotIdBeforeCommit + 1)); + } + } + + void checkAfterCommit(long newSnapshotId) throws IOException { + String maxSaveSnapshotsNumStr = tableOptions.get(SNAPSHOT_NUM_RETAINED_MAX.key()); + int maxSaveSnapshotNum = SNAPSHOT_NUM_RETAINED_MAX.defaultValue(); + if (!StringUtils.isBlank(maxSaveSnapshotsNumStr)) { + maxSaveSnapshotNum = Integer.parseInt(maxSaveSnapshotsNumStr); + } + long latestSnapshotIdAfterCommit = + Optional.ofNullable(snapshotManager.latestSnapshotIdWithOutHint()) + .orElse(Snapshot.FIRST_SNAPSHOT_ID); + long waterMark = + latestSnapshotIdAfterCommit > maxSaveSnapshotNum + ? latestSnapshotIdAfterCommit - maxSaveSnapshotNum + : -1; + fastFailIfDirtyCommit(newSnapshotId, waterMark, latestSnapshotIdAfterCommit); Review Comment: Here we need to add the logic of whether the snapshot belongs to a branch or tag.PR 3787 provides this functionality, I'm not sure if I need to merge it with that PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@paimon.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org