JingsongLi commented on code in PR #301:
URL: https://github.com/apache/flink-table-store/pull/301#discussion_r977245473
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java:
##########
@@ -182,26 +187,61 @@ public void commit(ManifestCommittable committable,
Map<String, String> properti
LOG.debug("Ready to commit\n" + committable.toString());
}
+ Long safeLatestSnapshotId = null;
+ List<ManifestEntry> entriesToCheck = new ArrayList<>();
+
List<ManifestEntry> appendChanges =
collectChanges(committable.newFiles(), FileKind.ADD);
+ List<ManifestEntry> compactChanges = new ArrayList<>();
+ compactChanges.addAll(collectChanges(committable.compactBefore(),
FileKind.DELETE));
+ compactChanges.addAll(collectChanges(committable.compactAfter(),
FileKind.ADD));
+
if (createEmptyCommit || !appendChanges.isEmpty()) {
+ // Optimization for common path.
+ // Step 1:
+ // Read manifest entries from changed partitions here and check
for conflicts.
+ // If there are no other jobs committing at the same time,
+ // we can skip conflict checking in tryCommit method.
+ // This optimization is mainly used to decrease the number of
times we read from files.
+ Long latestSnapshotId = snapshotManager.latestSnapshotId();
+ if (latestSnapshotId != null) {
+ // it is possible that some partitions only have compact
changes,
+ // so we need to contain all changes
+ entriesToCheck.addAll(
+ readAllEntriesFromChangedPartitions(
+ latestSnapshotId, appendChanges,
compactChanges));
+ entriesToCheck.addAll(appendChanges);
+ noConflictsOrFail(entriesToCheck);
Review Comment:
It is better to explain in the exception: When there will be conflict and
what should be done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]