tsreaper commented on code in PR #224:
URL: https://github.com/apache/flink-table-store/pull/224#discussion_r923186715
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java:
##########
@@ -464,50 +479,88 @@ private boolean tryCommitOnce(
}
private void noConflictsOrFail(long snapshotId, List<ManifestEntry>
changes) {
- Set<ManifestEntry.Identifier> removedFiles =
- changes.stream()
- .filter(e -> e.kind().equals(FileKind.DELETE))
- .map(ManifestEntry::identifier)
- .collect(Collectors.toSet());
- if (removedFiles.isEmpty()) {
- // early exit for append only changes
- return;
- }
-
List<BinaryRowData> changedPartitions =
changes.stream()
.map(ManifestEntry::partition)
.distinct()
.collect(Collectors.toList());
+ List<ManifestEntry> currentEntries;
try {
- for (ManifestEntry entry :
+ currentEntries =
scan.withSnapshot(snapshotId)
.withPartitionFilter(changedPartitions)
.plan()
- .files()) {
- removedFiles.remove(entry.identifier());
- }
+ .files();
} catch (Throwable e) {
throw new RuntimeException("Cannot determine if conflicts exist.",
e);
}
+ noConflictsForDeletedFilesOrFail(currentEntries, changes);
+ noConflictsForLsmOrFail(currentEntries, changes);
+ }
+
+ private void noConflictsForDeletedFilesOrFail(
Review Comment:
This method will throw a more specific exception message. If we just use
`mergeManifestEntries` the exception will say "manifest files are corrupted"
which is actually not the case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]