StefanRRichter commented on a change in pull request #8826:
[FLINK-12479][operators] Integrate StreamInputProcessor(s) with mailbox
URL: https://github.com/apache/flink/pull/8826#discussion_r299003689
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
##########
@@ -286,64 +334,17 @@ private void checkAndSetAvailable(int inputIndex) {
}
}
- /**
- * @return false if both of the inputs are finished, true otherwise.
- */
- private boolean waitForAvailableInput(InputSelection inputSelection)
- throws ExecutionException, InterruptedException, IOException {
-
- if (inputSelection.isALLMaskOf2()) {
- return waitForAvailableEitherInput();
- } else {
- waitForOneInput(
- (inputSelection.getInputMask() ==
InputSelection.FIRST.getInputMask()) ? input1 : input2);
- return true;
+ private CompletableFuture<?> isAnyInputAvailable() {
+ if (input1.isFinished() && input2.isFinished()) {
+ return AVAILABLE;
}
- }
-
- private boolean waitForAvailableEitherInput()
- throws ExecutionException, InterruptedException {
-
- CompletableFuture<?> future1 = input1.isFinished() ?
UNAVAILABLE : input1.isAvailable();
- CompletableFuture<?> future2 = input2.isFinished() ?
UNAVAILABLE : input2.isAvailable();
-
- if (future1 == UNAVAILABLE && future2 == UNAVAILABLE) {
- return false;
+ if (input1.isFinished()) {
+ return input2.isAvailable();
}
-
- // block to wait for a available input
- CompletableFuture.anyOf(future1, future2).get();
-
- if (future1.isDone()) {
- setAvailableInput(input1.getInputIndex());
+ if (input2.isFinished()) {
+ return input1.isAvailable();
}
- if (future2.isDone()) {
- setAvailableInput(input2.getInputIndex());
- }
-
- return true;
- }
-
- private void waitForOneInput(StreamTaskInput input)
- throws IOException, ExecutionException, InterruptedException {
-
- if (input.isFinished()) {
- throw new IOException("Could not read the finished
input: input" + (input.getInputIndex() + 1) + ".");
- }
-
- input.isAvailable().get();
- setAvailableInput(input.getInputIndex());
- }
-
- private boolean checkFinished() throws Exception {
- if (getInput(lastReadInputIndex).isFinished()) {
- synchronized (lock) {
-
operatorChain.endInput(getInputId(lastReadInputIndex));
- inputSelection = inputSelector.nextSelection();
- }
- }
-
- return input1.isFinished() && input2.isFinished();
+ return CompletableFuture.anyOf(input1.isAvailable(),
input2.isAvailable());
Review comment:
I would suggest this instead:
<code>
private CompletableFuture<?> isAnyInputAvailable() {
if (input1.isFinished()) {
return input2.isFinished() ? AVAILABLE :
input2.isAvailable();
}
if (input2.isFinished()) {
return input1.isAvailable();
}
CompletableFuture<?> input1Available = input1.isAvailable();
CompletableFuture<?> input2Available = input2.isAvailable();
return (input1Available == AVAILABLE || input2Available ==
AVAILABLE) ?
AVAILABLE : CompletableFuture.anyOf(input1Available,
input2Available);
}
<code>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services