Merge branch 'master' into 0.14.0

Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/052a0570
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/052a0570
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/052a0570

Branch: refs/heads/master
Commit: 052a0570cf0f1a1020faef7c3695c0e86c6f348c
Parents: a1f0144 f16ba26
Author: Xinyu Liu <[email protected]>
Authored: Tue Oct 3 15:09:41 2017 -0700
Committer: Xinyu Liu <[email protected]>
Committed: Tue Oct 3 15:09:41 2017 -0700

----------------------------------------------------------------------
 build.gradle                                    |  12 +-
 .../versioned/jobs/configuration-table.html     |  27 +-
 .../java/org/apache/samza/operators/KV.java     |  48 +++
 .../apache/samza/operators/MessageStream.java   |  34 +-
 .../apache/samza/operators/OutputStream.java    |   6 +-
 .../org/apache/samza/operators/StreamGraph.java |  67 ++-
 .../org/apache/samza/serializers/Serde.java     |   6 +-
 .../samza/system/SystemProducerException.java   |  38 ++
 .../ByteBufferSerde.scala                       |  48 +++
 .../ByteSerde.scala                             |  36 ++
 .../DoubleSerde.scala                           |  45 ++
 .../IntegerSerde.scala                          |  45 ++
 .../JsonSerdeV2.scala                           |  91 ++++
 .../org.apache.samza.serializers/KVSerde.scala  |  55 +++
 .../LongSerde.scala                             |  45 ++
 .../NoOpSerde.scala                             |  37 ++
 .../SerializableSerde.scala                     |  67 +++
 .../StringSerde.scala                           |  49 +++
 .../UUIDSerde.scala                             |  47 ++
 .../TestByteBufferSerde.scala                   |  53 +++
 .../TestByteSerde.scala                         |  38 ++
 .../TestDoubleSerde.scala                       |  40 ++
 .../TestIntegerSerde.scala                      |  37 ++
 .../TestJsonSerdeV2.scala                       |  45 ++
 .../TestLongSerde.scala                         |  40 ++
 .../TestSerializableSerde.scala                 |  45 ++
 .../TestStringSerde.scala                       |  37 ++
 .../TestUUIDSerde.scala                         |  53 +++
 .../apache/samza/config/JavaSystemConfig.java   |  14 +-
 .../samza/config/JobCoordinatorConfig.java      |   9 +
 .../coordinator/CoordinationUtilsFactory.java   |  10 -
 .../org/apache/samza/execution/JobNode.java     |  92 +++-
 .../samza/operators/MessageStreamImpl.java      |  25 +-
 .../apache/samza/operators/StreamGraphImpl.java | 127 +++---
 .../samza/operators/impl/InputOperatorImpl.java |  16 +-
 .../samza/operators/impl/OperatorImpl.java      |  36 +-
 .../samza/operators/impl/OperatorImplGraph.java |  27 +-
 .../operators/impl/OutputOperatorImpl.java      |  58 +--
 .../operators/impl/PartitionByOperatorImpl.java | 108 +++++
 .../operators/impl/WindowOperatorImpl.java      |   3 +-
 .../samza/operators/spec/InputOperatorSpec.java |  37 +-
 .../samza/operators/spec/OperatorSpec.java      |  20 +-
 .../samza/operators/spec/OperatorSpecs.java     |  26 +-
 .../operators/spec/OutputOperatorSpec.java      |  11 +-
 .../samza/operators/spec/OutputStreamImpl.java  |  28 +-
 .../operators/spec/PartitionByOperatorSpec.java |  81 ++++
 .../stream/IntermediateMessageStreamImpl.java   |  17 +-
 .../samza/runtime/LocalApplicationRunner.java   |   5 +-
 .../serializers/IntermediateMessageSerde.java   |  39 +-
 .../apache/samza/task/StreamOperatorTask.java   |   4 +-
 .../apache/samza/config/SerializerConfig.scala  |   6 +-
 .../org/apache/samza/config/SystemConfig.scala  |  16 +-
 .../org/apache/samza/config/TaskConfig.scala    |   3 +
 .../apache/samza/container/SamzaContainer.scala |  59 ++-
 .../samza/serializers/ByteBufferSerde.scala     |  48 ---
 .../apache/samza/serializers/ByteSerde.scala    |  36 --
 .../apache/samza/serializers/DoubleSerde.scala  |  45 --
 .../apache/samza/serializers/IntegerSerde.scala |  45 --
 .../apache/samza/serializers/JsonSerde.scala    |  32 +-
 .../apache/samza/serializers/LongSerde.scala    |  45 --
 .../serializers/MetricsSnapshotSerde.scala      |   4 +-
 .../samza/serializers/SerializableSerde.scala   |  67 ---
 .../apache/samza/serializers/StringSerde.scala  |  44 --
 .../apache/samza/serializers/UUIDSerde.scala    |  47 --
 .../main/scala/org/apache/samza/util/Util.scala |  15 +-
 .../samza/config/TestJavaSystemConfig.java      |  35 +-
 .../apache/samza/example/BroadcastExample.java  |  18 +-
 .../samza/example/KeyValueStoreExample.java     |  26 +-
 .../org/apache/samza/example/MergeExample.java  |  20 +-
 .../samza/example/OrderShipmentJoinExample.java |  28 +-
 .../samza/example/PageViewCounterExample.java   |  13 +-
 .../samza/example/RepartitionExample.java       |  22 +-
 .../org/apache/samza/example/WindowExample.java |   8 +-
 .../samza/execution/TestExecutionPlanner.java   |  59 ++-
 .../execution/TestJobGraphJsonGenerator.java    |  35 +-
 .../org/apache/samza/execution/TestJobNode.java | 112 +++++
 .../samza/operators/TestJoinOperator.java       |  21 +-
 .../samza/operators/TestMessageStreamImpl.java  |  69 ++-
 .../samza/operators/TestStreamGraphImpl.java    | 407 ++++++++++++++++--
 .../samza/operators/TestWindowOperator.java     |  17 +-
 .../operators/impl/TestOperatorImplGraph.java   |  88 +++-
 .../operators/impl/TestStreamOperatorImpl.java  |   4 +-
 .../runtime/TestLocalApplicationRunner.java     |  22 +-
 .../apache/samza/config/TestSystemConfig.scala  |  67 +++
 .../samza/container/TestTaskInstance.scala      |  35 ++
 .../samza/serializers/TestByteBufferSerde.scala |  53 ---
 .../samza/serializers/TestByteSerde.scala       |  38 --
 .../samza/serializers/TestDoubleSerde.scala     |  40 --
 .../samza/serializers/TestIntegerSerde.scala    |  37 --
 .../samza/serializers/TestLongSerde.scala       |  40 --
 .../serializers/TestSerializableSerde.scala     |  45 --
 .../samza/serializers/TestStringSerde.scala     |  37 --
 .../samza/serializers/TestUUIDSerde.scala       |  53 ---
 .../scala/org/apache/samza/util/TestUtil.scala  |  13 +
 .../org/apache/samza/config/KafkaConfig.scala   |  34 +-
 .../samza/system/kafka/KafkaSystemFactory.scala |   5 +-
 .../system/kafka/KafkaSystemProducer.scala      | 256 ++++++-----
 .../samza/system/kafka/MockKafkaProducer.java   |  87 ++--
 .../kafka/TestKafkaSystemProducerJava.java      |   2 +-
 .../apache/samza/config/TestKafkaConfig.scala   |  24 +-
 .../system/kafka/TestKafkaSystemProducer.scala  | 427 ++++++++++++++++---
 .../storage/kv/TestKeyValueStorageEngine.scala  |  14 +-
 .../apache/samza/config/Log4jSystemConfig.java  |   2 +-
 .../samza/logging/log4j/StreamAppender.java     |   2 +-
 samza-test/src/main/resources/log4j.xml         |  44 +-
 .../EndOfStreamIntegrationTest.java             |  15 +-
 .../WatermarkIntegrationTest.java               |   8 +-
 .../apache/samza/test/operator/PageView.java    |  37 +-
 .../test/operator/RepartitionWindowApp.java     |  28 +-
 .../samza/test/operator/SessionWindowApp.java   |  23 +-
 .../test/operator/TestRepartitionWindowApp.java |  20 +-
 .../samza/test/operator/TumblingWindowApp.java  |  24 +-
 .../test/processor/IdentityStreamTask.java      |  16 +-
 .../test/processor/TestStreamProcessor.java     | 149 ++++---
 .../processor/TestZkLocalApplicationRunner.java |  36 +-
 .../test/integration/StreamTaskTestUtil.scala   |   1 +
 .../test/integration/TestStatefulTask.scala     |   2 +-
 117 files changed, 3580 insertions(+), 1634 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --cc samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index a86e019,fea42f2..7ff43ed
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@@ -123,8 -132,11 +132,11 @@@ public class JobNode 
      configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson);
  
      // write input/output streams to configs
 -    inEdges.stream().filter(StreamEdge::isIntermediate).forEach(edge -> 
addStreamConfig(edge, configs));
 +    inEdges.stream().filter(StreamEdge::isIntermediate).forEach(edge -> 
configs.putAll(edge.generateConfig()));
  
