pnowojski commented on a change in pull request #13466:
URL: https://github.com/apache/flink/pull/13466#discussion_r494911970
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
##########
@@ -443,38 +479,66 @@ public void testWatermark() throws Exception {
public void testWatermarkAndStreamStatusForwarding() throws Exception {
try (StreamTaskMailboxTestHarness<String> testHarness =
new
StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new,
BasicTypeInfo.STRING_TYPE_INFO)
+ .modifyExecutionConfig(config ->
config.enableObjectReuse())
.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
- .addInput(BasicTypeInfo.INT_TYPE_INFO,
2)
+ .addSourceInput(
+ new SourceOperatorFactory<>(
+ new
MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 2, true, true),
+
WatermarkStrategy.forGenerator(ctx -> new RecordToWatermarkGenerator())))
.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
.setupOutputForSingletonOperatorChain(new
MapToStringMultipleInputOperatorFactory(3))
.build()) {
ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
- long initialTime = 0L;
+ int initialTime = 0;
// test whether idle input channels are acknowledged
correctly when forwarding watermarks
testHarness.processElement(StreamStatus.IDLE, 0, 1);
- testHarness.processElement(StreamStatus.IDLE, 1, 1);
- testHarness.processElement(StreamStatus.IDLE, 2, 0);
testHarness.processElement(new Watermark(initialTime +
6), 0, 0);
- testHarness.processElement(new Watermark(initialTime +
6), 1, 0);
- testHarness.processElement(new Watermark(initialTime +
5), 2, 1); // this watermark should be advanced first
- testHarness.processElement(StreamStatus.IDLE, 2, 1); //
once this is acknowledged,
+ testHarness.processElement(new Watermark(initialTime +
5), 1, 1); // this watermark should be advanced first
+ testHarness.processElement(StreamStatus.IDLE, 1, 0); //
once this is acknowledged,
- expectedOutput.add(new Watermark(initialTime + 5));
// We don't expect to see Watermark(6) here because the
idle status of one
// input doesn't propagate to the other input. That is,
if input 1 is at WM 6 and input
// two was at WM 5 before going to IDLE then the output
watermark will not jump to WM 6.
+
+ // OPS, there is a known bug:
https://issues.apache.org/jira/browse/FLINK-18934
+ // that prevents this check from succeeding
(AbstractStreamOperator and AbstractStreamOperatorV2
+ // are ignoring StreamStatus), so those checks needs to
be commented out ...
+
+ //expectedOutput.add(new Watermark(initialTime + 5));
+ //assertThat(testHarness.getOutput(),
contains(expectedOutput.toArray()));
+
+ // and in as a temporary replacement we need this code
block:
+ {
+ // we wake up the source and emit watermark
+ addSourceRecords(testHarness, 1, initialTime +
5);
+ while (testHarness.processSingleStep()) {
+ }
Review comment:
Ok, I went a step further and I have dropped the old
`processIfAvailable` and `processWhileAvailable` and replaced them with
`processSingleStep`. Previously I was afraid that some tests might be relaying
on the previous behaviour, but apparently that's not the case:
https://github.com/apache/flink/pull/13466/commits/ceb02daced0ebde702daad7b68f220a5c231cced
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]