pnowojski commented on a change in pull request #13351:
URL: https://github.com/apache/flink/pull/13351#discussion_r499521881
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java
##########
@@ -197,22 +198,21 @@ public void testInputStarvation() throws Exception {
testHarness.processElement(new StreamRecord<>("3"), 1);
testHarness.processElement(new StreamRecord<>("4"), 1);
- testHarness.processSingleStep();
expectedOutput.add(new StreamRecord<>("[2]: 1"));
- testHarness.processSingleStep();
expectedOutput.add(new StreamRecord<>("[2]: 2"));
- assertThat(testHarness.getOutput(),
contains(expectedOutput.toArray()));
+ testHarness.processAll();
+ assertEquals(expectedOutput, new
ArrayList<>(testHarness.getOutput()).subList(0, expectedOutput.size()));
Review comment:
This test is now not doing what it was intended.
Now you are processing all elements from the input gate `1` before
`testHarness.processElement(new StreamRecord<>("1"), 2);` (L207/206) is being
enqueued to input gate `2`.
I would guess that
```
// to avoid starvation, if the input selection is ALL and
availableInputsMask is not ALL,
// always try to check and set the availability of another input
if (inputSelectionHandler.shouldSetAvailableForAnotherInput()) {
fullCheckAndSetAvailable();
}
```
check from `StreamMultipleInputProcessor#selectNextReadingInputIndex` is
currently not tested.
The intention behind this test is:
1. to have a long (just as well could be infinite) backlog of records to
process on one of the inputs
2. introduce the availability change on the second input, and make sure it's
checked/respected (instead of hot looping on the first input)
3. also throw in a third not selected input just to spice things a little bit
Why did you have to change this test?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]