+     // write serialized serde instances and stream serde configs to configs
+     addSerdeConfigs(configs);
+ 
      log.info("Job {} has generated configs {}", jobName, configs);
  
      String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName);

http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --cc 
samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index eefd4eb,e353ac4..0c50630
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@@ -18,16 -18,9 +18,13 @@@
   */
  package org.apache.samza.operators.impl;
  
- import java.util.Collection;
- import java.util.Collections;
- import java.util.HashSet;
- import java.util.Set;
+ import org.apache.samza.SamzaException;
  import org.apache.samza.config.Config;
  import org.apache.samza.config.MetricsConfig;
 +import org.apache.samza.container.TaskContextImpl;
 +import org.apache.samza.container.TaskName;
 +import org.apache.samza.operators.functions.WatermarkFunction;
 +import org.apache.samza.system.EndOfStreamMessage;
  import org.apache.samza.metrics.Counter;
  import org.apache.samza.metrics.MetricsRegistry;
  import org.apache.samza.metrics.Timer;
@@@ -39,14 -29,20 +36,23 @@@ import org.apache.samza.task.MessageCol
  import org.apache.samza.task.TaskContext;
  import org.apache.samza.task.TaskCoordinator;
  import org.apache.samza.util.HighResolutionClock;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
  
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.Set;
+ 
+ 
  /**
   * Abstract base class for all stream operator implementations.
+  *
+  * @param <M> type of the input to this operator
+  * @param <RM> type of the results of applying this operator
   */
  public abstract class OperatorImpl<M, RM> {
 +  private static final Logger LOG = 
LoggerFactory.getLogger(OperatorImpl.class);
    private static final String METRICS_GROUP = OperatorImpl.class.getName();
  
    private boolean initialized;
@@@ -151,14 -133,9 +169,16 @@@
      long endNs = this.highResClock.nanoTime();
      this.handleMessageNs.update(endNs - startNs);
  
-     results.forEach(rm -> this.registeredOperators.forEach(op -> 
op.onMessage(rm, collector, coordinator)));
+     results.forEach(rm ->
+         this.registeredOperators.forEach(op ->
 -            op.onMessage(rm, collector, coordinator)));
++            op.onMessage(rm, collector, coordinator)));    
 +
 +    WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn();
 +    if (watermarkFn != null) {
 +      // check whether there is new watermark emitted from the user function
 +      Long outputWm = watermarkFn.getOutputWatermark();
 +      propagateWatermark(outputWm, collector, coordinator);
 +    }
    }
  
    /**

http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --cc 
samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 9b747bc,faedfc9..1f86975
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@@ -18,14 -18,9 +18,14 @@@
   */
  package org.apache.samza.operators.impl;
  
 +import com.google.common.collect.HashMultimap;
  import com.google.common.collect.Lists;
 +import com.google.common.collect.Multimap;
 +import java.util.stream.Collectors;
- import org.apache.commons.lang3.tuple.Pair;
  import org.apache.samza.config.Config;
 +import org.apache.samza.container.TaskContextImpl;
 +import org.apache.samza.job.model.JobModel;
+ import org.apache.samza.operators.KV;
  import org.apache.samza.operators.StreamGraphImpl;
  import org.apache.samza.operators.functions.JoinFunction;
  import org.apache.samza.operators.functions.PartialJoinFunction;
@@@ -272,70 -249,4 +275,68 @@@ public class OperatorImplGraph 
        }
      };
    }
 +
 +  private boolean hasIntermediateStreams(StreamGraphImpl streamGraph) {
 +    return !Collections.disjoint(streamGraph.getInputOperators().keySet(), 
streamGraph.getOutputStreams().keySet());
 +  }
 +
 +  /**
 +   * calculate the task count that produces to each intermediate streams
 +   * @param streamToConsumerTasks input streams to task mapping
 +   * @param intermediateToInputStreams intermediate stream to input streams 
mapping
 +   * @return mapping from intermediate stream to task count
 +   */
 +  static Map<SystemStream, Integer> 
getProducerTaskCountForIntermediateStreams(
 +      Multimap<SystemStream, String> streamToConsumerTasks,
 +      Multimap<SystemStream, SystemStream> intermediateToInputStreams) {
 +    Map<SystemStream, Integer> result = new HashMap<>();
 +    intermediateToInputStreams.asMap().entrySet().forEach(entry -> {
 +        result.put(entry.getKey(),
 +            entry.getValue().stream()
 +                .flatMap(systemStream -> 
streamToConsumerTasks.get(systemStream).stream())
 +                .collect(Collectors.toSet()).size());
 +      });
 +    return result;
 +  }
 +
 +  /**
 +   * calculate the mapping from input streams to consumer tasks
 +   * @param jobModel JobModel object
 +   * @return mapping from input stream to tasks
 +   */
 +  static Multimap<SystemStream, String> getStreamToConsumerTasks(JobModel 
jobModel) {
 +    Multimap<SystemStream, String> streamToConsumerTasks = 
HashMultimap.create();
 +    jobModel.getContainers().values().forEach(containerModel -> {
 +        containerModel.getTasks().values().forEach(taskModel -> {
 +            taskModel.getSystemStreamPartitions().forEach(ssp -> {
 +                streamToConsumerTasks.put(ssp.getSystemStream(), 
taskModel.getTaskName().getTaskName());
 +              });
 +          });
 +      });
 +    return streamToConsumerTasks;
 +  }
 +
 +  /**
 +   * calculate the mapping from output streams to input streams
 +   * @param streamGraph the user {@link StreamGraphImpl} instance
 +   * @return mapping from output streams to input streams
 +   */
 +  static Multimap<SystemStream, SystemStream> 
getIntermediateToInputStreamsMap(StreamGraphImpl streamGraph) {
 +    Multimap<SystemStream, SystemStream> outputToInputStreams = 
HashMultimap.create();
 +    streamGraph.getInputOperators().entrySet().stream()
 +        .forEach(
 +            entry -> computeOutputToInput(entry.getKey().toSystemStream(), 
entry.getValue(), outputToInputStreams));
 +    return outputToInputStreams;
 +  }
 +
 +  private static void computeOutputToInput(SystemStream input, OperatorSpec 
opSpec,
 +      Multimap<SystemStream, SystemStream> outputToInputStreams) {
-     if (opSpec instanceof OutputOperatorSpec) {
-       OutputOperatorSpec outputOpSpec = (OutputOperatorSpec) opSpec;
-       if (outputOpSpec.getOpCode() == OperatorSpec.OpCode.PARTITION_BY) {
-         
outputToInputStreams.put(outputOpSpec.getOutputStream().getStreamSpec().toSystemStream(),
 input);
-       }
++    if (opSpec instanceof PartitionByOperatorSpec) {
++      PartitionByOperatorSpec spec = (PartitionByOperatorSpec) opSpec;
++      
outputToInputStreams.put(spec.getOutputStream().getStreamSpec().toSystemStream(),
 input);
 +    } else {
 +      Collection<OperatorSpec> nextOperators = 
opSpec.getRegisteredOperatorSpecs();
 +      nextOperators.forEach(spec -> computeOutputToInput(input, spec, 
outputToInputStreams));
 +    }
 +  }
  }

