http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java 
b/samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java
new file mode 100644
index 0000000..cc70b6b
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.control;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.message.EndOfStreamMessage;
+import org.apache.samza.operators.spec.OperatorSpecs;
+import org.apache.samza.operators.spec.OutputOperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class TestEndOfStreamManager {
+  StreamMetadataCache metadataCache;
+
+  @Before
+  public void setup() {
+    SystemStreamMetadata metadata = mock(SystemStreamMetadata.class);
+    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> 
partitionMetadata = new HashMap<>();
+    partitionMetadata.put(new Partition(0), 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
+    partitionMetadata.put(new Partition(1), 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
+    partitionMetadata.put(new Partition(2), 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
+    partitionMetadata.put(new Partition(3), 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
+    
when(metadata.getSystemStreamPartitionMetadata()).thenReturn(partitionMetadata);
+    metadataCache = mock(StreamMetadataCache.class);
+    when(metadataCache.getSystemStreamMetadata(anyObject(), 
anyBoolean())).thenReturn(metadata);
+  }
+
+  @Test
+  public void testUpdateFromInputSource() {
+    SystemStreamPartition ssp = new SystemStreamPartition("test-system", 
"test-stream", new Partition(0));
+    TaskName taskName = new TaskName("Task 0");
+    Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
+    streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName());
+    ControlMessageListenerTask listener = 
mock(ControlMessageListenerTask.class);
+    when(listener.getIOGraph()).thenReturn(new 
IOGraph(Collections.emptyList()));
+    EndOfStreamManager manager = new EndOfStreamManager("Task 0", listener, 
streamToTasks, Collections.singleton(ssp), null, null);
+    manager.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssp), 
mock(TaskCoordinator.class));
+    assertTrue(manager.isEndOfStream(ssp.getSystemStream()));
+  }
+
+  @Test
+  public void testUpdateFromIntermediateStream() {
+    SystemStreamPartition[] ssps = new SystemStreamPartition[3];
+    ssps[0] = new SystemStreamPartition("test-system", "test-stream-1", new 
Partition(0));
+    ssps[1] = new SystemStreamPartition("test-system", "test-stream-2", new 
Partition(0));
+    ssps[2] = new SystemStreamPartition("test-system", "test-stream-2", new 
Partition(1));
+
+    TaskName taskName = new TaskName("Task 0");
+    Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
+    for (SystemStreamPartition ssp : ssps) {
+      streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName());
+    }
+    ControlMessageListenerTask listener = 
mock(ControlMessageListenerTask.class);
+    when(listener.getIOGraph()).thenReturn(new 
IOGraph(Collections.emptyList()));
+    EndOfStreamManager manager = new EndOfStreamManager("Task 0", listener, 
streamToTasks, new HashSet<>(Arrays.asList(ssps)), null, null);
+
+    int envelopeCount = 4;
+    IncomingMessageEnvelope[] envelopes = new 
IncomingMessageEnvelope[envelopeCount];
+    for (int i = 0; i < envelopeCount; i++) {
+      envelopes[i] = new IncomingMessageEnvelope(ssps[0], "dummy-offset", "", 
new EndOfStreamMessage("task " + i, envelopeCount));
+    }
+    TaskCoordinator coordinator = mock(TaskCoordinator.class);
+
+    // verify the first three messages won't result in end-of-stream
+    for (int i = 0; i < 3; i++) {
+      manager.update(envelopes[i], coordinator);
+      assertFalse(manager.isEndOfStream(ssps[0].getSystemStream()));
+    }
+    // the fourth message will end the stream
+    manager.update(envelopes[3], coordinator);
+    assertTrue(manager.isEndOfStream(ssps[0].getSystemStream()));
+    assertFalse(manager.isEndOfStream(ssps[1].getSystemStream()));
+
+    // stream2 has two partitions assigned to this task, so it requires a 
message from each partition to end it
+    envelopes = new IncomingMessageEnvelope[envelopeCount];
+    for (int i = 0; i < envelopeCount; i++) {
+      envelopes[i] = new IncomingMessageEnvelope(ssps[1], "dummy-offset", 
"dummy-key", new EndOfStreamMessage("task " + i, envelopeCount));
+    }
+    // verify the messages for the partition 0 won't result in end-of-stream
+    for (int i = 0; i < 4; i++) {
+      manager.update(envelopes[i], coordinator);
+      assertFalse(manager.isEndOfStream(ssps[1].getSystemStream()));
+    }
+    for (int i = 0; i < envelopeCount; i++) {
+      envelopes[i] = new IncomingMessageEnvelope(ssps[2], "dummy-offset", 
"dummy-key", new EndOfStreamMessage("task " + i, envelopeCount));
+    }
+    for (int i = 0; i < 3; i++) {
+      manager.update(envelopes[i], coordinator);
+      assertFalse(manager.isEndOfStream(ssps[1].getSystemStream()));
+    }
+    // the fourth message will end the stream
+    manager.update(envelopes[3], coordinator);
+    assertTrue(manager.isEndOfStream(ssps[1].getSystemStream()));
+  }
+
+  @Test
+  public void testUpdateFromIntermediateStreamWith2Tasks() {
+    SystemStreamPartition[] ssps0 = new SystemStreamPartition[2];
+    ssps0[0] = new SystemStreamPartition("test-system", "test-stream-1", new 
Partition(0));
+    ssps0[1] = new SystemStreamPartition("test-system", "test-stream-2", new 
Partition(0));
+
+    SystemStreamPartition ssp1 = new SystemStreamPartition("test-system", 
"test-stream-2", new Partition(1));
+
+    TaskName t0 = new TaskName("Task 0");
+    Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
+    for (SystemStreamPartition ssp : ssps0) {
+      streamToTasks.put(ssp.getSystemStream(), t0.getTaskName());
+    }
+
+    TaskName t1 = new TaskName("Task 1");
+    streamToTasks.put(ssp1, t1.getTaskName());
+
+    List<StreamSpec> inputs = new ArrayList<>();
+    inputs.add(new StreamSpec("test-stream-1", "test-stream-1", 
"test-system"));
+    inputs.add(new StreamSpec("test-stream-2", "test-stream-2", 
"test-system"));
+    StreamSpec outputSpec = new StreamSpec("int-stream", "int-stream", 
"test-system");
+    IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(inputs, outputSpec, true);
+
+    ControlMessageListenerTask listener = 
mock(ControlMessageListenerTask.class);
+    when(listener.getIOGraph()).thenReturn(ioGraph);
+
+    EndOfStreamManager manager0 = spy(new EndOfStreamManager("Task 0", 
listener, streamToTasks, new HashSet<>(Arrays.asList(ssps0)), null, null));
+    manager0.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps0[0]), 
mock(TaskCoordinator.class));
+    assertTrue(manager0.isEndOfStream(ssps0[0].getSystemStream()));
+    doNothing().when(manager0).sendEndOfStream(any(), anyInt());
+    manager0.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps0[1]), 
mock(TaskCoordinator.class));
+    assertTrue(manager0.isEndOfStream(ssps0[1].getSystemStream()));
+    verify(manager0).sendEndOfStream(any(), anyInt());
+
+    EndOfStreamManager manager1 = spy(new EndOfStreamManager("Task 1", 
listener, streamToTasks, Collections.singleton(
+        ssp1), null, null));
+    doNothing().when(manager1).sendEndOfStream(any(), anyInt());
+    manager1.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssp1), 
mock(TaskCoordinator.class));
+    assertTrue(manager1.isEndOfStream(ssp1.getSystemStream()));
+    verify(manager1).sendEndOfStream(any(), anyInt());
+  }
+
+  @Test
+  public void testSendEndOfStream() {
+    StreamSpec ints = new StreamSpec("int-stream", "int-stream", 
"test-system");
+    StreamSpec input = new StreamSpec("input-stream", "input-stream", 
"test-system");
+    IOGraph ioGraph = 
TestIOGraph.buildSimpleIOGraph(Collections.singletonList(input), ints, true);
+
+    Multimap<SystemStream, String> inputToTasks = HashMultimap.create();
+    for (int i = 0; i < 8; i++) {
+      inputToTasks.put(input.toSystemStream(), "Task " + i);
+    }
+
+    MessageCollector collector = mock(MessageCollector.class);
+    TaskName taskName = new TaskName("Task 0");
+    ControlMessageListenerTask listener = 
mock(ControlMessageListenerTask.class);
+    when(listener.getIOGraph()).thenReturn(ioGraph);
+    EndOfStreamManager manager = new EndOfStreamManager(taskName.getTaskName(),
+        listener,
+        inputToTasks,
+        Collections.EMPTY_SET,
+        metadataCache,
+        collector);
+
+    Set<Integer> partitions = new HashSet<>();
+    doAnswer(invocation -> {
+        OutgoingMessageEnvelope envelope = (OutgoingMessageEnvelope) 
invocation.getArguments()[0];
+        partitions.add((Integer) envelope.getPartitionKey());
+        EndOfStreamMessage eosMessage = (EndOfStreamMessage) 
envelope.getMessage();
+        assertEquals(eosMessage.getTaskName(), taskName.getTaskName());
+        assertEquals(eosMessage.getTaskCount(), 8);
+        return null;
+      }).when(collector).send(any());
+
+    manager.sendEndOfStream(input.toSystemStream(), 8);
+    assertEquals(partitions.size(), 4);
+  }
+
+  @Test
+  public void testPropagate() {
+    List<StreamSpec> inputs = new ArrayList<>();
+    inputs.add(new StreamSpec("input-stream-1", "input-stream-1", 
"test-system"));
+    inputs.add(new StreamSpec("input-stream-2", "input-stream-2", 
"test-system"));
+    StreamSpec outputSpec = new StreamSpec("int-stream", "int-stream", 
"test-system");
+
+    SystemStream input1 = new SystemStream("test-system", "input-stream-1");
+    SystemStream input2 = new SystemStream("test-system", "input-stream-2");
+    SystemStream ints = new SystemStream("test-system", "int-stream");
+    SystemStreamPartition[] ssps = new SystemStreamPartition[3];
+    ssps[0] = new SystemStreamPartition(input1, new Partition(0));
+    ssps[1] = new SystemStreamPartition(input2, new Partition(0));
+    ssps[2] = new SystemStreamPartition(ints, new Partition(0));
+
+    Set<SystemStreamPartition> sspSet = new HashSet<>(Arrays.asList(ssps));
+    TaskName taskName = new TaskName("task 0");
+    Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
+    for (SystemStreamPartition ssp : ssps) {
+      streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName());
+    }
+
+    IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(inputs, outputSpec, true);
+    MessageCollector collector = mock(MessageCollector.class);
+    ControlMessageListenerTask listener = 
mock(ControlMessageListenerTask.class);
+    when(listener.getIOGraph()).thenReturn(ioGraph);
+    EndOfStreamManager manager = spy(
+        new EndOfStreamManager("task 0", listener, streamToTasks, sspSet, 
metadataCache, collector));
+    TaskCoordinator coordinator = mock(TaskCoordinator.class);
+
+    // ssp1 end-of-stream, wait for ssp2
+    manager.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps[0]), 
coordinator);
+    verify(manager, never()).sendEndOfStream(any(), anyInt());
+
+    // ssp2 end-of-stream, propagate to intermediate
+    manager.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps[1]), 
coordinator);
+    doNothing().when(manager).sendEndOfStream(any(), anyInt());
+    ArgumentCaptor<SystemStream> argument = 
ArgumentCaptor.forClass(SystemStream.class);
+    verify(manager).sendEndOfStream(argument.capture(), anyInt());
+    assertEquals(ints, argument.getValue());
+
+    // intermediate end-of-stream, shutdown the task
+    manager.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps[2]), 
coordinator);
+    doNothing().when(coordinator).shutdown(any());
+    ArgumentCaptor<TaskCoordinator.RequestScope> arg = 
ArgumentCaptor.forClass(TaskCoordinator.RequestScope.class);
+    verify(coordinator).shutdown(arg.capture());
+    assertEquals(TaskCoordinator.RequestScope.CURRENT_TASK, arg.getValue());
+  }
+
+  //  Test the case when the publishing tasks to intermediate stream is a 
subset of total tasks
+  @Test
+  public void testPropogateWith2Tasks() {
+    StreamSpec outputSpec = new StreamSpec("int-stream", "int-stream", 
"test-system");
+    OutputStreamImpl outputStream = new OutputStreamImpl(outputSpec, null, 
null);
+    OutputOperatorSpec partitionByOp = 
OperatorSpecs.createPartitionByOperatorSpec(outputStream, 0);
+
+    List<StreamSpec> inputs = new ArrayList<>();
+    inputs.add(new StreamSpec("input-stream-1", "input-stream-1", 
"test-system"));
+
+    IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(inputs, outputSpec, true);
+
+    SystemStream input1 = new SystemStream("test-system", "input-stream-1");
+    SystemStream ints = new SystemStream("test-system", "int-stream");
+    SystemStreamPartition ssp1 = new SystemStreamPartition(input1, new 
Partition(0));
+    SystemStreamPartition ssp2 = new SystemStreamPartition(ints, new 
Partition(0));
+
+    TaskName t0 = new TaskName("task 0");
+    TaskName t1 = new TaskName("task 1");
+    Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
+    streamToTasks.put(ssp1.getSystemStream(), t0.getTaskName());
+    streamToTasks.put(ssp2.getSystemStream(), t1.getTaskName());
+
+    ControlMessageListenerTask listener = 
mock(ControlMessageListenerTask.class);
+    when(listener.getIOGraph()).thenReturn(ioGraph);
+
+    EndOfStreamManager manager0 = spy(
+        new EndOfStreamManager(t0.getTaskName(), listener, streamToTasks, 
Collections.singleton(ssp1), metadataCache, null));
+    EndOfStreamManager manager1 = spy(
+        new EndOfStreamManager(t1.getTaskName(), listener, streamToTasks, 
Collections.singleton(ssp2), metadataCache, null));
+
+    TaskCoordinator coordinator0 = mock(TaskCoordinator.class);
+    TaskCoordinator coordinator1 = mock(TaskCoordinator.class);
+
+    // ssp1 end-of-stream
+    doNothing().when(manager0).sendEndOfStream(any(), anyInt());
+    doNothing().when(coordinator0).shutdown(any());
+    manager0.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssp1), 
coordinator0);
+    //verify task count is 1
+    ArgumentCaptor<Integer> argument = ArgumentCaptor.forClass(Integer.class);
+    verify(manager0).sendEndOfStream(any(), argument.capture());
+    assertTrue(argument.getValue() == 1);
+    ArgumentCaptor<TaskCoordinator.RequestScope> arg = 
ArgumentCaptor.forClass(TaskCoordinator.RequestScope.class);
+    verify(coordinator0).shutdown(arg.capture());
+    assertEquals(TaskCoordinator.RequestScope.CURRENT_TASK, arg.getValue());
+
+    // int1 end-of-stream
+    IncomingMessageEnvelope intEos = new IncomingMessageEnvelope(ssp2, null, 
null, new EndOfStreamMessage(t0.getTaskName(), 1));
+    manager1.update(intEos, coordinator1);
+    doNothing().when(coordinator1).shutdown(any());
+    verify(manager1, never()).sendEndOfStream(any(), anyInt());
+    arg = ArgumentCaptor.forClass(TaskCoordinator.RequestScope.class);
+    verify(coordinator1).shutdown(arg.capture());
+    assertEquals(TaskCoordinator.RequestScope.CURRENT_TASK, arg.getValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java 
b/samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java
new file mode 100644
index 0000000..39c56c3
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.control;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.control.IOGraph.IONode;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.system.StreamSpec;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestIOGraph {
+  StreamSpec input1;
+  StreamSpec input2;
+  StreamSpec input3;
+  StreamSpec output1;
+  StreamSpec output2;
+  StreamSpec int1;
+  StreamSpec int2;
+
+  StreamGraphImpl streamGraph;
+
+  @Before
+  public void setup() {
+    ApplicationRunner runner = mock(ApplicationRunner.class);
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(JobConfig.JOB_NAME(), "test-app");
+    configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system");
+    Config config = new MapConfig(configMap);
+
+    /**
+     * the graph looks like the following. number of partitions in 
parentheses. quotes indicate expected value.
+     *
+     *                                    input1 -> map -> join -> output1
+     *                                                       |
+     *                      input2 -> partitionBy -> filter -|
+     *                                                       |
+     *           input3 -> filter -> partitionBy -> map -> join -> output2
+     *
+     */
+    input1 = new StreamSpec("input1", "input1", "system1");
+    input2 = new StreamSpec("input2", "input2", "system2");
+    input3 = new StreamSpec("input3", "input3", "system2");
+
+    output1 = new StreamSpec("output1", "output1", "system1");
+    output2 = new StreamSpec("output2", "output2", "system2");
+
+    runner = mock(ApplicationRunner.class);
+    when(runner.getStreamSpec("input1")).thenReturn(input1);
+    when(runner.getStreamSpec("input2")).thenReturn(input2);
+    when(runner.getStreamSpec("input3")).thenReturn(input3);
+    when(runner.getStreamSpec("output1")).thenReturn(output1);
+    when(runner.getStreamSpec("output2")).thenReturn(output2);
+
+    // intermediate streams used in tests
+    int1 = new StreamSpec("test-app-1-partition_by-3", 
"test-app-1-partition_by-3", "default-system");
+    int2 = new StreamSpec("test-app-1-partition_by-8", 
"test-app-1-partition_by-8", "default-system");
+    when(runner.getStreamSpec("test-app-1-partition_by-3"))
+        .thenReturn(int1);
+    when(runner.getStreamSpec("test-app-1-partition_by-8"))
+        .thenReturn(int2);
+
+    streamGraph = new StreamGraphImpl(runner, config);
+    BiFunction msgBuilder = mock(BiFunction.class);
+    MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m 
-> m);
+    MessageStream m2 = streamGraph.getInputStream("input2", 
msgBuilder).partitionBy(m -> "haha").filter(m -> true);
+    MessageStream m3 = streamGraph.getInputStream("input3", 
msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
+    Function mockFn = mock(Function.class);
+    OutputStream<Object, Object, Object> om1 = 
streamGraph.getOutputStream("output1", mockFn, mockFn);
+    OutputStream<Object, Object, Object> om2 = 
streamGraph.getOutputStream("output2", mockFn, mockFn);
+
+    m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(om1);
+    m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(om2);
+  }
+
+  @Test
+  public void testBuildIOGraph() {
+    IOGraph ioGraph = streamGraph.toIOGraph();
+    assertEquals(ioGraph.getNodes().size(), 4);
+
+    for (IONode node : ioGraph.getNodes()) {
+      if (node.getOutput().equals(output1)) {
+        assertEquals(node.getInputs().size(), 2);
+        assertFalse(node.isOutputIntermediate());
+        StreamSpec[] inputs = sort(node.getInputs());
+        assertEquals(inputs[0], input1);
+        assertEquals(inputs[1], int1);
+      } else if (node.getOutput().equals(output2)) {
+        assertEquals(node.getInputs().size(), 2);
+        assertFalse(node.isOutputIntermediate());
+        StreamSpec[] inputs = sort(node.getInputs());
+        assertEquals(inputs[0], int1);
+        assertEquals(inputs[1], int2);
+      } else if (node.getOutput().equals(int1)) {
+        assertEquals(node.getInputs().size(), 1);
+        assertTrue(node.isOutputIntermediate());
+        StreamSpec[] inputs = sort(node.getInputs());
+        assertEquals(inputs[0], input2);
+      } else if (node.getOutput().equals(int2)) {
+        assertEquals(node.getInputs().size(), 1);
+        assertTrue(node.isOutputIntermediate());
+        StreamSpec[] inputs = sort(node.getInputs());
+        assertEquals(inputs[0], input3);
+      }
+    }
+  }
+
+  @Test
+  public void testNodesOfInput() {
+    IOGraph ioGraph = streamGraph.toIOGraph();
+    Collection<IONode> nodes = 
ioGraph.getNodesOfInput(input1.toSystemStream());
+    assertEquals(nodes.size(), 1);
+    IONode node = nodes.iterator().next();
+    assertEquals(node.getOutput(), output1);
+    assertEquals(node.getInputs().size(), 2);
+    assertFalse(node.isOutputIntermediate());
+
+    nodes = ioGraph.getNodesOfInput(input2.toSystemStream());
+    assertEquals(nodes.size(), 1);
+    node = nodes.iterator().next();
+    assertEquals(node.getOutput(), int1);
+    assertEquals(node.getInputs().size(), 1);
+    assertTrue(node.isOutputIntermediate());
+
+    nodes = ioGraph.getNodesOfInput(int1.toSystemStream());
+    assertEquals(nodes.size(), 2);
+    nodes.forEach(n -> {
+        assertEquals(n.getInputs().size(), 2);
+      });
+
+    nodes = ioGraph.getNodesOfInput(input3.toSystemStream());
+    assertEquals(nodes.size(), 1);
+    node = nodes.iterator().next();
+    assertEquals(node.getOutput(), int2);
+    assertEquals(node.getInputs().size(), 1);
+    assertTrue(node.isOutputIntermediate());
+
+    nodes = ioGraph.getNodesOfInput(int2.toSystemStream());
+    assertEquals(nodes.size(), 1);
+    node = nodes.iterator().next();
+    assertEquals(node.getOutput(), output2);
+    assertEquals(node.getInputs().size(), 2);
+    assertFalse(node.isOutputIntermediate());
+  }
+
+  private static StreamSpec[] sort(Set<StreamSpec> specs) {
+    StreamSpec[] array = new StreamSpec[specs.size()];
+    specs.toArray(array);
+    Arrays.sort(array, (s1, s2) -> s1.getId().compareTo(s2.getId()));
+    return array;
+  }
+
+  public static IOGraph buildSimpleIOGraph(List<StreamSpec> inputs,
+      StreamSpec output,
+      boolean isOutputIntermediate) {
+    IONode node = new IONode(output, isOutputIntermediate);
+    inputs.forEach(input -> node.addInput(input));
+    return new IOGraph(Collections.singleton(node));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-core/src/test/java/org/apache/samza/control/TestWatermarkManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/control/TestWatermarkManager.java 
b/samza-core/src/test/java/org/apache/samza/control/TestWatermarkManager.java
new file mode 100644
index 0000000..8fe7a16
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/control/TestWatermarkManager.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.control;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.message.WatermarkMessage;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class TestWatermarkManager {
+
+  StreamMetadataCache metadataCache;
+
+  @Before
+  public void setup() {
+    SystemStreamMetadata metadata = mock(SystemStreamMetadata.class);
+    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> 
partitionMetadata = new HashMap<>();
+    partitionMetadata.put(new Partition(0), 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
+    partitionMetadata.put(new Partition(1), 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
+    partitionMetadata.put(new Partition(2), 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
+    partitionMetadata.put(new Partition(3), 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
+    
when(metadata.getSystemStreamPartitionMetadata()).thenReturn(partitionMetadata);
+    metadataCache = mock(StreamMetadataCache.class);
+    when(metadataCache.getSystemStreamMetadata(anyObject(), 
anyBoolean())).thenReturn(metadata);
+  }
+
+  @Test
+  public void testUpdateFromInputSource() {
+    SystemStreamPartition ssp = new SystemStreamPartition("test-system", 
"test-stream", new Partition(0));
+    TaskName taskName = new TaskName("Task 0");
+    Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
+    streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName());
+    ControlMessageListenerTask listener = 
mock(ControlMessageListenerTask.class);
+    when(listener.getIOGraph()).thenReturn(new 
IOGraph(Collections.emptyList()));
+    WatermarkManager manager = new WatermarkManager("Task 0", listener, 
streamToTasks, Collections.singleton(ssp), null, null);
+    long time = System.currentTimeMillis();
+    Watermark watermark = 
manager.update(WatermarkManager.buildWatermarkEnvelope(time, ssp));
+    assertEquals(watermark.getTimestamp(), time);
+  }
+
+  @Test
+  public void testUpdateFromIntermediateStream() {
+    SystemStreamPartition[] ssps = new SystemStreamPartition[3];
+    ssps[0] = new SystemStreamPartition("test-system", "test-stream-1", new 
Partition(0));
+    ssps[1] = new SystemStreamPartition("test-system", "test-stream-2", new 
Partition(0));
+    ssps[2] = new SystemStreamPartition("test-system", "test-stream-2", new 
Partition(1));
+
+    TaskName taskName = new TaskName("Task 0");
+    Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
+    for (SystemStreamPartition ssp : ssps) {
+      streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName());
+    }
+    ControlMessageListenerTask listener = 
mock(ControlMessageListenerTask.class);
+    when(listener.getIOGraph()).thenReturn(new 
IOGraph(Collections.emptyList()));
+
+    WatermarkManager manager = new WatermarkManager("Task 0", listener, 
streamToTasks, new HashSet<>(Arrays.asList(ssps)), null, null);
+    int envelopeCount = 4;
+    IncomingMessageEnvelope[] envelopes = new 
IncomingMessageEnvelope[envelopeCount];
+
+    long[] time = {300L, 200L, 100L, 400L};
+    for (int i = 0; i < envelopeCount; i++) {
+      envelopes[i] = new IncomingMessageEnvelope(ssps[0], "dummy-offset", "", 
new WatermarkMessage(time[i], "task " + i, envelopeCount));
+    }
+    for (int i = 0; i < 3; i++) {
+      assertNull(manager.update(envelopes[i]));
+    }
+    // verify the first three messages won't result in end-of-stream
+    assertEquals(manager.getWatermarkTime(ssps[0]), 
WatermarkManager.TIME_NOT_EXIST);
+    // the fourth message will generate a watermark
+    Watermark watermark = manager.update(envelopes[3]);
+    assertNotNull(watermark);
+    assertEquals(watermark.getTimestamp(), 100);
+    assertEquals(manager.getWatermarkTime(ssps[1]), 
WatermarkManager.TIME_NOT_EXIST);
+    assertEquals(manager.getWatermarkTime(ssps[2]), 
WatermarkManager.TIME_NOT_EXIST);
+
+
+    // stream2 has two partitions assigned to this task, so it requires a 
message from each partition to calculate watermarks
+    long[] time1 = {300L, 200L, 100L, 400L};
+    envelopes = new IncomingMessageEnvelope[envelopeCount];
+    for (int i = 0; i < envelopeCount; i++) {
+      envelopes[i] = new IncomingMessageEnvelope(ssps[1], "dummy-offset", "", 
new WatermarkMessage(time1[i], "task " + i, envelopeCount));
+    }
+    // verify the messages for the partition 0 won't generate watermark
+    for (int i = 0; i < 4; i++) {
+      assertNull(manager.update(envelopes[i]));
+    }
+    assertEquals(manager.getWatermarkTime(ssps[1]), 100L);
+
+    long[] time2 = {350L, 150L, 500L, 80L};
+    for (int i = 0; i < envelopeCount; i++) {
+      envelopes[i] = new IncomingMessageEnvelope(ssps[2], "dummy-offset", "", 
new WatermarkMessage(time2[i], "task " + i, envelopeCount));
+    }
+    for (int i = 0; i < 3; i++) {
+      assertNull(manager.update(envelopes[i]));
+    }
+    assertEquals(manager.getWatermarkTime(ssps[2]), 
WatermarkManager.TIME_NOT_EXIST);
+    // the fourth message will generate the watermark
+    watermark = manager.update(envelopes[3]);
+    assertNotNull(watermark);
+    assertEquals(manager.getWatermarkTime(ssps[2]), 80L);
+    assertEquals(watermark.getTimestamp(), 80L);
+  }
+
+  @Test
+  public void testSendWatermark() {
+    SystemStream ints = new SystemStream("test-system", "int-stream");
+    SystemStreamMetadata metadata = mock(SystemStreamMetadata.class);
+    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> 
partitionMetadata = new HashMap<>();
+    partitionMetadata.put(new Partition(0), 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
+    partitionMetadata.put(new Partition(1), 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
+    partitionMetadata.put(new Partition(2), 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
+    partitionMetadata.put(new Partition(3), 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
+    
when(metadata.getSystemStreamPartitionMetadata()).thenReturn(partitionMetadata);
+    StreamMetadataCache metadataCache = mock(StreamMetadataCache.class);
+    when(metadataCache.getSystemStreamMetadata(anyObject(), 
anyBoolean())).thenReturn(metadata);
+
+    MessageCollector collector = mock(MessageCollector.class);
+    ControlMessageListenerTask listener = 
mock(ControlMessageListenerTask.class);
+    when(listener.getIOGraph()).thenReturn(new 
IOGraph(Collections.emptyList()));
+
+    WatermarkManager manager = new WatermarkManager("task 0",
+        listener,
+        HashMultimap.create(),
+        Collections.EMPTY_SET,
+        metadataCache,
+        collector);
+
+    long time = System.currentTimeMillis();
+    Set<Integer> partitions = new HashSet<>();
+    doAnswer(invocation -> {
+        OutgoingMessageEnvelope envelope = (OutgoingMessageEnvelope) 
invocation.getArguments()[0];
+        partitions.add((Integer) envelope.getPartitionKey());
+        WatermarkMessage watermarkMessage = (WatermarkMessage) 
envelope.getMessage();
+        assertEquals(watermarkMessage.getTaskName(), "task 0");
+        assertEquals(watermarkMessage.getTaskCount(), 8);
+        assertEquals(watermarkMessage.getTimestamp(), time);
+        return null;
+      }).when(collector).send(any());
+
+    manager.sendWatermark(time, ints, 8);
+    assertEquals(partitions.size(), 4);
+  }
+
+  @Test
+  public void testPropagate() {
+    StreamSpec outputSpec = new StreamSpec("int-stream", "int-stream", 
"test-system");
+    List<StreamSpec> inputs = new ArrayList<>();
+    inputs.add(new StreamSpec("input-stream-1", "input-stream-1", 
"test-system"));
+    inputs.add(new StreamSpec("input-stream-2", "input-stream-2", 
"test-system"));
+
+    IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(inputs, outputSpec, true);
+
+    SystemStream input1 = new SystemStream("test-system", "input-stream-1");
+    SystemStream input2 = new SystemStream("test-system", "input-stream-2");
+    SystemStream input3 = new SystemStream("test-system", "input-stream-3");
+    SystemStream ints = new SystemStream("test-system", "int-stream");
+    SystemStreamPartition[] ssps0 = new SystemStreamPartition[3];
+    ssps0[0] = new SystemStreamPartition(input1, new Partition(0));
+    ssps0[1] = new SystemStreamPartition(input2, new Partition(0));
+    ssps0[2] = new SystemStreamPartition(ints, new Partition(0));
+
+    SystemStreamPartition[] ssps1 = new SystemStreamPartition[4];
+    ssps1[0] = new SystemStreamPartition(input1, new Partition(1));
+    ssps1[1] = new SystemStreamPartition(input2, new Partition(1));
+    ssps1[2] = new SystemStreamPartition(input3, new Partition(1));
+    ssps1[3] = new SystemStreamPartition(ints, new Partition(1));
+
+    SystemStreamPartition[] ssps2 = new SystemStreamPartition[2];
+    ssps2[0] = new SystemStreamPartition(input3, new Partition(2));
+    ssps2[1] = new SystemStreamPartition(ints, new Partition(2));
+
+
+    TaskName t0 = new TaskName("task 0"); //consume input1 and input2
+    TaskName t1 = new TaskName("task 1"); //consume input 1 and input2 and 
input 3
+    TaskName t2 = new TaskName("task 2"); //consume input2 and input 3
+    Multimap<SystemStream, String> inputToTasks = HashMultimap.create();
+    for (SystemStreamPartition ssp : ssps0) {
+      inputToTasks.put(ssp.getSystemStream(), t0.getTaskName());
+    }
+    for (SystemStreamPartition ssp : ssps1) {
+      inputToTasks.put(ssp.getSystemStream(), t1.getTaskName());
+    }
+    for (SystemStreamPartition ssp : ssps2) {
+      inputToTasks.put(ssp.getSystemStream(), t2.getTaskName());
+    }
+
+    ControlMessageListenerTask listener = 
mock(ControlMessageListenerTask.class);
+    when(listener.getIOGraph()).thenReturn(ioGraph);
+    WatermarkManager manager = spy(
+        new WatermarkManager(t0.getTaskName(), listener, inputToTasks, new 
HashSet<>(Arrays.asList(ssps0)), null, null));
+
+    IncomingMessageEnvelope envelope = 
WatermarkManager.buildWatermarkEnvelope(System.currentTimeMillis(), ssps0[0]);
+    doNothing().when(manager).sendWatermark(anyLong(), any(), anyInt());
+    Watermark watermark = manager.update(envelope);
+    assertNotNull(watermark);
+    long time = System.currentTimeMillis();
+    Watermark updatedWatermark = watermark.copyWithTimestamp(time);
+    updatedWatermark.propagate(ints);
+    ArgumentCaptor<Long> arg1 = ArgumentCaptor.forClass(Long.class);
+    ArgumentCaptor<SystemStream> arg2 = 
ArgumentCaptor.forClass(SystemStream.class);
+    ArgumentCaptor<Integer> arg3 = ArgumentCaptor.forClass(Integer.class);
+    verify(manager).sendWatermark(arg1.capture(), arg2.capture(), 
arg3.capture());
+    assertEquals(arg1.getValue().longValue(), time);
+    assertEquals(arg2.getValue(), ints);
+    assertEquals(arg3.getValue().intValue(), 2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index d50d271..3ae8f5b 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.operators.impl;
 
+import java.util.Set;
 import org.apache.samza.config.Config;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistry;
@@ -210,5 +211,22 @@ public class TestOperatorImpl {
      super(OpCode.INPUT, 1);
     }
   }
+
+  public static Set<OperatorImpl> getNextOperators(OperatorImpl op) {
+    return op.registeredOperators;
+  }
+
+  public static OperatorSpec.OpCode getOpCode(OperatorImpl op) {
+    return op.getOperatorSpec().getOpCode();
+  }
+
+  public static long getInputWatermark(OperatorImpl op) {
+    return op.getInputWatermarkTime();
+  }
+
+  public static long getOutputWatermark(OperatorImpl op) {
+    return op.getOutputWatermarkTime();
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java 
b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
index 6a8d765..fc1259c 100644
--- 
a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
+++ 
b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
@@ -88,9 +88,7 @@ public class TestStreamProcessor {
     }
 
     @Override
-    SamzaContainer createSamzaContainer(
-        ContainerModel containerModel,
-        int maxChangelogStreamPartitions) {
+    SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) 
{
       if (container == null) {
         RunLoop mockRunLoop = mock(RunLoop.class);
         doAnswer(invocation ->

http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index a04bd3b..4be4e73 100644
--- 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.runtime;
 
+import java.util.Set;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.JobConfig;
@@ -330,7 +331,6 @@ public class TestLocalApplicationRunner {
         return null;
       }).when(sp).start();
 
-
     LocalApplicationRunner spy = spy(runner);
     doReturn(sp).when(spy).createStreamProcessor(anyObject(), anyObject(), 
captor.capture());
 
@@ -343,4 +343,8 @@ public class TestLocalApplicationRunner {
     assertEquals(spy.status(app), ApplicationStatus.UnsuccessfulFinish);
   }
 
+  public static Set<StreamProcessor> getProcessors(LocalApplicationRunner 
runner) {
+    return runner.getProcessors();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
 
b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
index 5b76bba..7192525 100644
--- 
a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
+++ 
b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
@@ -26,7 +26,7 @@ import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import org.apache.samza.message.EndOfStreamMessage;
 import org.apache.samza.message.WatermarkMessage;
-import org.apache.samza.message.MessageType;
+import org.apache.samza.message.IntermediateMessageType;
 import org.apache.samza.serializers.IntermediateMessageSerde;
 import org.apache.samza.serializers.Serde;
 import org.junit.Test;
@@ -96,7 +96,7 @@ public class TestIntermediateMessageSerde {
     TestUserMessage userMessage = new TestUserMessage(msg, 0, 
System.currentTimeMillis());
     byte[] bytes = imserde.toBytes(userMessage);
     TestUserMessage de = (TestUserMessage) imserde.fromBytes(bytes);
-    assertEquals(MessageType.of(de), MessageType.USER_MESSAGE);
+    assertEquals(IntermediateMessageType.of(de), 
IntermediateMessageType.USER_MESSAGE);
     assertEquals(de.getMessage(), msg);
     assertEquals(de.getOffset(), 0);
     assertTrue(de.getTimestamp() > 0);
@@ -109,7 +109,7 @@ public class TestIntermediateMessageSerde {
     WatermarkMessage watermark = new 
WatermarkMessage(System.currentTimeMillis(), taskName, 8);
     byte[] bytes = imserde.toBytes(watermark);
     WatermarkMessage de = (WatermarkMessage) imserde.fromBytes(bytes);
-    assertEquals(MessageType.of(de), MessageType.WATERMARK);
+    assertEquals(IntermediateMessageType.of(de), 
IntermediateMessageType.WATERMARK_MESSAGE);
     assertEquals(de.getTaskName(), taskName);
     assertEquals(de.getTaskCount(), 8);
     assertTrue(de.getTimestamp() > 0);
@@ -123,7 +123,7 @@ public class TestIntermediateMessageSerde {
     EndOfStreamMessage eos = new EndOfStreamMessage(taskName, 8);
     byte[] bytes = imserde.toBytes(eos);
     EndOfStreamMessage de = (EndOfStreamMessage) imserde.fromBytes(bytes);
-    assertEquals(MessageType.of(de), MessageType.END_OF_STREAM);
+    assertEquals(IntermediateMessageType.of(de), 
IntermediateMessageType.END_OF_STREAM_MESSAGE);
     assertEquals(de.getTaskName(), taskName);
     assertEquals(de.getTaskCount(), 8);
     assertEquals(de.getVersion(), 1);

http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index 1afc26a..03931f1 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -39,6 +39,7 @@ import org.apache.samza.container.TaskInstance;
 import org.apache.samza.container.TaskInstanceExceptionHandler;
 import org.apache.samza.container.TaskInstanceMetrics;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.control.EndOfStreamManager;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemConsumer;
@@ -78,15 +79,15 @@ public class TestAsyncRunLoop {
   private final IncomingMessageEnvelope envelope0 = new 
IncomingMessageEnvelope(ssp0, "0", "key0", "value0");
   private final IncomingMessageEnvelope envelope1 = new 
IncomingMessageEnvelope(ssp1, "1", "key1", "value1");
   private final IncomingMessageEnvelope envelope3 = new 
IncomingMessageEnvelope(ssp0, "1", "key0", "value0");
-  private final IncomingMessageEnvelope ssp0EndOfStream = 
IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp0);
-  private final IncomingMessageEnvelope ssp1EndOfStream = 
IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp1);
+  private final IncomingMessageEnvelope ssp0EndOfStream = 
EndOfStreamManager.buildEndOfStreamEnvelope(ssp0);
+  private final IncomingMessageEnvelope ssp1EndOfStream = 
EndOfStreamManager.buildEndOfStreamEnvelope(ssp1);
 
   TaskInstance createTaskInstance(AsyncStreamTask task, TaskName taskName, 
SystemStreamPartition ssp, OffsetManager manager, SystemConsumers consumers) {
     TaskInstanceMetrics taskInstanceMetrics = new TaskInstanceMetrics("task", 
new MetricsRegistryMap());
     scala.collection.immutable.Set<SystemStreamPartition> sspSet = 
JavaConverters.asScalaSetConverter(Collections.singleton(ssp)).asScala().toSet();
     return new TaskInstance(task, taskName, mock(Config.class), 
taskInstanceMetrics,
         null, consumers, mock(TaskInstanceCollector.class), 
mock(SamzaContainerContext.class),
-        manager, null, null, sspSet, new 
TaskInstanceExceptionHandler(taskInstanceMetrics, new 
scala.collection.immutable.HashSet<String>()));
+        manager, null, null, sspSet, new 
TaskInstanceExceptionHandler(taskInstanceMetrics, new 
scala.collection.immutable.HashSet<String>()), null, null);
   }
 
   TaskInstance createTaskInstance(AsyncStreamTask task, TaskName taskName, 
SystemStreamPartition ssp) {
@@ -569,7 +570,7 @@ public class TestAsyncRunLoop {
     SystemStreamPartition ssp2 = new SystemStreamPartition("system1", 
"stream2", p2);
     IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp2, "1", 
"key1", "message1");
     IncomingMessageEnvelope envelope2 = new IncomingMessageEnvelope(ssp2, "2", 
"key1", "message1");
-    IncomingMessageEnvelope envelope3 = 
IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp2);
+    IncomingMessageEnvelope envelope3 = 
EndOfStreamManager.buildEndOfStreamEnvelope(ssp2);
 
     Map<SystemStreamPartition, List<IncomingMessageEnvelope>> sspMap = new 
HashMap<>();
     List<IncomingMessageEnvelope> messageList = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java 
b/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
new file mode 100644
index 0000000..45b08d7
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import org.apache.samza.operators.impl.OperatorImplGraph;
+
+
+public class TestStreamOperatorTask {
+
+  public static OperatorImplGraph getOperatorImplGraph(StreamOperatorTask 
task) {
+    return task.getOperatorImplGraph();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 40974a6..9025077 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -19,12 +19,21 @@
 
 package org.apache.samza.container
 
+
+import java.util
+import java.util
+import java.util.Collections
 import java.util.concurrent.ConcurrentHashMap
+import com.google.common.collect.Multimap
 import org.apache.samza.SamzaException
 import org.apache.samza.Partition
 import org.apache.samza.checkpoint.OffsetManager
 import org.apache.samza.config.Config
 import org.apache.samza.config.MapConfig
+import org.apache.samza.control.ControlMessageUtils
+import org.apache.samza.job.model.ContainerModel
+import org.apache.samza.job.model.JobModel
+import org.apache.samza.job.model.TaskModel
 import org.apache.samza.metrics.Counter
 import org.apache.samza.metrics.Metric
 import org.apache.samza.metrics.MetricsRegistryMap
@@ -354,6 +363,36 @@ class TestTaskInstance {
     val expected = List(envelope1, envelope2, envelope4)
     assertEquals(expected, result.toList)
   }
+
+  @Test
+  def testBuildInputToTasks = {
+    val system: String = "test-system"
+    val stream0: String = "test-stream-0"
+    val stream1: String = "test-stream-1"
+
+    val ssp0: SystemStreamPartition = new SystemStreamPartition(system, 
stream0, new Partition(0))
+    val ssp1: SystemStreamPartition = new SystemStreamPartition(system, 
stream0, new Partition(1))
+    val ssp2: SystemStreamPartition = new SystemStreamPartition(system, 
stream1, new Partition(0))
+
+    val task0: TaskName = new TaskName("Task 0")
+    val task1: TaskName = new TaskName("Task 1")
+    val ssps: util.Set[SystemStreamPartition] = new 
util.HashSet[SystemStreamPartition]
+    ssps.add(ssp0)
+    ssps.add(ssp2)
+    val tm0: TaskModel = new TaskModel(task0, ssps, new Partition(0))
+    val cm0: ContainerModel = new ContainerModel("c0", 0, 
Collections.singletonMap(task0, tm0))
+    val tm1: TaskModel = new TaskModel(task1, Collections.singleton(ssp1), new 
Partition(1))
+    val cm1: ContainerModel = new ContainerModel("c1", 1, 
Collections.singletonMap(task1, tm1))
+
+    val cms: util.Map[String, ContainerModel] = new util.HashMap[String, 
ContainerModel]
+    cms.put(cm0.getProcessorId, cm0)
+    cms.put(cm1.getProcessorId, cm1)
+
+    val jobModel: JobModel = new JobModel(new MapConfig, cms, null)
+    val streamToTasks: Multimap[SystemStream, String] = 
TaskInstance.buildInputToTasks(jobModel)
+    assertEquals(streamToTasks.get(ssp0.getSystemStream).size, 2)
+    assertEquals(streamToTasks.get(ssp2.getSystemStream).size, 1)
+  }
 }
 
 class MockSystemAdmin extends SystemAdmin {

http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java 
b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
index fb9bb56..de0d1da 100644
--- 
a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
+++ 
b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
@@ -36,6 +36,7 @@ import org.apache.commons.lang.Validate;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
+import org.apache.samza.control.EndOfStreamManager;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -236,7 +237,7 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap 
{
       consumerMetrics.incNumEvents(systemStreamPartition);
       consumerMetrics.incTotalNumEvents();
     }
-    offerMessage(systemStreamPartition, 
IncomingMessageEnvelope.buildEndOfStreamEnvelope(systemStreamPartition));
+    offerMessage(systemStreamPartition, 
EndOfStreamManager.buildEndOfStreamEnvelope(systemStreamPartition));
     reader.close();
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessorUtil.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessorUtil.java
 
b/samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessorUtil.java
new file mode 100644
index 0000000..08e866e
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessorUtil.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processor;
+
+import org.apache.samza.container.SamzaContainer;
+
+public class TestStreamProcessorUtil {
+  public static SamzaContainer getContainer(StreamProcessor processor) {
+    return processor.getContainer();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
new file mode 100644
index 0000000..26abb13
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.controlmessages;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
+import org.apache.samza.test.controlmessages.TestData.PageView;
+import org.apache.samza.test.controlmessages.TestData.PageViewJsonSerdeFactory;
+import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
+import org.apache.samza.test.util.ArraySystemFactory;
+import org.apache.samza.test.util.Base64Serializer;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * This test uses an array as a bounded input source, and does a partitionBy() 
and sink() after reading the input.
+ * It verifies the pipeline will stop and the number of output messages should 
equal to the input.
+ */
+public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness 
{
+
+
+  private static final String[] PAGEKEYS = {"inbox", "home", "search", "pymk", 
"group", "job"};
+
+  @Test
+  public void testPipeline() throws  Exception {
+    Random random = new Random();
+    int count = 100;
+    PageView[] pageviews = new PageView[count];
+    for (int i = 0; i < count; i++) {
+      String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)];
+      int memberId = random.nextInt(10);
+      pageviews[i] = new PageView(pagekey, memberId);
+    }
+
+    int partitionCount = 4;
+    Map<String, String> configs = new HashMap<>();
+    configs.put("systems.test.samza.factory", 
ArraySystemFactory.class.getName());
+    configs.put("streams.PageView.samza.system", "test");
+    configs.put("streams.PageView.source", 
Base64Serializer.serialize(pageviews));
+    configs.put("streams.PageView.partitionCount", 
String.valueOf(partitionCount));
+
+    configs.put(JobConfig.JOB_NAME(), "test-eos-job");
+    configs.put(JobConfig.PROCESSOR_ID(), "1");
+    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
PassthroughJobCoordinatorFactory.class.getName());
+    configs.put(TaskConfig.GROUPER_FACTORY(), 
SingleContainerGrouperFactory.class.getName());
+
+    configs.put("systems.kafka.samza.factory", 
"org.apache.samza.system.kafka.KafkaSystemFactory");
+    configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
+    configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect());
+    configs.put("systems.kafka.samza.key.serde", "int");
+    configs.put("systems.kafka.samza.msg.serde", "json");
+    configs.put("systems.kafka.default.stream.replication.factor", "1");
+    configs.put("job.default.system", "kafka");
+
+    configs.put("serializers.registry.int.class", 
"org.apache.samza.serializers.IntegerSerdeFactory");
+    configs.put("serializers.registry.json.class", 
PageViewJsonSerdeFactory.class.getName());
+
+    final LocalApplicationRunner runner = new LocalApplicationRunner(new 
MapConfig(configs));
+    List<PageView> received = new ArrayList<>();
+    final StreamApplication app = (streamGraph, cfg) -> {
+      streamGraph.getInputStream("PageView", (k, v) -> (PageView) v)
+        .partitionBy(PageView::getMemberId)
+        .sink((m, collector, coordinator) -> {
+            received.add(m);
+          });
+    };
+    runner.run(app);
+    runner.waitForFinish();
+
+    assertEquals(received.size(), count * partitionCount);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-test/src/test/java/org/apache/samza/test/controlmessages/TestData.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/TestData.java 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/TestData.java
new file mode 100644
index 0000000..8541b55
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/TestData.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.controlmessages;
+
+import java.io.Serializable;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+public class TestData {
+
+  public static class PageView implements Serializable {
+    @JsonProperty("pageKey")
+    final String pageKey;
+    @JsonProperty("memberId")
+    final int memberId;
+
+    @JsonProperty("pageKey")
+    public String getPageKey() {
+      return pageKey;
+    }
+
+    @JsonProperty("memberId")
+    public int getMemberId() {
+      return memberId;
+    }
+
+    @JsonCreator
+    public PageView(@JsonProperty("pageKey") String pageKey, 
@JsonProperty("memberId") int memberId) {
+      this.pageKey = pageKey;
+      this.memberId = memberId;
+    }
+  }
+
+  public static class PageViewJsonSerdeFactory implements 
SerdeFactory<PageView> {
+    @Override
+    public Serde<PageView> getSerde(String name, Config config) {
+      return new PageViewJsonSerde();
+    }
+  }
+
+  public static class PageViewJsonSerde implements Serde<PageView> {
+    ObjectMapper mapper = new ObjectMapper();
+
+    @Override
+    public PageView fromBytes(byte[] bytes) {
+      try {
+        return mapper.readValue(new String(bytes, "UTF-8"), new 
TypeReference<PageView>() { });
+      } catch (Exception e) {
+        throw new SamzaException(e);
+      }
+    }
+
+    @Override
+    public byte[] toBytes(PageView pv) {
+      try {
+        return mapper.writeValueAsString(pv).getBytes("UTF-8");
+      } catch (Exception e) {
+        throw new SamzaException(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
new file mode 100644
index 0000000..58da8bd
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.controlmessages;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.container.TaskInstance;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
+import org.apache.samza.control.EndOfStreamManager;
+import org.apache.samza.control.WatermarkManager;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.operators.impl.InputOperatorImpl;
+import org.apache.samza.operators.impl.OperatorImpl;
+import org.apache.samza.operators.impl.OperatorImplGraph;
+import org.apache.samza.operators.impl.TestOperatorImpl;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.processor.StreamProcessor;
+import org.apache.samza.processor.TestStreamProcessorUtil;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.runtime.TestLocalApplicationRunner;
+import org.apache.samza.serializers.IntegerSerdeFactory;
+import org.apache.samza.serializers.StringSerdeFactory;
+import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.AsyncStreamTaskAdapter;
+import org.apache.samza.task.StreamOperatorTask;
+import org.apache.samza.task.TestStreamOperatorTask;
+import org.apache.samza.test.controlmessages.TestData.PageView;
+import org.apache.samza.test.controlmessages.TestData.PageViewJsonSerdeFactory;
+import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
+import org.apache.samza.test.util.SimpleSystemAdmin;
+import org.apache.samza.test.util.TestStreamConsumer;
+import org.junit.Test;
+import scala.collection.JavaConverters;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness {
+
+  private static int offset = 1;
+  private static final String TEST_SYSTEM = "test";
+  private static final String TEST_STREAM = "PageView";
+  private static final int PARTITION_COUNT = 2;
+  private static final SystemStreamPartition SSP0 = new 
SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
+  private static final SystemStreamPartition SSP1 = new 
SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1));
+
+  private final static List<IncomingMessageEnvelope> TEST_DATA = new 
ArrayList<>();
+  static {
+    TEST_DATA.add(createIncomingMessage(new PageView("inbox", 1), SSP0));
+    TEST_DATA.add(createIncomingMessage(new PageView("home", 2), SSP1));
+    TEST_DATA.add(WatermarkManager.buildWatermarkEnvelope(1, SSP0));
+    TEST_DATA.add(WatermarkManager.buildWatermarkEnvelope(2, SSP1));
+    TEST_DATA.add(WatermarkManager.buildWatermarkEnvelope(4, SSP0));
+    TEST_DATA.add(WatermarkManager.buildWatermarkEnvelope(3, SSP1));
+    TEST_DATA.add(createIncomingMessage(new PageView("search", 3), SSP0));
+    TEST_DATA.add(createIncomingMessage(new PageView("pymk", 4), SSP1));
+    TEST_DATA.add(EndOfStreamManager.buildEndOfStreamEnvelope(SSP0));
+    TEST_DATA.add(EndOfStreamManager.buildEndOfStreamEnvelope(SSP1));
+  }
+
+  public final static class TestSystemFactory implements SystemFactory {
+    @Override
+    public SystemConsumer getConsumer(String systemName, Config config, 
MetricsRegistry registry) {
+      return new TestStreamConsumer(TEST_DATA);
+    }
+
+    @Override
+    public SystemProducer getProducer(String systemName, Config config, 
MetricsRegistry registry) {
+      return null;
+    }
+
+    @Override
+    public SystemAdmin getAdmin(String systemName, Config config) {
+      return new SimpleSystemAdmin(config);
+    }
+  }
+
+  private static IncomingMessageEnvelope createIncomingMessage(Object message, 
SystemStreamPartition ssp) {
+    return new IncomingMessageEnvelope(ssp, String.valueOf(offset++), "", 
message);
+  }
+
+  @Test
+  public void testWatermark() throws Exception {
+    Map<String, String> configs = new HashMap<>();
+    configs.put("systems.test.samza.factory", 
TestSystemFactory.class.getName());
+    configs.put("streams.PageView.samza.system", "test");
+    configs.put("streams.PageView.partitionCount", 
String.valueOf(PARTITION_COUNT));
+
+    configs.put(JobConfig.JOB_NAME(), "test-watermark-job");
+    configs.put(JobConfig.PROCESSOR_ID(), "1");
+    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
PassthroughJobCoordinatorFactory.class.getName());
+    configs.put(TaskConfig.GROUPER_FACTORY(), 
SingleContainerGrouperFactory.class.getName());
+
+    configs.put("systems.kafka.samza.factory", 
"org.apache.samza.system.kafka.KafkaSystemFactory");
+    configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
+    configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect());
+    configs.put("systems.kafka.samza.key.serde", "int");
+    configs.put("systems.kafka.samza.msg.serde", "json");
+    configs.put("systems.kafka.default.stream.replication.factor", "1");
+    configs.put("job.default.system", "kafka");
+
+    configs.put("serializers.registry.int.class", 
IntegerSerdeFactory.class.getName());
+    configs.put("serializers.registry.string.class", 
StringSerdeFactory.class.getName());
+    configs.put("serializers.registry.json.class", 
PageViewJsonSerdeFactory.class.getName());
+
+    final LocalApplicationRunner runner = new LocalApplicationRunner(new 
MapConfig(configs));
+    List<PageView> received = new ArrayList<>();
+    final StreamApplication app = (streamGraph, cfg) -> {
+      streamGraph.getInputStream("PageView", (k, v) -> (PageView) v)
+          .partitionBy(PageView::getMemberId)
+          .sink((m, collector, coordinator) -> {
+              received.add(m);
+            });
+    };
+    runner.run(app);
+    Map<String, StreamOperatorTask> tasks = getTaskOperationGraphs(runner);
+
+    runner.waitForFinish();
+
+    StreamOperatorTask task0 = tasks.get("Partition 0");
+    OperatorImplGraph graph = 
TestStreamOperatorTask.getOperatorImplGraph(task0);
+    OperatorImpl pb = getOperator(graph, OperatorSpec.OpCode.PARTITION_BY);
+    assertEquals(TestOperatorImpl.getInputWatermark(pb), 4);
+    assertEquals(TestOperatorImpl.getOutputWatermark(pb), 4);
+    OperatorImpl sink = getOperator(graph, OperatorSpec.OpCode.SINK);
+    assertEquals(TestOperatorImpl.getInputWatermark(sink), 3);
+    assertEquals(TestOperatorImpl.getOutputWatermark(sink), 3);
+
+    StreamOperatorTask task1 = tasks.get("Partition 1");
+    graph = TestStreamOperatorTask.getOperatorImplGraph(task1);
+    pb = getOperator(graph, OperatorSpec.OpCode.PARTITION_BY);
+    assertEquals(TestOperatorImpl.getInputWatermark(pb), 3);
+    assertEquals(TestOperatorImpl.getOutputWatermark(pb), 3);
+    sink = getOperator(graph, OperatorSpec.OpCode.SINK);
+    assertEquals(TestOperatorImpl.getInputWatermark(sink), 3);
+    assertEquals(TestOperatorImpl.getOutputWatermark(sink), 3);
+  }
+
+  Map<String, StreamOperatorTask> 
getTaskOperationGraphs(LocalApplicationRunner runner) throws Exception {
+    StreamProcessor processor = 
TestLocalApplicationRunner.getProcessors(runner).iterator().next();
+    SamzaContainer container = TestStreamProcessorUtil.getContainer(processor);
+    Map<TaskName, TaskInstance> taskInstances = 
JavaConverters.mapAsJavaMapConverter(container.getTaskInstances()).asJava();
+    Map<String, StreamOperatorTask> tasks = new HashMap<>();
+    for (Map.Entry<TaskName, TaskInstance> entry : taskInstances.entrySet()) {
+      AsyncStreamTaskAdapter adapter = (AsyncStreamTaskAdapter) 
entry.getValue().task();
+      Field field = 
AsyncStreamTaskAdapter.class.getDeclaredField("wrappedTask");
+      field.setAccessible(true);
+      StreamOperatorTask task = (StreamOperatorTask) field.get(adapter);
+      tasks.put(entry.getKey().getTaskName(), task);
+    }
+    return tasks;
+  }
+
+  OperatorImpl getOperator(OperatorImplGraph graph, OperatorSpec.OpCode 
opCode) {
+    for (InputOperatorImpl input : graph.getAllInputOperators()) {
+      Set<OperatorImpl> nextOps = TestOperatorImpl.getNextOperators(input);
+      while (!nextOps.isEmpty()) {
+        OperatorImpl op = nextOps.iterator().next();
+        if (TestOperatorImpl.getOpCode(op) == opCode) {
+          return op;
+        } else {
+          nextOps = TestOperatorImpl.getNextOperators(op);
+        }
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java 
b/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java
new file mode 100644
index 0000000..9b96216
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.util;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.samza.config.Config;
+import org.apache.samza.control.EndOfStreamManager;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStreamPartition;
+
+/**
+ * A simple implementation of array system consumer
+ */
+public class ArraySystemConsumer implements SystemConsumer {
+  boolean done = false;
+  private final Config config;
+
+  public ArraySystemConsumer(Config config) {
+    this.config = config;
+  }
+
+  @Override
+  public void start() {
+  }
+
+  @Override
+  public void stop() {
+  }
+
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, String s) {
+  }
+
+  @Override
+  public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> 
poll(Set<SystemStreamPartition> set, long l) throws InterruptedException {
+    if (!done) {
+      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopeMap = 
new HashMap<>();
+      set.forEach(ssp -> {
+          List<IncomingMessageEnvelope> envelopes = 
Arrays.stream(getArrayObjects(ssp.getSystemStream().getStream(), config))
+              .map(object -> new IncomingMessageEnvelope(ssp, null, null, 
object)).collect(Collectors.toList());
+          envelopes.add(EndOfStreamManager.buildEndOfStreamEnvelope(ssp));
+          envelopeMap.put(ssp, envelopes);
+        });
+      done = true;
+      return envelopeMap;
+    } else {
+      return Collections.emptyMap();
+    }
+
+  }
+
+  private static Object[] getArrayObjects(String stream, Config config) {
+    try {
+      return Base64Serializer.deserialize(config.get("streams." + stream + 
".source"), Object[].class);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemFactory.java 
b/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemFactory.java
new file mode 100644
index 0000000..0632865
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.util;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+
+
+/**
+ * System factory for the stream from an array
+ */
+public class ArraySystemFactory implements SystemFactory {
+
+  @Override
+  public SystemConsumer getConsumer(String systemName, Config config, 
MetricsRegistry metricsRegistry) {
+    return new ArraySystemConsumer(config);
+  }
+
+  @Override
+  public SystemProducer getProducer(String systemName, Config config, 
MetricsRegistry metricsRegistry) {
+    // no producer
+    return null;
+  }
+
+  @Override
+  public SystemAdmin getAdmin(String systemName, Config config) {
+    return new SimpleSystemAdmin(config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-test/src/test/java/org/apache/samza/test/util/Base64Serializer.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/util/Base64Serializer.java 
b/samza-test/src/test/java/org/apache/samza/test/util/Base64Serializer.java
new file mode 100644
index 0000000..1a17a3d
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/util/Base64Serializer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Base64;
+
+
+public class Base64Serializer {
+  private Base64Serializer() {}
+
+  public static String serializeUnchecked(Serializable serializable) {
+    try {
+      return serialize(serializable);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static String serialize(Serializable serializable) throws IOException 
{
+    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    final ObjectOutputStream oos = new ObjectOutputStream(baos);
+    oos.writeObject(serializable);
+    oos.close();
+    return Base64.getEncoder().encodeToString(baos.toByteArray());
+  }
+
+  public static <T> T deserializeUnchecked(String serialized, Class<T> klass) {
+    try {
+      return deserialize(serialized, klass);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static <T> T deserialize(String serialized, Class<T> klass) throws 
IOException, ClassNotFoundException {
+    final byte[] bytes = Base64.getDecoder().decode(serialized);
+    final ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(bytes));
+    @SuppressWarnings("unchecked")
+    T object = (T) ois.readObject();
+    ois.close();
+    return object;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/bb3007d6/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java 
b/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java
new file mode 100644
index 0000000..41f01c5
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * A dummy system admin
+ */
+public class SimpleSystemAdmin implements SystemAdmin {
+  private final Config config;
+
+  public SimpleSystemAdmin(Config config) {
+    this.config = config;
+  }
+
+  @Override
+  public Map<SystemStreamPartition, String> 
getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+    return offsets.entrySet().stream()
+        .collect(Collectors.toMap(Map.Entry::getKey, null));
+  }
+
+  @Override
+  public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> 
streamNames) {
+    return streamNames.stream()
+        .collect(Collectors.toMap(
+            Function.<String>identity(),
+            streamName -> {
+            Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> 
metadataMap = new HashMap<>();
+            int partitionCount = config.getInt("streams." + streamName + 
".partitionCount", 1);
+            for (int i = 0; i < partitionCount; i++) {
+              metadataMap.put(new Partition(i), new 
SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null));
+            }
+            return new SystemStreamMetadata(streamName, metadataMap);
+          }));
+  }
+
+  @Override
+  public void createChangelogStream(String streamName, int numOfPartitions) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void validateChangelogStream(String streamName, int numOfPartitions) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void createCoordinatorStream(String streamName) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Integer offsetComparator(String offset1, String offset2) {
+    if (offset1 == null) {
+      return offset2 == null ? 0 : -1;
+    } else if (offset2 == null) {
+      return 1;
+    }
+    return offset1.compareTo(offset2);
+  }
+}
+

Reply via email to