Repository: flink
Updated Branches:
  refs/heads/master 02410324b -> 66305135b


http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
new file mode 100644
index 0000000..564901f
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValveTest.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamstatus;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link StatusWatermarkValve}. While tests in {@link 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest}
+ * and {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest} 
may also implicitly test {@link StatusWatermarkValve}
+ * and that valves are correctly used in the tasks' input processors, the unit 
tests here additionally makes sure that
+ * the watermarks and stream statuses to forward are generated from the valve 
at the exact correct times and in a
+ * deterministic behaviour. The unit tests here also test more complex stream 
status / watermark input cases.
+ *
+ * <p>
+ * The tests are performed by a series of watermark and stream status inputs 
to the valve. On every input method call,
+ * the output is checked to contain only the expected watermark or stream 
status, and nothing else. This ensures that
+ * no redundant outputs are generated by the output logic of {@link 
StatusWatermarkValve}. The behaviours that a series of
+ * input calls to the valve is trying to test is explained as inline comments 
within the tests.
+ */
+public class StatusWatermarkValveTest {
+
+       /**
+        * Tests that all input channels of a valve start as ACTIVE stream 
status.
+        */
+       @Test
+       public void testAllInputChannelsStartAsActive() {
+               BufferedValveOutputHandler valveOutput = new 
BufferedValveOutputHandler();
+               StatusWatermarkValve valve = new StatusWatermarkValve(4, 
valveOutput);
+
+               // 
------------------------------------------------------------------------
+               //  Ensure that the valve will output an IDLE stream status as 
soon as
+               //  all input channels become IDLE; this also implicitly 
ensures that
+               //  all input channels start as ACTIVE.
+               // 
------------------------------------------------------------------------
+
+               valve.inputStreamStatus(StreamStatus.IDLE, 3);
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               valve.inputStreamStatus(StreamStatus.IDLE, 0);
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               valve.inputStreamStatus(StreamStatus.IDLE, 1);
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               valve.inputStreamStatus(StreamStatus.IDLE, 2);
+               assertEquals(StreamStatus.IDLE, 
valveOutput.popLastOutputStreamStatus());
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+       }
+
+       /**
+        * Tests that valves work as expected when they handle only 1 input 
channel.
+        * Tested behaviours are explained as inline comments.
+        */
+       @Test
+       public void testOneInputValve() {
+               BufferedValveOutputHandler valveOutput = new 
BufferedValveOutputHandler();
+               StatusWatermarkValve valve = new StatusWatermarkValve(1, 
valveOutput);
+
+               // start off with an ACTIVE status; since the valve should 
initially start as ACTIVE,
+               // no state change is toggled, therefore no stream status 
should be emitted
+               valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               // input some monotonously increasing watermarks while ACTIVE;
+               // the exact same watermarks should be emitted right after the 
inputs
+               valve.inputWatermark(new Watermark(0), 0);
+               assertEquals(new Watermark(0), 
valveOutput.popLastOutputWatermark());
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               valve.inputWatermark(new Watermark(25), 0);
+               assertEquals(new Watermark(25), 
valveOutput.popLastOutputWatermark());
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               // decreasing watermarks should not result in any output
+               valve.inputWatermark(new Watermark(18), 0);
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               valve.inputWatermark(new Watermark(42), 0);
+               assertEquals(new Watermark(42), 
valveOutput.popLastOutputWatermark());
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               // toggling ACTIVE to IDLE should result in an IDLE stream 
status output
+               valve.inputStreamStatus(StreamStatus.IDLE, 0);
+               assertEquals(StreamStatus.IDLE, 
valveOutput.popLastOutputStreamStatus());
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               // watermark inputs should be ignored while all input channels 
(only 1 in this case) are IDLE
+               valve.inputWatermark(new Watermark(52), 0);
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               valve.inputWatermark(new Watermark(60), 0);
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               // no status change toggle while IDLE should result in stream 
status outputs
+               valve.inputStreamStatus(StreamStatus.IDLE, 0);
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               // toggling IDLE to ACTIVE should result in an ACTIVE stream 
status output
+               valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
+               assertEquals(StreamStatus.ACTIVE, 
valveOutput.popLastOutputStreamStatus());
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               // the valve should remember the last watermark input channels 
received while they were ACTIVE (which was 42);
+               // decreasing watermarks should therefore still be ignored, 
even after a status toggle
+               valve.inputWatermark(new Watermark(40), 0);
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               // monotonously increasing watermarks after resuming to be 
ACTIVE should be output normally
+               valve.inputWatermark(new Watermark(68), 0);
+               assertEquals(new Watermark(68), 
valveOutput.popLastOutputWatermark());
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               valve.inputWatermark(new Watermark(72), 0);
+               assertEquals(new Watermark(72), 
valveOutput.popLastOutputWatermark());
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+       }
+
+       /**
+        * Tests that valves work as expected when they handle multiple input 
channels (tested with 3).
+        * Tested behaviours are explained as inline comments.
+        */
+       @Test
+       public void testMultipleInputValve() {
+               BufferedValveOutputHandler valveOutput = new 
BufferedValveOutputHandler();
+               StatusWatermarkValve valve = new StatusWatermarkValve(3, 
valveOutput);
+
+               // 
------------------------------------------------------------------------
+               //  Ensure that watermarks are output only when all
+               //  channels have been input some watermark.
+               // 
------------------------------------------------------------------------
+
+               valve.inputWatermark(new Watermark(0), 0);
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               valve.inputWatermark(new Watermark(0), 1);
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               valve.inputWatermark(new Watermark(0), 2);
+               assertEquals(new Watermark(0), 
valveOutput.popLastOutputWatermark());
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               // 
------------------------------------------------------------------------
+               //  Ensure that watermarks are output as soon as the overall min
+               //  watermark across all channels have advanced.
+               // 
------------------------------------------------------------------------
+
+               valve.inputWatermark(new Watermark(12), 0);
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               valve.inputWatermark(new Watermark(8), 2);
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               valve.inputWatermark(new Watermark(10), 2);
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               valve.inputWatermark(new Watermark(15), 1);
+               // lowest watermark across all channels is now channel 2, with 
watermark @ 10
+               assertEquals(new Watermark(10), 
valveOutput.popLastOutputWatermark());
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               // 
------------------------------------------------------------------------
+               //  Ensure that decreasing watermarks are ignored
+               // 
------------------------------------------------------------------------
+
+               valve.inputWatermark(new Watermark(6), 0);
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               // 
------------------------------------------------------------------------
+               //  Ensure that when some input channel becomes idle, that 
channel will
+               //  no longer be accounted for when advancing the watermark.
+               // 
------------------------------------------------------------------------
+
+               // marking channel 2 as IDLE shouldn't result in overall status 
toggle for the valve,
+               // because there are still other active channels (0 and 1), so 
there should not be any
+               // stream status outputs;
+               // also, now that channel 2 is IDLE, the overall min watermark 
is 12 (from channel 0),
+               // so the valve should output that
+               valve.inputStreamStatus(StreamStatus.IDLE, 2);
+               assertEquals(new Watermark(12), 
valveOutput.popLastOutputWatermark());
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               // from now on, since channel 2 is IDLE, the valve should use 
watermarks only from
+               // channel 0 and 1 to find the min watermark, even if channel 2 
has the lowest watermark (10)
+               valve.inputWatermark(new Watermark(17), 0);
+               assertEquals(new Watermark(15), 
valveOutput.popLastOutputWatermark());
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               valve.inputWatermark(new Watermark(25), 0);
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               valve.inputWatermark(new Watermark(20), 1);
+               assertEquals(new Watermark(20), 
valveOutput.popLastOutputWatermark());
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               // 
------------------------------------------------------------------------
+               //  Ensure that after some channel resumes to be ACTIVE, it 
needs to
+               //  catch up" with the current overall min watermark before it 
can be
+               //  accounted for again when finding the min watermark across 
channels.
+               //  Also tests that before the resumed channel catches up, the 
overall
+               //  min watermark can still advance with watermarks of other 
channels.
+               // 
------------------------------------------------------------------------
+
+               // resuming channel 2 to be ACTIVE shouldn't result in overall 
status toggle for the valve,
+               // because the valve wasn't overall IDLE, so there should not 
be any stream status outputs;
+               valve.inputStreamStatus(StreamStatus.ACTIVE, 2);
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               // although watermarks for channel 2 will now be accepted, it 
still
+               // hasn't caught up with the overall min watermark (20)
+               valve.inputWatermark(new Watermark(18), 2);
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               // since channel 2 hasn't caught up yet, it is still ignored 
when advancing new min watermarks
+               valve.inputWatermark(new Watermark(22), 1);
+               assertEquals(new Watermark(22), 
valveOutput.popLastOutputWatermark());
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               valve.inputWatermark(new Watermark(28), 0);
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               valve.inputWatermark(new Watermark(33), 1);
+               assertEquals(new Watermark(28), 
valveOutput.popLastOutputWatermark());
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               // now, channel 2 has caught up with the overall min watermark
+               valve.inputWatermark(new Watermark(30), 2);
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               valve.inputWatermark(new Watermark(31), 0);
+               // this acknowledges that channel 2's watermark is being 
accounted for again
+               assertEquals(new Watermark(30), 
valveOutput.popLastOutputWatermark());
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               valve.inputWatermark(new Watermark(34), 2);
+               assertEquals(new Watermark(31), 
valveOutput.popLastOutputWatermark());
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               // 
------------------------------------------------------------------------
+               //  Ensure that once all channels are IDLE, the valve should 
also
+               //  determine itself to be IDLE output a IDLE stream status
+               // 
------------------------------------------------------------------------
+
+               valve.inputStreamStatus(StreamStatus.IDLE, 0);
+               // this is because once channel 0 becomes IDLE,
+               // the new min watermark will be 33 (channel 1)
+               assertEquals(new Watermark(33), 
valveOutput.popLastOutputWatermark());
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               valve.inputStreamStatus(StreamStatus.IDLE, 2);
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               valve.inputStreamStatus(StreamStatus.IDLE, 1);
+               assertEquals(StreamStatus.IDLE, 
valveOutput.popLastOutputStreamStatus());
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               // 
------------------------------------------------------------------------
+               //  Ensure that channels gradually become ACTIVE again, the 
above behaviours
+               //  still hold. Also ensure that as soon as one of the input 
channels
+               //  become ACTIVE, the valve is ACTIVE again and outputs an 
ACTIVE stream status.
+               // 
------------------------------------------------------------------------
+
+               // let channel 0 resume to be ACTIVE
+               valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
+               assertEquals(StreamStatus.ACTIVE, 
valveOutput.popLastOutputStreamStatus());
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               // channel 0 is the only ACTIVE channel now, and is the only 
channel
+               // accounted for when advancing min watermark
+               valve.inputWatermark(new Watermark(36), 0);
+               assertEquals(new Watermark(36), 
valveOutput.popLastOutputWatermark());
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               // new also let channel 1 become ACTIVE
+               valve.inputStreamStatus(StreamStatus.ACTIVE, 1);
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               // channel 1 is still behind overall min watermark
+               valve.inputWatermark(new Watermark(35), 1);
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               // since channel 1 is still behind, channel 0 remains to be the 
only
+               // channel used to advance min watermark
+               valve.inputWatermark(new Watermark(37), 0);
+               assertEquals(new Watermark(37), 
valveOutput.popLastOutputWatermark());
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               // now, channel 1 has caught up with the overall min watermark
+               valve.inputWatermark(new Watermark(38), 1);
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+
+               valve.inputWatermark(new Watermark(40), 0);
+               // this acknowledges that channel 1's watermark is being 
accounted for again
+               assertEquals(new Watermark(38), 
valveOutput.popLastOutputWatermark());
+               assertTrue(valveOutput.hasNoOutputWatermarks());
+               assertTrue(valveOutput.hasNoOutputStreamStatuses());
+       }
+
+       private class BufferedValveOutputHandler implements 
StatusWatermarkValve.ValveOutputHandler {
+               private BlockingQueue<Watermark> outputWatermarks = new 
LinkedBlockingQueue<>();
+               private BlockingQueue<StreamStatus> outputStreamStatuses = new 
LinkedBlockingQueue<>();
+
+               @Override
+               public void handleWatermark(Watermark watermark) {
+                       outputWatermarks.add(watermark);
+               }
+
+               @Override
+               public void handleStreamStatus(StreamStatus streamStatus) {
+                       outputStreamStatuses.add(streamStatus);
+               }
+
+               public Watermark popLastOutputWatermark() {
+                       return outputWatermarks.poll();
+               }
+
+               public StreamStatus popLastOutputStreamStatus() {
+                       return outputStreamStatuses.poll();
+               }
+
+               public boolean hasNoOutputWatermarks() {
+                       return outputWatermarks.size() == 0;
+               }
+
+               public boolean hasNoOutputStreamStatuses() {
+                       return outputStreamStatuses.size() == 0;
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java
new file mode 100644
index 0000000..247dc8b
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamstatus;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class StreamStatusTest {
+
+       @Test (expected = IllegalArgumentException.class)
+       public void testIllegalCreationThrowsException() {
+               new StreamStatus(32);
+       }
+
+       @Test
+       public void testEquals() {
+               StreamStatus idleStatus = new 
StreamStatus(StreamStatus.IDLE_STATUS);
+               StreamStatus activeStatus = new 
StreamStatus(StreamStatus.ACTIVE_STATUS);
+
+               assertEquals(StreamStatus.IDLE, idleStatus);
+               assertTrue(idleStatus.isIdle());
+               assertFalse(idleStatus.isActive());
+
+               assertEquals(StreamStatus.ACTIVE, activeStatus);
+               assertTrue(activeStatus.isActive());
+               assertFalse(activeStatus.isIdle());
+       }
+
+       @Test
+       public void testTypeCasting() {
+               StreamStatus status = StreamStatus.ACTIVE;
+
+               assertTrue(status.isStreamStatus());
+               assertFalse(status.isRecord());
+               assertFalse(status.isWatermark());
+               assertFalse(status.isLatencyMarker());
+
+               try {
+                       status.asWatermark();
+                       fail("should throw an exception");
+               } catch (Exception e) {
+                       // expected
+               }
+
+               try {
+                       status.asRecord();
+                       fail("should throw an exception");
+               } catch (Exception e) {
+                       // expected
+               }
+
+               try {
+                       status.asLatencyMarker();
+                       fail("should throw an exception");
+               } catch (Exception e) {
+                       // expected
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index be93f6a..4b08c83 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -38,15 +39,18 @@ import 
org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
@@ -59,14 +63,7 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -98,6 +95,7 @@ public class OneInputStreamTaskTest extends TestLogger {
        public void testOpenCloseAndTimestamps() throws Exception {
                final OneInputStreamTask<String, String> mapTask = new 
OneInputStreamTask<String, String>();
                final OneInputStreamTaskTestHarness<String, String> testHarness 
= new OneInputStreamTaskTestHarness<String, String>(mapTask, 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+               testHarness.setupOutputForSingletonOperatorChain();
 
                StreamConfig streamConfig = testHarness.getStreamConfig();
                StreamMap<String, String> mapOperator = new StreamMap<String, 
String>(new TestOpenCloseMapFunction());
@@ -126,15 +124,20 @@ public class OneInputStreamTaskTest extends TestLogger {
        }
 
        /**
-        * This test verifies that watermarks are correctly forwarded. This 
also checks whether
+        * This test verifies that watermarks and stream statuses are correctly 
forwarded. This also checks whether
         * watermarks are forwarded only when we have received watermarks from 
all inputs. The
-        * forwarded watermark must be the minimum of the watermarks of all 
inputs.
+        * forwarded watermark must be the minimum of the watermarks of all 
active inputs.
         */
        @Test
        @SuppressWarnings("unchecked")
-       public void testWatermarkForwarding() throws Exception {
+       public void testWatermarkAndStreamStatusForwarding() throws Exception {
                final OneInputStreamTask<String, String> mapTask = new 
OneInputStreamTask<String, String>();
-               final OneInputStreamTaskTestHarness<String, String> testHarness 
= new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+               final OneInputStreamTaskTestHarness<String, String> testHarness 
=
+                       new OneInputStreamTaskTestHarness<String, String>(
+                               mapTask, 2, 2,
+                               BasicTypeInfo.STRING_TYPE_INFO,
+                               BasicTypeInfo.STRING_TYPE_INFO);
+               testHarness.setupOutputForSingletonOperatorChain();
 
                StreamConfig streamConfig = testHarness.getStreamConfig();
                StreamMap<String, String> mapOperator = new StreamMap<String, 
String>(new IdentityMap());
@@ -180,8 +183,7 @@ public class OneInputStreamTaskTest extends TestLogger {
                expectedOutput.add(new Watermark(initialTime + 2));
                TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
 
-
-               // advance watermark from one of the inputs, now we should get 
a now one since the
+               // advance watermark from one of the inputs, now we should get 
a new one since the
                // minimum increases
                testHarness.processElement(new Watermark(initialTime + 4), 1, 
1);
                testHarness.waitForInputProcessing();
@@ -196,6 +198,31 @@ public class OneInputStreamTaskTest extends TestLogger {
                expectedOutput.add(new Watermark(initialTime + 4));
                TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
 
+               // test whether idle input channels are acknowledged correctly 
when forwarding watermarks
+               testHarness.processElement(StreamStatus.IDLE, 0, 1);
+               testHarness.processElement(StreamStatus.IDLE, 1, 0);
+               testHarness.processElement(new Watermark(initialTime + 6), 0, 
0);
+               testHarness.processElement(new Watermark(initialTime + 5), 1, 
1); // this watermark should be advanced first
+               testHarness.processElement(StreamStatus.IDLE, 1, 1); // once 
this is acknowledged,
+                                                                    // 
watermark (initial + 6) should be forwarded
+               testHarness.waitForInputProcessing();
+               expectedOutput.add(new Watermark(initialTime + 5));
+               expectedOutput.add(new Watermark(initialTime + 6));
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               // make all input channels idle and check that the operator's 
idle status is forwarded
+               testHarness.processElement(StreamStatus.IDLE, 0, 0);
+               testHarness.waitForInputProcessing();
+               expectedOutput.add(StreamStatus.IDLE);
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               // make some input channels active again and check that the 
operator's active status is forwarded only once
+               testHarness.processElement(StreamStatus.ACTIVE, 1, 0);
+               testHarness.processElement(StreamStatus.ACTIVE, 0, 1);
+               testHarness.waitForInputProcessing();
+               expectedOutput.add(StreamStatus.ACTIVE);
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
                testHarness.endInput();
 
                testHarness.waitForTaskCompletion();
@@ -205,12 +232,170 @@ public class OneInputStreamTaskTest extends TestLogger {
        }
 
        /**
+        * This test verifies that watermarks are not forwarded when the task 
is idle.
+        * It also verifies that when task is idle, watermarks generated in the 
middle of chains are also blocked and
+        * never forwarded.
+        *
+        * The tested chain will be: (HEAD: normal operator) --> (watermark 
generating operator) --> (normal operator).
+        * The operators will throw an exception and fail the test if either of 
them were forwarded watermarks when
+        * the task is idle.
+        */
+       @Test
+       public void testWatermarksNotForwardedWithinChainWhenIdle() throws 
Exception {
+               final OneInputStreamTask<String, String> testTask = new 
OneInputStreamTask<>();
+               final OneInputStreamTaskTestHarness<String, String> testHarness 
=
+                       new OneInputStreamTaskTestHarness<String, String>(
+                               testTask, 1, 1,
+                               BasicTypeInfo.STRING_TYPE_INFO,
+                               BasicTypeInfo.STRING_TYPE_INFO);
+
+               // ------------------ setup the chain ------------------
+
+               TriggerableFailOnWatermarkTestOperator headOperator = new 
TriggerableFailOnWatermarkTestOperator();
+               StreamConfig headOperatorConfig = testHarness.getStreamConfig();
+
+               WatermarkGeneratingTestOperator watermarkOperator = new 
WatermarkGeneratingTestOperator();
+               StreamConfig watermarkOperatorConfig = new StreamConfig(new 
Configuration());
+
+               TriggerableFailOnWatermarkTestOperator tailOperator = new 
TriggerableFailOnWatermarkTestOperator();
+               StreamConfig tailOperatorConfig = new StreamConfig(new 
Configuration());
+
+               headOperatorConfig.setStreamOperator(headOperator);
+               headOperatorConfig.setChainStart();
+               headOperatorConfig.setChainIndex(0);
+               
headOperatorConfig.setChainedOutputs(Collections.singletonList(new StreamEdge(
+                       new StreamNode(null, 0, null, null, null, null, null),
+                       new StreamNode(null, 1, null, null, null, null, null),
+                       0,
+                       Collections.<String>emptyList(),
+                       null
+               )));
+
+               watermarkOperatorConfig.setStreamOperator(watermarkOperator);
+               
watermarkOperatorConfig.setTypeSerializerIn1(StringSerializer.INSTANCE);
+               watermarkOperatorConfig.setChainIndex(1);
+               
watermarkOperatorConfig.setChainedOutputs(Collections.singletonList(new 
StreamEdge(
+                       new StreamNode(null, 1, null, null, null, null, null),
+                       new StreamNode(null, 2, null, null, null, null, null),
+                       0,
+                       Collections.<String>emptyList(),
+                       null
+               )));
+
+               List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
+               outEdgesInOrder.add(new StreamEdge(
+                       new StreamNode(null, 2, null, null, null, null, null),
+                       new StreamNode(null, 3, null, null, null, null, null),
+                       0,
+                       Collections.<String>emptyList(),
+                       new BroadcastPartitioner<Object>()));
+
+               tailOperatorConfig.setStreamOperator(tailOperator);
+               
tailOperatorConfig.setTypeSerializerIn1(StringSerializer.INSTANCE);
+               tailOperatorConfig.setBufferTimeout(0);
+               tailOperatorConfig.setChainIndex(2);
+               tailOperatorConfig.setChainEnd();
+               
tailOperatorConfig.setOutputSelectors(Collections.<OutputSelector<?>>emptyList());
+               tailOperatorConfig.setNumberOfOutputs(1);
+               tailOperatorConfig.setOutEdgesInOrder(outEdgesInOrder);
+               tailOperatorConfig.setNonChainedOutputs(outEdgesInOrder);
+               
tailOperatorConfig.setTypeSerializerOut(StringSerializer.INSTANCE);
+
+               Map<Integer, StreamConfig> chainedConfigs = new HashMap<>(2);
+               chainedConfigs.put(1, watermarkOperatorConfig);
+               chainedConfigs.put(2, tailOperatorConfig);
+               
headOperatorConfig.setTransitiveChainedTaskConfigs(chainedConfigs);
+               headOperatorConfig.setOutEdgesInOrder(outEdgesInOrder);
+
+               // -----------------------------------------------------
+
+               // --------------------- begin test ---------------------
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<Object>();
+
+               testHarness.invoke();
+               testHarness.waitForTaskRunning();
+
+               // the task starts as active, so all generated watermarks 
should be forwarded
+               testHarness.processElement(new 
StreamRecord<>(TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER));
+
+               testHarness.processElement(new StreamRecord<>("10"), 0, 0);
+
+               // this watermark will be forwarded since the task is currently 
active,
+               // but should not be in the final output because it should be 
blocked by the watermark generator in the chain
+               testHarness.processElement(new Watermark(15));
+
+               testHarness.processElement(new StreamRecord<>("20"), 0, 0);
+               testHarness.processElement(new StreamRecord<>("30"), 0, 0);
+
+               testHarness.waitForInputProcessing();
+
+               expectedOutput.add(new 
StreamRecord<>(TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER));
+               expectedOutput.add(new StreamRecord<>("10"));
+               expectedOutput.add(new Watermark(10));
+               expectedOutput.add(new StreamRecord<>("20"));
+               expectedOutput.add(new Watermark(20));
+               expectedOutput.add(new StreamRecord<>("30"));
+               expectedOutput.add(new Watermark(30));
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               // now, toggle the task to be idle, and let the watermark 
generator produce some watermarks
+               testHarness.processElement(StreamStatus.IDLE);
+
+               // after this, the operators will throw an exception if they 
are forwarded watermarks anywhere in the chain
+               testHarness.processElement(new 
StreamRecord<>(TriggerableFailOnWatermarkTestOperator.NO_FORWARDED_WATERMARKS_MARKER));
+
+               // NOTE: normally, tasks will not have records to process while 
idle;
+               // we're doing this here only to mimic watermark generating in 
operators
+               testHarness.processElement(new StreamRecord<>("40"), 0, 0);
+               testHarness.processElement(new StreamRecord<>("50"), 0, 0);
+               testHarness.processElement(new StreamRecord<>("60"), 0, 0);
+               testHarness.processElement(new Watermark(65)); // the test will 
fail if any of the operators were forwarded this
+               testHarness.waitForInputProcessing();
+
+               // the 40 - 60 watermarks should not be forwarded, only the 
stream status toggle element and records
+               expectedOutput.add(StreamStatus.IDLE);
+               expectedOutput.add(new 
StreamRecord<>(TriggerableFailOnWatermarkTestOperator.NO_FORWARDED_WATERMARKS_MARKER));
+               expectedOutput.add(new StreamRecord<>("40"));
+               expectedOutput.add(new StreamRecord<>("50"));
+               expectedOutput.add(new StreamRecord<>("60"));
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               // re-toggle the task to be active and see if new watermarks 
are correctly forwarded again
+               testHarness.processElement(StreamStatus.ACTIVE);
+               testHarness.processElement(new 
StreamRecord<>(TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER));
+
+               testHarness.processElement(new StreamRecord<>("70"), 0, 0);
+               testHarness.processElement(new StreamRecord<>("80"), 0, 0);
+               testHarness.processElement(new StreamRecord<>("90"), 0, 0);
+               testHarness.waitForInputProcessing();
+
+               expectedOutput.add(StreamStatus.ACTIVE);
+               expectedOutput.add(new 
StreamRecord<>(TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER));
+               expectedOutput.add(new StreamRecord<>("70"));
+               expectedOutput.add(new Watermark(70));
+               expectedOutput.add(new StreamRecord<>("80"));
+               expectedOutput.add(new Watermark(80));
+               expectedOutput.add(new StreamRecord<>("90"));
+               expectedOutput.add(new Watermark(90));
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               testHarness.endInput();
+
+               testHarness.waitForTaskCompletion();
+
+               List<String> resultElements = 
TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
+               assertEquals(12, resultElements.size());
+       }
+
+       /**
         * This test verifies that checkpoint barriers are correctly forwarded.
         */
        @Test
        public void testCheckpointBarriers() throws Exception {
                final OneInputStreamTask<String, String> mapTask = new 
OneInputStreamTask<String, String>();
                final OneInputStreamTaskTestHarness<String, String> testHarness 
= new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+               testHarness.setupOutputForSingletonOperatorChain();
 
                StreamConfig streamConfig = testHarness.getStreamConfig();
                StreamMap<String, String> mapOperator = new StreamMap<String, 
String>(new IdentityMap());
@@ -269,6 +454,7 @@ public class OneInputStreamTaskTest extends TestLogger {
        public void testOvertakingCheckpointBarriers() throws Exception {
                final OneInputStreamTask<String, String> mapTask = new 
OneInputStreamTask<String, String>();
                final OneInputStreamTaskTestHarness<String, String> testHarness 
= new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+               testHarness.setupOutputForSingletonOperatorChain();
 
                StreamConfig streamConfig = testHarness.getStreamConfig();
                StreamMap<String, String> mapOperator = new StreamMap<String, 
String>(new IdentityMap());
@@ -339,6 +525,8 @@ public class OneInputStreamTaskTest extends TestLogger {
                final Deadline deadline = new FiniteDuration(2, 
TimeUnit.MINUTES).fromNow();
                final OneInputStreamTask<String, String> streamTask = new 
OneInputStreamTask<String, String>();
                final OneInputStreamTaskTestHarness<String, String> testHarness 
= new OneInputStreamTaskTestHarness<String, String>(streamTask, 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+               testHarness.setupOutputForSingletonOperatorChain();
+
                IdentityKeySelector<String> keySelector = new 
IdentityKeySelector<>();
                testHarness.configureForKeyedStream(keySelector, 
BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -658,5 +846,80 @@ public class OneInputStreamTaskTest extends TestLogger {
                        return value;
                }
        }
+
+       /**
+        * A {@link TriggerableFailOnWatermarkTestOperator} that generates 
watermarks.
+        */
+       private static class WatermarkGeneratingTestOperator extends 
TriggerableFailOnWatermarkTestOperator {
+
+               private static final long serialVersionUID = 
-5064871833244157221L;
+
+               private long lastWatermark;
+
+               @Override
+               protected void handleElement(StreamRecord<String> element) {
+                       long timestamp = Long.valueOf(element.getValue());
+                       if (timestamp > lastWatermark) {
+                               output.emitWatermark(new Watermark(timestamp));
+                               lastWatermark = timestamp;
+                       }
+               }
+
+               @Override
+               protected void handleWatermark(Watermark mark) {
+                       if (mark.equals(Watermark.MAX_WATERMARK)) {
+                               output.emitWatermark(mark);
+                               lastWatermark = Long.MAX_VALUE;
+                       }
+               }
+       }
+
+       /**
+        * An operator that can be triggered whether or not to expect 
watermarks forwarded to it, toggled
+        * by letting it process special trigger marker records.
+        *
+        * If it receives a watermark when it's not expecting one, it'll throw 
an exception and fail.
+        */
+       private static class TriggerableFailOnWatermarkTestOperator
+                       extends AbstractStreamOperator<String>
+                       implements OneInputStreamOperator<String, String> {
+
+               private static final long serialVersionUID = 
2048954179291813243L;
+
+               public final static String EXPECT_FORWARDED_WATERMARKS_MARKER = 
"EXPECT_WATERMARKS";
+               public final static String NO_FORWARDED_WATERMARKS_MARKER = 
"NO_WATERMARKS";
+
+               protected boolean expectForwardedWatermarks;
+
+               @Override
+               public void processElement(StreamRecord<String> element) throws 
Exception {
+                       output.collect(element);
+
+                       if 
(element.getValue().equals(EXPECT_FORWARDED_WATERMARKS_MARKER)) {
+                               this.expectForwardedWatermarks = true;
+                       } else if 
(element.getValue().equals(NO_FORWARDED_WATERMARKS_MARKER)) {
+                               this.expectForwardedWatermarks = false;
+                       } else {
+                               handleElement(element);
+                       }
+               }
+
+               @Override
+               public void processWatermark(Watermark mark) throws Exception {
+                       if (!expectForwardedWatermarks) {
+                               throw new Exception("Received a " + mark + ", 
but this operator should not be forwarded watermarks.");
+                       } else {
+                               handleWatermark(mark);
+                       }
+               }
+
+               protected void handleElement(StreamRecord<String> element) {
+                       // do nothing
+               }
+
+               protected void handleWatermark(Watermark mark) {
+                       output.emitWatermark(mark);
+               }
+       }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index dd1fe58..0773699 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -57,6 +57,7 @@ public class SourceStreamTaskTest {
        public void testOpenClose() throws Exception {
                final SourceStreamTask<String, SourceFunction<String>, 
StreamSource<String, SourceFunction<String>>> sourceTask = new 
SourceStreamTask<>();
                final StreamTaskTestHarness<String> testHarness = new 
StreamTaskTestHarness<String>(sourceTask, BasicTypeInfo.STRING_TYPE_INFO);
+               testHarness.setupOutputForSingletonOperatorChain();
 
                StreamConfig streamConfig = testHarness.getStreamConfig();
                StreamSource<String, ?> sourceOperator = new StreamSource<>(new 
OpenCloseTestSource());
@@ -99,6 +100,7 @@ public class SourceStreamTaskTest {
                        final SourceStreamTask<Tuple2<Long, Integer>, 
SourceFunction<Tuple2<Long, Integer>>,
                                StreamSource<Tuple2<Long, Integer>, 
SourceFunction<Tuple2<Long, Integer>>>> sourceTask = new SourceStreamTask<>();
                        final StreamTaskTestHarness<Tuple2<Long, Integer>> 
testHarness = new StreamTaskTestHarness<Tuple2<Long, Integer>>(sourceTask, 
typeInfo);
+                       testHarness.setupOutputForSingletonOperatorChain();
 
                        StreamConfig streamConfig = 
testHarness.getStreamConfig();
                        StreamSource<Tuple2<Long, Integer>, ?> sourceOperator = 
new StreamSource<>(new MockSource(NUM_ELEMENTS, SOURCE_CHECKPOINT_DELAY, 
SOURCE_READ_DELAY));

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index ebe5285..c2d4aaa 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -50,6 +50,7 @@ public class StreamTaskCancellationBarrierTest {
        public void testEmitCancellationBarrierWhenNotReady() throws Exception {
                StreamTask<String, ?> task = new InitBlockingTask();
                StreamTaskTestHarness<String> testHarness = new 
StreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO);
+               testHarness.setupOutputForSingletonOperatorChain();
 
                // start the test - this cannot succeed across the 'init()' 
method
                testHarness.invoke();
@@ -80,6 +81,7 @@ public class StreamTaskCancellationBarrierTest {
                                task,
                                1, 2,
                                BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);
+               testHarness.setupOutputForSingletonOperatorChain();
 
                StreamConfig streamConfig = testHarness.getStreamConfig();
                StreamMap<String, String> mapOperator = new StreamMap<>(new 
IdentityMap());
@@ -124,6 +126,7 @@ public class StreamTaskCancellationBarrierTest {
                TwoInputStreamTaskTestHarness<String, String, String> 
testHarness = new TwoInputStreamTaskTestHarness<>(
                                task,
                                BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+               testHarness.setupOutputForSingletonOperatorChain();
 
                StreamConfig streamConfig = testHarness.getStreamConfig();
                CoStreamMap<String, String, String> op = new CoStreamMap<>(new 
UnionCoMap());

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 8dc6afa..e58bc5a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -106,9 +106,6 @@ public class StreamTaskTestHarness<OUT> {
                this.executionConfig = new ExecutionConfig();
 
                streamConfig = new StreamConfig(taskConfig);
-               streamConfig.setChainStart();
-               streamConfig.setBufferTimeout(0);
-               
streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
 
                outputSerializer = outputType.createSerializer(executionConfig);
                outputStreamRecordSerializer = new 
StreamElementSerializer<OUT>(outputSerializer);
@@ -129,11 +126,25 @@ public class StreamTaskTestHarness<OUT> {
        @SuppressWarnings("unchecked")
        private void initializeOutput() {
                outputList = new ConcurrentLinkedQueue<Object>();
-
                mockEnv.addOutput(outputList, outputStreamRecordSerializer);
+       }
 
+       /**
+        * Users of the test harness can call this utility method to setup the 
stream config
+        * if there will only be a single operator to be tested. The method 
will setup the
+        * outgoing network connection for the operator.
+        *
+        * For more advanced test cases such as testing chains of multiple 
operators with the harness,
+        * please manually configure the stream config.
+        */
+       public void setupOutputForSingletonOperatorChain() {
+               streamConfig.setChainStart();
+               streamConfig.setBufferTimeout(0);
+               
streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
                
streamConfig.setOutputSelectors(Collections.<OutputSelector<?>>emptyList());
                streamConfig.setNumberOfOutputs(1);
+               streamConfig.setTypeSerializerOut(outputSerializer);
+               streamConfig.setVertexID(0);
 
                StreamOperator<OUT> dummyOperator = new 
AbstractStreamOperator<OUT>() {
                        private static final long serialVersionUID = 1L;
@@ -142,13 +153,10 @@ public class StreamTaskTestHarness<OUT> {
                List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
                StreamNode sourceVertexDummy = new StreamNode(null, 0, "group", 
dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), 
SourceStreamTask.class);
                StreamNode targetVertexDummy = new StreamNode(null, 1, "group", 
dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), 
SourceStreamTask.class);
-
                outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, 
targetVertexDummy, 0, new LinkedList<String>(), new 
BroadcastPartitioner<Object>()));
+
                streamConfig.setOutEdgesInOrder(outEdgesInOrder);
                streamConfig.setNonChainedOutputs(outEdgesInOrder);
-               streamConfig.setTypeSerializerOut(outputSerializer);
-               streamConfig.setVertexID(0);
-
        }
 
        public StreamMockEnvironment createEnvironment() {
@@ -330,9 +338,6 @@ public class StreamTaskTestHarness<OUT> {
                                        allEmpty = false;
                                }
                        }
-                       try {
-                               Thread.sleep(10);
-                       } catch (InterruptedException ignored) {}
 
                        if (allEmpty) {
                                break;

http://git-wip-us.apache.org/repos/asf/flink/blob/66305135/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 3cd9c9a..c0a1638 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 
 import org.junit.Assert;
@@ -58,6 +59,7 @@ public class TwoInputStreamTaskTest {
        public void testOpenCloseAndTimestamps() throws Exception {
                final TwoInputStreamTask<String, Integer, String> coMapTask = 
new TwoInputStreamTask<String, Integer, String>();
                final TwoInputStreamTaskTestHarness<String, Integer, String> 
testHarness = new TwoInputStreamTaskTestHarness<String, Integer, 
String>(coMapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);
+               testHarness.setupOutputForSingletonOperatorChain();
 
                StreamConfig streamConfig = testHarness.getStreamConfig();
                CoStreamMap<String, Integer, String> coMapOperator = new 
CoStreamMap<String, Integer, String>(new TestOpenCloseMapFunction());
@@ -89,15 +91,21 @@ public class TwoInputStreamTaskTest {
        }
 
        /**
-        * This test verifies that watermarks are correctly forwarded. This 
also checks whether
+        * This test verifies that watermarks and stream statuses are correctly 
forwarded. This also checks whether
         * watermarks are forwarded only when we have received watermarks from 
all inputs. The
-        * forwarded watermark must be the minimum of the watermarks of all 
inputs.
+        * forwarded watermark must be the minimum of the watermarks of all 
active inputs.
         */
        @Test
        @SuppressWarnings("unchecked")
-       public void testWatermarkForwarding() throws Exception {
+       public void testWatermarkAndStreamStatusForwarding() throws Exception {
                final TwoInputStreamTask<String, Integer, String> coMapTask = 
new TwoInputStreamTask<String, Integer, String>();
-               final TwoInputStreamTaskTestHarness<String, Integer, String> 
testHarness = new TwoInputStreamTaskTestHarness<String, Integer, 
String>(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+               final TwoInputStreamTaskTestHarness<String, Integer, String> 
testHarness =
+                       new TwoInputStreamTaskTestHarness<String, Integer, 
String>(
+                               coMapTask, 2, 2, new int[] {1, 2},
+                               BasicTypeInfo.STRING_TYPE_INFO,
+                               BasicTypeInfo.INT_TYPE_INFO,
+                               BasicTypeInfo.STRING_TYPE_INFO);
+               testHarness.setupOutputForSingletonOperatorChain();
 
                StreamConfig streamConfig = testHarness.getStreamConfig();
                CoStreamMap<String, Integer, String> coMapOperator = new 
CoStreamMap<String, Integer, String>(new IdentityMap());
@@ -147,7 +155,7 @@ public class TwoInputStreamTaskTest {
                TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
 
 
-               // advance watermark from one of the inputs, now we should get 
a now one since the
+               // advance watermark from one of the inputs, now we should get 
a new one since the
                // minimum increases
                testHarness.processElement(new Watermark(initialTime + 4), 1, 
1);
                testHarness.waitForInputProcessing();
@@ -162,6 +170,33 @@ public class TwoInputStreamTaskTest {
                expectedOutput.add(new Watermark(initialTime + 4));
                TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
 
+               // test whether idle input channels are acknowledged correctly 
when forwarding watermarks
+               testHarness.processElement(StreamStatus.IDLE, 0, 1);
+               testHarness.processElement(StreamStatus.IDLE, 1, 0);
+               testHarness.processElement(new Watermark(initialTime + 6), 0, 
0);
+               testHarness.processElement(new Watermark(initialTime + 5), 1, 
1); // this watermark should be advanced first
+               testHarness.processElement(StreamStatus.IDLE, 1, 1); // once 
this is acknowledged,
+                                                                    // 
watermark (initial + 6) should be forwarded
+               testHarness.waitForInputProcessing();
+               expectedOutput.add(new Watermark(initialTime + 5));
+               // We don't expect to see Watermark(6) here because the idle 
status of one
+               // input doesn't propagate to the other input. That is, if 
input 1 is at WM 6 and input
+               // two was at WM 5 before going to IDLE then the output 
watermark will not jump to WM 6.
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               // make all input channels idle and check that the operator's 
idle status is forwarded
+               testHarness.processElement(StreamStatus.IDLE, 0, 0);
+               testHarness.waitForInputProcessing();
+               expectedOutput.add(StreamStatus.IDLE);
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
+               // make some input channels active again and check that the 
operator's active status is forwarded only once
+               testHarness.processElement(StreamStatus.ACTIVE, 1, 0);
+               testHarness.processElement(StreamStatus.ACTIVE, 0, 1);
+               testHarness.waitForInputProcessing();
+               expectedOutput.add(StreamStatus.ACTIVE);
+               TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+
                testHarness.endInput();
 
                testHarness.waitForTaskCompletion();
@@ -178,6 +213,7 @@ public class TwoInputStreamTaskTest {
        public void testCheckpointBarriers() throws Exception {
                final TwoInputStreamTask<String, Integer, String> coMapTask = 
new TwoInputStreamTask<String, Integer, String>();
                final TwoInputStreamTaskTestHarness<String, Integer, String> 
testHarness = new TwoInputStreamTaskTestHarness<String, Integer, 
String>(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+               testHarness.setupOutputForSingletonOperatorChain();
 
                StreamConfig streamConfig = testHarness.getStreamConfig();
                CoStreamMap<String, Integer, String> coMapOperator = new 
CoStreamMap<String, Integer, String>(new IdentityMap());
@@ -258,6 +294,7 @@ public class TwoInputStreamTaskTest {
        public void testOvertakingCheckpointBarriers() throws Exception {
                final TwoInputStreamTask<String, Integer, String> coMapTask = 
new TwoInputStreamTask<String, Integer, String>();
                final TwoInputStreamTaskTestHarness<String, Integer, String> 
testHarness = new TwoInputStreamTaskTestHarness<String, Integer, 
String>(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+               testHarness.setupOutputForSingletonOperatorChain();
 
                StreamConfig streamConfig = testHarness.getStreamConfig();
                CoStreamMap<String, Integer, String> coMapOperator = new 
CoStreamMap<String, Integer, String>(new IdentityMap());

Reply via email to