http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
----------------------------------------------------------------------
diff --cc 
samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
index 0000000,072b31d..28b8dba
mode 000000,100644..100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
@@@ -1,0 -1,82 +1,108 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.samza.operators.impl;
+ 
+ import org.apache.samza.SamzaException;
+ import org.apache.samza.config.Config;
++import org.apache.samza.container.TaskContextImpl;
+ import org.apache.samza.operators.KV;
+ import org.apache.samza.operators.spec.OperatorSpec;
+ import org.apache.samza.operators.spec.OutputStreamImpl;
+ import org.apache.samza.operators.spec.PartitionByOperatorSpec;
++import org.apache.samza.system.ControlMessage;
++import org.apache.samza.system.EndOfStreamMessage;
+ import org.apache.samza.system.OutgoingMessageEnvelope;
++import org.apache.samza.system.StreamMetadataCache;
+ import org.apache.samza.system.SystemStream;
++import org.apache.samza.system.WatermarkMessage;
+ import org.apache.samza.task.MessageCollector;
+ import org.apache.samza.task.TaskContext;
+ import org.apache.samza.task.TaskCoordinator;
+ 
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.function.Function;
+ 
+ 
+ /**
+  * An operator that sends sends messages to an output {@link SystemStream} 
for repartitioning them.
+  */
+ class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> {
+ 
+   private final PartitionByOperatorSpec<M, K, V> partitionByOpSpec;
+   private final SystemStream systemStream;
+   private final Function<? super M, ? extends K> keyFunction;
+   private final Function<? super M, ? extends V> valueFunction;
++  private final String taskName;
++  private final ControlMessageSender controlMessageSender;
+ 
+   PartitionByOperatorImpl(PartitionByOperatorSpec<M, K, V> partitionByOpSpec, 
Config config, TaskContext context) {
+     this.partitionByOpSpec = partitionByOpSpec;
+     OutputStreamImpl<KV<K, V>> outputStream = 
partitionByOpSpec.getOutputStream();
+     if (!outputStream.isKeyedOutput()) {
+       throw new SamzaException("Output stream for repartitioning must be a 
keyed stream.");
+     }
+     this.systemStream = new SystemStream(
+         outputStream.getStreamSpec().getSystemName(),
+         outputStream.getStreamSpec().getPhysicalName());
+     this.keyFunction = partitionByOpSpec.getKeyFunction();
+     this.valueFunction = partitionByOpSpec.getValueFunction();
++    this.taskName = context.getTaskName().getTaskName();
++    StreamMetadataCache streamMetadataCache = ((TaskContextImpl) 
context).getStreamMetadataCache();
++    this.controlMessageSender = new ControlMessageSender(streamMetadataCache);
+   }
+ 
+   @Override
+   protected void handleInit(Config config, TaskContext context) {
+   }
+ 
+   @Override
+   public Collection<Void> handleMessage(M message, MessageCollector collector,
+       TaskCoordinator coordinator) {
+     K key = keyFunction.apply(message);
+     V value = valueFunction.apply(message);
+     collector.send(new OutgoingMessageEnvelope(systemStream, null, key, 
value));
+     return Collections.emptyList();
+   }
+ 
+   @Override
+   protected void handleClose() {
+   }
+ 
+   @Override
+   protected OperatorSpec<M, Void> getOperatorSpec() {
+     return partitionByOpSpec;
+   }
++
++  @Override
++  protected void handleEndOfStream(MessageCollector collector, 
TaskCoordinator coordinator) {
++    sendControlMessage(new EndOfStreamMessage(taskName), collector);
++  }
++
++  @Override
++  protected Long handleWatermark(long watermark, MessageCollector collector, 
TaskCoordinator coordinator) {
++    sendControlMessage(new WatermarkMessage(watermark, taskName), collector);
++    return watermark;
++  }
++
++  private void sendControlMessage(ControlMessage message, MessageCollector 
collector) {
++    SystemStream outputStream = 
partitionByOpSpec.getOutputStream().getStreamSpec().toSystemStream();
++    controlMessageSender.send(message, outputStream, collector);
++  }
+ }

http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
----------------------------------------------------------------------
diff --cc 
samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
index 773f742,2749245..3c66ee6
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
@@@ -18,12 -18,10 +18,11 @@@
   */
  package org.apache.samza.operators.spec;
  
- import org.apache.commons.lang3.tuple.Pair;
+ import org.apache.samza.operators.KV;
+ import org.apache.samza.serializers.Serde;
 +import org.apache.samza.operators.functions.WatermarkFunction;
  import org.apache.samza.system.StreamSpec;
  
- import java.util.function.BiFunction;
- 
  /**
   * The spec for an operator that receives incoming messages from an input 
stream
   * and converts them to the input message.
@@@ -47,12 -49,15 +50,20 @@@ public class InputOperatorSpec<K, V> ex
      return this.streamSpec;
    }
  
-   public BiFunction<K, V, M> getMsgBuilder() {
-     return this.msgBuilder;
+   public Serde<K> getKeySerde() {
+     return keySerde;
+   }
+ 
+   public Serde<V> getValueSerde() {
+     return valueSerde;
+   }
+ 
+   public boolean isKeyedInput() {
+     return isKeyedInput;
    }
 +
 +  @Override
 +  public WatermarkFunction getWatermarkFn() {
 +    return null;
-   }
++  }  
  }

http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --cc 
samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index 4047d92,bcb0485..71a9897
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@@ -19,7 -19,8 +19,9 @@@
  package org.apache.samza.operators.spec;
  
  import org.apache.samza.annotation.InterfaceStability;
 +import org.apache.samza.operators.functions.WatermarkFunction;
+ import org.apache.samza.operators.MessageStream;
+ import org.apache.samza.operators.MessageStreamImpl;
  
  import java.util.Collection;
  import java.util.LinkedHashSet;

http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
----------------------------------------------------------------------
diff --cc 
samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
index 9759392,fc88634..862370f
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
@@@ -51,12 -46,7 +48,12 @@@ public class OutputOperatorSpec<M> exte
     * The {@link OutputStreamImpl} that this operator is sending its output to.
     * @return the {@link OutputStreamImpl} for this operator if any, else null.
     */
-   public OutputStreamImpl<?, ?, M> getOutputStream() {
+   public OutputStreamImpl<M> getOutputStream() {
      return this.outputStream;
    }
 +
 +  @Override
 +  public WatermarkFunction getWatermarkFn() {
 +    return null;
 +  }
  }

http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
----------------------------------------------------------------------
diff --cc 
samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
index 0000000,a2bb5f2..42eeb4b
mode 000000,100644..100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
@@@ -1,0 -1,76 +1,81 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.samza.operators.spec;
+ 
+ import org.apache.samza.operators.KV;
++import org.apache.samza.operators.functions.WatermarkFunction;
+ 
+ import java.util.function.Function;
+ 
+ 
+ /**
+  * The spec for an operator that re-partitions a {@link 
org.apache.samza.operators.MessageStream} to a
+  * {@link org.apache.samza.system.SystemStream}. This is usually paired with 
a corresponding
+  * {@link InputOperatorSpec} that consumes the {@link 
org.apache.samza.system.SystemStream} again.
+  * <p>
+  * This is a terminal operator and does not allow further operator chaining.
+  *
+  * @param <M> the type of message
+  * @param <K> the type of key in the message
+  * @param <V> the type of value in the message
+  */
+ public class PartitionByOperatorSpec<M, K, V> extends OperatorSpec<M, Void> {
+ 
+   private final OutputStreamImpl<KV<K, V>> outputStream;
+   private final Function<? super M, ? extends K> keyFunction;
+   private final Function<? super M, ? extends V> valueFunction;
+ 
+   /**
+    * Constructs an {@link PartitionByOperatorSpec} to send messages to the 
provided {@code outputStream}
+    *
+    * @param outputStream the {@link OutputStreamImpl} to send messages to
+    * @param keyFunction the {@link Function} for extracting the key from the 
message
+    * @param valueFunction the {@link Function} for extracting the value from 
the message
+    * @param opId the unique ID of this {@link SinkOperatorSpec} in the graph
+    */
+   PartitionByOperatorSpec(OutputStreamImpl<KV<K, V>> outputStream,
+       Function<? super M, ? extends K> keyFunction,
+       Function<? super M, ? extends V> valueFunction, int opId) {
+     super(OpCode.PARTITION_BY, opId);
+     this.outputStream = outputStream;
+     this.keyFunction = keyFunction;
+     this.valueFunction = valueFunction;
+   }
+ 
+   /**
+    * The {@link OutputStreamImpl} that this operator is sending its output to.
+    * @return the {@link OutputStreamImpl} for this operator if any, else null.
+    */
+   public OutputStreamImpl<KV<K, V>> getOutputStream() {
+     return this.outputStream;
+   }
+ 
+   public Function<? super M, ? extends K> getKeyFunction() {
+     return keyFunction;
+   }
+ 
+   public Function<? super M, ? extends V> getValueFunction() {
+     return valueFunction;
+   }
+ 
++  @Override
++  public WatermarkFunction getWatermarkFn() {
++    return null;
++  }
+ }

http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
----------------------------------------------------------------------
diff --cc 
samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
index 2ed559f,45ce9aa..61735ad
--- 
a/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
+++ 
b/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
@@@ -20,11 -20,13 +20,13 @@@
  package org.apache.samza.serializers;
  
  import java.util.Arrays;
+ 
  import org.apache.samza.SamzaException;
 -import org.apache.samza.message.EndOfStreamMessage;
 -import org.apache.samza.message.MessageType;
 -import org.apache.samza.message.WatermarkMessage;
 +import org.apache.samza.system.EndOfStreamMessage;
 +import org.apache.samza.system.MessageType;
 +import org.apache.samza.system.WatermarkMessage;
- import org.codehaus.jackson.type.TypeReference;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
  
  
  /**

http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --cc samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index 0074e24,d7c2742..87af392
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@@ -18,12 -18,10 +18,12 @@@
   */
  package org.apache.samza.task;
  
- import org.apache.commons.lang3.tuple.Pair;
  import org.apache.samza.application.StreamApplication;
  import org.apache.samza.config.Config;
 +import org.apache.samza.system.EndOfStreamMessage;
 +import org.apache.samza.system.MessageType;
  import org.apache.samza.operators.ContextManager;
+ import org.apache.samza.operators.KV;
  import org.apache.samza.operators.StreamGraphImpl;
  import org.apache.samza.operators.impl.InputOperatorImpl;
  import org.apache.samza.operators.impl.OperatorImplGraph;
@@@ -111,21 -105,7 +111,21 @@@ public final class StreamOperatorTask i
      SystemStream systemStream = 
ime.getSystemStreamPartition().getSystemStream();
      InputOperatorImpl inputOpImpl = 
operatorImplGraph.getInputOperator(systemStream);
      if (inputOpImpl != null) {
 -      inputOpImpl.onMessage(KV.of(ime.getKey(), ime.getMessage()), collector, 
coordinator);
 +      switch (MessageType.of(ime.getMessage())) {
 +        case USER_MESSAGE:
-           inputOpImpl.onMessage(Pair.of(ime.getKey(), ime.getMessage()), 
collector, coordinator);
++          inputOpImpl.onMessage(KV.of(ime.getKey(), ime.getMessage()), 
collector, coordinator);
 +          break;
 +
 +        case END_OF_STREAM:
 +          EndOfStreamMessage eosMessage = (EndOfStreamMessage) 
ime.getMessage();
 +          inputOpImpl.aggregateEndOfStream(eosMessage, 
ime.getSystemStreamPartition(), collector, coordinator);
 +          break;
 +
 +        case WATERMARK:
 +          WatermarkMessage watermarkMessage = (WatermarkMessage) 
ime.getMessage();
 +          inputOpImpl.aggregateWatermark(watermarkMessage, 
ime.getSystemStreamPartition(), collector, coordinator);
 +          break;
 +      }
      }
    }
  

