http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/control/TestWatermarkManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/control/TestWatermarkManager.java 
b/samza-core/src/test/java/org/apache/samza/control/TestWatermarkManager.java
deleted file mode 100644
index 8fe7a16..0000000
--- 
a/samza-core/src/test/java/org/apache/samza/control/TestWatermarkManager.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.control;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.samza.Partition;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.message.WatermarkMessage;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.MessageCollector;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-
-public class TestWatermarkManager {
-
-  StreamMetadataCache metadataCache;
-
-  @Before
-  public void setup() {
-    SystemStreamMetadata metadata = mock(SystemStreamMetadata.class);
-    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> 
partitionMetadata = new HashMap<>();
-    partitionMetadata.put(new Partition(0), 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    partitionMetadata.put(new Partition(1), 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    partitionMetadata.put(new Partition(2), 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    partitionMetadata.put(new Partition(3), 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    
when(metadata.getSystemStreamPartitionMetadata()).thenReturn(partitionMetadata);
-    metadataCache = mock(StreamMetadataCache.class);
-    when(metadataCache.getSystemStreamMetadata(anyObject(), 
anyBoolean())).thenReturn(metadata);
-  }
-
-  @Test
-  public void testUpdateFromInputSource() {
-    SystemStreamPartition ssp = new SystemStreamPartition("test-system", 
"test-stream", new Partition(0));
-    TaskName taskName = new TaskName("Task 0");
-    Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
-    streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName());
-    ControlMessageListenerTask listener = 
mock(ControlMessageListenerTask.class);
-    when(listener.getIOGraph()).thenReturn(new 
IOGraph(Collections.emptyList()));
-    WatermarkManager manager = new WatermarkManager("Task 0", listener, 
streamToTasks, Collections.singleton(ssp), null, null);
-    long time = System.currentTimeMillis();
-    Watermark watermark = 
manager.update(WatermarkManager.buildWatermarkEnvelope(time, ssp));
-    assertEquals(watermark.getTimestamp(), time);
-  }
-
-  @Test
-  public void testUpdateFromIntermediateStream() {
-    SystemStreamPartition[] ssps = new SystemStreamPartition[3];
-    ssps[0] = new SystemStreamPartition("test-system", "test-stream-1", new 
Partition(0));
-    ssps[1] = new SystemStreamPartition("test-system", "test-stream-2", new 
Partition(0));
-    ssps[2] = new SystemStreamPartition("test-system", "test-stream-2", new 
Partition(1));
-
-    TaskName taskName = new TaskName("Task 0");
-    Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
-    for (SystemStreamPartition ssp : ssps) {
-      streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName());
-    }
-    ControlMessageListenerTask listener = 
mock(ControlMessageListenerTask.class);
-    when(listener.getIOGraph()).thenReturn(new 
IOGraph(Collections.emptyList()));
-
-    WatermarkManager manager = new WatermarkManager("Task 0", listener, 
streamToTasks, new HashSet<>(Arrays.asList(ssps)), null, null);
-    int envelopeCount = 4;
-    IncomingMessageEnvelope[] envelopes = new 
IncomingMessageEnvelope[envelopeCount];
-
-    long[] time = {300L, 200L, 100L, 400L};
-    for (int i = 0; i < envelopeCount; i++) {
-      envelopes[i] = new IncomingMessageEnvelope(ssps[0], "dummy-offset", "", 
new WatermarkMessage(time[i], "task " + i, envelopeCount));
-    }
-    for (int i = 0; i < 3; i++) {
-      assertNull(manager.update(envelopes[i]));
-    }
-    // verify the first three messages won't result in end-of-stream
-    assertEquals(manager.getWatermarkTime(ssps[0]), 
WatermarkManager.TIME_NOT_EXIST);
-    // the fourth message will generate a watermark
-    Watermark watermark = manager.update(envelopes[3]);
-    assertNotNull(watermark);
-    assertEquals(watermark.getTimestamp(), 100);
-    assertEquals(manager.getWatermarkTime(ssps[1]), 
WatermarkManager.TIME_NOT_EXIST);
-    assertEquals(manager.getWatermarkTime(ssps[2]), 
WatermarkManager.TIME_NOT_EXIST);
-
-
-    // stream2 has two partitions assigned to this task, so it requires a 
message from each partition to calculate watermarks
-    long[] time1 = {300L, 200L, 100L, 400L};
-    envelopes = new IncomingMessageEnvelope[envelopeCount];
-    for (int i = 0; i < envelopeCount; i++) {
-      envelopes[i] = new IncomingMessageEnvelope(ssps[1], "dummy-offset", "", 
new WatermarkMessage(time1[i], "task " + i, envelopeCount));
-    }
-    // verify the messages for the partition 0 won't generate watermark
-    for (int i = 0; i < 4; i++) {
-      assertNull(manager.update(envelopes[i]));
-    }
-    assertEquals(manager.getWatermarkTime(ssps[1]), 100L);
-
-    long[] time2 = {350L, 150L, 500L, 80L};
-    for (int i = 0; i < envelopeCount; i++) {
-      envelopes[i] = new IncomingMessageEnvelope(ssps[2], "dummy-offset", "", 
new WatermarkMessage(time2[i], "task " + i, envelopeCount));
-    }
-    for (int i = 0; i < 3; i++) {
-      assertNull(manager.update(envelopes[i]));
-    }
-    assertEquals(manager.getWatermarkTime(ssps[2]), 
WatermarkManager.TIME_NOT_EXIST);
-    // the fourth message will generate the watermark
-    watermark = manager.update(envelopes[3]);
-    assertNotNull(watermark);
-    assertEquals(manager.getWatermarkTime(ssps[2]), 80L);
-    assertEquals(watermark.getTimestamp(), 80L);
-  }
-
-  @Test
-  public void testSendWatermark() {
-    SystemStream ints = new SystemStream("test-system", "int-stream");
-    SystemStreamMetadata metadata = mock(SystemStreamMetadata.class);
-    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> 
partitionMetadata = new HashMap<>();
-    partitionMetadata.put(new Partition(0), 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    partitionMetadata.put(new Partition(1), 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    partitionMetadata.put(new Partition(2), 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    partitionMetadata.put(new Partition(3), 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    
when(metadata.getSystemStreamPartitionMetadata()).thenReturn(partitionMetadata);
-    StreamMetadataCache metadataCache = mock(StreamMetadataCache.class);
-    when(metadataCache.getSystemStreamMetadata(anyObject(), 
anyBoolean())).thenReturn(metadata);
-
-    MessageCollector collector = mock(MessageCollector.class);
-    ControlMessageListenerTask listener = 
mock(ControlMessageListenerTask.class);
-    when(listener.getIOGraph()).thenReturn(new 
IOGraph(Collections.emptyList()));
-
-    WatermarkManager manager = new WatermarkManager("task 0",
-        listener,
-        HashMultimap.create(),
-        Collections.EMPTY_SET,
-        metadataCache,
-        collector);
-
-    long time = System.currentTimeMillis();
-    Set<Integer> partitions = new HashSet<>();
-    doAnswer(invocation -> {
-        OutgoingMessageEnvelope envelope = (OutgoingMessageEnvelope) 
invocation.getArguments()[0];
-        partitions.add((Integer) envelope.getPartitionKey());
-        WatermarkMessage watermarkMessage = (WatermarkMessage) 
envelope.getMessage();
-        assertEquals(watermarkMessage.getTaskName(), "task 0");
-        assertEquals(watermarkMessage.getTaskCount(), 8);
-        assertEquals(watermarkMessage.getTimestamp(), time);
-        return null;
-      }).when(collector).send(any());
-
-    manager.sendWatermark(time, ints, 8);
-    assertEquals(partitions.size(), 4);
-  }
-
-  @Test
-  public void testPropagate() {
-    StreamSpec outputSpec = new StreamSpec("int-stream", "int-stream", 
"test-system");
-    List<StreamSpec> inputs = new ArrayList<>();
-    inputs.add(new StreamSpec("input-stream-1", "input-stream-1", 
"test-system"));
-    inputs.add(new StreamSpec("input-stream-2", "input-stream-2", 
"test-system"));
-
-    IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(inputs, outputSpec, true);
-
-    SystemStream input1 = new SystemStream("test-system", "input-stream-1");
-    SystemStream input2 = new SystemStream("test-system", "input-stream-2");
-    SystemStream input3 = new SystemStream("test-system", "input-stream-3");
-    SystemStream ints = new SystemStream("test-system", "int-stream");
-    SystemStreamPartition[] ssps0 = new SystemStreamPartition[3];
-    ssps0[0] = new SystemStreamPartition(input1, new Partition(0));
-    ssps0[1] = new SystemStreamPartition(input2, new Partition(0));
-    ssps0[2] = new SystemStreamPartition(ints, new Partition(0));
-
-    SystemStreamPartition[] ssps1 = new SystemStreamPartition[4];
-    ssps1[0] = new SystemStreamPartition(input1, new Partition(1));
-    ssps1[1] = new SystemStreamPartition(input2, new Partition(1));
-    ssps1[2] = new SystemStreamPartition(input3, new Partition(1));
-    ssps1[3] = new SystemStreamPartition(ints, new Partition(1));
-
-    SystemStreamPartition[] ssps2 = new SystemStreamPartition[2];
-    ssps2[0] = new SystemStreamPartition(input3, new Partition(2));
-    ssps2[1] = new SystemStreamPartition(ints, new Partition(2));
-
-
-    TaskName t0 = new TaskName("task 0"); //consume input1 and input2
-    TaskName t1 = new TaskName("task 1"); //consume input 1 and input2 and 
input 3
-    TaskName t2 = new TaskName("task 2"); //consume input2 and input 3
-    Multimap<SystemStream, String> inputToTasks = HashMultimap.create();
-    for (SystemStreamPartition ssp : ssps0) {
-      inputToTasks.put(ssp.getSystemStream(), t0.getTaskName());
-    }
-    for (SystemStreamPartition ssp : ssps1) {
-      inputToTasks.put(ssp.getSystemStream(), t1.getTaskName());
-    }
-    for (SystemStreamPartition ssp : ssps2) {
-      inputToTasks.put(ssp.getSystemStream(), t2.getTaskName());
-    }
-
-    ControlMessageListenerTask listener = 
mock(ControlMessageListenerTask.class);
-    when(listener.getIOGraph()).thenReturn(ioGraph);
-    WatermarkManager manager = spy(
-        new WatermarkManager(t0.getTaskName(), listener, inputToTasks, new 
HashSet<>(Arrays.asList(ssps0)), null, null));
-
-    IncomingMessageEnvelope envelope = 
WatermarkManager.buildWatermarkEnvelope(System.currentTimeMillis(), ssps0[0]);
-    doNothing().when(manager).sendWatermark(anyLong(), any(), anyInt());
-    Watermark watermark = manager.update(envelope);
-    assertNotNull(watermark);
-    long time = System.currentTimeMillis();
-    Watermark updatedWatermark = watermark.copyWithTimestamp(time);
-    updatedWatermark.propagate(ints);
-    ArgumentCaptor<Long> arg1 = ArgumentCaptor.forClass(Long.class);
-    ArgumentCaptor<SystemStream> arg2 = 
ArgumentCaptor.forClass(SystemStream.class);
-    ArgumentCaptor<Integer> arg3 = ArgumentCaptor.forClass(Integer.class);
-    verify(manager).sendWatermark(arg1.capture(), arg2.capture(), 
arg3.capture());
-    assertEquals(arg1.getValue().longValue(), time);
-    assertEquals(arg2.getValue(), ints);
-    assertEquals(arg3.getValue().intValue(), 2);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java 
b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index c51b1ea..af0b786 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -32,6 +32,7 @@ import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamOperatorTask;
+import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.testUtils.TestClock;
@@ -253,7 +254,7 @@ public class TestJoinOperator {
     when(runner.getStreamSpec("instream")).thenReturn(new 
StreamSpec("instream", "instream", "insystem"));
     when(runner.getStreamSpec("instream2")).thenReturn(new 
StreamSpec("instream2", "instream2", "insystem2"));
 
-    TaskContext taskContext = mock(TaskContext.class);
+    TaskContextImpl taskContext = mock(TaskContextImpl.class);
     when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
         .of(new SystemStreamPartition("insystem", "instream", new 
Partition(0)),
             new SystemStreamPartition("insystem2", "instream2", new 
Partition(0))));

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java 
b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
index ca8a151..1edc5f6 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
@@ -26,6 +26,7 @@ import junit.framework.Assert;
 import org.apache.samza.Partition;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.triggers.FiringType;
 import org.apache.samza.operators.triggers.Trigger;
@@ -41,7 +42,6 @@ import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamOperatorTask;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.testUtils.TestClock;
 import org.junit.Before;
@@ -60,13 +60,13 @@ public class TestWindowOperator {
   private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
   private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 
2, 3);
   private Config config;
-  private TaskContext taskContext;
+  private TaskContextImpl taskContext;
   private ApplicationRunner runner;
 
   @Before
   public void setup() throws Exception {
     config = mock(Config.class);
-    taskContext = mock(TaskContext.class);
+    taskContext = mock(TaskContextImpl.class);
     runner = mock(ApplicationRunner.class);
     when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
         .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/operators/impl/TestControlMessageSender.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestControlMessageSender.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestControlMessageSender.java
new file mode 100644
index 0000000..d17d751
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestControlMessageSender.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.WatermarkMessage;
+import org.apache.samza.task.MessageCollector;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestControlMessageSender {
+
+  @Test
+  public void testSend() {
+    SystemStreamMetadata metadata = mock(SystemStreamMetadata.class);
+    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> 
partitionMetadata = new HashMap<>();
+    partitionMetadata.put(new Partition(0), 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
+    partitionMetadata.put(new Partition(1), 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
+    partitionMetadata.put(new Partition(2), 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
+    partitionMetadata.put(new Partition(3), 
mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
+    
when(metadata.getSystemStreamPartitionMetadata()).thenReturn(partitionMetadata);
+    StreamMetadataCache metadataCache = mock(StreamMetadataCache.class);
+    when(metadataCache.getSystemStreamMetadata(anyObject(), 
anyBoolean())).thenReturn(metadata);
+
+    SystemStream systemStream = new SystemStream("test-system", "test-stream");
+    Set<Integer> partitions = new HashSet<>();
+    MessageCollector collector = mock(MessageCollector.class);
+    doAnswer(invocation -> {
+        OutgoingMessageEnvelope envelope = (OutgoingMessageEnvelope) 
invocation.getArguments()[0];
+        partitions.add((Integer) envelope.getPartitionKey());
+        assertEquals(envelope.getSystemStream(), systemStream);
+        return null;
+      }).when(collector).send(any());
+
+    ControlMessageSender sender = new ControlMessageSender(metadataCache);
+    WatermarkMessage watermark = new 
WatermarkMessage(System.currentTimeMillis(), "task 0");
+    sender.send(watermark, systemStream, collector);
+    assertEquals(partitions.size(), 4);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/operators/impl/TestEndOfStreamStates.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestEndOfStreamStates.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestEndOfStreamStates.java
new file mode 100644
index 0000000..887991f
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestEndOfStreamStates.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.system.EndOfStreamMessage;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestEndOfStreamStates {
+
+  @Test
+  public void testUpdate() {
+    SystemStream input = new SystemStream("system", "input");
+    SystemStream intermediate = new SystemStream("system", "intermediate");
+
+    Set<SystemStreamPartition> ssps = new HashSet<>();
+    SystemStreamPartition inputPartition0 = new SystemStreamPartition(input, 
new Partition(0));
+    SystemStreamPartition intPartition0 = new 
SystemStreamPartition(intermediate, new Partition(0));
+    SystemStreamPartition intPartition1 = new 
SystemStreamPartition(intermediate, new Partition(1));
+    ssps.add(inputPartition0);
+    ssps.add(intPartition0);
+    ssps.add(intPartition1);
+
+    Map<SystemStream, Integer> producerCounts = new HashMap<>();
+    producerCounts.put(intermediate, 2);
+
+    EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ssps, 
producerCounts);
+    assertFalse(endOfStreamStates.isEndOfStream(input));
+    assertFalse(endOfStreamStates.isEndOfStream(intermediate));
+    assertFalse(endOfStreamStates.allEndOfStream());
+
+    IncomingMessageEnvelope envelope = 
IncomingMessageEnvelope.buildEndOfStreamEnvelope(inputPartition0);
+    endOfStreamStates.update((EndOfStreamMessage) envelope.getMessage(), 
envelope.getSystemStreamPartition());
+    assertTrue(endOfStreamStates.isEndOfStream(input));
+    assertFalse(endOfStreamStates.isEndOfStream(intermediate));
+    assertFalse(endOfStreamStates.allEndOfStream());
+
+    EndOfStreamMessage eos = new EndOfStreamMessage("task 0");
+    endOfStreamStates.update(eos, intPartition0);
+    endOfStreamStates.update(eos, intPartition1);
+    assertFalse(endOfStreamStates.isEndOfStream(intermediate));
+    assertFalse(endOfStreamStates.allEndOfStream());
+
+    eos = new EndOfStreamMessage("task 1");
+    endOfStreamStates.update(eos, intPartition0);
+    endOfStreamStates.update(eos, intPartition1);
+    assertTrue(endOfStreamStates.isEndOfStream(intermediate));
+    assertTrue(endOfStreamStates.allEndOfStream());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index 3ae8f5b..4a78da8 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -18,21 +18,22 @@
  */
 package org.apache.samza.operators.impl;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Set;
 import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
 import org.apache.samza.metrics.Timer;
+import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;
 
-import java.util.Collection;
-import java.util.Collections;
-
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
@@ -47,7 +48,7 @@ public class TestOperatorImpl {
   @Test(expected = IllegalStateException.class)
   public void testMultipleInitShouldThrow() {
     OperatorImpl<Object, Object> opImpl = new TestOpImpl(mock(Object.class));
-    TaskContext mockTaskContext = mock(TaskContext.class);
+    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
     opImpl.init(mock(Config.class), mockTaskContext);
     opImpl.init(mock(Config.class), mockTaskContext);
@@ -61,7 +62,7 @@ public class TestOperatorImpl {
 
   @Test
   public void testOnMessagePropagatesResults() {
-    TaskContext mockTaskContext = mock(TaskContext.class);
+    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
 
     Object mockTestOpImplOutput = mock(Object.class);
@@ -93,8 +94,8 @@ public class TestOperatorImpl {
 
   @Test
   public void testOnMessageUpdatesMetrics() {
-    TaskContext mockTaskContext = mock(TaskContext.class);
-    MetricsRegistry mockMetricsRegistry = mock(MetricsRegistry.class);
+    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
+    ReadableMetricsRegistry mockMetricsRegistry = 
mock(ReadableMetricsRegistry.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(mockMetricsRegistry);
     Counter mockCounter = mock(Counter.class);
     Timer mockTimer = mock(Timer.class);
@@ -117,7 +118,7 @@ public class TestOperatorImpl {
 
   @Test
   public void testOnTimerPropagatesResultsAndTimer() {
-    TaskContext mockTaskContext = mock(TaskContext.class);
+    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
 
     Object mockTestOpImplOutput = mock(Object.class);
@@ -153,8 +154,8 @@ public class TestOperatorImpl {
 
   @Test
   public void testOnTimerUpdatesMetrics() {
-    TaskContext mockTaskContext = mock(TaskContext.class);
-    MetricsRegistry mockMetricsRegistry = mock(MetricsRegistry.class);
+    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
+    ReadableMetricsRegistry mockMetricsRegistry = 
mock(ReadableMetricsRegistry.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(mockMetricsRegistry);
     Counter mockMessageCounter = mock(Counter.class);
     Timer mockTimer = mock(Timer.class);
@@ -210,6 +211,11 @@ public class TestOperatorImpl {
     TestOpSpec() {
      super(OpCode.INPUT, 1);
     }
+
+    @Override
+    public WatermarkFunction getWatermarkFn() {
+      return null;
+    }
   }
 
   public static Set<OperatorImpl> getNextOperators(OperatorImpl op) {
@@ -221,11 +227,11 @@ public class TestOperatorImpl {
   }
 
   public static long getInputWatermark(OperatorImpl op) {
-    return op.getInputWatermarkTime();
+    return op.getInputWatermark();
   }
 
   public static long getOutputWatermark(OperatorImpl op) {
-    return op.getOutputWatermarkTime();
+    return op.getOutputWatermark();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 4505eef..9fab1b7 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -19,8 +19,23 @@
 
 package org.apache.samza.operators.impl;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
@@ -32,6 +47,7 @@ import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
@@ -62,7 +78,7 @@ public class TestOperatorImplGraph {
   public void testEmptyChain() {
     StreamGraphImpl streamGraph = new 
StreamGraphImpl(mock(ApplicationRunner.class), mock(Config.class));
     OperatorImplGraph opGraph =
-        new OperatorImplGraph(streamGraph, mock(Config.class), 
mock(TaskContext.class), mock(Clock.class));
+        new OperatorImplGraph(streamGraph, mock(Config.class), 
mock(TaskContextImpl.class), mock(Clock.class));
     assertEquals(0, opGraph.getAllInputOperators().size());
   }
 
@@ -82,8 +98,9 @@ public class TestOperatorImplGraph {
         .map(mock(MapFunction.class))
         .sendTo(outputStream);
 
-    TaskContext mockTaskContext = mock(TaskContext.class);
+    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
+    when(mockTaskContext.getTaskName()).thenReturn(new TaskName("task 0"));
     OperatorImplGraph opImplGraph =
         new OperatorImplGraph(streamGraph, mock(Config.class), 
mockTaskContext, mock(Clock.class));
 
@@ -113,17 +130,17 @@ public class TestOperatorImplGraph {
     inputStream.filter(mock(FilterFunction.class));
     inputStream.map(mock(MapFunction.class));
 
-    TaskContext mockTaskContext = mock(TaskContext.class);
+    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
     OperatorImplGraph opImplGraph =
         new OperatorImplGraph(streamGraph, mock(Config.class), 
mockTaskContext, mock(Clock.class));
 
     InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new 
SystemStream("input-system", "input-stream"));
     assertEquals(2, inputOpImpl.registeredOperators.size());
-    assertTrue(inputOpImpl.registeredOperators.stream().anyMatch(opImpl ->
-        ((OperatorImpl) opImpl).getOperatorSpec().getOpCode() == 
OpCode.FILTER));
-    assertTrue(inputOpImpl.registeredOperators.stream().anyMatch(opImpl ->
-        ((OperatorImpl) opImpl).getOperatorSpec().getOpCode() == OpCode.MAP));
+    assertTrue(inputOpImpl.registeredOperators.stream()
+        .anyMatch(opImpl -> ((OperatorImpl) 
opImpl).getOperatorSpec().getOpCode() == OpCode.FILTER));
+    assertTrue(inputOpImpl.registeredOperators.stream()
+        .anyMatch(opImpl -> ((OperatorImpl) 
opImpl).getOperatorSpec().getOpCode() == OpCode.MAP));
   }
 
   @Test
@@ -139,13 +156,13 @@ public class TestOperatorImplGraph {
     MapFunction mockMapFunction = mock(MapFunction.class);
     mergedStream.map(mockMapFunction);
 
-    TaskContext mockTaskContext = mock(TaskContext.class);
+    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
     OperatorImplGraph opImplGraph =
         new OperatorImplGraph(streamGraph, mock(Config.class), 
mockTaskContext, mock(Clock.class));
 
     // verify that the DAG after merge is only traversed & initialized once
-    verify(mockMapFunction, times(1)).init(any(Config.class), 
any(TaskContext.class));
+    verify(mockMapFunction, times(1)).init(any(Config.class), 
any(TaskContextImpl.class));
   }
 
   @Test
@@ -160,13 +177,13 @@ public class TestOperatorImplGraph {
     MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", 
(k, v) -> v);
     inputStream1.join(inputStream2, mockJoinFunction, Duration.ofHours(1));
 
-    TaskContext mockTaskContext = mock(TaskContext.class);
+    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
     OperatorImplGraph opImplGraph =
         new OperatorImplGraph(streamGraph, mock(Config.class), 
mockTaskContext, mock(Clock.class));
 
     // verify that join function is initialized once.
-    verify(mockJoinFunction, times(1)).init(any(Config.class), 
any(TaskContext.class));
+    verify(mockJoinFunction, times(1)).init(any(Config.class), 
any(TaskContextImpl.class));
 
     InputOperatorImpl inputOpImpl1 = opImplGraph.getInputOperator(new 
SystemStream("input-system", "input-stream1"));
     InputOperatorImpl inputOpImpl2 = opImplGraph.getInputOperator(new 
SystemStream("input-system", "input-stream2"));
@@ -201,7 +218,7 @@ public class TestOperatorImplGraph {
     when(mockRunner.getStreamSpec("input1")).thenReturn(new 
StreamSpec("input1", "input-stream1", "input-system"));
     when(mockRunner.getStreamSpec("input2")).thenReturn(new 
StreamSpec("input2", "input-stream2", "input-system"));
     Config mockConfig = mock(Config.class);
-    TaskContext mockContext = mock(TaskContext.class);
+    TaskContextImpl mockContext = mock(TaskContextImpl.class);
     when(mockContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
     StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig);
 
@@ -255,4 +272,144 @@ public class TestOperatorImplGraph {
       }
     };
   }
+
+  @Test
+  public void testGetStreamToConsumerTasks() {
+    String system = "test-system";
+    String stream0 = "test-stream-0";
+    String stream1 = "test-stream-1";
+
+    SystemStreamPartition ssp0 = new SystemStreamPartition(system, stream0, 
new Partition(0));
+    SystemStreamPartition ssp1 = new SystemStreamPartition(system, stream0, 
new Partition(1));
+    SystemStreamPartition ssp2 = new SystemStreamPartition(system, stream1, 
new Partition(0));
+
+    TaskName task0 = new TaskName("Task 0");
+    TaskName task1 = new TaskName("Task 1");
+    Set<SystemStreamPartition> ssps = new HashSet<>();
+    ssps.add(ssp0);
+    ssps.add(ssp2);
+    TaskModel tm0 = new TaskModel(task0, ssps, new Partition(0));
+    ContainerModel cm0 = new ContainerModel("c0", 0, 
Collections.singletonMap(task0, tm0));
+    TaskModel tm1 = new TaskModel(task1, Collections.singleton(ssp1), new 
Partition(1));
+    ContainerModel cm1 = new ContainerModel("c1", 1, 
Collections.singletonMap(task1, tm1));
+
+    Map<String, ContainerModel> cms = new HashMap<>();
+    cms.put(cm0.getProcessorId(), cm0);
+    cms.put(cm1.getProcessorId(), cm1);
+
+    JobModel jobModel = new JobModel(new MapConfig(), cms, null);
+    Multimap<SystemStream, String> streamToTasks = 
OperatorImplGraph.getStreamToConsumerTasks(jobModel);
+    assertEquals(streamToTasks.get(ssp0.getSystemStream()).size(), 2);
+    assertEquals(streamToTasks.get(ssp2.getSystemStream()).size(), 1);
+  }
+
+  @Test
+  public void testGetOutputToInputStreams() {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(JobConfig.JOB_NAME(), "test-app");
+    configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system");
+    Config config = new MapConfig(configMap);
+
+    /**
+     * the graph looks like the following. number of partitions in 
parentheses. quotes indicate expected value.
+     *
+     *                                    input1 -> map -> join -> partitionBy 
(10) -> output1
+     *                                                       |
+     *                                     input2 -> filter -|
+     *                                                       |
+     *           input3 -> filter -> partitionBy -> map -> join -> output2
+     *
+     */
+    StreamSpec input1 = new StreamSpec("input1", "input1", "system1");
+    StreamSpec input2 = new StreamSpec("input2", "input2", "system2");
+    StreamSpec input3 = new StreamSpec("input3", "input3", "system2");
+
+    StreamSpec output1 = new StreamSpec("output1", "output1", "system1");
+    StreamSpec output2 = new StreamSpec("output2", "output2", "system2");
+
+    ApplicationRunner runner = mock(ApplicationRunner.class);
+    when(runner.getStreamSpec("input1")).thenReturn(input1);
+    when(runner.getStreamSpec("input2")).thenReturn(input2);
+    when(runner.getStreamSpec("input3")).thenReturn(input3);
+    when(runner.getStreamSpec("output1")).thenReturn(output1);
+    when(runner.getStreamSpec("output2")).thenReturn(output2);
+
+    // intermediate streams used in tests
+    StreamSpec int1 = new StreamSpec("test-app-1-partition_by-10", 
"test-app-1-partition_by-10", "default-system");
+    StreamSpec int2 = new StreamSpec("test-app-1-partition_by-6", 
"test-app-1-partition_by-6", "default-system");
+    when(runner.getStreamSpec("test-app-1-partition_by-10"))
+        .thenReturn(int1);
+    when(runner.getStreamSpec("test-app-1-partition_by-6"))
+        .thenReturn(int2);
+
+    StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
+    BiFunction msgBuilder = mock(BiFunction.class);
+    MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m 
-> m);
+    MessageStream m2 = streamGraph.getInputStream("input2", 
msgBuilder).filter(m -> true);
+    MessageStream m3 = streamGraph.getInputStream("input3", 
msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
+    Function mockFn = mock(Function.class);
+    OutputStream<Object, Object, Object> om1 = 
streamGraph.getOutputStream("output1", mockFn, mockFn);
+    OutputStream<Object, Object, Object> om2 = 
streamGraph.getOutputStream("output2", mockFn, mockFn);
+
+    m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).partitionBy(m 
-> "haha").sendTo(om1);
+    m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(om2);
+
+    Multimap<SystemStream, SystemStream> outputToInput = 
OperatorImplGraph.getIntermediateToInputStreamsMap(streamGraph);
+    Collection<SystemStream> inputs = outputToInput.get(int1.toSystemStream());
+    assertEquals(inputs.size(), 2);
+    assertTrue(inputs.contains(input1.toSystemStream()));
+    assertTrue(inputs.contains(input2.toSystemStream()));
+
+    inputs = outputToInput.get(int2.toSystemStream());
+    assertEquals(inputs.size(), 1);
+    assertEquals(inputs.iterator().next(), input3.toSystemStream());
+  }
+
+  @Test
+  public void testGetProducerTaskCountForIntermediateStreams() {
+    /**
+     * the task assignment looks like the following:
+     *
+     * input1 -----> task0, task1 -----> int1
+     *                                    ^
+     * input2 ------> task1, task2--------|
+     *                                    v
+     * input3 ------> task1 -----------> int2
+     *
+     */
+
+    SystemStream input1 = new SystemStream("system1", "intput1");
+    SystemStream input2 = new SystemStream("system2", "intput2");
+    SystemStream input3 = new SystemStream("system2", "intput3");
+
+    SystemStream int1 = new SystemStream("system1", "int1");
+    SystemStream int2 = new SystemStream("system1", "int2");
+
+
+    String task0 = "Task 0";
+    String task1 = "Task 1";
+    String task2 = "Task 2";
+
+    Multimap<SystemStream, String> streamToConsumerTasks = 
HashMultimap.create();
+    streamToConsumerTasks.put(input1, task0);
+    streamToConsumerTasks.put(input1, task1);
+    streamToConsumerTasks.put(input2, task1);
+    streamToConsumerTasks.put(input2, task2);
+    streamToConsumerTasks.put(input3, task1);
+    streamToConsumerTasks.put(int1, task0);
+    streamToConsumerTasks.put(int1, task1);
+    streamToConsumerTasks.put(int2, task0);
+
+    Multimap<SystemStream, SystemStream> intermediateToInputStreams = 
HashMultimap.create();
+    intermediateToInputStreams.put(int1, input1);
+    intermediateToInputStreams.put(int1, input2);
+
+    intermediateToInputStreams.put(int2, input2);
+    intermediateToInputStreams.put(int2, input3);
+
+    Map<SystemStream, Integer> counts = 
OperatorImplGraph.getProducerTaskCountForIntermediateStreams(
+        streamToConsumerTasks, intermediateToInputStreams);
+    assertTrue(counts.get(int1) == 3);
+    assertTrue(counts.get(int2) == 2);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java
new file mode 100644
index 0000000..a726069
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.WatermarkMessage;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+import static 
org.apache.samza.operators.impl.WatermarkStates.WATERMARK_NOT_EXIST;
+
+public class TestWatermarkStates {
+
+  @Test
+  public void testUpdate() {
+    SystemStream input = new SystemStream("system", "input");
+    SystemStream intermediate = new SystemStream("system", "intermediate");
+
+    Set<SystemStreamPartition> ssps = new HashSet<>();
+    SystemStreamPartition inputPartition0 = new SystemStreamPartition(input, 
new Partition(0));
+    SystemStreamPartition intPartition0 = new 
SystemStreamPartition(intermediate, new Partition(0));
+    SystemStreamPartition intPartition1 = new 
SystemStreamPartition(intermediate, new Partition(1));
+    ssps.add(inputPartition0);
+    ssps.add(intPartition0);
+    ssps.add(intPartition1);
+
+    Map<SystemStream, Integer> producerCounts = new HashMap<>();
+    producerCounts.put(intermediate, 2);
+
+    // advance watermark on input to 5
+    WatermarkStates watermarkStates = new WatermarkStates(ssps, 
producerCounts);
+    IncomingMessageEnvelope envelope = 
IncomingMessageEnvelope.buildWatermarkEnvelope(inputPartition0, 5L);
+    watermarkStates.update((WatermarkMessage) envelope.getMessage(),
+        envelope.getSystemStreamPartition());
+    assertEquals(watermarkStates.getWatermark(input), 5L);
+    assertEquals(watermarkStates.getWatermark(intermediate), 
WATERMARK_NOT_EXIST);
+
+    // watermark from task 0 on int p0 to 6
+    WatermarkMessage watermarkMessage = new WatermarkMessage(6L, "task 0");
+    watermarkStates.update(watermarkMessage, intPartition0);
+    assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 
WATERMARK_NOT_EXIST);
+    assertEquals(watermarkStates.getWatermark(intermediate), 
WATERMARK_NOT_EXIST);
+
+    // watermark from task 1 on int p0 to 3
+    watermarkMessage = new WatermarkMessage(3L, "task 1");
+    watermarkStates.update(watermarkMessage, intPartition0);
+    assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 3L);
+    assertEquals(watermarkStates.getWatermark(intermediate), 
WATERMARK_NOT_EXIST);
+
+    // watermark from task 0 on int p1 to 10
+    watermarkMessage = new WatermarkMessage(10L, "task 0");
+    watermarkStates.update(watermarkMessage, intPartition1);
+    assertEquals(watermarkStates.getWatermarkPerSSP(intPartition1), 
WATERMARK_NOT_EXIST);
+    assertEquals(watermarkStates.getWatermark(intermediate), 
WATERMARK_NOT_EXIST);
+
+    // watermark from task 1 on int p1 to 4
+    watermarkMessage = new WatermarkMessage(4L, "task 1");
+    watermarkStates.update(watermarkMessage, intPartition1);
+    assertEquals(watermarkStates.getWatermarkPerSSP(intPartition1), 4L);
+    // verify we got a watermark 3 (min) for int stream
+    assertEquals(watermarkStates.getWatermark(intermediate), 3L);
+
+    // advance watermark from task 1 on int p0 to 8
+    watermarkMessage = new WatermarkMessage(8L, "task 1");
+    watermarkStates.update(watermarkMessage, intPartition0);
+    assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 6L);
+    // verify we got a watermark 4 (min) for int stream
+    assertEquals(watermarkStates.getWatermark(intermediate), 4L);
+
+    // advance watermark from task 1 on int p1 to 7
+    watermarkMessage = new WatermarkMessage(7L, "task 1");
+    watermarkStates.update(watermarkMessage, intPartition1);
+    assertEquals(watermarkStates.getWatermarkPerSSP(intPartition1), 7L);
+    // verify we got a watermark 6 (min) for int stream
+    assertEquals(watermarkStates.getWatermark(intermediate), 6L);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
 
b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
index 7192525..7a3faca 100644
--- 
a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
+++ 
b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
@@ -24,11 +24,11 @@ import java.io.ByteArrayOutputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
-import org.apache.samza.message.EndOfStreamMessage;
-import org.apache.samza.message.WatermarkMessage;
-import org.apache.samza.message.IntermediateMessageType;
 import org.apache.samza.serializers.IntermediateMessageSerde;
 import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.EndOfStreamMessage;
+import org.apache.samza.system.MessageType;
+import org.apache.samza.system.WatermarkMessage;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -96,7 +96,7 @@ public class TestIntermediateMessageSerde {
     TestUserMessage userMessage = new TestUserMessage(msg, 0, 
System.currentTimeMillis());
     byte[] bytes = imserde.toBytes(userMessage);
     TestUserMessage de = (TestUserMessage) imserde.fromBytes(bytes);
-    assertEquals(IntermediateMessageType.of(de), 
IntermediateMessageType.USER_MESSAGE);
+    assertEquals(MessageType.of(de), MessageType.USER_MESSAGE);
     assertEquals(de.getMessage(), msg);
     assertEquals(de.getOffset(), 0);
     assertTrue(de.getTimestamp() > 0);
@@ -106,12 +106,11 @@ public class TestIntermediateMessageSerde {
   public void testWatermarkMessageSerde() {
     IntermediateMessageSerde imserde = new IntermediateMessageSerde(new 
ObjectSerde());
     String taskName = "task-1";
-    WatermarkMessage watermark = new 
WatermarkMessage(System.currentTimeMillis(), taskName, 8);
+    WatermarkMessage watermark = new 
WatermarkMessage(System.currentTimeMillis(), taskName);
     byte[] bytes = imserde.toBytes(watermark);
     WatermarkMessage de = (WatermarkMessage) imserde.fromBytes(bytes);
-    assertEquals(IntermediateMessageType.of(de), 
IntermediateMessageType.WATERMARK_MESSAGE);
+    assertEquals(MessageType.of(de), MessageType.WATERMARK);
     assertEquals(de.getTaskName(), taskName);
-    assertEquals(de.getTaskCount(), 8);
     assertTrue(de.getTimestamp() > 0);
   }
 
@@ -120,12 +119,11 @@ public class TestIntermediateMessageSerde {
     IntermediateMessageSerde imserde = new IntermediateMessageSerde(new 
ObjectSerde());
     String streamId = "test-stream";
     String taskName = "task-1";
-    EndOfStreamMessage eos = new EndOfStreamMessage(taskName, 8);
+    EndOfStreamMessage eos = new EndOfStreamMessage(taskName);
     byte[] bytes = imserde.toBytes(eos);
     EndOfStreamMessage de = (EndOfStreamMessage) imserde.fromBytes(bytes);
-    assertEquals(IntermediateMessageType.of(de), 
IntermediateMessageType.END_OF_STREAM_MESSAGE);
+    assertEquals(MessageType.of(de), MessageType.END_OF_STREAM);
     assertEquals(de.getTaskName(), taskName);
-    assertEquals(de.getTaskCount(), 8);
     assertEquals(de.getVersion(), 1);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index 3d4976b..5a4b4bf 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -39,7 +39,6 @@ import org.apache.samza.container.TaskInstance;
 import org.apache.samza.container.TaskInstanceExceptionHandler;
 import org.apache.samza.container.TaskInstanceMetrics;
 import org.apache.samza.container.TaskName;
-import org.apache.samza.control.EndOfStreamManager;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemConsumer;
@@ -78,8 +77,8 @@ public class TestAsyncRunLoop {
   private final IncomingMessageEnvelope envelope0 = new 
IncomingMessageEnvelope(ssp0, "0", "key0", "value0");
   private final IncomingMessageEnvelope envelope1 = new 
IncomingMessageEnvelope(ssp1, "1", "key1", "value1");
   private final IncomingMessageEnvelope envelope3 = new 
IncomingMessageEnvelope(ssp0, "1", "key0", "value0");
-  private final IncomingMessageEnvelope ssp0EndOfStream = 
EndOfStreamManager.buildEndOfStreamEnvelope(ssp0);
-  private final IncomingMessageEnvelope ssp1EndOfStream = 
EndOfStreamManager.buildEndOfStreamEnvelope(ssp1);
+  private final IncomingMessageEnvelope ssp0EndOfStream = 
IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp0);
+  private final IncomingMessageEnvelope ssp1EndOfStream = 
IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp1);
 
   TaskInstance createTaskInstance(AsyncStreamTask task, TaskName taskName, 
SystemStreamPartition ssp, OffsetManager manager, SystemConsumers consumers) {
     TaskInstanceMetrics taskInstanceMetrics = new TaskInstanceMetrics("task", 
new MetricsRegistryMap());
@@ -575,7 +574,7 @@ public class TestAsyncRunLoop {
     SystemStreamPartition ssp2 = new SystemStreamPartition("system1", 
"stream2", p2);
     IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp2, "1", 
"key1", "message1");
     IncomingMessageEnvelope envelope2 = new IncomingMessageEnvelope(ssp2, "2", 
"key1", "message1");
-    IncomingMessageEnvelope envelope3 = 
EndOfStreamManager.buildEndOfStreamEnvelope(ssp2);
+    IncomingMessageEnvelope envelope3 = 
IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp2);
 
     Map<SystemStreamPartition, List<IncomingMessageEnvelope>> sspMap = new 
HashMap<>();
     List<IncomingMessageEnvelope> messageList = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 81f3ed1..ea11d9f 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -20,28 +20,21 @@
 package org.apache.samza.container
 
 
-import java.util
-import java.util
-import java.util.Collections
 import java.util.concurrent.ConcurrentHashMap
-import com.google.common.collect.Multimap
-import org.apache.samza.SamzaException
 
 import org.apache.samza.Partition
-import org.apache.samza.checkpoint.OffsetManager
-import org.apache.samza.config.Config
-import org.apache.samza.config.MapConfig
-import org.apache.samza.control.ControlMessageUtils
-import org.apache.samza.job.model.ContainerModel
-import org.apache.samza.job.model.JobModel
-import org.apache.samza.job.model.TaskModel
-import org.apache.samza.metrics.Counter
-import org.apache.samza.metrics.Metric
-import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.checkpoint.{Checkpoint, OffsetManager}
 import org.apache.samza.config.{Config, MapConfig}
 import org.apache.samza.metrics.{Counter, Metric, MetricsRegistryMap}
 import org.apache.samza.serializers.SerdeManager
+import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.system.SystemAdmin
+import org.apache.samza.system.SystemConsumer
+import org.apache.samza.system.SystemConsumers
+import org.apache.samza.system.SystemProducer
+import org.apache.samza.system.SystemProducers
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.storage.TaskStorageManager
 import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import org.apache.samza.system._
@@ -124,9 +117,9 @@ class TestTaskInstance {
    */
   class TroublesomeTask extends StreamTask with WindowableTask {
     def process(
-      envelope: IncomingMessageEnvelope,
-      collector: MessageCollector,
-      coordinator: TaskCoordinator) {
+                 envelope: IncomingMessageEnvelope,
+                 collector: MessageCollector,
+                 coordinator: TaskCoordinator) {
 
       envelope.getOffset().toInt match {
         case offset if offset % 2 == 0 => throw new TroublesomeException
@@ -143,8 +136,8 @@ class TestTaskInstance {
    * Helper method used to retrieve the value of a counter from a group.
    */
   private def getCount(
-    group: ConcurrentHashMap[String, Metric],
-    name: String): Long = {
+                        group: ConcurrentHashMap[String, Metric],
+                        name: String): Long = {
     group.get("exception-ignored-" + 
name.toLowerCase).asInstanceOf[Counter].getCount
   }
 
@@ -407,36 +400,6 @@ class TestTaskInstance {
     // Finally, checkpoint the inputs with the snapshotted checkpoint captured 
at the beginning of commit
     mockOrder.verify(offsetManager).writeCheckpoint(taskName, checkpoint)
   }
-
-  @Test
-  def testBuildInputToTasks = {
-    val system: String = "test-system"
-    val stream0: String = "test-stream-0"
-    val stream1: String = "test-stream-1"
-
-    val ssp0: SystemStreamPartition = new SystemStreamPartition(system, 
stream0, new Partition(0))
-    val ssp1: SystemStreamPartition = new SystemStreamPartition(system, 
stream0, new Partition(1))
-    val ssp2: SystemStreamPartition = new SystemStreamPartition(system, 
stream1, new Partition(0))
-
-    val task0: TaskName = new TaskName("Task 0")
-    val task1: TaskName = new TaskName("Task 1")
-    val ssps: util.Set[SystemStreamPartition] = new 
util.HashSet[SystemStreamPartition]
-    ssps.add(ssp0)
-    ssps.add(ssp2)
-    val tm0: TaskModel = new TaskModel(task0, ssps, new Partition(0))
-    val cm0: ContainerModel = new ContainerModel("c0", 0, 
Collections.singletonMap(task0, tm0))
-    val tm1: TaskModel = new TaskModel(task1, Collections.singleton(ssp1), new 
Partition(1))
-    val cm1: ContainerModel = new ContainerModel("c1", 1, 
Collections.singletonMap(task1, tm1))
-
-    val cms: util.Map[String, ContainerModel] = new util.HashMap[String, 
ContainerModel]
-    cms.put(cm0.getProcessorId, cm0)
-    cms.put(cm1.getProcessorId, cm1)
-
-    val jobModel: JobModel = new JobModel(new MapConfig, cms, null)
-    val streamToTasks: Multimap[SystemStream, String] = 
TaskInstance.buildInputToTasks(jobModel)
-    assertEquals(streamToTasks.get(ssp0.getSystemStream).size, 2)
-    assertEquals(streamToTasks.get(ssp2.getSystemStream).size, 1)
-  }
 }
 
 class MockSystemAdmin extends SystemAdmin {

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala 
b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala
index 9d808cb..774230c 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala
@@ -20,8 +20,8 @@
 package org.apache.samza.serializers
 
 
-import org.apache.samza.message.EndOfStreamMessage
-import org.apache.samza.message.WatermarkMessage
+import org.apache.samza.system.EndOfStreamMessage
+import org.apache.samza.system.WatermarkMessage
 import org.junit.Assert._
 import org.junit.Assert.assertEquals
 import org.junit.Assert.assertEquals
@@ -83,18 +83,17 @@ class TestSerdeManager {
     val eosStreamId = "eos-stream"
     val taskName = "task 1"
     val taskCount = 8
-    outEnvelope = new OutgoingMessageEnvelope(intermediate, "eos", new 
EndOfStreamMessage(taskName, taskCount))
+    outEnvelope = new OutgoingMessageEnvelope(intermediate, "eos", new 
EndOfStreamMessage(taskName))
     se = serdeManager.toBytes(outEnvelope)
     inEnvelope = new IncomingMessageEnvelope(new 
SystemStreamPartition(intermediate, new Partition(0)), "offset", se.getKey, 
se.getMessage)
     de = serdeManager.fromBytes(inEnvelope)
     assertEquals(de.getKey, "eos")
     val eosMsg = de.getMessage.asInstanceOf[EndOfStreamMessage]
     assertEquals(eosMsg.getTaskName, taskName)
-    assertEquals(eosMsg.getTaskCount, taskCount)
 
     // test watermark message sent to intermediate stream
     val timestamp = System.currentTimeMillis()
-    outEnvelope = new OutgoingMessageEnvelope(intermediate, "watermark", new 
WatermarkMessage(timestamp, taskName, taskCount))
+    outEnvelope = new OutgoingMessageEnvelope(intermediate, "watermark", new 
WatermarkMessage(timestamp, taskName))
     se = serdeManager.toBytes(outEnvelope)
     inEnvelope = new IncomingMessageEnvelope(new 
SystemStreamPartition(intermediate, new Partition(0)), "offset", se.getKey, 
se.getMessage)
     de = serdeManager.fromBytes(inEnvelope)
@@ -102,6 +101,5 @@ class TestSerdeManager {
     val watermarkMsg = de.getMessage.asInstanceOf[WatermarkMessage]
     assertEquals(watermarkMsg.getTimestamp, timestamp)
     assertEquals(watermarkMsg.getTaskName, taskName)
-    assertEquals(watermarkMsg.getTaskCount, taskCount)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java 
b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
index de0d1da..fb9bb56 100644
--- 
a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
+++ 
b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
@@ -36,7 +36,6 @@ import org.apache.commons.lang.Validate;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
-import org.apache.samza.control.EndOfStreamManager;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -237,7 +236,7 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap 
{
       consumerMetrics.incNumEvents(systemStreamPartition);
       consumerMetrics.incTotalNumEvents();
     }
-    offerMessage(systemStreamPartition, 
EndOfStreamManager.buildEndOfStreamEnvelope(systemStreamPartition));
+    offerMessage(systemStreamPartition, 
IncomingMessageEnvelope.buildEndOfStreamEnvelope(systemStreamPartition));
     reader.close();
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
index f313348..8493cf1 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
@@ -54,7 +54,7 @@ public class EndOfStreamIntegrationTest extends 
AbstractIntegrationTestHarness {
   @Test
   public void testPipeline() throws  Exception {
     Random random = new Random();
-    int count = 100;
+    int count = 10;
     PageView[] pageviews = new PageView[count];
     for (int i = 0; i < count; i++) {
       String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)];

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
index 2eb72fc..d9202d3 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
@@ -36,8 +36,6 @@ import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.TaskInstance;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
-import org.apache.samza.control.EndOfStreamManager;
-import org.apache.samza.control.WatermarkManager;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.operators.impl.InputOperatorImpl;
 import org.apache.samza.operators.impl.OperatorImpl;
@@ -85,14 +83,14 @@ public class WatermarkIntegrationTest extends 
AbstractIntegrationTestHarness {
   static {
     TEST_DATA.add(createIncomingMessage(new PageView("inbox", 1), SSP0));
     TEST_DATA.add(createIncomingMessage(new PageView("home", 2), SSP1));
-    TEST_DATA.add(WatermarkManager.buildWatermarkEnvelope(1, SSP0));
-    TEST_DATA.add(WatermarkManager.buildWatermarkEnvelope(2, SSP1));
-    TEST_DATA.add(WatermarkManager.buildWatermarkEnvelope(4, SSP0));
-    TEST_DATA.add(WatermarkManager.buildWatermarkEnvelope(3, SSP1));
+    TEST_DATA.add(IncomingMessageEnvelope.buildWatermarkEnvelope(SSP0, 1));
+    TEST_DATA.add(IncomingMessageEnvelope.buildWatermarkEnvelope(SSP1, 2));
+    TEST_DATA.add(IncomingMessageEnvelope.buildWatermarkEnvelope(SSP0, 4));
+    TEST_DATA.add(IncomingMessageEnvelope.buildWatermarkEnvelope(SSP1, 3));
     TEST_DATA.add(createIncomingMessage(new PageView("search", 3), SSP0));
     TEST_DATA.add(createIncomingMessage(new PageView("pymk", 4), SSP1));
-    TEST_DATA.add(EndOfStreamManager.buildEndOfStreamEnvelope(SSP0));
-    TEST_DATA.add(EndOfStreamManager.buildEndOfStreamEnvelope(SSP1));
+    TEST_DATA.add(IncomingMessageEnvelope.buildEndOfStreamEnvelope(SSP0));
+    TEST_DATA.add(IncomingMessageEnvelope.buildEndOfStreamEnvelope(SSP1));
   }
 
   public final static class TestSystemFactory implements SystemFactory {

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java 
b/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java
index 9b96216..832457b 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java
@@ -27,7 +27,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.samza.config.Config;
-import org.apache.samza.control.EndOfStreamManager;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemStreamPartition;
@@ -62,7 +61,7 @@ public class ArraySystemConsumer implements SystemConsumer {
       set.forEach(ssp -> {
           List<IncomingMessageEnvelope> envelopes = 
Arrays.stream(getArrayObjects(ssp.getSystemStream().getStream(), config))
               .map(object -> new IncomingMessageEnvelope(ssp, null, null, 
object)).collect(Collectors.toList());
-          envelopes.add(EndOfStreamManager.buildEndOfStreamEnvelope(ssp));
+          envelopes.add(IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp));
           envelopeMap.put(ssp, envelopes);
         });
       done = true;

Reply via email to