rkhachatryan commented on a change in pull request #19050:
URL: https://github.com/apache/flink/pull/19050#discussion_r824453024
##########
File path:
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
##########
@@ -204,22 +205,25 @@ public SequenceNumber nextSequenceNumber() {
SequenceNumberRange range = SequenceNumberRange.generic(from,
activeSequenceNumber);
if (range.size() == readyToReturn.size()) {
checkState(toUpload.isEmpty());
- return completedFuture(buildHandle(keyGroupRange,
readyToReturn, 0L));
+ future.complete(buildHandle(keyGroupRange, readyToReturn, 0L));
} else {
- CompletableFuture<ChangelogStateHandleStreamImpl> future =
- new CompletableFuture<>();
uploadCompletionListeners.add(
new UploadCompletionListener(keyGroupRange, range,
readyToReturn, future));
if (!toUpload.isEmpty()) {
- uploader.upload(
+ uploadTask =
new UploadTask(
toUpload.values(),
this::handleUploadSuccess,
- this::handleUploadFailure));
+ this::handleUploadFailure);
}
- return future;
}
}
+ // upload outside the synchronized block to prevent deadlock: capacity
might be needed,
+ // but upload threads need to acquire lock in completion callbacks
+ if (uploadTask != null) {
+ uploader.upload(uploadTask);
+ }
Review comment:
Thanks for your input, but I'll try to disagree:
`uploader.upload` is a blocking by design: it must provide "hard"
back-pressure in case if "soft" back-pressure (exerted by
"AvailabilityProvider") is not enough. Therefore, this call can not be
asynchronous unfortunately.
Probably, the wording of the comment exposes unncessecary implementation
details about the capacity (I'm not sure, because these classes are closely
related).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]