http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java 
b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
deleted file mode 100644
index 7ddcd19..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.spec.InputOperatorSpec;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.OperatorSpec.OpCode;
-import org.apache.samza.operators.spec.OperatorSpecs;
-import org.apache.samza.operators.spec.OutputStreamImpl;
-import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
-import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-/**
- * A {@link StreamGraph} that provides APIs for accessing {@link 
MessageStream}s to be used to
- * create the DAG of transforms.
- */
-public class StreamGraphImpl implements StreamGraph {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamGraphImpl.class);
-  private static final Pattern USER_DEFINED_ID_PATTERN = 
Pattern.compile("[\\d\\w-_.]+");
-
-  // We use a LHM for deterministic order in initializing and closing 
operators.
-  private final Map<StreamSpec, InputOperatorSpec> inputOperators = new 
LinkedHashMap<>();
-  private final Map<StreamSpec, OutputStreamImpl> outputStreams = new 
LinkedHashMap<>();
-  private final Map<TableSpec, TableImpl> tables = new LinkedHashMap<>();
-  private final ApplicationRunner runner;
-  private final Config config;
-
-
-  /**
-   * The 0-based position of the next operator in the graph.
-   * Part of the unique ID for each OperatorSpec in the graph.
-   * Should only accessed and incremented via {@link #getNextOpId(OpCode, 
String)}.
-   */
-  private int nextOpNum = 0;
-  private final Set<String> operatorIds = new HashSet<>();
-  private Serde<?> defaultSerde = new KVSerde(new NoOpSerde(), new 
NoOpSerde());
-  private ContextManager contextManager = null;
-
-  public StreamGraphImpl(ApplicationRunner runner, Config config) {
-    // TODO: SAMZA-1118 - Move StreamSpec and ApplicationRunner out of 
StreamGraphImpl once Systems
-    // can use streamId to send and receive messages.
-    this.runner = runner;
-    this.config = config;
-  }
-
-  @Override
-  public void setDefaultSerde(Serde<?> serde) {
-    Preconditions.checkNotNull(serde, "Default serde must not be null");
-    Preconditions.checkState(inputOperators.isEmpty() && 
outputStreams.isEmpty(),
-        "Default serde must be set before creating any input or output 
streams.");
-    this.defaultSerde = serde;
-  }
-
-  @Override
-  public <M> MessageStream<M> getInputStream(String streamId, Serde<M> serde) {
-    StreamSpec streamSpec = runner.getStreamSpec(streamId);
-    Preconditions.checkState(streamSpec != null, "No StreamSpec found for 
streamId: " + streamId);
-    Preconditions.checkNotNull(serde, "serde must not be null for an input 
stream.");
-    Preconditions.checkState(!inputOperators.containsKey(streamSpec),
-        "getInputStream must not be called multiple times with the same 
streamId: " + streamId);
-
-    KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
-    if (outputStreams.containsKey(streamSpec)) {
-      OutputStreamImpl outputStream = outputStreams.get(streamSpec);
-      Serde keySerde = outputStream.getKeySerde();
-      Serde valueSerde = outputStream.getValueSerde();
-      Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && 
kvSerdes.getValue().equals(valueSerde),
-          String.format("Stream %s is being used both as an input and an 
output stream. Serde in Samza happens at "
-              + "stream level, so the same key and message Serde must be used 
for both.", streamId));
-    }
-
-    boolean isKeyed = serde instanceof KVSerde;
-    InputOperatorSpec inputOperatorSpec =
-        OperatorSpecs.createInputOperatorSpec(streamSpec, kvSerdes.getKey(), 
kvSerdes.getValue(),
-            isKeyed, this.getNextOpId(OpCode.INPUT, null));
-    inputOperators.put(streamSpec, inputOperatorSpec);
-    return new MessageStreamImpl<>(this, inputOperators.get(streamSpec));
-  }
-
-  @Override
-  public <M> MessageStream<M> getInputStream(String streamId) {
-    return (MessageStream<M>) getInputStream(streamId, defaultSerde);
-  }
-
-  @Override
-  public <M> OutputStream<M> getOutputStream(String streamId, Serde<M> serde) {
-    StreamSpec streamSpec = runner.getStreamSpec(streamId);
-    Preconditions.checkState(streamSpec != null, "No StreamSpec found for 
streamId: " + streamId);
-    Preconditions.checkNotNull(serde, "serde must not be null for an output 
stream.");
-    Preconditions.checkState(!outputStreams.containsKey(streamSpec),
-        "getOutputStream must not be called multiple times with the same 
streamId: " + streamId);
-
-    KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
-    if (inputOperators.containsKey(streamSpec)) {
-      InputOperatorSpec inputOperatorSpec = inputOperators.get(streamSpec);
-      Serde keySerde = inputOperatorSpec.getKeySerde();
-      Serde valueSerde = inputOperatorSpec.getValueSerde();
-      Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && 
kvSerdes.getValue().equals(valueSerde),
-          String.format("Stream %s is being used both as an input and an 
output stream. Serde in Samza happens at "
-              + "stream level, so the same key and message Serde must be used 
for both.", streamId));
-    }
-
-    boolean isKeyed = serde instanceof KVSerde;
-    outputStreams.put(streamSpec, new OutputStreamImpl<>(streamSpec, 
kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
-    return outputStreams.get(streamSpec);
-  }
-
-  @Override
-  public <M> OutputStream<M> getOutputStream(String streamId) {
-    return (OutputStream<M>) getOutputStream(streamId, defaultSerde);
-  }
-
-  @Override
-  public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDesc) {
-    TableSpec tableSpec = ((BaseTableDescriptor) tableDesc).getTableSpec();
-    if (tables.containsKey(tableSpec)) {
-      throw new IllegalStateException(String.format(
-          "getTable() invoked multiple times with the same tableId: %s",
-          tableDesc.getTableId()));
-    }
-    tables.put(tableSpec, new TableImpl(tableSpec));
-    return tables.get(tableSpec);
-  }
-
-  @Override
-  public StreamGraph withContextManager(ContextManager contextManager) {
-    this.contextManager = contextManager;
-    return this;
-  }
-
-  /**
-   * See {@link StreamGraphImpl#getIntermediateStream(String, Serde, boolean)}.
-   */
-  <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, 
Serde<M> serde) {
-    return getIntermediateStream(streamId, serde, false);
-  }
-
-  /**
-   * Internal helper for {@link MessageStreamImpl} to add an intermediate 
{@link MessageStream} to the graph.
-   * An intermediate {@link MessageStream} is both an output and an input 
stream.
-   *
-   * @param streamId the id of the stream to be created.
-   * @param serde the {@link Serde} to use for the message in the intermediate 
stream. If null, the default serde
-   *              is used.
-   * @param isBroadcast whether the stream is a broadcast stream.
-   * @param <M> the type of messages in the intermediate {@link MessageStream}
-   * @return  the intermediate {@link MessageStreamImpl}
-   *
-   * TODO: once SAMZA-1566 is resolved, we should be able to pass in the 
StreamSpec directly.
-   */
-  <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, 
Serde<M> serde, boolean isBroadcast) {
-    StreamSpec streamSpec = runner.getStreamSpec(streamId);
-    if (isBroadcast) {
-      streamSpec = streamSpec.copyWithBroadCast();
-    }
-
-    Preconditions.checkState(!inputOperators.containsKey(streamSpec) && 
!outputStreams.containsKey(streamSpec),
-        "getIntermediateStream must not be called multiple times with the same 
streamId: " + streamId);
-
-    if (serde == null) {
-      LOGGER.info("Using default serde for intermediate stream: " + streamId);
-      serde = (Serde<M>) defaultSerde;
-    }
-
-    boolean isKeyed = serde instanceof KVSerde;
-    KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
-    InputOperatorSpec inputOperatorSpec =
-        OperatorSpecs.createInputOperatorSpec(streamSpec, kvSerdes.getKey(), 
kvSerdes.getValue(),
-            isKeyed, this.getNextOpId(OpCode.INPUT, null));
-    inputOperators.put(streamSpec, inputOperatorSpec);
-    outputStreams.put(streamSpec, new OutputStreamImpl(streamSpec, 
kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
-    return new IntermediateMessageStreamImpl<>(this, 
inputOperators.get(streamSpec), outputStreams.get(streamSpec));
-  }
-
-  public Map<StreamSpec, InputOperatorSpec> getInputOperators() {
-    return Collections.unmodifiableMap(inputOperators);
-  }
-
-  public Map<StreamSpec, OutputStreamImpl> getOutputStreams() {
-    return Collections.unmodifiableMap(outputStreams);
-  }
-
-  public Map<TableSpec, TableImpl> getTables() {
-    return Collections.unmodifiableMap(tables);
-  }
-
-  public ContextManager getContextManager() {
-    return this.contextManager;
-  }
-
-  /**
-   * Gets the unique ID for the next operator in the graph. The ID is of the 
following format:
-   * jobName-jobId-opCode-(userDefinedId|nextOpNum);
-   *
-   * @param opCode the {@link OpCode} of the next operator
-   * @param userDefinedId the optional user-provided name of the next operator 
or null
-   * @return the unique ID for the next operator in the graph
-   */
-  /* package private */ String getNextOpId(OpCode opCode, String 
userDefinedId) {
-    if (StringUtils.isNotBlank(userDefinedId) && 
!USER_DEFINED_ID_PATTERN.matcher(userDefinedId).matches()) {
-      throw new SamzaException("Operator ID must not contain spaces and 
special characters: " + userDefinedId);
-    }
-
-    String nextOpId = String.format("%s-%s-%s-%s",
-        config.get(JobConfig.JOB_NAME()),
-        config.get(JobConfig.JOB_ID(), "1"),
-        opCode.name().toLowerCase(),
-        StringUtils.isNotBlank(userDefinedId) ? userDefinedId.trim() : 
String.valueOf(nextOpNum));
-    if (!operatorIds.add(nextOpId)) {
-      throw new SamzaException(
-          String.format("Found duplicate operator ID %s in the graph. Operator 
IDs must be unique.", nextOpId));
-    }
-    nextOpNum++;
-    return nextOpId;
-  }
-
-  /**
-   * Gets the unique ID for the next operator in the graph. The ID is of the 
following format:
-   * jobName-jobId-opCode-nextOpNum;
-   *
-   * @param opCode the {@link OpCode} of the next operator
-   * @return the unique ID for the next operator in the graph
-   */
-  /* package private */ String getNextOpId(OpCode opCode) {
-    return getNextOpId(opCode, null);
-  }
-
-  /**
-   * Get all {@link OperatorSpec}s available in this {@link StreamGraphImpl}
-   *
-   * @return  all available {@link OperatorSpec}s
-   */
-  public Collection<OperatorSpec> getAllOperatorSpecs() {
-    Collection<InputOperatorSpec> inputOperatorSpecs = inputOperators.values();
-    Set<OperatorSpec> operatorSpecs = new HashSet<>();
-    for (InputOperatorSpec inputOperatorSpec: inputOperatorSpecs) {
-      operatorSpecs.add(inputOperatorSpec);
-      doGetOperatorSpecs(inputOperatorSpec, operatorSpecs);
-    }
-    return operatorSpecs;
-  }
-
-  private void doGetOperatorSpecs(OperatorSpec operatorSpec, Set<OperatorSpec> 
specs) {
-    Collection<OperatorSpec> registeredOperatorSpecs = 
operatorSpec.getRegisteredOperatorSpecs();
-    for (OperatorSpec registeredOperatorSpec: registeredOperatorSpecs) {
-      specs.add(registeredOperatorSpec);
-      doGetOperatorSpecs(registeredOperatorSpec, specs);
-    }
-  }
-
-  /**
-   * Returns <tt>true</tt> iff this {@link StreamGraphImpl} contains a join or 
a window operator
-   *
-   * @return  <tt>true</tt> iff this {@link StreamGraphImpl} contains a join 
or a window operator
-   */
-  public boolean hasWindowOrJoins() {
-    // Obtain the operator specs from the streamGraph
-    Collection<OperatorSpec> operatorSpecs = getAllOperatorSpecs();
-
-    Set<OperatorSpec> windowOrJoinSpecs = operatorSpecs.stream()
-        .filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.WINDOW || 
spec.getOpCode() == OperatorSpec.OpCode.JOIN)
-        .collect(Collectors.toSet());
-
-    return windowOrJoinSpecs.size() != 0;
-  }
-
-  private KV<Serde, Serde> getKVSerdes(String streamId, Serde serde) {
-    Serde keySerde, valueSerde;
-
-    if (serde instanceof KVSerde) {
-      keySerde = ((KVSerde) serde).getKeySerde();
-      valueSerde = ((KVSerde) serde).getValueSerde();
-    } else {
-      keySerde = new NoOpSerde();
-      valueSerde = serde;
-    }
-
-    if (keySerde instanceof NoOpSerde) {
-      LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId +
-          ". Keys will not be (de)serialized");
-    }
-    if (valueSerde instanceof NoOpSerde) {
-      LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId +
-          ". Values will not be (de)serialized");
-    }
-
-    return KV.of(keySerde, valueSerde);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java 
b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java
new file mode 100644
index 0000000..ea9690b
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
+import org.apache.samza.operators.spec.OperatorSpecs;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * This class defines:
+ * 1) an implementation of {@link StreamGraph} that provides APIs for 
accessing {@link MessageStream}s to be used to
+ * create the DAG of transforms.
+ * 2) a builder that creates a serializable {@link OperatorSpecGraph} from 
user-defined DAG
+ */
+public class StreamGraphSpec implements StreamGraph {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamGraphSpec.class);
+  private static final Pattern USER_DEFINED_ID_PATTERN = 
Pattern.compile("[\\d\\w-_.]+");
+
+  // We use a LHM for deterministic order in initializing and closing 
operators.
+  private final Map<StreamSpec, InputOperatorSpec> inputOperators = new 
LinkedHashMap<>();
+  private final Map<StreamSpec, OutputStreamImpl> outputStreams = new 
LinkedHashMap<>();
+  private final Map<TableSpec, TableImpl> tables = new LinkedHashMap<>();
+  private final ApplicationRunner runner;
+  private final Config config;
+
+  /**
+   * The 0-based position of the next operator in the graph.
+   * Part of the unique ID for each OperatorSpec in the graph.
+   * Should only accessed and incremented via {@link #getNextOpId(OpCode, 
String)}.
+   */
+  private int nextOpNum = 0;
+  private final Set<String> operatorIds = new HashSet<>();
+  private Serde<?> defaultSerde = new KVSerde(new NoOpSerde(), new 
NoOpSerde());
+  private ContextManager contextManager = null;
+
+  public StreamGraphSpec(ApplicationRunner runner, Config config) {
+    // TODO: SAMZA-1118 - Move StreamSpec and ApplicationRunner out of 
StreamGraphSpec once Systems
+    // can use streamId to send and receive messages.
+    this.runner = runner;
+    this.config = config;
+  }
+
+  @Override
+  public void setDefaultSerde(Serde<?> serde) {
+    Preconditions.checkNotNull(serde, "Default serde must not be null");
+    Preconditions.checkState(inputOperators.isEmpty() && 
outputStreams.isEmpty(),
+        "Default serde must be set before creating any input or output 
streams.");
+    this.defaultSerde = serde;
+  }
+
+  @Override
+  public <M> MessageStream<M> getInputStream(String streamId, Serde<M> serde) {
+    StreamSpec streamSpec = runner.getStreamSpec(streamId);
+    Preconditions.checkState(streamSpec != null, "No StreamSpec found for 
streamId: " + streamId);
+    Preconditions.checkNotNull(serde, "serde must not be null for an input 
stream.");
+    Preconditions.checkState(!inputOperators.containsKey(streamSpec),
+        "getInputStream must not be called multiple times with the same 
streamId: " + streamId);
+
+    KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
+    if (outputStreams.containsKey(streamSpec)) {
+      OutputStreamImpl outputStream = outputStreams.get(streamSpec);
+      Serde keySerde = outputStream.getKeySerde();
+      Serde valueSerde = outputStream.getValueSerde();
+      Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && 
kvSerdes.getValue().equals(valueSerde),
+          String.format("Stream %s is being used both as an input and an 
output stream. Serde in Samza happens at "
+              + "stream level, so the same key and message Serde must be used 
for both.", streamId));
+    }
+
+    boolean isKeyed = serde instanceof KVSerde;
+    InputOperatorSpec inputOperatorSpec =
+        OperatorSpecs.createInputOperatorSpec(streamSpec, kvSerdes.getKey(), 
kvSerdes.getValue(),
+            isKeyed, this.getNextOpId(OpCode.INPUT, null));
+    inputOperators.put(streamSpec, inputOperatorSpec);
+    return new MessageStreamImpl<>(this, inputOperators.get(streamSpec));
+  }
+
+  @Override
+  public <M> MessageStream<M> getInputStream(String streamId) {
+    return (MessageStream<M>) getInputStream(streamId, defaultSerde);
+  }
+
+  @Override
+  public <M> OutputStream<M> getOutputStream(String streamId, Serde<M> serde) {
+    StreamSpec streamSpec = runner.getStreamSpec(streamId);
+    Preconditions.checkState(streamSpec != null, "No StreamSpec found for 
streamId: " + streamId);
+    Preconditions.checkNotNull(serde, "serde must not be null for an output 
stream.");
+    Preconditions.checkState(!outputStreams.containsKey(streamSpec),
+        "getOutputStream must not be called multiple times with the same 
streamId: " + streamId);
+
+    KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
+    if (inputOperators.containsKey(streamSpec)) {
+      InputOperatorSpec inputOperatorSpec = inputOperators.get(streamSpec);
+      Serde keySerde = inputOperatorSpec.getKeySerde();
+      Serde valueSerde = inputOperatorSpec.getValueSerde();
+      Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && 
kvSerdes.getValue().equals(valueSerde),
+          String.format("Stream %s is being used both as an input and an 
output stream. Serde in Samza happens at "
+              + "stream level, so the same key and message Serde must be used 
for both.", streamId));
+    }
+
+    boolean isKeyed = serde instanceof KVSerde;
+    outputStreams.put(streamSpec, new OutputStreamImpl<>(streamSpec, 
kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
+    return outputStreams.get(streamSpec);
+  }
+
+  @Override
+  public <M> OutputStream<M> getOutputStream(String streamId) {
+    return (OutputStream<M>) getOutputStream(streamId, defaultSerde);
+  }
+
+  @Override
+  public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDesc) {
+    TableSpec tableSpec = ((BaseTableDescriptor) tableDesc).getTableSpec();
+    if (tables.containsKey(tableSpec)) {
+      throw new IllegalStateException(String.format(
+          "getTable() invoked multiple times with the same tableId: %s",
+          tableDesc.getTableId()));
+    }
+    tables.put(tableSpec, new TableImpl(tableSpec));
+    return tables.get(tableSpec);
+  }
+
+  @Override
+  public StreamGraph withContextManager(ContextManager contextManager) {
+    this.contextManager = contextManager;
+    return this;
+  }
+
+  public ContextManager getContextManager() {
+    return this.contextManager;
+  }
+
+  public OperatorSpecGraph getOperatorSpecGraph() {
+    return new OperatorSpecGraph(this);
+  }
+
+  /**
+   * Gets the unique ID for the next operator in the graph. The ID is of the 
following format:
+   * jobName-jobId-opCode-(userDefinedId|nextOpNum);
+   *
+   * @param opCode the {@link OpCode} of the next operator
+   * @param userDefinedId the optional user-provided name of the next operator 
or null
+   * @return the unique ID for the next operator in the graph
+   */
+  public String getNextOpId(OpCode opCode, String userDefinedId) {
+    if (StringUtils.isNotBlank(userDefinedId) && 
!USER_DEFINED_ID_PATTERN.matcher(userDefinedId).matches()) {
+      throw new SamzaException("Operator ID must not contain spaces and 
special characters: " + userDefinedId);
+    }
+
+    String nextOpId = String.format("%s-%s-%s-%s",
+        config.get(JobConfig.JOB_NAME()),
+        config.get(JobConfig.JOB_ID(), "1"),
+        opCode.name().toLowerCase(),
+        StringUtils.isNotBlank(userDefinedId) ? userDefinedId.trim() : 
String.valueOf(nextOpNum));
+    if (!operatorIds.add(nextOpId)) {
+      throw new SamzaException(
+          String.format("Found duplicate operator ID %s in the graph. Operator 
IDs must be unique.", nextOpId));
+    }
+    nextOpNum++;
+    return nextOpId;
+  }
+
+  /**
+   * Gets the unique ID for the next operator in the graph. The ID is of the 
following format:
+   * jobName-jobId-opCode-nextOpNum;
+   *
+   * @param opCode the {@link OpCode} of the next operator
+   * @return the unique ID for the next operator in the graph
+   */
+  public String getNextOpId(OpCode opCode) {
+    return getNextOpId(opCode, null);
+  }
+
+  /**
+   * See {@link StreamGraphSpec#getIntermediateStream(String, Serde, boolean)}.
+   *
+   * @param <M> type of messages in the intermediate stream
+   * @param streamId the id of the stream to be created
+   * @param serde the {@link Serde} to use for messages in the intermediate 
stream. If null, the default serde is used.
+   * @return  the intermediate {@link MessageStreamImpl}
+   */
+  @VisibleForTesting
+  public <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String 
streamId, Serde<M> serde) {
+    return getIntermediateStream(streamId, serde, false);
+  }
+
+  /**
+   * Internal helper for {@link MessageStreamImpl} to add an intermediate 
{@link MessageStream} to the graph.
+   * An intermediate {@link MessageStream} is both an output and an input 
stream.
+   *
+   * @param streamId the id of the stream to be created.
+   * @param serde the {@link Serde} to use for the message in the intermediate 
stream. If null, the default serde
+   *              is used.
+   * @param isBroadcast whether the stream is a broadcast stream.
+   * @param <M> the type of messages in the intermediate {@link MessageStream}
+   * @return  the intermediate {@link MessageStreamImpl}
+   *
+   * TODO: once SAMZA-1566 is resolved, we should be able to pass in the 
StreamSpec directly.
+   */
+  @VisibleForTesting
+  <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, 
Serde<M> serde, boolean isBroadcast) {
+    StreamSpec streamSpec = runner.getStreamSpec(streamId);
+    if (isBroadcast) {
+      streamSpec = streamSpec.copyWithBroadCast();
+    }
+
+    Preconditions.checkState(!inputOperators.containsKey(streamSpec) && 
!outputStreams.containsKey(streamSpec),
+        "getIntermediateStream must not be called multiple times with the same 
streamId: " + streamId);
+
+    if (serde == null) {
+      LOGGER.info("Using default serde for intermediate stream: " + streamId);
+      serde = (Serde<M>) defaultSerde;
+    }
+
+    boolean isKeyed = serde instanceof KVSerde;
+    KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
+    InputOperatorSpec inputOperatorSpec =
+        OperatorSpecs.createInputOperatorSpec(streamSpec, kvSerdes.getKey(), 
kvSerdes.getValue(),
+            isKeyed, this.getNextOpId(OpCode.INPUT, null));
+    inputOperators.put(streamSpec, inputOperatorSpec);
+    outputStreams.put(streamSpec, new OutputStreamImpl(streamSpec, 
kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
+    return new IntermediateMessageStreamImpl<>(this, 
inputOperators.get(streamSpec), outputStreams.get(streamSpec));
+  }
+
+  Map<StreamSpec, InputOperatorSpec> getInputOperators() {
+    return Collections.unmodifiableMap(inputOperators);
+  }
+
+  Map<StreamSpec, OutputStreamImpl> getOutputStreams() {
+    return Collections.unmodifiableMap(outputStreams);
+  }
+
+  Map<TableSpec, TableImpl> getTables() {
+    return Collections.unmodifiableMap(tables);
+  }
+
+  private KV<Serde, Serde> getKVSerdes(String streamId, Serde serde) {
+    Serde keySerde, valueSerde;
+
+    if (serde instanceof KVSerde) {
+      keySerde = ((KVSerde) serde).getKeySerde();
+      valueSerde = ((KVSerde) serde).getValueSerde();
+    } else {
+      keySerde = new NoOpSerde();
+      valueSerde = serde;
+    }
+
+    if (keySerde instanceof NoOpSerde) {
+      LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId +
+          ". Keys will not be (de)serialized");
+    }
+    if (valueSerde instanceof NoOpSerde) {
+      LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId +
+          ". Values will not be (de)serialized");
+    }
+
+    return KV.of(keySerde, valueSerde);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java 
b/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java
index e671534..8ceada0 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.operators;
 
+import java.io.Serializable;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.TableSpec;
 
@@ -25,7 +26,7 @@ import org.apache.samza.table.TableSpec;
 /**
  * This class is the holder of a {@link TableSpec}
  */
-public class TableImpl implements Table {
+public class TableImpl implements Table, Serializable {
 
   private final TableSpec tableSpec;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java
index 269e7bc..8df670e 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java
@@ -42,7 +42,7 @@ class BroadcastOperatorImpl<M> extends OperatorImpl<M, Void> {
 
   BroadcastOperatorImpl(BroadcastOperatorSpec<M> broadcastOpSpec, TaskContext 
context) {
     this.broadcastOpSpec = broadcastOpSpec;
-    this.systemStream = 
broadcastOpSpec.getOutputStream().getStreamSpec().toSystemStream();
+    this.systemStream = broadcastOpSpec.getOutputStream().getSystemStream();
     this.taskName = context.getTaskName().getTaskName();
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 608b2be..f0c0997 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -196,7 +196,7 @@ public abstract class OperatorImpl<M, RM> {
 
     results.forEach(rm ->
         this.registeredOperators.forEach(op ->
-            op.onMessage(rm, collector, coordinator)));    
+            op.onMessage(rm, collector, coordinator)));
 
     WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn();
     if (watermarkFn != null) {

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index bbc8783..0f51798 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -25,7 +25,6 @@ import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.operators.KV;
-import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.TimerRegistry;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.PartialJoinFunction;
@@ -34,6 +33,7 @@ import org.apache.samza.operators.spec.BroadcastOperatorSpec;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.spec.OutputOperatorSpec;
 import org.apache.samza.operators.spec.PartitionByOperatorSpec;
 import org.apache.samza.operators.spec.SendToTableOperatorSpec;
@@ -81,26 +81,26 @@ public class OperatorImplGraph {
    * the two {@link PartialJoinOperatorImpl}s for a {@link JoinOperatorSpec} 
with each other since they're
    * reached from different {@link OperatorSpec} during DAG traversals.
    */
-  private final Map<String, KV<PartialJoinFunction, PartialJoinFunction>> 
joinFunctions = new HashMap<>();
+  private final Map<String, KV<PartialJoinOperatorImpl, 
PartialJoinOperatorImpl>> joinOpImpls = new HashMap<>();
 
   private final Clock clock;
 
   /**
    * Constructs the DAG of {@link OperatorImpl}s corresponding to the the DAG 
of {@link OperatorSpec}s
-   * in the {@code streamGraph}.
+   * in the {@code specGraph}.
    *
-   * @param streamGraph  the {@link StreamGraphImpl} containing the logical 
{@link OperatorSpec} DAG
+   * @param specGraph  the {@link OperatorSpecGraph} containing the logical 
{@link OperatorSpec} DAG
    * @param config  the {@link Config} required to instantiate operators
    * @param context  the {@link TaskContext} required to instantiate operators
    * @param clock  the {@link Clock} to get current time
    */
-  public OperatorImplGraph(StreamGraphImpl streamGraph, Config config, 
TaskContext context, Clock clock) {
+  public OperatorImplGraph(OperatorSpecGraph specGraph, Config config, 
TaskContext context, Clock clock) {
     this.clock = clock;
 
     TaskContextImpl taskContext = (TaskContextImpl) context;
-    Map<SystemStream, Integer> producerTaskCounts = 
hasIntermediateStreams(streamGraph) ?
+    Map<SystemStream, Integer> producerTaskCounts = 
hasIntermediateStreams(specGraph) ?
         
getProducerTaskCountForIntermediateStreams(getStreamToConsumerTasks(taskContext.getJobModel()),
-            getIntermediateToInputStreamsMap(streamGraph)) :
+            getIntermediateToInputStreamsMap(specGraph)) :
         Collections.EMPTY_MAP;
     producerTaskCounts.forEach((stream, count) -> {
         LOG.info("{} has {} producer tasks.", stream, count);
@@ -113,7 +113,7 @@ public class OperatorImplGraph {
     taskContext.registerObject(WatermarkStates.class.getName(),
         new WatermarkStates(context.getSystemStreamPartitions(), 
producerTaskCounts));
 
-    streamGraph.getInputOperators().forEach((streamSpec, inputOpSpec) -> {
+    specGraph.getInputOperators().forEach((streamSpec, inputOpSpec) -> {
         SystemStream systemStream = new 
SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName());
         InputOperatorImpl inputOperatorImpl =
             (InputOperatorImpl) createAndRegisterOperatorImpl(null, 
inputOpSpec, systemStream, config, context);
@@ -151,12 +151,13 @@ public class OperatorImplGraph {
    * creates the corresponding DAG of {@link OperatorImpl}s, and returns the 
root {@link OperatorImpl} node.
    *
    * @param prevOperatorSpec  the parent of the current {@code operatorSpec} 
in the traversal
-   * @param operatorSpec  the operatorSpec to create the {@link OperatorImpl} 
for
+   * @param operatorSpec  the {@link OperatorSpec} to create the {@link 
OperatorImpl} for
+   * @param inputStream  the source input stream that we traverse the {@link 
OperatorSpecGraph} from
    * @param config  the {@link Config} required to instantiate operators
    * @param context  the {@link TaskContext} required to instantiate operators
    * @return  the operator implementation for the operatorSpec
    */
-  OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec, 
OperatorSpec operatorSpec,
+  private OperatorImpl createAndRegisterOperatorImpl(OperatorSpec 
prevOperatorSpec, OperatorSpec operatorSpec,
       SystemStream inputStream, Config config, TaskContext context) {
 
     if (!operatorImpls.containsKey(operatorSpec.getOpId()) || operatorSpec 
instanceof JoinOperatorSpec) {
@@ -178,7 +179,9 @@ public class OperatorImplGraph {
 
       Collection<OperatorSpec> registeredSpecs = 
operatorSpec.getRegisteredOperatorSpecs();
       registeredSpecs.forEach(registeredSpec -> {
-          OperatorImpl nextImpl = createAndRegisterOperatorImpl(operatorSpec, 
registeredSpec, inputStream, config, context);
+          LOG.debug("Creating operator {} with opCode: {}", 
registeredSpec.getOpId(), registeredSpec.getOpCode());
+          OperatorImpl nextImpl =
+              createAndRegisterOperatorImpl(operatorSpec, registeredSpec, 
inputStream, config, context);
           operatorImpl.registerNextOperator(nextImpl);
         });
       return operatorImpl;
@@ -199,7 +202,8 @@ public class OperatorImplGraph {
   /**
    * Creates a new {@link OperatorImpl} instance for the provided {@link 
OperatorSpec}.
    *
-   * @param operatorSpec  the immutable {@link OperatorSpec} definition.
+   * @param prevOperatorSpec the original {@link OperatorSpec} that produces 
output for {@code operatorSpec} from {@link OperatorSpecGraph}
+   * @param operatorSpec  the original {@link OperatorSpec} from {@link 
OperatorSpecGraph}
    * @param config  the {@link Config} required to instantiate operators
    * @param context  the {@link TaskContext} required to instantiate operators
    * @return  the {@link OperatorImpl} implementation instance
@@ -209,17 +213,19 @@ public class OperatorImplGraph {
     if (operatorSpec instanceof InputOperatorSpec) {
       return new InputOperatorImpl((InputOperatorSpec) operatorSpec);
     } else if (operatorSpec instanceof StreamOperatorSpec) {
-      return new StreamOperatorImpl((StreamOperatorSpec) operatorSpec, config, 
context);
+      return new StreamOperatorImpl((StreamOperatorSpec) operatorSpec);
     } else if (operatorSpec instanceof SinkOperatorSpec) {
       return new SinkOperatorImpl((SinkOperatorSpec) operatorSpec, config, 
context);
     } else if (operatorSpec instanceof OutputOperatorSpec) {
-      return new OutputOperatorImpl((OutputOperatorSpec) operatorSpec, config, 
context);
+      return new OutputOperatorImpl((OutputOperatorSpec) operatorSpec);
     } else if (operatorSpec instanceof PartitionByOperatorSpec) {
       return new PartitionByOperatorImpl((PartitionByOperatorSpec) 
operatorSpec, config, context);
     } else if (operatorSpec instanceof WindowOperatorSpec) {
       return new WindowOperatorImpl((WindowOperatorSpec) operatorSpec, clock);
     } else if (operatorSpec instanceof JoinOperatorSpec) {
-      return createPartialJoinOperatorImpl(prevOperatorSpec, 
(JoinOperatorSpec) operatorSpec, config, context, clock);
+      return getOrCreatePartialJoinOpImpls((JoinOperatorSpec) operatorSpec,
+          prevOperatorSpec.equals(((JoinOperatorSpec) 
operatorSpec).getLeftInputOpSpec()),
+          config, context, clock);
     } else if (operatorSpec instanceof StreamTableJoinOperatorSpec) {
       return new StreamTableJoinOperatorImpl((StreamTableJoinOperatorSpec) 
operatorSpec, config, context);
     } else if (operatorSpec instanceof SendToTableOperatorSpec) {
@@ -231,23 +237,24 @@ public class OperatorImplGraph {
         String.format("Unsupported OperatorSpec: %s", 
operatorSpec.getClass().getName()));
   }
 
-  private PartialJoinOperatorImpl createPartialJoinOperatorImpl(OperatorSpec 
prevOperatorSpec,
-      JoinOperatorSpec joinOpSpec, Config config, TaskContext context, Clock 
clock) {
-    KV<PartialJoinFunction, PartialJoinFunction> partialJoinFunctions = 
getOrCreatePartialJoinFunctions(joinOpSpec);
-    if (joinOpSpec.getLeftInputOpSpec().equals(prevOperatorSpec)) { // we got 
here from the left side of the join
-      return new PartialJoinOperatorImpl(joinOpSpec, /* isLeftSide */ true,
-          partialJoinFunctions.getKey(), partialJoinFunctions.getValue(), 
config, context, clock);
+  private PartialJoinOperatorImpl 
getOrCreatePartialJoinOpImpls(JoinOperatorSpec joinOpSpec, boolean isLeft,
+      Config config, TaskContext context, Clock clock) {
+    // get the per task pair of PartialJoinOperatorImpl for the corresponding 
{@code joinOpSpec}
+    KV<PartialJoinOperatorImpl, PartialJoinOperatorImpl> partialJoinOpImpls = 
joinOpImpls.computeIfAbsent(joinOpSpec.getOpId(),
+        joinOpId -> {
+        PartialJoinFunction leftJoinFn = createLeftJoinFn(joinOpSpec);
+        PartialJoinFunction rightJoinFn = createRightJoinFn(joinOpSpec);
+        return new KV(new PartialJoinOperatorImpl(joinOpSpec, true, 
leftJoinFn, rightJoinFn, config, context, clock),
+            new PartialJoinOperatorImpl(joinOpSpec, false, rightJoinFn, 
leftJoinFn, config, context, clock));
+      });
+
+    if (isLeft) { // we got here from the left side of the join
+      return partialJoinOpImpls.getKey();
     } else { // we got here from the right side of the join
-      return new PartialJoinOperatorImpl(joinOpSpec, /* isLeftSide */ false,
-          partialJoinFunctions.getValue(), partialJoinFunctions.getKey(), 
config, context, clock);
+      return partialJoinOpImpls.getValue();
     }
   }
 
-  private KV<PartialJoinFunction, PartialJoinFunction> 
getOrCreatePartialJoinFunctions(JoinOperatorSpec joinOpSpec) {
-    return joinFunctions.computeIfAbsent(joinOpSpec.getOpId(),
-        joinOpId -> KV.of(createLeftJoinFn(joinOpSpec), 
createRightJoinFn(joinOpSpec)));
-  }
-
   private PartialJoinFunction<Object, Object, Object, Object> 
createLeftJoinFn(JoinOperatorSpec joinOpSpec) {
     return new PartialJoinFunction<Object, Object, Object, Object>() {
       private final JoinFunction joinFn = joinOpSpec.getJoinFn();
@@ -316,8 +323,8 @@ public class OperatorImplGraph {
     };
   }
 
-  private boolean hasIntermediateStreams(StreamGraphImpl streamGraph) {
-    return !Collections.disjoint(streamGraph.getInputOperators().keySet(), 
streamGraph.getOutputStreams().keySet());
+  private boolean hasIntermediateStreams(OperatorSpecGraph specGraph) {
+    return !Collections.disjoint(specGraph.getInputOperators().keySet(), 
specGraph.getOutputStreams().keySet());
   }
 
   /**
@@ -358,12 +365,12 @@ public class OperatorImplGraph {
 
   /**
    * calculate the mapping from output streams to input streams
-   * @param streamGraph the user {@link StreamGraphImpl} instance
+   * @param specGraph the user {@link OperatorSpecGraph}
    * @return mapping from output streams to input streams
    */
-  static Multimap<SystemStream, SystemStream> 
getIntermediateToInputStreamsMap(StreamGraphImpl streamGraph) {
+  static Multimap<SystemStream, SystemStream> 
getIntermediateToInputStreamsMap(OperatorSpecGraph specGraph) {
     Multimap<SystemStream, SystemStream> outputToInputStreams = 
HashMultimap.create();
-    streamGraph.getInputOperators().entrySet().stream()
+    specGraph.getInputOperators().entrySet().stream()
         .forEach(
             entry -> computeOutputToInput(entry.getKey().toSystemStream(), 
entry.getValue(), outputToInputStreams));
     return outputToInputStreams;

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
index 27bef87..e625484 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
@@ -42,11 +42,10 @@ class OutputOperatorImpl<M> extends OperatorImpl<M, Void> {
   private final OutputStreamImpl<M> outputStream;
   private final SystemStream systemStream;
 
-  OutputOperatorImpl(OutputOperatorSpec<M> outputOpSpec, Config config, 
TaskContext context) {
+  OutputOperatorImpl(OutputOperatorSpec<M> outputOpSpec) {
     this.outputOpSpec = outputOpSpec;
     this.outputStream = outputOpSpec.getOutputStream();
-    this.systemStream = new 
SystemStream(outputStream.getStreamSpec().getSystemName(),
-        outputStream.getStreamSpec().getPhysicalName());
+    this.systemStream = outputStream.getSystemStream();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
index 9fc1e7c..dd64429 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
@@ -21,6 +21,7 @@ package org.apache.samza.operators.impl;
 import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.operators.KV;
+import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.spec.PartitionByOperatorSpec;
@@ -36,7 +37,6 @@ import org.apache.samza.task.TaskCoordinator;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.function.Function;
 
 
 /**
@@ -46,17 +46,15 @@ class PartitionByOperatorImpl<M, K, V> extends 
OperatorImpl<M, Void> {
 
   private final PartitionByOperatorSpec<M, K, V> partitionByOpSpec;
   private final SystemStream systemStream;
-  private final Function<? super M, ? extends K> keyFunction;
-  private final Function<? super M, ? extends V> valueFunction;
+  private final MapFunction<? super M, ? extends K> keyFunction;
+  private final MapFunction<? super M, ? extends V> valueFunction;
   private final String taskName;
   private final ControlMessageSender controlMessageSender;
 
   PartitionByOperatorImpl(PartitionByOperatorSpec<M, K, V> partitionByOpSpec, 
Config config, TaskContext context) {
     this.partitionByOpSpec = partitionByOpSpec;
     OutputStreamImpl<KV<K, V>> outputStream = 
partitionByOpSpec.getOutputStream();
-    this.systemStream = new SystemStream(
-        outputStream.getStreamSpec().getSystemName(),
-        outputStream.getStreamSpec().getPhysicalName());
+    this.systemStream = outputStream.getSystemStream();
     this.keyFunction = partitionByOpSpec.getKeyFunction();
     this.valueFunction = partitionByOpSpec.getValueFunction();
     this.taskName = context.getTaskName().getTaskName();
@@ -66,6 +64,8 @@ class PartitionByOperatorImpl<M, K, V> extends 
OperatorImpl<M, Void> {
 
   @Override
   protected void handleInit(Config config, TaskContext context) {
+    this.keyFunction.init(config, context);
+    this.valueFunction.init(config, context);
   }
 
   @Override
@@ -80,6 +80,8 @@ class PartitionByOperatorImpl<M, K, V> extends 
OperatorImpl<M, Void> {
 
   @Override
   protected void handleClose() {
+    this.keyFunction.close();
+    this.valueFunction.close();
   }
 
   @Override
@@ -100,7 +102,7 @@ class PartitionByOperatorImpl<M, K, V> extends 
OperatorImpl<M, Void> {
   }
 
   private void sendControlMessage(ControlMessage message, MessageCollector 
collector) {
-    SystemStream outputStream = 
partitionByOpSpec.getOutputStream().getStreamSpec().toSystemStream();
+    SystemStream outputStream = 
partitionByOpSpec.getOutputStream().getSystemStream();
     controlMessageSender.send(message, outputStream, collector);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
index a51d5e6..6cd426b 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
@@ -40,8 +40,7 @@ class StreamOperatorImpl<M, RM> extends OperatorImpl<M, RM> {
   private final StreamOperatorSpec<M, RM> streamOpSpec;
   private final FlatMapFunction<M, RM> transformFn;
 
-  StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOpSpec,
-      Config config, TaskContext context) {
+  StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOpSpec) {
     this.streamOpSpec = streamOpSpec;
     this.transformFn = streamOpSpec.getTransformFn();
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
index 32406cb..6b5baae 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -23,6 +23,8 @@ package org.apache.samza.operators.impl;
 import com.google.common.base.Preconditions;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.functions.FoldLeftFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.SupplierFunction;
 import org.apache.samza.operators.impl.store.TimeSeriesKey;
 import org.apache.samza.operators.impl.store.TimeSeriesStore;
 import org.apache.samza.operators.impl.store.TimeSeriesStoreImpl;
@@ -58,8 +60,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.function.Function;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 /**
@@ -93,8 +93,8 @@ public class WindowOperatorImpl<M, K> extends OperatorImpl<M, 
WindowPane<K, Obje
   private final Clock clock;
   private final WindowInternal<M, K, Object> window;
   private final FoldLeftFunction<M, Object> foldLeftFn;
-  private final Supplier<Object> initializer;
-  private final Function<M, K> keyFn;
+  private final SupplierFunction<Object> initializer;
+  private final MapFunction<M, K> keyFn;
 
   private final TriggerScheduler<K> triggerScheduler;
   private final Map<TriggerKey<K>, TriggerImplHandler> triggers = new 
HashMap<>();
@@ -112,11 +112,18 @@ public class WindowOperatorImpl<M, K> extends 
OperatorImpl<M, WindowPane<K, Obje
 
   @Override
   protected void handleInit(Config config, TaskContext context) {
-    WindowInternal<M, K, Object> window = windowOpSpec.getWindow();
 
     KeyValueStore<TimeSeriesKey<K>, Object> store =
         (KeyValueStore<TimeSeriesKey<K>, Object>) 
context.getStore(windowOpSpec.getOpId());
 
+    if (initializer != null) {
+      initializer.init(config, context);
+    }
+
+    if (keyFn != null) {
+      keyFn.init(config, context);
+    }
+
     // For aggregating windows, we use the store in over-write mode since we 
only retain the aggregated
     // value. Else, we use the store in append-mode.
     if (foldLeftFn != null) {
@@ -215,6 +222,12 @@ public class WindowOperatorImpl<M, K> extends 
OperatorImpl<M, WindowPane<K, Obje
     if (timeSeriesStore != null) {
       timeSeriesStore.close();
     }
+    if (initializer != null) {
+      initializer.close();
+    }
+    if (keyFn != null) {
+      keyFn.close();
+    }
   }
 
   private TriggerImplHandler getOrCreateTriggerImplHandler(TriggerKey<K> 
triggerKey, Trigger<M> trigger) {

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java
new file mode 100644
index 0000000..a5cdb82
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * The spec for an operator that filters input messages based on some 
conditions.
+ *
+ * @param <M> type of input message
+ */
+class FilterOperatorSpec<M> extends StreamOperatorSpec<M, M> {
+  private final FilterFunction<M> filterFn;
+
+  FilterOperatorSpec(FilterFunction<M> filterFn, String opId) {
+    super(new FlatMapFunction<M, M>() {
+      @Override
+      public Collection<M> apply(M message) {
+        return new ArrayList<M>() {
+          {
+            if (filterFn.apply(message)) {
+              this.add(message);
+            }
+          }
+        };
+      }
+
+      @Override
+      public void init(Config config, TaskContext context) {
+        filterFn.init(config, context);
+      }
+
+      @Override
+      public void close() {
+        filterFn.close();
+      }
+    }, OpCode.FILTER, opId);
+    this.filterFn = filterFn;
+  }
+
+  @Override
+  public WatermarkFunction getWatermarkFn() {
+    return this.filterFn instanceof WatermarkFunction ? (WatermarkFunction) 
this.filterFn : null;
+  }
+
+  @Override
+  public TimerFunction getTimerFn() {
+    return this.filterFn instanceof TimerFunction ? (TimerFunction) 
this.filterFn : null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java
new file mode 100644
index 0000000..a93a221
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
+
+
+/**
+ * The spec for an operator that transforms each input message to a collection 
of output messages.
+ *
+ * @param <M> type of input message
+ * @param <OM> type of output messages
+ */
+class FlatMapOperatorSpec<M, OM> extends StreamOperatorSpec<M, OM> {
+
+  FlatMapOperatorSpec(FlatMapFunction<M, OM> flatMapFn, String opId) {
+    super(flatMapFn, OpCode.FLAT_MAP, opId);
+  }
+
+  @Override
+  public WatermarkFunction getWatermarkFn() {
+    return this.transformFn instanceof WatermarkFunction ? (WatermarkFunction) 
this.transformFn : null;
+  }
+
+  @Override
+  public TimerFunction getTimerFn() {
+    return this.transformFn instanceof TimerFunction ? (TimerFunction) 
this.transformFn : null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
index 2ed1e30..a636ac5 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
@@ -20,8 +20,8 @@ package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.functions.TimerFunction;
-import org.apache.samza.serializers.Serde;
 import org.apache.samza.operators.functions.WatermarkFunction;
+import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamSpec;
 
 /**
@@ -33,10 +33,15 @@ import org.apache.samza.system.StreamSpec;
  */
 public class InputOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, Object> { 
// Object == KV<K, V> | V
 
-  private final StreamSpec streamSpec;
-  private final Serde<K> keySerde;
-  private final Serde<V> valueSerde;
   private final boolean isKeyed;
+  private final StreamSpec streamSpec;
+
+  /**
+   * The following {@link Serde}s are serialized by the ExecutionPlanner when 
generating the configs for a stream, and deserialized
+   * once during startup in SamzaContainer. They don't need to be deserialized 
here on a per-task basis
+   */
+  private transient final Serde<K> keySerde;
+  private transient final Serde<V> valueSerde;
 
   public InputOperatorSpec(StreamSpec streamSpec,
       Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyed, String opId) {

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
index 9e058ff..a218135 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
@@ -42,14 +42,20 @@ import java.util.Map;
  */
 public class JoinOperatorSpec<K, M, OM, JM> extends OperatorSpec<Object, JM> 
implements StatefulOperatorSpec { // Object == M | OM
 
-  private final OperatorSpec<?, M> leftInputOpSpec;
-  private final OperatorSpec<?, OM> rightInputOpSpec;
   private final JoinFunction<K, M, OM, JM> joinFn;
-  private final Serde<K> keySerde;
-  private final Serde<TimestampedValue<M>> messageSerde;
-  private final Serde<TimestampedValue<OM>> otherMessageSerde;
   private final long ttlMs;
 
+  private final OperatorSpec<?, M> leftInputOpSpec;
+  private final OperatorSpec<?, OM> rightInputOpSpec;
+
+  /**
+   * The following {@link Serde}s are serialized by the ExecutionPlanner when 
generating the store configs for a join, and
+   * deserialized once during startup in SamzaContainer. They don't need to be 
deserialized here on a per-task basis
+   */
+  private transient final Serde<K> keySerde;
+  private transient final Serde<TimestampedValue<M>> messageSerde;
+  private transient final Serde<TimestampedValue<OM>> otherMessageSerde;
+
   /**
    * Default constructor for a {@link JoinOperatorSpec}.
    *
@@ -126,4 +132,5 @@ public class JoinOperatorSpec<K, M, OM, JM> extends 
OperatorSpec<Object, JM> imp
   public long getTtlMs() {
     return ttlMs;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java 
b/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java
new file mode 100644
index 0000000..1e2190b
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * The spec for an operator that transforms each input message to a single 
output message.
+ *
+ * @param <M> type of input message
+ * @param <OM> type of output messages
+ */
+class MapOperatorSpec<M, OM> extends StreamOperatorSpec<M, OM> {
+
+  private final MapFunction<M, OM> mapFn;
+
+  MapOperatorSpec(MapFunction<M, OM> mapFn, String opId) {
+    super(new FlatMapFunction<M, OM>() {
+      @Override
+      public Collection<OM> apply(M message) {
+        return new ArrayList<OM>() {
+          {
+            OM r = mapFn.apply(message);
+            if (r != null) {
+              this.add(r);
+            }
+          }
+        };
+      }
+
+      @Override
+      public void init(Config config, TaskContext context) {
+        mapFn.init(config, context);
+      }
+
+      @Override
+      public void close() {
+        mapFn.close();
+      }
+    }, OpCode.MAP, opId);
+    this.mapFn = mapFn;
+  }
+
+  @Override
+  public WatermarkFunction getWatermarkFn() {
+    return this.mapFn instanceof WatermarkFunction ? (WatermarkFunction) 
this.mapFn : null;
+  }
+
+  @Override
+  public TimerFunction getTimerFn() {
+    return this.mapFn instanceof TimerFunction ? (TimerFunction) this.mapFn : 
null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java
new file mode 100644
index 0000000..987f72c
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import java.util.ArrayList;
+import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
+
+
+/**
+ * The spec for an operator that combines messages from all input streams into 
a single output stream.
+ *
+ * @param <M> the type of messages in all input streams
+ */
+class MergeOperatorSpec<M> extends StreamOperatorSpec<M, M> {
+
+  MergeOperatorSpec(String opId) {
+    super((M message) ->
+        new ArrayList<M>() {
+        {
+          this.add(message);
+        }
+      }, OperatorSpec.OpCode.MERGE, opId);
+  }
+
+  @Override
+  public WatermarkFunction getWatermarkFn() {
+    return null;
+  }
+
+  @Override
+  public TimerFunction getTimerFn() {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index 7b0a41b..e1e1c55 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -18,9 +18,9 @@
  */
 package org.apache.samza.operators.spec;
 
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.LinkedHashSet;
-import java.util.Set;
 
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.MessageStream;
@@ -30,14 +30,14 @@ import 
org.apache.samza.operators.functions.WatermarkFunction;
 
 /**
  * A stream operator specification that holds all the information required to 
transform
- * the input {@link org.apache.samza.operators.MessageStreamImpl} and produce 
the output
- * {@link org.apache.samza.operators.MessageStreamImpl}.
+ * the input {@link MessageStreamImpl} and produce the output
+ * {@link MessageStreamImpl}.
  *
  * @param <M>  the type of input message to the operator
  * @param <OM>  the type of output message from the operator
  */
 @InterfaceStability.Unstable
-public abstract class OperatorSpec<M, OM> {
+public abstract class OperatorSpec<M, OM> implements Serializable {
 
   public enum OpCode {
     INPUT,
@@ -61,9 +61,15 @@ public abstract class OperatorSpec<M, OM> {
   /**
    * The set of operators that consume the messages produced from this 
operator.
    * <p>
-   * We use a LinkedHashSet since we need deterministic ordering in 
initializing/closing operators.
+   * We use a LinkedHashSet since we need both deterministic ordering in 
initializing/closing operators and serializability.
    */
-  private final Set<OperatorSpec<OM, ?>> nextOperatorSpecs = new 
LinkedHashSet<>();
+  private final LinkedHashSet<OperatorSpec<OM, ?>> nextOperatorSpecs = new 
LinkedHashSet<>();
+
+  // this method is used in unit tests to verify an {@link OperatorSpec} 
instance is a deserialized copy of this object.
+  final boolean isClone(OperatorSpec other) {
+    return this != other && this.getClass().isAssignableFrom(other.getClass())
+        && this.opCode.equals(other.opCode) && this.opId.equals(other.opId);
+  }
 
   public OperatorSpec(OpCode opCode, String opId) {
     this.opCode = opCode;
@@ -79,6 +85,11 @@ public abstract class OperatorSpec<M, OM> {
     nextOperatorSpecs.add(nextOperatorSpec);
   }
 
+  /**
+   * Get the collection of chained {@link OperatorSpec}s that are consuming 
the output of this node
+   *
+   * @return the collection of chained {@link OperatorSpec}s
+   */
   public Collection<OperatorSpec<OM, ?>> getRegisteredOperatorSpecs() {
     return nextOperatorSpecs;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index c38f6e8..6e98d5a 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -19,11 +19,6 @@
 
 package org.apache.samza.operators.spec;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.function.Function;
-
-import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
@@ -35,7 +30,6 @@ import 
org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.table.TableSpec;
-import org.apache.samza.task.TaskContext;
 
 
 /**
@@ -73,29 +67,7 @@ public class OperatorSpecs {
    */
   public static <M, OM> StreamOperatorSpec<M, OM> createMapOperatorSpec(
       MapFunction<? super M, ? extends OM> mapFn, String opId) {
-    return new StreamOperatorSpec<>(new FlatMapFunction<M, OM>() {
-      @Override
-      public Collection<OM> apply(M message) {
-        return new ArrayList<OM>() {
-          {
-            OM r = mapFn.apply(message);
-            if (r != null) {
-              this.add(r);
-            }
-          }
-        };
-      }
-
-      @Override
-      public void init(Config config, TaskContext context) {
-        mapFn.init(config, context);
-      }
-
-      @Override
-      public void close() {
-        mapFn.close();
-      }
-    }, mapFn, OperatorSpec.OpCode.MAP, opId);
+    return new MapOperatorSpec<>((MapFunction<M, OM>) mapFn, opId);
   }
 
   /**
@@ -108,28 +80,7 @@ public class OperatorSpecs {
    */
   public static <M> StreamOperatorSpec<M, M> createFilterOperatorSpec(
       FilterFunction<? super M> filterFn, String opId) {
-    return new StreamOperatorSpec<>(new FlatMapFunction<M, M>() {
-      @Override
-      public Collection<M> apply(M message) {
-        return new ArrayList<M>() {
-          {
-            if (filterFn.apply(message)) {
-              this.add(message);
-            }
-          }
-        };
-      }
-
-      @Override
-      public void init(Config config, TaskContext context) {
-        filterFn.init(config, context);
-      }
-
-      @Override
-      public void close() {
-        filterFn.close();
-      }
-    }, filterFn, OperatorSpec.OpCode.FILTER, opId);
+    return new FilterOperatorSpec<>((FilterFunction<M>) filterFn, opId);
   }
 
   /**
@@ -143,7 +94,7 @@ public class OperatorSpecs {
    */
   public static <M, OM> StreamOperatorSpec<M, OM> createFlatMapOperatorSpec(
       FlatMapFunction<? super M, ? extends OM> flatMapFn, String opId) {
-    return new StreamOperatorSpec<>((FlatMapFunction<M, OM>) flatMapFn, 
flatMapFn, OperatorSpec.OpCode.FLAT_MAP, opId);
+    return new FlatMapOperatorSpec<>((FlatMapFunction<M, OM>) flatMapFn, opId);
   }
 
   /**
@@ -183,8 +134,8 @@ public class OperatorSpecs {
    * @return  the {@link OutputOperatorSpec} for the partitionBy operator
    */
   public static <M, K, V> PartitionByOperatorSpec<M, K, V> 
createPartitionByOperatorSpec(
-      OutputStreamImpl<KV<K, V>> outputStream, Function<? super M, ? extends 
K> keyFunction,
-      Function<? super M, ? extends V> valueFunction, String opId) {
+      OutputStreamImpl<KV<K, V>> outputStream, MapFunction<? super M, ? 
extends K> keyFunction,
+      MapFunction<? super M, ? extends V> valueFunction, String opId) {
     return new PartitionByOperatorSpec<>(outputStream, keyFunction, 
valueFunction, opId);
   }
 
@@ -198,7 +149,6 @@ public class OperatorSpecs {
    * @param <WV>  the type of value in the window
    * @return  the {@link WindowOperatorSpec}
    */
-
   public static <M, WK, WV> WindowOperatorSpec<M, WK, WV> 
createWindowOperatorSpec(
       WindowInternal<M, WK, WV> window, String opId) {
     return new WindowOperatorSpec<>(window, opId);
@@ -236,13 +186,7 @@ public class OperatorSpecs {
    * @return  the {@link StreamOperatorSpec} for the merge
    */
   public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(String 
opId) {
-    return new StreamOperatorSpec<>(message ->
-        new ArrayList<M>() {
-          {
-            this.add(message);
-          }
-        },
-        null, OperatorSpec.OpCode.MERGE, opId);
+    return new MergeOperatorSpec<>(opId);
   }
 
   /**
@@ -266,7 +210,6 @@ public class OperatorSpecs {
    * Creates a {@link SendToTableOperatorSpec} with a key extractor and a 
value extractor function,
    * the type of incoming message is expected to be KV&#60;K, V&#62;.
    *
-   * @param inputOpSpec the operator spec for the input stream
    * @param tableSpec the table spec for the underlying table
    * @param opId the unique ID of the operator
    * @param <K> the type of the table record key
@@ -274,8 +217,8 @@ public class OperatorSpecs {
    * @return the {@link SendToTableOperatorSpec}
    */
   public static <K, V> SendToTableOperatorSpec<K, V> 
createSendToTableOperatorSpec(
-      OperatorSpec<?, KV<K, V>> inputOpSpec, TableSpec tableSpec, String opId) 
{
-    return new SendToTableOperatorSpec(inputOpSpec, tableSpec, opId);
+     TableSpec tableSpec, String opId) {
+    return new SendToTableOperatorSpec(tableSpec, opId);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
index e439c4e..fe0abcb 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
@@ -18,18 +18,25 @@
  */
 package org.apache.samza.operators.spec;
 
+import java.io.Serializable;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStream;
 
 
-public class OutputStreamImpl<M> implements OutputStream<M> {
+public class OutputStreamImpl<M> implements OutputStream<M>, Serializable {
 
   private final StreamSpec streamSpec;
-  private final Serde keySerde;
-  private final Serde valueSerde;
   private final boolean isKeyed;
 
+  /**
+   * The following fields are serialized by the ExecutionPlanner when 
generating the configs for the output stream, and
+   * deserialized once during startup in SamzaContainer. They don't need to be 
deserialized here on a per-task basis
+   */
+  private transient final Serde keySerde;
+  private transient final Serde valueSerde;
+
   public OutputStreamImpl(StreamSpec streamSpec,
       Serde keySerde, Serde valueSerde, boolean isKeyed) {
     this.streamSpec = streamSpec;
@@ -50,6 +57,10 @@ public class OutputStreamImpl<M> implements OutputStream<M> {
     return valueSerde;
   }
 
+  public SystemStream getSystemStream() {
+    return this.streamSpec.toSystemStream();
+  }
+
   public boolean isKeyed() {
     return isKeyed;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
index a0a9b61..d6bf3d9 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
@@ -19,10 +19,11 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.KV;
+import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
-import java.util.function.Function;
+import static com.google.common.base.Preconditions.checkArgument;
 
 
 /**
@@ -39,21 +40,25 @@ import java.util.function.Function;
 public class PartitionByOperatorSpec<M, K, V> extends OperatorSpec<M, Void> {
 
   private final OutputStreamImpl<KV<K, V>> outputStream;
-  private final Function<? super M, ? extends K> keyFunction;
-  private final Function<? super M, ? extends V> valueFunction;
+  private final MapFunction<? super M, ? extends K> keyFunction;
+  private final MapFunction<? super M, ? extends V> valueFunction;
 
   /**
    * Constructs an {@link PartitionByOperatorSpec} to send messages to the 
provided {@code outputStream}
    *
    * @param outputStream the {@link OutputStreamImpl} to send messages to
-   * @param keyFunction the {@link Function} for extracting the key from the 
message
-   * @param valueFunction the {@link Function} for extracting the value from 
the message
+   * @param keyFunction the {@link MapFunction} for extracting the key from 
the message
+   * @param valueFunction the {@link MapFunction} for extracting the value 
from the message
    * @param opId the unique ID of this {@link SinkOperatorSpec} in the graph
    */
   PartitionByOperatorSpec(OutputStreamImpl<KV<K, V>> outputStream,
-      Function<? super M, ? extends K> keyFunction,
-      Function<? super M, ? extends V> valueFunction, String opId) {
+      MapFunction<? super M, ? extends K> keyFunction,
+      MapFunction<? super M, ? extends V> valueFunction, String opId) {
     super(OpCode.PARTITION_BY, opId);
+    checkArgument(!(keyFunction instanceof TimerFunction || keyFunction 
instanceof WatermarkFunction),
+        "keyFunction for partitionBy should not implement TimerFunction or 
WatermarkFunction.");
+    checkArgument(!(valueFunction instanceof TimerFunction || valueFunction 
instanceof WatermarkFunction),
+        "valueFunction for partitionBy should not implement TimerFunction or 
WatermarkFunction.");
     this.outputStream = outputStream;
     this.keyFunction = keyFunction;
     this.valueFunction = valueFunction;
@@ -67,11 +72,11 @@ public class PartitionByOperatorSpec<M, K, V> extends 
OperatorSpec<M, Void> {
     return this.outputStream;
   }
 
-  public Function<? super M, ? extends K> getKeyFunction() {
+  public MapFunction<? super M, ? extends K> getKeyFunction() {
     return keyFunction;
   }
 
-  public Function<? super M, ? extends V> getValueFunction() {
+  public MapFunction<? super M, ? extends V> getValueFunction() {
     return valueFunction;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
index e1b51be..22f393e 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
@@ -35,26 +35,19 @@ import org.apache.samza.table.TableSpec;
 @InterfaceStability.Unstable
 public class SendToTableOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, 
Void> {
 
-  private final OperatorSpec<?, KV<K, V>> inputOpSpec;
   private final TableSpec tableSpec;
 
   /**
    * Constructor for a {@link SendToTableOperatorSpec}.
    *
-   * @param inputOpSpec  the operator spec of the input stream
    * @param tableSpec  the table spec of the table written to
    * @param opId  the unique ID for this operator
    */
-  SendToTableOperatorSpec(OperatorSpec<?, KV<K, V>> inputOpSpec, TableSpec 
tableSpec, String opId) {
+  SendToTableOperatorSpec(TableSpec tableSpec, String opId) {
     super(OpCode.SEND_TO, opId);
-    this.inputOpSpec = inputOpSpec;
     this.tableSpec = tableSpec;
   }
 
-  public OperatorSpec<?, KV<K, V>> getInputOpSpec() {
-    return inputOpSpec;
-  }
-
   public TableSpec getTableSpec() {
     return tableSpec;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
index 644eb6c..3addbf7 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
@@ -19,46 +19,31 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.TimerFunction;
-import org.apache.samza.operators.functions.WatermarkFunction;
-
 
 /**
- * The spec for a simple stream operator that outputs 0 or more messages for 
each input message.
+ * The common spec for a simple stream operator that outputs 0 or more 
messages for each input message.
  *
  * @param <M>  the type of input message
  * @param <OM>  the type of output message
  */
-public class StreamOperatorSpec<M, OM> extends OperatorSpec<M, OM> {
+public abstract class StreamOperatorSpec<M, OM> extends OperatorSpec<M, OM> {
 
-  private final FlatMapFunction<M, OM> transformFn;
-  private final Object originalFn;
+  protected final FlatMapFunction<M, OM> transformFn;
 
   /**
    * Constructor for a {@link StreamOperatorSpec}.
    *
    * @param transformFn  the transformation function
-   * @param originalFn the original user function before wrapping to 
transformFn
    * @param opCode  the {@link OpCode} for this {@link StreamOperatorSpec}
    * @param opId  the unique ID for this {@link StreamOperatorSpec}
    */
-  StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, Object originalFn, 
OperatorSpec.OpCode opCode, String opId) {
+  protected StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, 
OperatorSpec.OpCode opCode, String opId) {
     super(opCode, opId);
     this.transformFn = transformFn;
-    this.originalFn = originalFn;
   }
 
   public FlatMapFunction<M, OM> getTransformFn() {
     return this.transformFn;
   }
 
-  @Override
-  public WatermarkFunction getWatermarkFn() {
-    return originalFn instanceof WatermarkFunction ? (WatermarkFunction) 
originalFn : null;
-  }
-
-  @Override
-  public TimerFunction getTimerFn() {
-    return originalFn instanceof TimerFunction ? (TimerFunction) originalFn : 
null;
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index 73d10ff..8d1ad29 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -40,6 +40,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static com.google.common.base.Preconditions.*;
+
 
 /**
  * The spec for an operator that groups messages into finite windows for 
processing
@@ -61,6 +63,15 @@ public class WindowOperatorSpec<M, WK, WV> extends 
OperatorSpec<M, WindowPane<WK
    */
   WindowOperatorSpec(WindowInternal<M, WK, WV> window, String opId) {
     super(OpCode.WINDOW, opId);
+    checkArgument(window.getInitializer() == null ||
+        !(window.getInitializer() instanceof TimerFunction || 
window.getInitializer() instanceof WatermarkFunction),
+        "A window does not accepts a user-defined TimerFunction or 
WatermarkFunction as the initializer.");
+    checkArgument(window.getKeyExtractor() == null ||
+        !(window.getKeyExtractor() instanceof TimerFunction || 
window.getKeyExtractor() instanceof WatermarkFunction),
+        "A window does not accepts a user-defined TimerFunction or 
WatermarkFunction as the keyExtractor.");
+    checkArgument(window.getEventTimeExtractor() == null ||
+        !(window.getEventTimeExtractor() instanceof TimerFunction || 
window.getEventTimeExtractor() instanceof WatermarkFunction),
+        "A window does not accepts a user-defined TimerFunction or 
WatermarkFunction as the eventTimeExtractor.");
     this.window = window;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
index 5eeca99..272ba63 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
@@ -20,7 +20,7 @@ package org.apache.samza.operators.stream;
 
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
@@ -45,7 +45,7 @@ public class IntermediateMessageStreamImpl<M> extends 
MessageStreamImpl<M> imple
   private final OutputStreamImpl<M> outputStream;
   private final boolean isKeyed;
 
-  public IntermediateMessageStreamImpl(StreamGraphImpl graph, 
InputOperatorSpec<?, M> inputOperatorSpec,
+  public IntermediateMessageStreamImpl(StreamGraphSpec graph, 
InputOperatorSpec<?, M> inputOperatorSpec,
       OutputStreamImpl<M> outputStream) {
     super(graph, (OperatorSpec<?, M>) inputOperatorSpec);
     this.outputStream = outputStream;

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java 
b/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java
index ca0ba67..96defd5 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java
@@ -30,5 +30,5 @@ public interface Cancellable {
    *
    * @return the result of the cancelation
    */
-  public boolean cancel();
+  boolean cancel();
 }

Reply via email to