AHeise commented on a change in pull request #11467: [FLINK-16317][operators] 
Provide support for watermarks, key selector and latency marker in 
MultipleInputStreamOperator
URL: https://github.com/apache/flink/pull/11467#discussion_r396985318
 
 

 ##########
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
 ##########
 @@ -355,8 +377,235 @@ public void testInputFairness() throws Exception {
                }
        }
 
-       // This must only be used in one test, otherwise the static fields will 
be changed
-       // by several tests concurrently
+       /**
+        * This test verifies that watermarks and stream statuses are correctly 
forwarded. This also checks whether
+        * watermarks are forwarded only when we have received watermarks from 
all inputs. The
+        * forwarded watermark must be the minimum of the watermarks of all 
active inputs.
+        */
+       @Test
+       public void testWatermarkAndStreamStatusForwarding() throws Exception {
+               try (StreamTaskMailboxTestHarness<String> testHarness =
+                                new 
MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, 
BasicTypeInfo.STRING_TYPE_INFO)
+                                        
.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
+                                        .addInput(BasicTypeInfo.INT_TYPE_INFO, 
2)
+                                        
.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
+                                        
.setupOutputForSingletonOperatorChain(new 
MapToStringMultipleInputOperatorFactory())
+                                        .build()) {
+                       ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
+
+                       long initialTime = 0L;
+
+                       testHarness.processElement(new Watermark(initialTime), 
0, 0);
+                       testHarness.processElement(new Watermark(initialTime), 
0, 1);
+                       testHarness.processElement(new Watermark(initialTime), 
1, 0);
+                       testHarness.processElement(new Watermark(initialTime), 
1, 1);
+
+                       testHarness.processElement(new Watermark(initialTime), 
2, 0);
+
+                       assertThat(testHarness.getOutput(), 
IsEmptyCollection.empty());
+
+                       testHarness.processElement(new Watermark(initialTime), 
2, 1);
+
+                       // now the watermark should have propagated, Map simply 
forward Watermarks
+                       expectedOutput.add(new Watermark(initialTime));
+                       assertThat(testHarness.getOutput(), 
contains(expectedOutput.toArray()));
+
+                       // contrary to checkpoint barriers these elements are 
not blocked by watermarks
+                       testHarness.processElement(new StreamRecord<>("Hello", 
initialTime), 0, 0);
+                       testHarness.processElement(new StreamRecord<>(42, 
initialTime), 1, 1);
+                       expectedOutput.add(new StreamRecord<>("Hello", 
initialTime));
+                       expectedOutput.add(new StreamRecord<>("42", 
initialTime));
+
+                       assertThat(testHarness.getOutput(), 
contains(expectedOutput.toArray()));
+
+                       testHarness.processElement(new Watermark(initialTime + 
4), 0, 0);
+                       testHarness.processElement(new Watermark(initialTime + 
3), 0, 1);
+                       testHarness.processElement(new Watermark(initialTime + 
3), 1, 0);
+                       testHarness.processElement(new Watermark(initialTime + 
4), 1, 1);
+                       testHarness.processElement(new Watermark(initialTime + 
3), 2, 0);
+                       testHarness.processElement(new Watermark(initialTime + 
2), 2, 1);
+
+                       // check whether we get the minimum of all the 
watermarks, this must also only occur in
+                       // the output after the two StreamRecords
+                       expectedOutput.add(new Watermark(initialTime + 2));
+                       assertThat(testHarness.getOutput(), 
contains(expectedOutput.toArray()));
+
+                       // advance watermark from one of the inputs, now we 
should get a new one since the
+                       // minimum increases
+                       testHarness.processElement(new Watermark(initialTime + 
4), 2, 1);
+                       expectedOutput.add(new Watermark(initialTime + 3));
+                       assertThat(testHarness.getOutput(), 
contains(expectedOutput.toArray()));
+
+                       // advance the other two inputs, now we should get a 
new one since the
+                       // minimum increases again
+                       testHarness.processElement(new Watermark(initialTime + 
4), 0, 1);
+                       testHarness.processElement(new Watermark(initialTime + 
4), 1, 0);
+                       testHarness.processElement(new Watermark(initialTime + 
4), 2, 0);
+                       expectedOutput.add(new Watermark(initialTime + 4));
+                       assertThat(testHarness.getOutput(), 
contains(expectedOutput.toArray()));
+
+                       // test whether idle input channels are acknowledged 
correctly when forwarding watermarks
+                       testHarness.processElement(StreamStatus.IDLE, 0, 1);
 
 Review comment:
   Idle tests should be a separate test.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to