StefanRRichter commented on a change in pull request #8826:
[FLINK-12479][operators] Integrate StreamInputProcessor(s) with mailbox
URL: https://github.com/apache/flink/pull/8826#discussion_r298999540
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
##########
@@ -286,64 +334,17 @@ private void checkAndSetAvailable(int inputIndex) {
}
}
- /**
- * @return false if both of the inputs are finished, true otherwise.
- */
- private boolean waitForAvailableInput(InputSelection inputSelection)
- throws ExecutionException, InterruptedException, IOException {
-
- if (inputSelection.isALLMaskOf2()) {
- return waitForAvailableEitherInput();
- } else {
- waitForOneInput(
- (inputSelection.getInputMask() ==
InputSelection.FIRST.getInputMask()) ? input1 : input2);
- return true;
+ private CompletableFuture<?> isAnyInputAvailable() {
+ if (input1.isFinished() && input2.isFinished()) {
+ return AVAILABLE;
}
- }
-
- private boolean waitForAvailableEitherInput()
- throws ExecutionException, InterruptedException {
-
- CompletableFuture<?> future1 = input1.isFinished() ?
UNAVAILABLE : input1.isAvailable();
- CompletableFuture<?> future2 = input2.isFinished() ?
UNAVAILABLE : input2.isAvailable();
-
- if (future1 == UNAVAILABLE && future2 == UNAVAILABLE) {
- return false;
+ if (input1.isFinished()) {
+ return input2.isAvailable();
}
-
- // block to wait for a available input
- CompletableFuture.anyOf(future1, future2).get();
-
- if (future1.isDone()) {
- setAvailableInput(input1.getInputIndex());
+ if (input2.isFinished()) {
+ return input1.isAvailable();
}
- if (future2.isDone()) {
- setAvailableInput(input2.getInputIndex());
- }
-
- return true;
- }
-
- private void waitForOneInput(StreamTaskInput input)
- throws IOException, ExecutionException, InterruptedException {
-
- if (input.isFinished()) {
- throw new IOException("Could not read the finished
input: input" + (input.getInputIndex() + 1) + ".");
- }
-
- input.isAvailable().get();
- setAvailableInput(input.getInputIndex());
- }
-
- private boolean checkFinished() throws Exception {
- if (getInput(lastReadInputIndex).isFinished()) {
- synchronized (lock) {
-
operatorChain.endInput(getInputId(lastReadInputIndex));
- inputSelection = inputSelector.nextSelection();
- }
- }
-
- return input1.isFinished() && input2.isFinished();
+ return CompletableFuture.anyOf(input1.isAvailable(),
input2.isAvailable());
Review comment:
You could strongly consider checking if both returned `AVAILABLE` and then
also return `AVAILABLE`, and use the `anyOf` only if there is really something
to wait for. Some code might only check for the referencial equality and then
the method breaks the contract.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services