http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
----------------------------------------------------------------------
diff --cc samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
index 0000000,c59c0cc..918da26
mode 000000,100644..100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
@@@ -1,0 -1,111 +1,112 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ package org.apache.samza.execution;
+ 
+ import org.apache.samza.config.Config;
+ import org.apache.samza.config.MapConfig;
+ import org.apache.samza.config.SerializerConfig;
+ import org.apache.samza.operators.KV;
+ import org.apache.samza.operators.MessageStream;
+ import org.apache.samza.operators.OutputStream;
+ import org.apache.samza.operators.StreamGraphImpl;
+ import org.apache.samza.runtime.ApplicationRunner;
+ import org.apache.samza.serializers.JsonSerdeV2;
+ import org.apache.samza.serializers.KVSerde;
+ import org.apache.samza.serializers.Serde;
+ import org.apache.samza.serializers.SerializableSerde;
+ import org.apache.samza.serializers.StringSerde;
+ import org.apache.samza.system.StreamSpec;
+ import org.junit.Test;
+ 
+ import java.util.Base64;
+ import java.util.HashMap;
+ import java.util.Map;
+ import java.util.stream.Collectors;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertTrue;
+ import static org.mockito.Mockito.doReturn;
+ import static org.mockito.Mockito.mock;
+ 
+ public class TestJobNode {
+ 
+   @Test
+   public void testAddSerdeConfigs() {
+     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+     StreamSpec inputSpec = new StreamSpec("input", "input", "input-system");
+     StreamSpec outputSpec = new StreamSpec("output", "output", 
"output-system");
+     StreamSpec partitionBySpec = new StreamSpec("null-null-partition_by-1", 
"partition_by-1", "intermediate-system");
+     doReturn(inputSpec).when(mockRunner).getStreamSpec("input");
+     doReturn(outputSpec).when(mockRunner).getStreamSpec("output");
+     
doReturn(partitionBySpec).when(mockRunner).getStreamSpec("null-null-partition_by-1");
+ 
+     StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
+     streamGraph.setDefaultSerde(KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>()));
+     MessageStream<KV<String, Object>> input = 
streamGraph.getInputStream("input");
+     OutputStream<KV<String, Object>> output = 
streamGraph.getOutputStream("output");
+     input.partitionBy(KV::getKey, KV::getValue).sendTo(output);
+ 
+     JobNode jobNode = new JobNode("jobName", "jobId", streamGraph, 
mock(Config.class));
 -    StreamEdge inputEdge = new StreamEdge(inputSpec);
 -    StreamEdge outputEdge = new StreamEdge(outputSpec);
 -    StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true);
