AHeise commented on a change in pull request #11467: [FLINK-16317][operators] 
Provide support for watermarks, key selector and latency marker in 
MultipleInputStreamOperator
URL: https://github.com/apache/flink/pull/11467#discussion_r396984936
 
 

 ##########
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
 ##########
 @@ -355,8 +377,235 @@ public void testInputFairness() throws Exception {
                }
        }
 
-       // This must only be used in one test, otherwise the static fields will 
be changed
-       // by several tests concurrently
+       /**
+        * This test verifies that watermarks and stream statuses are correctly 
forwarded. This also checks whether
+        * watermarks are forwarded only when we have received watermarks from 
all inputs. The
+        * forwarded watermark must be the minimum of the watermarks of all 
active inputs.
+        */
+       @Test
+       public void testWatermarkAndStreamStatusForwarding() throws Exception {
+               try (StreamTaskMailboxTestHarness<String> testHarness =
+                                new 
MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, 
BasicTypeInfo.STRING_TYPE_INFO)
+                                        
.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
+                                        .addInput(BasicTypeInfo.INT_TYPE_INFO, 
2)
+                                        
.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
+                                        
.setupOutputForSingletonOperatorChain(new 
MapToStringMultipleInputOperatorFactory())
+                                        .build()) {
+                       ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
+
+                       long initialTime = 0L;
+
+                       testHarness.processElement(new Watermark(initialTime), 
0, 0);
+                       testHarness.processElement(new Watermark(initialTime), 
0, 1);
+                       testHarness.processElement(new Watermark(initialTime), 
1, 0);
+                       testHarness.processElement(new Watermark(initialTime), 
1, 1);
+
+                       testHarness.processElement(new Watermark(initialTime), 
2, 0);
+
+                       assertThat(testHarness.getOutput(), 
IsEmptyCollection.empty());
+
+                       testHarness.processElement(new Watermark(initialTime), 
2, 1);
+
+                       // now the watermark should have propagated, Map simply 
forward Watermarks
+                       expectedOutput.add(new Watermark(initialTime));
+                       assertThat(testHarness.getOutput(), 
contains(expectedOutput.toArray()));
+
+                       // contrary to checkpoint barriers these elements are 
not blocked by watermarks
+                       testHarness.processElement(new StreamRecord<>("Hello", 
initialTime), 0, 0);
+                       testHarness.processElement(new StreamRecord<>(42, 
initialTime), 1, 1);
+                       expectedOutput.add(new StreamRecord<>("Hello", 
initialTime));
+                       expectedOutput.add(new StreamRecord<>("42", 
initialTime));
+
+                       assertThat(testHarness.getOutput(), 
contains(expectedOutput.toArray()));
+
+                       testHarness.processElement(new Watermark(initialTime + 
4), 0, 0);
+                       testHarness.processElement(new Watermark(initialTime + 
3), 0, 1);
+                       testHarness.processElement(new Watermark(initialTime + 
3), 1, 0);
+                       testHarness.processElement(new Watermark(initialTime + 
4), 1, 1);
+                       testHarness.processElement(new Watermark(initialTime + 
3), 2, 0);
+                       testHarness.processElement(new Watermark(initialTime + 
2), 2, 1);
+
+                       // check whether we get the minimum of all the 
watermarks, this must also only occur in
+                       // the output after the two StreamRecords
+                       expectedOutput.add(new Watermark(initialTime + 2));
+                       assertThat(testHarness.getOutput(), 
contains(expectedOutput.toArray()));
+
+                       // advance watermark from one of the inputs, now we 
should get a new one since the
+                       // minimum increases
+                       testHarness.processElement(new Watermark(initialTime + 
4), 2, 1);
+                       expectedOutput.add(new Watermark(initialTime + 3));
+                       assertThat(testHarness.getOutput(), 
contains(expectedOutput.toArray()));
+
+                       // advance the other two inputs, now we should get a 
new one since the
+                       // minimum increases again
+                       testHarness.processElement(new Watermark(initialTime + 
4), 0, 1);
+                       testHarness.processElement(new Watermark(initialTime + 
4), 1, 0);
+                       testHarness.processElement(new Watermark(initialTime + 
4), 2, 0);
+                       expectedOutput.add(new Watermark(initialTime + 4));
+                       assertThat(testHarness.getOutput(), 
contains(expectedOutput.toArray()));
+
+                       // test whether idle input channels are acknowledged 
correctly when forwarding watermarks
+                       testHarness.processElement(StreamStatus.IDLE, 0, 1);
+                       testHarness.processElement(StreamStatus.IDLE, 1, 1);
+                       testHarness.processElement(StreamStatus.IDLE, 2, 0);
+                       testHarness.processElement(new Watermark(initialTime + 
6), 0, 0);
+                       testHarness.processElement(new Watermark(initialTime + 
6), 1, 0);
+                       testHarness.processElement(new Watermark(initialTime + 
5), 2, 1); // this watermark should be advanced first
+                       testHarness.processElement(StreamStatus.IDLE, 2, 1); // 
once this is acknowledged,
+
+                       // watermark (initial + 6) should be forwarded
 
 Review comment:
   comment is contradicting assertion

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to