pnowojski commented on a change in pull request #13466:
URL: https://github.com/apache/flink/pull/13466#discussion_r494867816



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
##########
@@ -726,15 +803,188 @@ public void processElement(StreamRecord<T> element) 
throws Exception {
         * Factory for {@link MapToStringMultipleInputOperator}.
         */
        protected static class MapToStringMultipleInputOperatorFactory extends 
AbstractStreamOperatorFactory<String> {
+               private final int numberOfInputs;
+
+               public MapToStringMultipleInputOperatorFactory(int 
numberOfInputs) {
+                       this.numberOfInputs = numberOfInputs;
+               }
+
                @Override
                public <T extends StreamOperator<String>> T 
createStreamOperator(StreamOperatorParameters<String> parameters) {
-                       return (T) new 
MapToStringMultipleInputOperator(parameters);
+                       return (T) new 
MapToStringMultipleInputOperator(parameters, numberOfInputs);
                }
 
                @Override
                public Class<? extends StreamOperator<String>> 
getStreamOperatorClass(ClassLoader classLoader) {
                        return MapToStringMultipleInputOperator.class;
                }
        }
+
+       static StreamTaskMailboxTestHarness<String> buildTestHarness() throws 
Exception {
+               return buildTestHarness(false);
+       }
+
+       static StreamTaskMailboxTestHarness<String> buildTestHarness(boolean 
unaligned) throws Exception {
+               return new 
StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, 
BasicTypeInfo.STRING_TYPE_INFO)
+                       .modifyExecutionConfig(config -> 
config.enableObjectReuse())
+                       .modifyStreamConfig(config -> 
config.setUnalignedCheckpointsEnabled(unaligned))
+                       .addInput(BasicTypeInfo.STRING_TYPE_INFO)
+                       .addSourceInput(
+                               new SourceOperatorFactory<>(
+                                       new MockSource(Boundedness.BOUNDED, 1),
+                                       WatermarkStrategy.noWatermarks()))
+                       .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
+                       .setupOutputForSingletonOperatorChain(new 
MapToStringMultipleInputOperatorFactory(3))
+                       .build();
+       }
+
+       static void addSourceRecords(
+                       StreamTaskMailboxTestHarness<String> testHarness,
+                       int sourceId,
+                       int... records) throws Exception {
+               OperatorID sourceOperatorID = getSourceOperatorID(testHarness, 
sourceId);
+
+               // Prepare the source split and assign it to the source reader.
+               MockSourceSplit split = new MockSourceSplit(0, 0, 
records.length);
+               for (int record : records) {
+                       split.addRecord(record);
+               }
+
+               // Assign the split to the source reader.
+               AddSplitEvent<MockSourceSplit> addSplitEvent =
+                       new AddSplitEvent<>(Collections.singletonList(split), 
new MockSourceSplitSerializer());
+
+               testHarness.getStreamTask().dispatchOperatorEvent(
+                       sourceOperatorID,
+                       new SerializedValue<>(addSplitEvent));
+       }
+
+       private static OperatorID 
getSourceOperatorID(StreamTaskMailboxTestHarness<String> testHarness, int 
sourceId) {
+               StreamConfig.InputConfig[] inputs = 
testHarness.getStreamTask().getConfiguration().getInputs(testHarness.getClass().getClassLoader());
+               StreamConfig.SourceInputConfig input = 
(StreamConfig.SourceInputConfig) inputs[sourceId];
+               return 
testHarness.getStreamTask().operatorChain.getSourceTaskInput(input).getOperatorID();
+       }
+
+       private void finishAddingRecords(StreamTaskMailboxTestHarness<String> 
testHarness, int sourceId) throws Exception {
+               testHarness.getStreamTask().dispatchOperatorEvent(
+                       getSourceOperatorID(testHarness, sourceId),
+                       new SerializedValue<>(new SourceEventWrapper(new 
MockNoMoreSplitsEvent())));
+       }
+
+       static class LifeCycleTrackingMapToStringMultipleInputOperator
+                       extends MapToStringMultipleInputOperator implements 
BoundedMultiInput {
+               public static final String OPEN = "MultipleInputOperator#open";
+               public static final String CLOSE = 
"MultipleInputOperator#close";
+               public static final String END_INPUT = 
"MultipleInputOperator#endInput";
+
+               private static final long serialVersionUID = 1L;
+
+               public 
LifeCycleTrackingMapToStringMultipleInputOperator(StreamOperatorParameters<String>
 parameters) {
+                       super(parameters, 3);
+               }
+
+               @Override
+               public void open() throws Exception {
+                       LIFE_CYCLE_EVENTS.add(OPEN);
+                       super.open();
+               }
+
+               @Override
+               public void close() throws Exception {
+                       LIFE_CYCLE_EVENTS.add(CLOSE);
+                       super.close();
+               }
+
+               @Override
+               public void endInput(int inputId) {
+                       LIFE_CYCLE_EVENTS.add(END_INPUT);
+               }
+       }
+
+       static class LifeCycleTrackingMapToStringMultipleInputOperatorFactory 
extends AbstractStreamOperatorFactory<String> {
+               @Override
+               public <T extends StreamOperator<String>> T 
createStreamOperator(StreamOperatorParameters<String> parameters) {
+                       return (T) new 
LifeCycleTrackingMapToStringMultipleInputOperator(parameters);
+               }
+
+               @Override
+               public Class<? extends StreamOperator<String>> 
getStreamOperatorClass(ClassLoader classLoader) {
+                       return 
LifeCycleTrackingMapToStringMultipleInputOperator.class;
+               }
+       }
+
+       static class LifeCycleTrackingMockSource extends MockSource {
+               public LifeCycleTrackingMockSource(Boundedness boundedness, int 
numSplits) {
+                       super(boundedness, numSplits);
+               }
+
+               @Override
+               public SourceReader<Integer, MockSourceSplit> 
createReader(SourceReaderContext readerContext) {
+                       LifeCycleTrackingMockSourceReader sourceReader = new 
LifeCycleTrackingMockSourceReader();
+                       createdReaders.add(sourceReader);
+                       return sourceReader;
+               }
+       }
+
+       static class LifeCycleTrackingMockSourceReader extends MockSourceReader 
{
+               public static final String START = "SourceReader#start";
+               public static final String CLOSE = "SourceReader#close";
+
+               @Override
+               public void start() {
+                       LIFE_CYCLE_EVENTS.add(START);
+                       super.start();
+               }
+
+               @Override
+               public InputStatus pollNext(ReaderOutput<Integer> sourceOutput) 
throws Exception {
+                       return super.pollNext(sourceOutput);

Review comment:
       Probably some left over of a previous version. Removed.

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
##########
@@ -443,38 +479,66 @@ public void testWatermark() throws Exception {
        public void testWatermarkAndStreamStatusForwarding() throws Exception {
                try (StreamTaskMailboxTestHarness<String> testHarness =
                                new 
StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, 
BasicTypeInfo.STRING_TYPE_INFO)
+                                       .modifyExecutionConfig(config -> 
config.enableObjectReuse())
                                        
.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
-                                       .addInput(BasicTypeInfo.INT_TYPE_INFO, 
2)
+                                       .addSourceInput(
+                                               new SourceOperatorFactory<>(
+                                                       new 
MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 2, true, true),
+                                                       
WatermarkStrategy.forGenerator(ctx -> new RecordToWatermarkGenerator())))
                                        
.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
                                        
.setupOutputForSingletonOperatorChain(new 
MapToStringMultipleInputOperatorFactory(3))
                                        .build()) {
                        ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
 
-                       long initialTime = 0L;
+                       int initialTime = 0;
 
                        // test whether idle input channels are acknowledged 
correctly when forwarding watermarks
                        testHarness.processElement(StreamStatus.IDLE, 0, 1);
-                       testHarness.processElement(StreamStatus.IDLE, 1, 1);
-                       testHarness.processElement(StreamStatus.IDLE, 2, 0);
                        testHarness.processElement(new Watermark(initialTime + 
6), 0, 0);
-                       testHarness.processElement(new Watermark(initialTime + 
6), 1, 0);
-                       testHarness.processElement(new Watermark(initialTime + 
5), 2, 1); // this watermark should be advanced first
-                       testHarness.processElement(StreamStatus.IDLE, 2, 1); // 
once this is acknowledged,
+                       testHarness.processElement(new Watermark(initialTime + 
5), 1, 1); // this watermark should be advanced first
+                       testHarness.processElement(StreamStatus.IDLE, 1, 0); // 
once this is acknowledged,
 
-                       expectedOutput.add(new Watermark(initialTime + 5));
                        // We don't expect to see Watermark(6) here because the 
idle status of one
                        // input doesn't propagate to the other input. That is, 
if input 1 is at WM 6 and input
                        // two was at WM 5 before going to IDLE then the output 
watermark will not jump to WM 6.
+
+                       // OPS, there is a known bug: 
https://issues.apache.org/jira/browse/FLINK-18934
+                       // that prevents this check from succeeding 
(AbstractStreamOperator and AbstractStreamOperatorV2
+                       // are ignoring StreamStatus), so those checks needs to 
be commented out ...
+
+                       //expectedOutput.add(new Watermark(initialTime + 5));
+                       //assertThat(testHarness.getOutput(), 
contains(expectedOutput.toArray()));
+
+                       // and in as a temporary replacement we need this code 
block:
+                       {
+                               // we wake up the source and emit watermark
+                               addSourceRecords(testHarness, 1, initialTime + 
5);
+                               while (testHarness.processSingleStep()) {
+                               }

Review comment:
       Ok, I went a step further and I have dropped the old 
`processIfAvailable` and `processWhileAvailable` and replaced them with 
`processSingleStep`. Previously I was afraid that some tests might be relaying 
on the previous behaviour, but apparently that's not the case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to