++    Config config = new MapConfig();
++    StreamEdge inputEdge = new StreamEdge(inputSpec, config);
++    StreamEdge outputEdge = new StreamEdge(outputSpec, config);
++    StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true, 
config);
+     jobNode.addInEdge(inputEdge);
+     jobNode.addOutEdge(outputEdge);
+     jobNode.addInEdge(repartitionEdge);
+     jobNode.addOutEdge(repartitionEdge);
+ 
+     Map<String, String> configs = new HashMap<>();
+     jobNode.addSerdeConfigs(configs);
+ 
+     MapConfig mapConfig = new MapConfig(configs);
+     Config serializers = mapConfig.subset("serializers.registry.", true);
+ 
+     // make sure that the serializers deserialize correctly
+     SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
+     Map<String, Serde> deserializedSerdes = 
serializers.entrySet().stream().collect(Collectors.toMap(
+         e -> 
e.getKey().replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), ""),
+         e -> 
serializableSerde.fromBytes(Base64.getDecoder().decode(e.getValue().getBytes()))
+     ));
+     assertEquals(2, serializers.size());
+ 
+     String inputKeySerde = mapConfig.get("streams.input.samza.key.serde");
+     String inputMsgSerde = mapConfig.get("streams.input.samza.msg.serde");
+     assertTrue(deserializedSerdes.containsKey(inputKeySerde));
+     assertTrue(inputKeySerde.startsWith(StringSerde.class.getSimpleName()));
+     assertTrue(deserializedSerdes.containsKey(inputMsgSerde));
+     assertTrue(inputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+ 
+     String outputKeySerde = mapConfig.get("streams.output.samza.key.serde");
+     String outputMsgSerde = mapConfig.get("streams.output.samza.msg.serde");
+     assertTrue(deserializedSerdes.containsKey(outputKeySerde));
+     assertTrue(outputKeySerde.startsWith(StringSerde.class.getSimpleName()));
+     assertTrue(deserializedSerdes.containsKey(outputMsgSerde));
+     assertTrue(outputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+ 
+     String partitionByKeySerde = 
mapConfig.get("streams.null-null-partition_by-1.samza.key.serde");
+     String partitionByMsgSerde = 
mapConfig.get("streams.null-null-partition_by-1.samza.msg.serde");
+     assertTrue(deserializedSerdes.containsKey(partitionByKeySerde));
+     
assertTrue(partitionByKeySerde.startsWith(StringSerde.class.getSimpleName()));
+     assertTrue(deserializedSerdes.containsKey(partitionByMsgSerde));
+     
assertTrue(partitionByMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+   }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --cc 
samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 9fab1b7,68b4ce0..d73c545
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@@ -19,24 -19,9 +19,24 @@@
  
  package org.apache.samza.operators.impl;
  
 +import com.google.common.collect.HashMultimap;
 +import com.google.common.collect.Multimap;
 +import java.util.Collection;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Map;
 +import java.util.Set;
- import org.apache.commons.lang3.tuple.Pair;
 +import org.apache.samza.Partition;
  import org.apache.samza.config.Config;
 +import org.apache.samza.config.JobConfig;
 +import org.apache.samza.config.MapConfig;
 +import org.apache.samza.container.TaskContextImpl;
 +import org.apache.samza.container.TaskName;
 +import org.apache.samza.job.model.ContainerModel;
 +import org.apache.samza.job.model.JobModel;
 +import org.apache.samza.job.model.TaskModel;
  import org.apache.samza.metrics.MetricsRegistryMap;
+ import org.apache.samza.operators.KV;
  import org.apache.samza.operators.MessageStream;
  import org.apache.samza.operators.OutputStream;
  import org.apache.samza.operators.StreamGraphImpl;
@@@ -45,9 -30,12 +45,14 @@@ import org.apache.samza.operators.funct
  import org.apache.samza.operators.functions.MapFunction;
  import org.apache.samza.operators.spec.OperatorSpec.OpCode;
  import org.apache.samza.runtime.ApplicationRunner;
+ import org.apache.samza.serializers.IntegerSerde;
+ import org.apache.samza.serializers.KVSerde;
+ import org.apache.samza.serializers.NoOpSerde;
++import org.apache.samza.serializers.Serde;
+ import org.apache.samza.serializers.StringSerde;
  import org.apache.samza.system.StreamSpec;
  import org.apache.samza.system.SystemStream;
 +import org.apache.samza.system.SystemStreamPartition;
  import org.apache.samza.task.MessageCollector;
  import org.apache.samza.task.TaskContext;
  import org.apache.samza.task.TaskCoordinator;
@@@ -121,6 -105,43 +123,47 @@@ public class TestOperatorImplGraph 
    }
  
    @Test
+   public void testPartitionByChain() {
+     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+     when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new 
StreamSpec("input", "input-stream", "input-system"));
+     when(mockRunner.getStreamSpec(eq("output"))).thenReturn(new 
StreamSpec("output", "output-stream", "output-system"));
+     when(mockRunner.getStreamSpec(eq("null-null-partition_by-1")))
+         .thenReturn(new StreamSpec("intermediate", "intermediate-stream", 
"intermediate-system"));
+     StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
+     MessageStream<Object> inputStream = streamGraph.getInputStream("input");
+     OutputStream<KV<Integer, String>> outputStream = streamGraph
+         .getOutputStream("output", KVSerde.of(mock(IntegerSerde.class), 
mock(StringSerde.class)));
+ 
+     inputStream
+         .partitionBy(Object::hashCode, Object::toString, 
KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)))
+         .sendTo(outputStream);
+ 
 -    TaskContext mockTaskContext = mock(TaskContext.class);
++    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
+     when(mockTaskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
++    when(mockTaskContext.getTaskName()).thenReturn(new TaskName("task 0"));
++    JobModel jobModel = mock(JobModel.class);
++    when(jobModel.getContainers()).thenReturn(Collections.EMPTY_MAP);
++    when(mockTaskContext.getJobModel()).thenReturn(jobModel);
+     OperatorImplGraph opImplGraph =
+         new OperatorImplGraph(streamGraph, mock(Config.class), 
mockTaskContext, mock(Clock.class));
+ 
+     InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new 
SystemStream("input-system", "input-stream"));
+     assertEquals(1, inputOpImpl.registeredOperators.size());
+ 
+     OperatorImpl partitionByOpImpl = (PartitionByOperatorImpl) 
inputOpImpl.registeredOperators.iterator().next();
+     assertEquals(0, partitionByOpImpl.registeredOperators.size()); // is 
terminal but paired with an input operator
+     assertEquals(OpCode.PARTITION_BY, 
partitionByOpImpl.getOperatorSpec().getOpCode());
+ 
+     InputOperatorImpl repartitionedInputOpImpl =
+         opImplGraph.getInputOperator(new SystemStream("intermediate-system", 
"intermediate-stream"));
+     assertEquals(1, repartitionedInputOpImpl.registeredOperators.size());
+ 
+     OperatorImpl sendToOpImpl = (OutputOperatorImpl) 
repartitionedInputOpImpl.registeredOperators.iterator().next();
+     assertEquals(0, sendToOpImpl.registeredOperators.size());
+     assertEquals(OpCode.SEND_TO, sendToOpImpl.getOperatorSpec().getOpCode());
+   }
+ 
+   @Test
    public void testBroadcastChain() {
      ApplicationRunner mockRunner = mock(ApplicationRunner.class);
      when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new 
StreamSpec("input", "input-stream", "input-system"));
@@@ -173,11 -194,11 +216,11 @@@
      StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
  
      JoinFunction mockJoinFunction = mock(JoinFunction.class);
