dawidwys commented on a change in pull request #13529:
URL: https://github.com/apache/flink/pull/13529#discussion_r501526829
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
##########
@@ -23,16 +23,12 @@
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
Review comment:
I had another look at the issue and now that I understand a bit better
the assumptions on `getAvailability` I think the current code is ok % maybe we
should change `StreamTwoInputProcessor#L206` to:
```
if (status == InputStatus.MORE_AVAILABLE || (status !=
InputStatus.END_OF_INPUT && (input.isApproximatelyAvailable() ||
input.isAvailable()))) {
```
My problem was that I did not use the `AvailabilityProvider.AVAILABLE` at
all. My code was something like:
```
class MyInput {
CompletableFuture<?> availabilityMarker = ...
void executeSomeAsyncOperation() {
availabilityMarker = new CompletableFuture();
executeAsync(this::asyncOperation);
}
void asyncOperation() {
....
availabilityMarker.complete(null);
}
CompletableFuture<?> getAvailability() {
return availabilityProvider;
}
}
```
This won't work because the default implementation of
`isApproximatelyAvailable()` is never true. I would have to change:
```
void asyncOperation() {
....
CompletableFuture<?> tmp = availabilityMarker;
availabilityMarker = AvailabilityProvider.AVAILABLE;
tmp.complete(null);
}
```
This is an implicit, not obvious contract that I was unaware of. The change
I suggested for `StreamTwoInputProcessor#L206` would probably make it work for
my old version, but I think it is not strictly necessary. On the other hand
some additional documentation or a walkthrough example could be helpful here,
especially as we are exposing this feature in user facing APIs.
I will remove this commit. Sorry for the confusion.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]