mxm commented on code in PR #20215:
URL: https://github.com/apache/flink/pull/20215#discussion_r947885185
##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
reader.start();
currentSourceIndex = index;
currentReader = reader;
- currentReader
- .isAvailable()
- .whenComplete(
- (result, ex) -> {
- if (ex == null) {
- availabilityFuture.complete(result);
- } else {
- availabilityFuture.completeExceptionally(ex);
- }
- });
Review Comment:
Thank you for elaborating. I was new to the code, apologies for not getting
the switch case right. I was under the assumption that the underlying reader
would complete its future on close when we switch. In that case synchronization
wouldn't have been an issue. But that isn't the case.
I had another look and came up with this which should do the trick:
https://github.com/apache/flink/compare/master...mxm:flink:FLINK-27479?expand=1
It's conceptually close to your code but it avoids using the helper class which
in my eyes makes it much easier to understand.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/MultipleFuturesAvailabilityHelper.java:
##########
@@ -57,13 +57,17 @@ public CompletableFuture<?> getAvailableFuture() {
return availableFuture;
}
+ public int getSize() {
+ return futuresToCombine.length;
+ }
+
public void resetToUnAvailable() {
if (availableFuture.isDone()) {
availableFuture = new CompletableFuture<>();
}
}
- private void notifyCompletion() {
+ public void notifyCompletion() {
Review Comment:
Wouldn't changing the modifier here to `public` break the class contract,
which is to only ever complete the `availableFuture` returned via
`getAvailableFuture` when the combined futures inserted via `anyOf` complete?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]