-     MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", 
(k, v) -> v);
-     MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", 
(k, v) -> v);
+     MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", 
new NoOpSerde<>());
+     MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", 
new NoOpSerde<>());
      inputStream1.join(inputStream2, mockJoinFunction, Duration.ofHours(1));
  
 -    TaskContext mockTaskContext = mock(TaskContext.class);
 +    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
      when(mockTaskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
      OperatorImplGraph opImplGraph =
          new OperatorImplGraph(streamGraph, mock(Config.class), 
mockTaskContext, mock(Clock.class));
@@@ -272,144 -293,4 +315,143 @@@
        }
      };
    }
 +
 +  @Test
 +  public void testGetStreamToConsumerTasks() {
 +    String system = "test-system";
 +    String stream0 = "test-stream-0";
 +    String stream1 = "test-stream-1";
 +
 +    SystemStreamPartition ssp0 = new SystemStreamPartition(system, stream0, 
new Partition(0));
 +    SystemStreamPartition ssp1 = new SystemStreamPartition(system, stream0, 
new Partition(1));
 +    SystemStreamPartition ssp2 = new SystemStreamPartition(system, stream1, 
new Partition(0));
 +
 +    TaskName task0 = new TaskName("Task 0");
 +    TaskName task1 = new TaskName("Task 1");
 +    Set<SystemStreamPartition> ssps = new HashSet<>();
 +    ssps.add(ssp0);
 +    ssps.add(ssp2);
 +    TaskModel tm0 = new TaskModel(task0, ssps, new Partition(0));
 +    ContainerModel cm0 = new ContainerModel("c0", 0, 
Collections.singletonMap(task0, tm0));
 +    TaskModel tm1 = new TaskModel(task1, Collections.singleton(ssp1), new 
Partition(1));
 +    ContainerModel cm1 = new ContainerModel("c1", 1, 
Collections.singletonMap(task1, tm1));
 +
 +    Map<String, ContainerModel> cms = new HashMap<>();
 +    cms.put(cm0.getProcessorId(), cm0);
 +    cms.put(cm1.getProcessorId(), cm1);
 +
 +    JobModel jobModel = new JobModel(new MapConfig(), cms, null);
 +    Multimap<SystemStream, String> streamToTasks = 
OperatorImplGraph.getStreamToConsumerTasks(jobModel);
 +    assertEquals(streamToTasks.get(ssp0.getSystemStream()).size(), 2);
 +    assertEquals(streamToTasks.get(ssp2.getSystemStream()).size(), 1);
 +  }
 +
 +  @Test
 +  public void testGetOutputToInputStreams() {
 +    Map<String, String> configMap = new HashMap<>();
 +    configMap.put(JobConfig.JOB_NAME(), "test-app");
 +    configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system");
 +    Config config = new MapConfig(configMap);
 +
 +    /**
 +     * the graph looks like the following. number of partitions in 
parentheses. quotes indicate expected value.
 +     *
 +     *                                    input1 -> map -> join -> 
partitionBy (10) -> output1
 +     *                                                       |
 +     *                                     input2 -> filter -|
 +     *                                                       |
 +     *           input3 -> filter -> partitionBy -> map -> join -> output2
 +     *
 +     */
 +    StreamSpec input1 = new StreamSpec("input1", "input1", "system1");
 +    StreamSpec input2 = new StreamSpec("input2", "input2", "system2");
 +    StreamSpec input3 = new StreamSpec("input3", "input3", "system2");
 +
 +    StreamSpec output1 = new StreamSpec("output1", "output1", "system1");
 +    StreamSpec output2 = new StreamSpec("output2", "output2", "system2");
 +
 +    ApplicationRunner runner = mock(ApplicationRunner.class);
 +    when(runner.getStreamSpec("input1")).thenReturn(input1);
 +    when(runner.getStreamSpec("input2")).thenReturn(input2);
 +    when(runner.getStreamSpec("input3")).thenReturn(input3);
 +    when(runner.getStreamSpec("output1")).thenReturn(output1);
 +    when(runner.getStreamSpec("output2")).thenReturn(output2);
 +
 +    // intermediate streams used in tests
 +    StreamSpec int1 = new StreamSpec("test-app-1-partition_by-10", 
"test-app-1-partition_by-10", "default-system");
 +    StreamSpec int2 = new StreamSpec("test-app-1-partition_by-6", 
"test-app-1-partition_by-6", "default-system");
 +    when(runner.getStreamSpec("test-app-1-partition_by-10"))
 +        .thenReturn(int1);
 +    when(runner.getStreamSpec("test-app-1-partition_by-6"))
 +        .thenReturn(int2);
 +
 +    StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
-     BiFunction msgBuilder = mock(BiFunction.class);
-     MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m 
-> m);
-     MessageStream m2 = streamGraph.getInputStream("input2", 
msgBuilder).filter(m -> true);
-     MessageStream m3 = streamGraph.getInputStream("input3", 
msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
-     Function mockFn = mock(Function.class);
-     OutputStream<Object, Object, Object> om1 = 
streamGraph.getOutputStream("output1", mockFn, mockFn);
-     OutputStream<Object, Object, Object> om2 = 
streamGraph.getOutputStream("output2", mockFn, mockFn);
- 
-     m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).partitionBy(m 
-> "haha").sendTo(om1);
++    Serde inputSerde = new NoOpSerde<>();
++    MessageStream m1 = streamGraph.getInputStream("input1", inputSerde).map(m 
-> m);
++    MessageStream m2 = streamGraph.getInputStream("input2", 
inputSerde).filter(m -> true);
++    MessageStream m3 = streamGraph.getInputStream("input3", 
inputSerde).filter(m -> true).partitionBy(m -> "hehe", m -> m).map(m -> m);
++    OutputStream<Object> om1 = streamGraph.getOutputStream("output1");
++    OutputStream<Object> om2 = streamGraph.getOutputStream("output2");
++
++    m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).partitionBy(m 
-> "haha", m -> m).sendTo(om1);
 +    m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(om2);
 +
 +    Multimap<SystemStream, SystemStream> outputToInput = 
OperatorImplGraph.getIntermediateToInputStreamsMap(streamGraph);
 +    Collection<SystemStream> inputs = 
outputToInput.get(int1.toSystemStream());
 +    assertEquals(inputs.size(), 2);
 +    assertTrue(inputs.contains(input1.toSystemStream()));
 +    assertTrue(inputs.contains(input2.toSystemStream()));
 +
 +    inputs = outputToInput.get(int2.toSystemStream());
 +    assertEquals(inputs.size(), 1);
 +    assertEquals(inputs.iterator().next(), input3.toSystemStream());
 +  }
 +
 +  @Test
 +  public void testGetProducerTaskCountForIntermediateStreams() {
 +    /**
 +     * the task assignment looks like the following:
 +     *
 +     * input1 -----> task0, task1 -----> int1
 +     *                                    ^
 +     * input2 ------> task1, task2--------|
 +     *                                    v
 +     * input3 ------> task1 -----------> int2
 +     *
 +     */
 +
 +    SystemStream input1 = new SystemStream("system1", "intput1");
 +    SystemStream input2 = new SystemStream("system2", "intput2");
 +    SystemStream input3 = new SystemStream("system2", "intput3");
 +
 +    SystemStream int1 = new SystemStream("system1", "int1");
 +    SystemStream int2 = new SystemStream("system1", "int2");
 +
 +
 +    String task0 = "Task 0";
 +    String task1 = "Task 1";
 +    String task2 = "Task 2";
 +
 +    Multimap<SystemStream, String> streamToConsumerTasks = 
HashMultimap.create();
 +    streamToConsumerTasks.put(input1, task0);
 +    streamToConsumerTasks.put(input1, task1);
 +    streamToConsumerTasks.put(input2, task1);
 +    streamToConsumerTasks.put(input2, task2);
 +    streamToConsumerTasks.put(input3, task1);
 +    streamToConsumerTasks.put(int1, task0);
 +    streamToConsumerTasks.put(int1, task1);
 +    streamToConsumerTasks.put(int2, task0);
 +
 +    Multimap<SystemStream, SystemStream> intermediateToInputStreams = 
HashMultimap.create();
 +    intermediateToInputStreams.put(int1, input1);
 +    intermediateToInputStreams.put(int1, input2);
 +
 +    intermediateToInputStreams.put(int2, input2);
 +    intermediateToInputStreams.put(int2, input3);
 +
 +    Map<SystemStream, Integer> counts = 
OperatorImplGraph.getProducerTaskCountForIntermediateStreams(
 +        streamToConsumerTasks, intermediateToInputStreams);
 +    assertTrue(counts.get(int1) == 3);
 +    assertTrue(counts.get(int2) == 2);
 +  }
  }

http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
----------------------------------------------------------------------
diff --cc 
samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index d2094b4,1816380..a23e513
--- 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@@ -20,9 -20,12 +20,12 @@@
  package org.apache.samza.runtime;
  
  import com.google.common.collect.ImmutableList;
- import java.util.*;
 -import java.lang.reflect.Field;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
++import java.util.Set;
  import java.util.stream.Collectors;
- 
  import org.apache.samza.application.StreamApplication;
  import org.apache.samza.config.ApplicationConfig;
  import org.apache.samza.config.JobConfig;
@@@ -49,8 -55,6 +54,7 @@@ import static org.junit.Assert.assertNo
  import static org.mockito.Matchers.anyObject;
  import static org.mockito.Matchers.anyString;
  import static org.mockito.Mockito.*;
 +import static org.powermock.api.mockito.PowerMockito.doReturn;
- import static org.powermock.api.mockito.PowerMockito.mockStatic;
  
  
  @RunWith(PowerMockRunner.class)
@@@ -72,20 -76,44 +76,21 @@@ public class TestLocalApplicationRunne
      StreamApplication app = mock(StreamApplication.class);
      doNothing().when(app).init(anyObject(), anyObject());
  
 -    ExecutionPlanner planner = mock(ExecutionPlanner.class);
 -    Field plannerField = 
runner.getClass().getSuperclass().getDeclaredField("planner");
 -    plannerField.setAccessible(true);
 -    plannerField.set(runner, planner);
 -
      StreamManager streamManager = mock(StreamManager.class);
 -    Field streamManagerField = 
runner.getClass().getSuperclass().getDeclaredField("streamManager");
 -    streamManagerField.setAccessible(true);
 -    streamManagerField.set(runner, streamManager);
 -    ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
 +    doReturn(streamManager).when(runner).getStreamManager();
  
 -    ExecutionPlan plan = new ExecutionPlan() {
 -      @Override
 -      public List<JobConfig> getJobConfigs() {
 -        return Collections.emptyList();
 -      }
 -
 -      @Override
 -      public List<StreamSpec> getIntermediateStreams() {
 -        return Collections.singletonList(new StreamSpec("test-stream", 
"test-stream", "test-system"));
 -      }
 -
 -      @Override
 -      public String getPlanAsJson()
 -          throws Exception {
 -        return "";
 -      }
 -    };
 -    when(planner.plan(anyObject())).thenReturn(plan);
 +    ExecutionPlan plan = mock(ExecutionPlan.class);
 +    
