JingsongLi commented on code in PR #301:
URL: https://github.com/apache/flink-table-store/pull/301#discussion_r977242850
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java:
##########
@@ -479,32 +526,39 @@ private boolean tryCommitOnce(
return false;
}
- private void noConflictsOrFail(long snapshotId, List<ManifestEntry>
changes) {
+ @SafeVarargs
+ private final List<ManifestEntry> readAllEntriesFromChangedPartitions(
+ long snapshotId, List<ManifestEntry>... changes) {
List<BinaryRowData> changedPartitions =
- changes.stream()
+ Arrays.stream(changes)
+ .flatMap(Collection::stream)
.map(ManifestEntry::partition)
.distinct()
.collect(Collectors.toList());
- List<ManifestEntry> allEntries;
try {
- allEntries =
- new ArrayList<>(
- scan.withSnapshot(snapshotId)
- .withPartitionFilter(changedPartitions)
- .plan()
- .files());
+ return scan.withSnapshot(snapshotId)
+ .withPartitionFilter(changedPartitions)
+ .plan()
+ .files();
} catch (Throwable e) {
- throw new RuntimeException("Cannot determine if conflicts exist.",
e);
+ throw new RuntimeException("Cannot read manifest entries from
changed partitions.", e);
}
+ }
+
+ private void noConflictsOrFail(long snapshotId, List<ManifestEntry>
changes) {
+ List<ManifestEntry> allEntries =
+ new
ArrayList<>(readAllEntriesFromChangedPartitions(snapshotId, changes));
allEntries.addAll(changes);
+ noConflictsOrFail(allEntries);
+ }
+ private void noConflictsOrFail(List<ManifestEntry> allEntries) {
Collection<ManifestEntry> mergedEntries;
try {
// merge manifest entries and also check if the files we want to
delete are still there
mergedEntries = ManifestEntry.mergeManifestEntries(allEntries);
} catch (Throwable e) {
- throw new RuntimeException(
- "File deletion conflicts detected! Give up committing
compact changes.", e);
+ throw new RuntimeException("File deletion conflicts detected! Give
up committing.", e);
Review Comment:
I think we can ignore exception from `mergeManifestEntries` here.
We can explain there are concurrent writing and print to change detail.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]