Repository: flink Updated Branches: refs/heads/master 02410324b -> 66305135b
http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java new file mode 100644 index 0000000..564901f --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.streamstatus; + +import org.apache.flink.streaming.api.watermark.Watermark; +import org.junit.Test; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link StatusWatermarkValve}. While tests in {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest} + * and {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest} may also implicitly test {@link StatusWatermarkValve} + * and that valves are correctly used in the tasks' input processors, the unit tests here additionally makes sure that + * the watermarks and stream statuses to forward are generated from the valve at the exact correct times and in a + * deterministic behaviour. The unit tests here also test more complex stream status / watermark input cases. + * + * <p> + * The tests are performed by a series of watermark and stream status inputs to the valve. On every input method call, + * the output is checked to contain only the expected watermark or stream status, and nothing else. This ensures that + * no redundant outputs are generated by the output logic of {@link StatusWatermarkValve}. The behaviours that a series of + * input calls to the valve is trying to test is explained as inline comments within the tests. + */ +public class StatusWatermarkValveTest { + + /** + * Tests that all input channels of a valve start as ACTIVE stream status. + */ + @Test + public void testAllInputChannelsStartAsActive() { + BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler(); + StatusWatermarkValve valve = new StatusWatermarkValve(4, valveOutput); + + // ------------------------------------------------------------------------ + // Ensure that the valve will output an IDLE stream status as soon as + // all input channels become IDLE; this also implicitly ensures that + // all input channels start as ACTIVE. + // ------------------------------------------------------------------------ + + valve.inputStreamStatus(StreamStatus.IDLE, 3); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputStreamStatus(StreamStatus.IDLE, 0); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputStreamStatus(StreamStatus.IDLE, 1); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputStreamStatus(StreamStatus.IDLE, 2); + assertEquals(StreamStatus.IDLE, valveOutput.popLastOutputStreamStatus()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + } + + /** + * Tests that valves work as expected when they handle only 1 input channel. + * Tested behaviours are explained as inline comments. + */ + @Test + public void testOneInputValve() { + BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler(); + StatusWatermarkValve valve = new StatusWatermarkValve(1, valveOutput); + + // start off with an ACTIVE status; since the valve should initially start as ACTIVE, + // no state change is toggled, therefore no stream status should be emitted + valve.inputStreamStatus(StreamStatus.ACTIVE, 0); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // input some monotonously increasing watermarks while ACTIVE; + // the exact same watermarks should be emitted right after the inputs + valve.inputWatermark(new Watermark(0), 0); + assertEquals(new Watermark(0), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(25), 0); + assertEquals(new Watermark(25), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // decreasing watermarks should not result in any output + valve.inputWatermark(new Watermark(18), 0); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(42), 0); + assertEquals(new Watermark(42), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // toggling ACTIVE to IDLE should result in an IDLE stream status output + valve.inputStreamStatus(StreamStatus.IDLE, 0); + assertEquals(StreamStatus.IDLE, valveOutput.popLastOutputStreamStatus()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // watermark inputs should be ignored while all input channels (only 1 in this case) are IDLE + valve.inputWatermark(new Watermark(52), 0); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(60), 0); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // no status change toggle while IDLE should result in stream status outputs + valve.inputStreamStatus(StreamStatus.IDLE, 0); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // toggling IDLE to ACTIVE should result in an ACTIVE stream status output + valve.inputStreamStatus(StreamStatus.ACTIVE, 0); + assertEquals(StreamStatus.ACTIVE, valveOutput.popLastOutputStreamStatus()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // the valve should remember the last watermark input channels received while they were ACTIVE (which was 42); + // decreasing watermarks should therefore still be ignored, even after a status toggle + valve.inputWatermark(new Watermark(40), 0); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // monotonously increasing watermarks after resuming to be ACTIVE should be output normally + valve.inputWatermark(new Watermark(68), 0); + assertEquals(new Watermark(68), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(72), 0); + assertEquals(new Watermark(72), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + } + + /** + * Tests that valves work as expected when they handle multiple input channels (tested with 3). + * Tested behaviours are explained as inline comments. + */ + @Test + public void testMultipleInputValve() { + BufferedValveOutputHandler valveOutput = new BufferedValveOutputHandler(); + StatusWatermarkValve valve = new StatusWatermarkValve(3, valveOutput); + + // ------------------------------------------------------------------------ + // Ensure that watermarks are output only when all + // channels have been input some watermark. + // ------------------------------------------------------------------------ + + valve.inputWatermark(new Watermark(0), 0); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(0), 1); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(0), 2); + assertEquals(new Watermark(0), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // ------------------------------------------------------------------------ + // Ensure that watermarks are output as soon as the overall min + // watermark across all channels have advanced. + // ------------------------------------------------------------------------ + + valve.inputWatermark(new Watermark(12), 0); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(8), 2); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(10), 2); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(15), 1); + // lowest watermark across all channels is now channel 2, with watermark @ 10 + assertEquals(new Watermark(10), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // ------------------------------------------------------------------------ + // Ensure that decreasing watermarks are ignored + // ------------------------------------------------------------------------ + + valve.inputWatermark(new Watermark(6), 0); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // ------------------------------------------------------------------------ + // Ensure that when some input channel becomes idle, that channel will + // no longer be accounted for when advancing the watermark. + // ------------------------------------------------------------------------ + + // marking channel 2 as IDLE shouldn't result in overall status toggle for the valve, + // because there are still other active channels (0 and 1), so there should not be any + // stream status outputs; + // also, now that channel 2 is IDLE, the overall min watermark is 12 (from channel 0), + // so the valve should output that + valve.inputStreamStatus(StreamStatus.IDLE, 2); + assertEquals(new Watermark(12), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // from now on, since channel 2 is IDLE, the valve should use watermarks only from + // channel 0 and 1 to find the min watermark, even if channel 2 has the lowest watermark (10) + valve.inputWatermark(new Watermark(17), 0); + assertEquals(new Watermark(15), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(25), 0); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(20), 1); + assertEquals(new Watermark(20), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // ------------------------------------------------------------------------ + // Ensure that after some channel resumes to be ACTIVE, it needs to + // catch up" with the current overall min watermark before it can be + // accounted for again when finding the min watermark across channels. + // Also tests that before the resumed channel catches up, the overall + // min watermark can still advance with watermarks of other channels. + // ------------------------------------------------------------------------ + + // resuming channel 2 to be ACTIVE shouldn't result in overall status toggle for the valve, + // because the valve wasn't overall IDLE, so there should not be any stream status outputs; + valve.inputStreamStatus(StreamStatus.ACTIVE, 2); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // although watermarks for channel 2 will now be accepted, it still + // hasn't caught up with the overall min watermark (20) + valve.inputWatermark(new Watermark(18), 2); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // since channel 2 hasn't caught up yet, it is still ignored when advancing new min watermarks + valve.inputWatermark(new Watermark(22), 1); + assertEquals(new Watermark(22), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(28), 0); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(33), 1); + assertEquals(new Watermark(28), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // now, channel 2 has caught up with the overall min watermark + valve.inputWatermark(new Watermark(30), 2); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(31), 0); + // this acknowledges that channel 2's watermark is being accounted for again + assertEquals(new Watermark(30), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(34), 2); + assertEquals(new Watermark(31), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // ------------------------------------------------------------------------ + // Ensure that once all channels are IDLE, the valve should also + // determine itself to be IDLE output a IDLE stream status + // ------------------------------------------------------------------------ + + valve.inputStreamStatus(StreamStatus.IDLE, 0); + // this is because once channel 0 becomes IDLE, + // the new min watermark will be 33 (channel 1) + assertEquals(new Watermark(33), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputStreamStatus(StreamStatus.IDLE, 2); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputStreamStatus(StreamStatus.IDLE, 1); + assertEquals(StreamStatus.IDLE, valveOutput.popLastOutputStreamStatus()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // ------------------------------------------------------------------------ + // Ensure that channels gradually become ACTIVE again, the above behaviours + // still hold. Also ensure that as soon as one of the input channels + // become ACTIVE, the valve is ACTIVE again and outputs an ACTIVE stream status. + // ------------------------------------------------------------------------ + + // let channel 0 resume to be ACTIVE + valve.inputStreamStatus(StreamStatus.ACTIVE, 0); + assertEquals(StreamStatus.ACTIVE, valveOutput.popLastOutputStreamStatus()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // channel 0 is the only ACTIVE channel now, and is the only channel + // accounted for when advancing min watermark + valve.inputWatermark(new Watermark(36), 0); + assertEquals(new Watermark(36), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // new also let channel 1 become ACTIVE + valve.inputStreamStatus(StreamStatus.ACTIVE, 1); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // channel 1 is still behind overall min watermark + valve.inputWatermark(new Watermark(35), 1); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // since channel 1 is still behind, channel 0 remains to be the only + // channel used to advance min watermark + valve.inputWatermark(new Watermark(37), 0); + assertEquals(new Watermark(37), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + // now, channel 1 has caught up with the overall min watermark + valve.inputWatermark(new Watermark(38), 1); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + + valve.inputWatermark(new Watermark(40), 0); + // this acknowledges that channel 1's watermark is being accounted for again + assertEquals(new Watermark(38), valveOutput.popLastOutputWatermark()); + assertTrue(valveOutput.hasNoOutputWatermarks()); + assertTrue(valveOutput.hasNoOutputStreamStatuses()); + } + + private class BufferedValveOutputHandler implements StatusWatermarkValve.ValveOutputHandler { + private BlockingQueue<Watermark> outputWatermarks = new LinkedBlockingQueue<>(); + private BlockingQueue<StreamStatus> outputStreamStatuses = new LinkedBlockingQueue<>(); + + @Override + public void handleWatermark(Watermark watermark) { + outputWatermarks.add(watermark); + } + + @Override + public void handleStreamStatus(StreamStatus streamStatus) { + outputStreamStatuses.add(streamStatus); + } + + public Watermark popLastOutputWatermark() { + return outputWatermarks.poll(); + } + + public StreamStatus popLastOutputStreamStatus() { + return outputStreamStatuses.poll(); + } + + public boolean hasNoOutputWatermarks() { + return outputWatermarks.size() == 0; + } + + public boolean hasNoOutputStreamStatuses() { + return outputStreamStatuses.size() == 0; + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java new file mode 100644 index 0000000..247dc8b --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.streamstatus; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +public class StreamStatusTest { + + @Test (expected = IllegalArgumentException.class) + public void testIllegalCreationThrowsException() { + new StreamStatus(32); + } + + @Test + public void testEquals() { + StreamStatus idleStatus = new StreamStatus(StreamStatus.IDLE_STATUS); + StreamStatus activeStatus = new StreamStatus(StreamStatus.ACTIVE_STATUS); + + assertEquals(StreamStatus.IDLE, idleStatus); + assertTrue(idleStatus.isIdle()); + assertFalse(idleStatus.isActive()); + + assertEquals(StreamStatus.ACTIVE, activeStatus); + assertTrue(activeStatus.isActive()); + assertFalse(activeStatus.isIdle()); + } + + @Test + public void testTypeCasting() { + StreamStatus status = StreamStatus.ACTIVE; + + assertTrue(status.isStreamStatus()); + assertFalse(status.isRecord()); + assertFalse(status.isWatermark()); + assertFalse(status.isLatencyMarker()); + + try { + status.asWatermark(); + fail("should throw an exception"); + } catch (Exception e) { + // expected + } + + try { + status.asRecord(); + fail("should throw an exception"); + } catch (Exception e) { + // expected + } + + try { + status.asLatencyMarker(); + fail("should throw an exception"); + } catch (Exception e) { + // expected + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index be93f6a..4b08c83 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FSDataInputStream; @@ -38,15 +39,18 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.runtime.state.TaskStateHandles; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.graph.StreamNode; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; import org.apache.flink.streaming.api.operators.StreamMap; +import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; @@ -59,14 +63,7 @@ import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -98,6 +95,7 @@ public class OneInputStreamTaskTest extends TestLogger { public void testOpenCloseAndTimestamps() throws Exception { final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>(); final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.setupOutputForSingletonOperatorChain(); StreamConfig streamConfig = testHarness.getStreamConfig(); StreamMap<String, String> mapOperator = new StreamMap<String, String>(new TestOpenCloseMapFunction()); @@ -126,15 +124,20 @@ public class OneInputStreamTaskTest extends TestLogger { } /** - * This test verifies that watermarks are correctly forwarded. This also checks whether + * This test verifies that watermarks and stream statuses are correctly forwarded. This also checks whether * watermarks are forwarded only when we have received watermarks from all inputs. The - * forwarded watermark must be the minimum of the watermarks of all inputs. + * forwarded watermark must be the minimum of the watermarks of all active inputs. */ @Test @SuppressWarnings("unchecked") - public void testWatermarkForwarding() throws Exception { + public void testWatermarkAndStreamStatusForwarding() throws Exception { final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>(); - final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + final OneInputStreamTaskTestHarness<String, String> testHarness = + new OneInputStreamTaskTestHarness<String, String>( + mapTask, 2, 2, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO); + testHarness.setupOutputForSingletonOperatorChain(); StreamConfig streamConfig = testHarness.getStreamConfig(); StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap()); @@ -180,8 +183,7 @@ public class OneInputStreamTaskTest extends TestLogger { expectedOutput.add(new Watermark(initialTime + 2)); TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - - // advance watermark from one of the inputs, now we should get a now one since the + // advance watermark from one of the inputs, now we should get a new one since the // minimum increases testHarness.processElement(new Watermark(initialTime + 4), 1, 1); testHarness.waitForInputProcessing(); @@ -196,6 +198,31 @@ public class OneInputStreamTaskTest extends TestLogger { expectedOutput.add(new Watermark(initialTime + 4)); TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + // test whether idle input channels are acknowledged correctly when forwarding watermarks + testHarness.processElement(StreamStatus.IDLE, 0, 1); + testHarness.processElement(StreamStatus.IDLE, 1, 0); + testHarness.processElement(new Watermark(initialTime + 6), 0, 0); + testHarness.processElement(new Watermark(initialTime + 5), 1, 1); // this watermark should be advanced first + testHarness.processElement(StreamStatus.IDLE, 1, 1); // once this is acknowledged, + // watermark (initial + 6) should be forwarded + testHarness.waitForInputProcessing(); + expectedOutput.add(new Watermark(initialTime + 5)); + expectedOutput.add(new Watermark(initialTime + 6)); + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + // make all input channels idle and check that the operator's idle status is forwarded + testHarness.processElement(StreamStatus.IDLE, 0, 0); + testHarness.waitForInputProcessing(); + expectedOutput.add(StreamStatus.IDLE); + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + // make some input channels active again and check that the operator's active status is forwarded only once + testHarness.processElement(StreamStatus.ACTIVE, 1, 0); + testHarness.processElement(StreamStatus.ACTIVE, 0, 1); + testHarness.waitForInputProcessing(); + expectedOutput.add(StreamStatus.ACTIVE); + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + testHarness.endInput(); testHarness.waitForTaskCompletion(); @@ -205,12 +232,170 @@ public class OneInputStreamTaskTest extends TestLogger { } /** + * This test verifies that watermarks are not forwarded when the task is idle. + * It also verifies that when task is idle, watermarks generated in the middle of chains are also blocked and + * never forwarded. + * + * The tested chain will be: (HEAD: normal operator) --> (watermark generating operator) --> (normal operator). + * The operators will throw an exception and fail the test if either of them were forwarded watermarks when + * the task is idle. + */ + @Test + public void testWatermarksNotForwardedWithinChainWhenIdle() throws Exception { + final OneInputStreamTask<String, String> testTask = new OneInputStreamTask<>(); + final OneInputStreamTaskTestHarness<String, String> testHarness = + new OneInputStreamTaskTestHarness<String, String>( + testTask, 1, 1, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO); + + // ------------------ setup the chain ------------------ + + TriggerableFailOnWatermarkTestOperator headOperator = new TriggerableFailOnWatermarkTestOperator(); + StreamConfig headOperatorConfig = testHarness.getStreamConfig(); + + WatermarkGeneratingTestOperator watermarkOperator = new WatermarkGeneratingTestOperator(); + StreamConfig watermarkOperatorConfig = new StreamConfig(new Configuration()); + + TriggerableFailOnWatermarkTestOperator tailOperator = new TriggerableFailOnWatermarkTestOperator(); + StreamConfig tailOperatorConfig = new StreamConfig(new Configuration()); + + headOperatorConfig.setStreamOperator(headOperator); + headOperatorConfig.setChainStart(); + headOperatorConfig.setChainIndex(0); + headOperatorConfig.setChainedOutputs(Collections.singletonList(new StreamEdge( + new StreamNode(null, 0, null, null, null, null, null), + new StreamNode(null, 1, null, null, null, null, null), + 0, + Collections.<String>emptyList(), + null + ))); + + watermarkOperatorConfig.setStreamOperator(watermarkOperator); + watermarkOperatorConfig.setTypeSerializerIn1(StringSerializer.INSTANCE); + watermarkOperatorConfig.setChainIndex(1); + watermarkOperatorConfig.setChainedOutputs(Collections.singletonList(new StreamEdge( + new StreamNode(null, 1, null, null, null, null, null), + new StreamNode(null, 2, null, null, null, null, null), + 0, + Collections.<String>emptyList(), + null + ))); + + List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>(); + outEdgesInOrder.add(new StreamEdge( + new StreamNode(null, 2, null, null, null, null, null), + new StreamNode(null, 3, null, null, null, null, null), + 0, + Collections.<String>emptyList(), + new BroadcastPartitioner<Object>())); + + tailOperatorConfig.setStreamOperator(tailOperator); + tailOperatorConfig.setTypeSerializerIn1(StringSerializer.INSTANCE); + tailOperatorConfig.setBufferTimeout(0); + tailOperatorConfig.setChainIndex(2); + tailOperatorConfig.setChainEnd(); + tailOperatorConfig.setOutputSelectors(Collections.<OutputSelector<?>>emptyList()); + tailOperatorConfig.setNumberOfOutputs(1); + tailOperatorConfig.setOutEdgesInOrder(outEdgesInOrder); + tailOperatorConfig.setNonChainedOutputs(outEdgesInOrder); + tailOperatorConfig.setTypeSerializerOut(StringSerializer.INSTANCE); + + Map<Integer, StreamConfig> chainedConfigs = new HashMap<>(2); + chainedConfigs.put(1, watermarkOperatorConfig); + chainedConfigs.put(2, tailOperatorConfig); + headOperatorConfig.setTransitiveChainedTaskConfigs(chainedConfigs); + headOperatorConfig.setOutEdgesInOrder(outEdgesInOrder); + + // ----------------------------------------------------- + + // --------------------- begin test --------------------- + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>(); + + testHarness.invoke(); + testHarness.waitForTaskRunning(); + + // the task starts as active, so all generated watermarks should be forwarded + testHarness.processElement(new StreamRecord<>(TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER)); + + testHarness.processElement(new StreamRecord<>("10"), 0, 0); + + // this watermark will be forwarded since the task is currently active, + // but should not be in the final output because it should be blocked by the watermark generator in the chain + testHarness.processElement(new Watermark(15)); + + testHarness.processElement(new StreamRecord<>("20"), 0, 0); + testHarness.processElement(new StreamRecord<>("30"), 0, 0); + + testHarness.waitForInputProcessing(); + + expectedOutput.add(new StreamRecord<>(TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER)); + expectedOutput.add(new StreamRecord<>("10")); + expectedOutput.add(new Watermark(10)); + expectedOutput.add(new StreamRecord<>("20")); + expectedOutput.add(new Watermark(20)); + expectedOutput.add(new StreamRecord<>("30")); + expectedOutput.add(new Watermark(30)); + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + // now, toggle the task to be idle, and let the watermark generator produce some watermarks + testHarness.processElement(StreamStatus.IDLE); + + // after this, the operators will throw an exception if they are forwarded watermarks anywhere in the chain + testHarness.processElement(new StreamRecord<>(TriggerableFailOnWatermarkTestOperator.NO_FORWARDED_WATERMARKS_MARKER)); + + // NOTE: normally, tasks will not have records to process while idle; + // we're doing this here only to mimic watermark generating in operators + testHarness.processElement(new StreamRecord<>("40"), 0, 0); + testHarness.processElement(new StreamRecord<>("50"), 0, 0); + testHarness.processElement(new StreamRecord<>("60"), 0, 0); + testHarness.processElement(new Watermark(65)); // the test will fail if any of the operators were forwarded this + testHarness.waitForInputProcessing(); + + // the 40 - 60 watermarks should not be forwarded, only the stream status toggle element and records + expectedOutput.add(StreamStatus.IDLE); + expectedOutput.add(new StreamRecord<>(TriggerableFailOnWatermarkTestOperator.NO_FORWARDED_WATERMARKS_MARKER)); + expectedOutput.add(new StreamRecord<>("40")); + expectedOutput.add(new StreamRecord<>("50")); + expectedOutput.add(new StreamRecord<>("60")); + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + // re-toggle the task to be active and see if new watermarks are correctly forwarded again + testHarness.processElement(StreamStatus.ACTIVE); + testHarness.processElement(new StreamRecord<>(TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER)); + + testHarness.processElement(new StreamRecord<>("70"), 0, 0); + testHarness.processElement(new StreamRecord<>("80"), 0, 0); + testHarness.processElement(new StreamRecord<>("90"), 0, 0); + testHarness.waitForInputProcessing(); + + expectedOutput.add(StreamStatus.ACTIVE); + expectedOutput.add(new StreamRecord<>(TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER)); + expectedOutput.add(new StreamRecord<>("70")); + expectedOutput.add(new Watermark(70)); + expectedOutput.add(new StreamRecord<>("80")); + expectedOutput.add(new Watermark(80)); + expectedOutput.add(new StreamRecord<>("90")); + expectedOutput.add(new Watermark(90)); + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.endInput(); + + testHarness.waitForTaskCompletion(); + + List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput()); + assertEquals(12, resultElements.size()); + } + + /** * This test verifies that checkpoint barriers are correctly forwarded. */ @Test public void testCheckpointBarriers() throws Exception { final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>(); final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.setupOutputForSingletonOperatorChain(); StreamConfig streamConfig = testHarness.getStreamConfig(); StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap()); @@ -269,6 +454,7 @@ public class OneInputStreamTaskTest extends TestLogger { public void testOvertakingCheckpointBarriers() throws Exception { final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>(); final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.setupOutputForSingletonOperatorChain(); StreamConfig streamConfig = testHarness.getStreamConfig(); StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap()); @@ -339,6 +525,8 @@ public class OneInputStreamTaskTest extends TestLogger { final Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow(); final OneInputStreamTask<String, String> streamTask = new OneInputStreamTask<String, String>(); final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(streamTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.setupOutputForSingletonOperatorChain(); + IdentityKeySelector<String> keySelector = new IdentityKeySelector<>(); testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO); @@ -658,5 +846,80 @@ public class OneInputStreamTaskTest extends TestLogger { return value; } } + + /** + * A {@link TriggerableFailOnWatermarkTestOperator} that generates watermarks. + */ + private static class WatermarkGeneratingTestOperator extends TriggerableFailOnWatermarkTestOperator { + + private static final long serialVersionUID = -5064871833244157221L; + + private long lastWatermark; + + @Override + protected void handleElement(StreamRecord<String> element) { + long timestamp = Long.valueOf(element.getValue()); + if (timestamp > lastWatermark) { + output.emitWatermark(new Watermark(timestamp)); + lastWatermark = timestamp; + } + } + + @Override + protected void handleWatermark(Watermark mark) { + if (mark.equals(Watermark.MAX_WATERMARK)) { + output.emitWatermark(mark); + lastWatermark = Long.MAX_VALUE; + } + } + } + + /** + * An operator that can be triggered whether or not to expect watermarks forwarded to it, toggled + * by letting it process special trigger marker records. + * + * If it receives a watermark when it's not expecting one, it'll throw an exception and fail. + */ + private static class TriggerableFailOnWatermarkTestOperator + extends AbstractStreamOperator<String> + implements OneInputStreamOperator<String, String> { + + private static final long serialVersionUID = 2048954179291813243L; + + public final static String EXPECT_FORWARDED_WATERMARKS_MARKER = "EXPECT_WATERMARKS"; + public final static String NO_FORWARDED_WATERMARKS_MARKER = "NO_WATERMARKS"; + + protected boolean expectForwardedWatermarks; + + @Override + public void processElement(StreamRecord<String> element) throws Exception { + output.collect(element); + + if (element.getValue().equals(EXPECT_FORWARDED_WATERMARKS_MARKER)) { + this.expectForwardedWatermarks = true; + } else if (element.getValue().equals(NO_FORWARDED_WATERMARKS_MARKER)) { + this.expectForwardedWatermarks = false; + } else { + handleElement(element); + } + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + if (!expectForwardedWatermarks) { + throw new Exception("Received a " + mark + ", but this operator should not be forwarded watermarks."); + } else { + handleWatermark(mark); + } + } + + protected void handleElement(StreamRecord<String> element) { + // do nothing + } + + protected void handleWatermark(Watermark mark) { + output.emitWatermark(mark); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java index dd1fe58..0773699 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java @@ -57,6 +57,7 @@ public class SourceStreamTaskTest { public void testOpenClose() throws Exception { final SourceStreamTask<String, SourceFunction<String>, StreamSource<String, SourceFunction<String>>> sourceTask = new SourceStreamTask<>(); final StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<String>(sourceTask, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.setupOutputForSingletonOperatorChain(); StreamConfig streamConfig = testHarness.getStreamConfig(); StreamSource<String, ?> sourceOperator = new StreamSource<>(new OpenCloseTestSource()); @@ -99,6 +100,7 @@ public class SourceStreamTaskTest { final SourceStreamTask<Tuple2<Long, Integer>, SourceFunction<Tuple2<Long, Integer>>, StreamSource<Tuple2<Long, Integer>, SourceFunction<Tuple2<Long, Integer>>>> sourceTask = new SourceStreamTask<>(); final StreamTaskTestHarness<Tuple2<Long, Integer>> testHarness = new StreamTaskTestHarness<Tuple2<Long, Integer>>(sourceTask, typeInfo); + testHarness.setupOutputForSingletonOperatorChain(); StreamConfig streamConfig = testHarness.getStreamConfig(); StreamSource<Tuple2<Long, Integer>, ?> sourceOperator = new StreamSource<>(new MockSource(NUM_ELEMENTS, SOURCE_CHECKPOINT_DELAY, SOURCE_READ_DELAY)); http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java index ebe5285..c2d4aaa 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java @@ -50,6 +50,7 @@ public class StreamTaskCancellationBarrierTest { public void testEmitCancellationBarrierWhenNotReady() throws Exception { StreamTask<String, ?> task = new InitBlockingTask(); StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.setupOutputForSingletonOperatorChain(); // start the test - this cannot succeed across the 'init()' method testHarness.invoke(); @@ -80,6 +81,7 @@ public class StreamTaskCancellationBarrierTest { task, 1, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.setupOutputForSingletonOperatorChain(); StreamConfig streamConfig = testHarness.getStreamConfig(); StreamMap<String, String> mapOperator = new StreamMap<>(new IdentityMap()); @@ -124,6 +126,7 @@ public class StreamTaskCancellationBarrierTest { TwoInputStreamTaskTestHarness<String, String, String> testHarness = new TwoInputStreamTaskTestHarness<>( task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.setupOutputForSingletonOperatorChain(); StreamConfig streamConfig = testHarness.getStreamConfig(); CoStreamMap<String, String, String> op = new CoStreamMap<>(new UnionCoMap()); http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index 8dc6afa..e58bc5a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -106,9 +106,6 @@ public class StreamTaskTestHarness<OUT> { this.executionConfig = new ExecutionConfig(); streamConfig = new StreamConfig(taskConfig); - streamConfig.setChainStart(); - streamConfig.setBufferTimeout(0); - streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime); outputSerializer = outputType.createSerializer(executionConfig); outputStreamRecordSerializer = new StreamElementSerializer<OUT>(outputSerializer); @@ -129,11 +126,25 @@ public class StreamTaskTestHarness<OUT> { @SuppressWarnings("unchecked") private void initializeOutput() { outputList = new ConcurrentLinkedQueue<Object>(); - mockEnv.addOutput(outputList, outputStreamRecordSerializer); + } + /** + * Users of the test harness can call this utility method to setup the stream config + * if there will only be a single operator to be tested. The method will setup the + * outgoing network connection for the operator. + * + * For more advanced test cases such as testing chains of multiple operators with the harness, + * please manually configure the stream config. + */ + public void setupOutputForSingletonOperatorChain() { + streamConfig.setChainStart(); + streamConfig.setBufferTimeout(0); + streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime); streamConfig.setOutputSelectors(Collections.<OutputSelector<?>>emptyList()); streamConfig.setNumberOfOutputs(1); + streamConfig.setTypeSerializerOut(outputSerializer); + streamConfig.setVertexID(0); StreamOperator<OUT> dummyOperator = new AbstractStreamOperator<OUT>() { private static final long serialVersionUID = 1L; @@ -142,13 +153,10 @@ public class StreamTaskTestHarness<OUT> { List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>(); StreamNode sourceVertexDummy = new StreamNode(null, 0, "group", dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class); StreamNode targetVertexDummy = new StreamNode(null, 1, "group", dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class); - outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>())); + streamConfig.setOutEdgesInOrder(outEdgesInOrder); streamConfig.setNonChainedOutputs(outEdgesInOrder); - streamConfig.setTypeSerializerOut(outputSerializer); - streamConfig.setVertexID(0); - } public StreamMockEnvironment createEnvironment() { @@ -330,9 +338,6 @@ public class StreamTaskTestHarness<OUT> { allEmpty = false; } } - try { - Thread.sleep(10); - } catch (InterruptedException ignored) {} if (allEmpty) { break; http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java index 3cd9c9a..c0a1638 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java @@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.co.CoStreamMap; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.util.TestHarnessUtil; import org.junit.Assert; @@ -58,6 +59,7 @@ public class TwoInputStreamTaskTest { public void testOpenCloseAndTimestamps() throws Exception { final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>(); final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.setupOutputForSingletonOperatorChain(); StreamConfig streamConfig = testHarness.getStreamConfig(); CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new TestOpenCloseMapFunction()); @@ -89,15 +91,21 @@ public class TwoInputStreamTaskTest { } /** - * This test verifies that watermarks are correctly forwarded. This also checks whether + * This test verifies that watermarks and stream statuses are correctly forwarded. This also checks whether * watermarks are forwarded only when we have received watermarks from all inputs. The - * forwarded watermark must be the minimum of the watermarks of all inputs. + * forwarded watermark must be the minimum of the watermarks of all active inputs. */ @Test @SuppressWarnings("unchecked") - public void testWatermarkForwarding() throws Exception { + public void testWatermarkAndStreamStatusForwarding() throws Exception { final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>(); - final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = + new TwoInputStreamTaskTestHarness<String, Integer, String>( + coMapTask, 2, 2, new int[] {1, 2}, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO); + testHarness.setupOutputForSingletonOperatorChain(); StreamConfig streamConfig = testHarness.getStreamConfig(); CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap()); @@ -147,7 +155,7 @@ public class TwoInputStreamTaskTest { TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - // advance watermark from one of the inputs, now we should get a now one since the + // advance watermark from one of the inputs, now we should get a new one since the // minimum increases testHarness.processElement(new Watermark(initialTime + 4), 1, 1); testHarness.waitForInputProcessing(); @@ -162,6 +170,33 @@ public class TwoInputStreamTaskTest { expectedOutput.add(new Watermark(initialTime + 4)); TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + // test whether idle input channels are acknowledged correctly when forwarding watermarks + testHarness.processElement(StreamStatus.IDLE, 0, 1); + testHarness.processElement(StreamStatus.IDLE, 1, 0); + testHarness.processElement(new Watermark(initialTime + 6), 0, 0); + testHarness.processElement(new Watermark(initialTime + 5), 1, 1); // this watermark should be advanced first + testHarness.processElement(StreamStatus.IDLE, 1, 1); // once this is acknowledged, + // watermark (initial + 6) should be forwarded + testHarness.waitForInputProcessing(); + expectedOutput.add(new Watermark(initialTime + 5)); + // We don't expect to see Watermark(6) here because the idle status of one + // input doesn't propagate to the other input. That is, if input 1 is at WM 6 and input + // two was at WM 5 before going to IDLE then the output watermark will not jump to WM 6. + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + // make all input channels idle and check that the operator's idle status is forwarded + testHarness.processElement(StreamStatus.IDLE, 0, 0); + testHarness.waitForInputProcessing(); + expectedOutput.add(StreamStatus.IDLE); + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + // make some input channels active again and check that the operator's active status is forwarded only once + testHarness.processElement(StreamStatus.ACTIVE, 1, 0); + testHarness.processElement(StreamStatus.ACTIVE, 0, 1); + testHarness.waitForInputProcessing(); + expectedOutput.add(StreamStatus.ACTIVE); + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + testHarness.endInput(); testHarness.waitForTaskCompletion(); @@ -178,6 +213,7 @@ public class TwoInputStreamTaskTest { public void testCheckpointBarriers() throws Exception { final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>(); final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.setupOutputForSingletonOperatorChain(); StreamConfig streamConfig = testHarness.getStreamConfig(); CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap()); @@ -258,6 +294,7 @@ public class TwoInputStreamTaskTest { public void testOvertakingCheckpointBarriers() throws Exception { final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>(); final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.setupOutputForSingletonOperatorChain(); StreamConfig streamConfig = testHarness.getStreamConfig(); CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