when(plan.getIntermediateStreams()).thenReturn(Collections.singletonList(new 
StreamSpec("test-stream", "test-stream", "test-system")));
 +    when(plan.getPlanAsJson()).thenReturn("");
 +    doReturn(plan).when(runner).getExecutionPlan(any(), any());
  
-     mockStatic(CoordinationUtilsFactory.class);
      CoordinationUtilsFactory coordinationUtilsFactory = 
mock(CoordinationUtilsFactory.class);
-     
when(CoordinationUtilsFactory.getCoordinationUtilsFactory(anyObject())).thenReturn(coordinationUtilsFactory);
+     JobCoordinatorConfig mockJcConfig = mock(JobCoordinatorConfig.class);
+     
when(mockJcConfig.getCoordinationUtilsFactory()).thenReturn(coordinationUtilsFactory);
+     
PowerMockito.whenNew(JobCoordinatorConfig.class).withAnyArguments().thenReturn(mockJcConfig);
  
 -    LocalApplicationRunner spy = spy(runner);
      try {
 -      spy.run(app);
 +      runner.run(app);
      } catch (Throwable t) {
        assertNotNull(t); //no jobs exception
      }

http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
----------------------------------------------------------------------
diff --cc 
samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
index 8493cf1,0000000..d2f0184
mode 100644,000000..100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
@@@ -1,103 -1,0 +1,112 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.samza.test.controlmessages;
 +
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Random;
 +import org.apache.samza.application.StreamApplication;
 +import org.apache.samza.config.JobConfig;
 +import org.apache.samza.config.JobCoordinatorConfig;
 +import org.apache.samza.config.MapConfig;
 +import org.apache.samza.config.TaskConfig;
 +import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
++import org.apache.samza.operators.KV;
++import org.apache.samza.operators.functions.MapFunction;
 +import org.apache.samza.runtime.LocalApplicationRunner;
 +import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
 +import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 +import org.apache.samza.test.controlmessages.TestData.PageView;
 +import 
org.apache.samza.test.controlmessages.TestData.PageViewJsonSerdeFactory;
 +import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 +import org.apache.samza.test.util.ArraySystemFactory;
 +import org.apache.samza.test.util.Base64Serializer;
 +import org.junit.Test;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +
 +/**
 + * This test uses an array as a bounded input source, and does a 
partitionBy() and sink() after reading the input.
 + * It verifies the pipeline will stop and the number of output messages 
should equal to the input.
 + */
 +public class EndOfStreamIntegrationTest extends 
AbstractIntegrationTestHarness {
 +
 +  private static final String[] PAGEKEYS = {"inbox", "home", "search", 
"pymk", "group", "job"};
 +
 +  @Test
 +  public void testPipeline() throws  Exception {
 +    Random random = new Random();
 +    int count = 10;
 +    PageView[] pageviews = new PageView[count];
 +    for (int i = 0; i < count; i++) {
 +      String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)];
 +      int memberId = random.nextInt(10);
 +      pageviews[i] = new PageView(pagekey, memberId);
 +    }
 +
 +    int partitionCount = 4;
 +    Map<String, String> configs = new HashMap<>();
 +    configs.put("systems.test.samza.factory", 
ArraySystemFactory.class.getName());
 +    configs.put("streams.PageView.samza.system", "test");
 +    configs.put("streams.PageView.source", 
Base64Serializer.serialize(pageviews));
 +    configs.put("streams.PageView.partitionCount", 
String.valueOf(partitionCount));
 +
 +    configs.put(JobConfig.JOB_NAME(), "test-eos-job");
 +    configs.put(JobConfig.PROCESSOR_ID(), "1");
 +    configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, 
PassthroughCoordinationUtilsFactory.class.getName());
 +    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
PassthroughJobCoordinatorFactory.class.getName());
 +    configs.put(TaskConfig.GROUPER_FACTORY(), 
SingleContainerGrouperFactory.class.getName());
 +
 +    configs.put("systems.kafka.samza.factory", 
"org.apache.samza.system.kafka.KafkaSystemFactory");
 +    configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
 +    configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect());
 +    configs.put("systems.kafka.samza.key.serde", "int");
 +    configs.put("systems.kafka.samza.msg.serde", "json");
 +    configs.put("systems.kafka.default.stream.replication.factor", "1");
 +    configs.put("job.default.system", "kafka");
 +
 +    configs.put("serializers.registry.int.class", 
"org.apache.samza.serializers.IntegerSerdeFactory");
 +    configs.put("serializers.registry.json.class", 
PageViewJsonSerdeFactory.class.getName());
 +
 +    final LocalApplicationRunner runner = new LocalApplicationRunner(new 
MapConfig(configs));
 +    List<PageView> received = new ArrayList<>();
 +    final StreamApplication app = (streamGraph, cfg) -> {
-       streamGraph.getInputStream("PageView", (k, v) -> (PageView) v)
-         .partitionBy(PageView::getMemberId)
++      streamGraph.<KV<String, PageView>>getInputStream("PageView")
++        .map(Values.create())
++        .partitionBy(pv -> pv.getMemberId(), pv -> pv)
 +        .sink((m, collector, coordinator) -> {
-             received.add(m);
++            received.add(m.getValue());
 +          });
 +    };
 +    runner.run(app);
 +    runner.waitForFinish();
 +
 +    assertEquals(received.size(), count * partitionCount);
 +  }
++
++  public static final class Values {
++    public static <K, V, M extends KV<K, V>> MapFunction<M, V> create() {
++      return (M m) -> m.getValue();
++    }
++  }
 +}

http://git-wip-us.apache.org/repos/asf/samza/blob/052a0570/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
----------------------------------------------------------------------
diff --cc 
samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
index d9202d3,0000000..7da0e77
mode 100644,000000..100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
@@@ -1,204 -1,0 +1,206 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.samza.test.controlmessages;
 +
 +import java.lang.reflect.Field;
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import org.apache.samza.Partition;
 +import org.apache.samza.application.StreamApplication;
 +import org.apache.samza.config.Config;
 +import org.apache.samza.config.JobConfig;
 +import org.apache.samza.config.JobCoordinatorConfig;
 +import org.apache.samza.config.MapConfig;
 +import org.apache.samza.config.TaskConfig;
 +import org.apache.samza.container.SamzaContainer;
 +import org.apache.samza.container.TaskInstance;
 +import org.apache.samza.container.TaskName;
 +import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
 +import org.apache.samza.metrics.MetricsRegistry;
++import org.apache.samza.operators.KV;
 +import org.apache.samza.operators.impl.InputOperatorImpl;
 +import org.apache.samza.operators.impl.OperatorImpl;
 +import org.apache.samza.operators.impl.OperatorImplGraph;
 +import org.apache.samza.operators.impl.TestOperatorImpl;
 +import org.apache.samza.operators.spec.OperatorSpec;
 +import org.apache.samza.processor.StreamProcessor;
 +import org.apache.samza.processor.TestStreamProcessorUtil;
 +import org.apache.samza.runtime.LocalApplicationRunner;
 +import org.apache.samza.runtime.TestLocalApplicationRunner;
 +import org.apache.samza.serializers.IntegerSerdeFactory;
 +import org.apache.samza.serializers.StringSerdeFactory;
 +import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
 +import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 +import org.apache.samza.system.IncomingMessageEnvelope;
 +import org.apache.samza.system.SystemAdmin;
 +import org.apache.samza.system.SystemConsumer;
 +import org.apache.samza.system.SystemFactory;
 +import org.apache.samza.system.SystemProducer;
 +import org.apache.samza.system.SystemStreamPartition;
 +import org.apache.samza.task.AsyncStreamTaskAdapter;
 +import org.apache.samza.task.StreamOperatorTask;
 +import org.apache.samza.task.TestStreamOperatorTask;
 +import org.apache.samza.test.controlmessages.TestData.PageView;
 +import 
org.apache.samza.test.controlmessages.TestData.PageViewJsonSerdeFactory;
 +import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 +import org.apache.samza.test.util.SimpleSystemAdmin;
 +import org.apache.samza.test.util.TestStreamConsumer;
 +import org.junit.Test;
 +import scala.collection.JavaConverters;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +
 +public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness {
 +
 +  private static int offset = 1;
 +  private static final String TEST_SYSTEM = "test";
 +  private static final String TEST_STREAM = "PageView";
 +  private static final int PARTITION_COUNT = 2;
 +  private static final SystemStreamPartition SSP0 = new 
SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
 +  private static final SystemStreamPartition SSP1 = new 
SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1));
 +
 +  private final static List<IncomingMessageEnvelope> TEST_DATA = new 
ArrayList<>();
 +  static {
 +    TEST_DATA.add(createIncomingMessage(new PageView("inbox", 1), SSP0));
 +    TEST_DATA.add(createIncomingMessage(new PageView("home", 2), SSP1));
 +    TEST_DATA.add(IncomingMessageEnvelope.buildWatermarkEnvelope(SSP0, 1));
 +    TEST_DATA.add(IncomingMessageEnvelope.buildWatermarkEnvelope(SSP1, 2));
 +    TEST_DATA.add(IncomingMessageEnvelope.buildWatermarkEnvelope(SSP0, 4));
 +    TEST_DATA.add(IncomingMessageEnvelope.buildWatermarkEnvelope(SSP1, 3));
 +    TEST_DATA.add(createIncomingMessage(new PageView("search", 3), SSP0));
 +    TEST_DATA.add(createIncomingMessage(new PageView("pymk", 4), SSP1));
 +    TEST_DATA.add(IncomingMessageEnvelope.buildEndOfStreamEnvelope(SSP0));
 +    TEST_DATA.add(IncomingMessageEnvelope.buildEndOfStreamEnvelope(SSP1));
 +  }
 +
 +  public final static class TestSystemFactory implements SystemFactory {
 +    @Override
 +    public SystemConsumer getConsumer(String systemName, Config config, 
MetricsRegistry registry) {
 +      return new TestStreamConsumer(TEST_DATA);
 +    }
 +
 +    @Override
 +    public SystemProducer getProducer(String systemName, Config config, 
MetricsRegistry registry) {
 +      return null;
 +    }
 +
 +    @Override
 +    public SystemAdmin getAdmin(String systemName, Config config) {
 +      return new SimpleSystemAdmin(config);
 +    }
 +  }
 +
 +  private static IncomingMessageEnvelope createIncomingMessage(Object 
message, SystemStreamPartition ssp) {
 +    return new IncomingMessageEnvelope(ssp, String.valueOf(offset++), "", 
message);
 +  }
 +
 +  @Test
 +  public void testWatermark() throws Exception {
 +    Map<String, String> configs = new HashMap<>();
 +    configs.put("systems.test.samza.factory", 
TestSystemFactory.class.getName());
 +    configs.put("streams.PageView.samza.system", "test");
 +    configs.put("streams.PageView.partitionCount", 
String.valueOf(PARTITION_COUNT));
 +
 +    configs.put(JobConfig.JOB_NAME(), "test-watermark-job");
 +    configs.put(JobConfig.PROCESSOR_ID(), "1");
 +    configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, 
PassthroughCoordinationUtilsFactory.class.getName());
 +    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
PassthroughJobCoordinatorFactory.class.getName());
 +    configs.put(TaskConfig.GROUPER_FACTORY(), 
SingleContainerGrouperFactory.class.getName());
 +
 +    configs.put("systems.kafka.samza.factory", 
"org.apache.samza.system.kafka.KafkaSystemFactory");
 +    configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
 +    configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect());
 +    configs.put("systems.kafka.samza.key.serde", "int");
 +    configs.put("systems.kafka.samza.msg.serde", "json");
 +    configs.put("systems.kafka.default.stream.replication.factor", "1");
 +    configs.put("job.default.system", "kafka");
 +
 +    configs.put("serializers.registry.int.class", 
IntegerSerdeFactory.class.getName());
 +    configs.put("serializers.registry.string.class", 
StringSerdeFactory.class.getName());
 +    configs.put("serializers.registry.json.class", 
PageViewJsonSerdeFactory.class.getName());
 +
 +    final LocalApplicationRunner runner = new LocalApplicationRunner(new 
MapConfig(configs));
 +    List<PageView> received = new ArrayList<>();
 +    final StreamApplication app = (streamGraph, cfg) -> {
-       streamGraph.getInputStream("PageView", (k, v) -> (PageView) v)
-           .partitionBy(PageView::getMemberId)
++      streamGraph.<KV<String, PageView>>getInputStream("PageView")
++          .map(EndOfStreamIntegrationTest.Values.create())
++          .partitionBy(pv -> pv.getMemberId(), pv -> pv)
 +          .sink((m, collector, coordinator) -> {
-               received.add(m);
++              received.add(m.getValue());
 +            });
 +    };
 +    runner.run(app);
 +    Map<String, StreamOperatorTask> tasks = getTaskOperationGraphs(runner);
 +
 +    runner.waitForFinish();
 +
 +    StreamOperatorTask task0 = tasks.get("Partition 0");
 +    OperatorImplGraph graph = 
TestStreamOperatorTask.getOperatorImplGraph(task0);
 +    OperatorImpl pb = getOperator(graph, OperatorSpec.OpCode.PARTITION_BY);
 +    assertEquals(TestOperatorImpl.getInputWatermark(pb), 4);
 +    assertEquals(TestOperatorImpl.getOutputWatermark(pb), 4);
 +    OperatorImpl sink = getOperator(graph, OperatorSpec.OpCode.SINK);
 +    assertEquals(TestOperatorImpl.getInputWatermark(sink), 3);
 +    assertEquals(TestOperatorImpl.getOutputWatermark(sink), 3);
 +
 +    StreamOperatorTask task1 = tasks.get("Partition 1");
 +    graph = TestStreamOperatorTask.getOperatorImplGraph(task1);
 +    pb = getOperator(graph, OperatorSpec.OpCode.PARTITION_BY);
 +    assertEquals(TestOperatorImpl.getInputWatermark(pb), 3);
 +    assertEquals(TestOperatorImpl.getOutputWatermark(pb), 3);
 +    sink = getOperator(graph, OperatorSpec.OpCode.SINK);
 +    assertEquals(TestOperatorImpl.getInputWatermark(sink), 3);
 +    assertEquals(TestOperatorImpl.getOutputWatermark(sink), 3);
 +  }
 +
 +  Map<String, StreamOperatorTask> 
getTaskOperationGraphs(LocalApplicationRunner runner) throws Exception {
 +    StreamProcessor processor = 
TestLocalApplicationRunner.getProcessors(runner).iterator().next();
 +    SamzaContainer container = 
TestStreamProcessorUtil.getContainer(processor);
 +    Map<TaskName, TaskInstance> taskInstances = 
JavaConverters.mapAsJavaMapConverter(container.getTaskInstances()).asJava();
 +    Map<String, StreamOperatorTask> tasks = new HashMap<>();
 +    for (Map.Entry<TaskName, TaskInstance> entry : taskInstances.entrySet()) {
 +      AsyncStreamTaskAdapter adapter = (AsyncStreamTaskAdapter) 
entry.getValue().task();
 +      Field field = 
AsyncStreamTaskAdapter.class.getDeclaredField("wrappedTask");
 +      field.setAccessible(true);
 +      StreamOperatorTask task = (StreamOperatorTask) field.get(adapter);
 +      tasks.put(entry.getKey().getTaskName(), task);
 +    }
 +    return tasks;
 +  }
 +
 +  OperatorImpl getOperator(OperatorImplGraph graph, OperatorSpec.OpCode 
opCode) {
 +    for (InputOperatorImpl input : graph.getAllInputOperators()) {
 +      Set<OperatorImpl> nextOps = TestOperatorImpl.getNextOperators(input);
 +      while (!nextOps.isEmpty()) {
 +        OperatorImpl op = nextOps.iterator().next();
 +        if (TestOperatorImpl.getOpCode(op) == opCode) {
 +          return op;
 +        } else {
 +          nextOps = TestOperatorImpl.getNextOperators(op);
 +        }
 +      }
 +    }
 +    return null;
 +  }
 +}

Reply via email to