SAMZA-1745: Remove all usages of StreamSpec and ApplicationRunner from the 
operator spec and impl layers.

This PR is a pre-requisite for adding support for user-provided 
SystemDescriptors and StreamDescriptors to the High Level API.

It removes all usages of StreamSpec and ApplicationRunner from the OperatorSpec 
and OperatorImpl layers. DAG specification (StreamGraphSpec, OperatorSpecs) now 
only relies on logical streamIds (and in future, will use the user-provided 
StreamDescriptors). DAG execution (i.e., StreamOperatorTask, OperatorImpls) now 
only relies on logical streamIds and their corresponding SystemStreams, which 
are obtained using StreamConfig in OperatorImplGraph.

After this change, StreamSpec can be thought of as the API between 
StreamManager and SystemAdmins for creating and validating streams. Ideally 
ExecutionPlanner shouldn't rely on StreamSpec either, but it currently does so 
extensively, so I'll leave that refactor for later.

Additional changes:
1. ApplicationRunner is no longer responsible for creating/returning StreamSpec 
instances. Instances can be created directly using the StreamSpec constructors, 
or by using one of the util methods in the new StreamUtil class.

2. StreamSpec class no longer tracks the isBroadcast and isBounded status for 
streams.
The former was being used for communicating broadcast status from the 
StreamGraphSpec to the planner so that it could write the broadcast input 
configurations. This is now done using a separate Set of broadcast streamIds in 
StreamGraphSpec.
The latter was being set by the ApplicationRunner based on a config, and then 
passed to the planner so that it could write the bounded input configs. This 
was redundant, so I removed it.

Author: Prateek Maheshwari <[email protected]>
Author: Prateek Maheshwari <[email protected]>
Author: Prateek Maheshwari <[email protected]>
Author: prateekm <[email protected]>

Reviewers: Jagadish Venkatraman <[email protected]>, Yi Pan 
<[email protected]>, Cameron Lee <[email protected]>

Closes #552 from prateekm/stream-spec-cleanup


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/440a25c9
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/440a25c9
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/440a25c9

Branch: refs/heads/master
Commit: 440a25c97e7c0e7e7339c8cb08813ef08f4cdf19
Parents: 43c36e6
Author: Prateek Maheshwari <[email protected]>
Authored: Fri Jul 27 11:24:00 2018 -0700
Committer: Prateek Maheshwari <[email protected]>
Committed: Fri Jul 27 11:24:00 2018 -0700

----------------------------------------------------------------------
 .../samza/operators/functions/MapFunction.java  |   2 +-
 .../apache/samza/runtime/ApplicationRunner.java |  22 --
 .../org/apache/samza/system/StreamSpec.java     |  56 +--
 .../org/apache/samza/config/TaskConfigJava.java |   3 +-
 .../samza/execution/ExecutionPlanner.java       |  22 +-
 .../org/apache/samza/execution/JobGraph.java    |  18 +-
 .../samza/execution/JobGraphJsonGenerator.java  |  12 +-
 .../org/apache/samza/execution/JobNode.java     |  18 +-
 .../org/apache/samza/execution/StreamEdge.java  |  34 +-
 .../apache/samza/execution/StreamManager.java   |   3 +-
 .../samza/operators/OperatorSpecGraph.java      |  15 +-
 .../apache/samza/operators/StreamGraphSpec.java |  79 ++--
 .../operators/impl/BroadcastOperatorImpl.java   |   4 +-
 .../samza/operators/impl/OperatorImplGraph.java |  62 +--
 .../operators/impl/OutputOperatorImpl.java      |   4 +-
 .../operators/impl/PartitionByOperatorImpl.java |  11 +-
 .../samza/operators/spec/InputOperatorSpec.java |  12 +-
 .../samza/operators/spec/OperatorSpecs.java     |   7 +-
 .../samza/operators/spec/OutputStreamImpl.java  |  18 +-
 .../stream/IntermediateMessageStreamImpl.java   |   7 +-
 .../runtime/AbstractApplicationRunner.java      |  64 +--
 .../samza/storage/ChangelogStreamManager.java   |   4 +-
 .../apache/samza/storage/StorageRecovery.java   |   3 +-
 .../java/org/apache/samza/util/StreamUtil.java  |  90 +++++
 .../org/apache/samza/config/StorageConfig.scala |   5 +-
 .../org/apache/samza/config/TaskConfig.scala    |   4 +-
 .../apache/samza/container/SamzaContainer.scala |   4 +-
 .../samza/job/local/ThreadJobFactory.scala      |   4 +-
 .../MetricsSnapshotReporterFactory.scala        |   5 +-
 .../main/scala/org/apache/samza/util/Util.scala |  42 --
 .../samza/execution/TestExecutionPlanner.java   |  42 +-
 .../apache/samza/execution/TestJobGraph.java    |  28 +-
 .../execution/TestJobGraphJsonGenerator.java    |  45 +--
 .../org/apache/samza/execution/TestJobNode.java |  17 +-
 .../apache/samza/execution/TestStreamEdge.java  |  16 +-
 .../samza/operators/TestJoinOperator.java       |  11 +-
 .../samza/operators/TestOperatorSpecGraph.java  |  23 +-
 .../samza/operators/TestStreamGraphSpec.java    | 336 ++++++----------
 .../operators/impl/TestOperatorImplGraph.java   | 272 +++++++------
 .../operators/impl/TestWindowOperator.java      |  16 +-
 .../operators/spec/OperatorSpecTestUtils.java   |  14 +-
 .../samza/operators/spec/TestOperatorSpec.java  |  18 +-
 .../spec/TestPartitionByOperatorSpec.java       |  14 +-
 .../runtime/TestAbstractApplicationRunner.java  | 391 -------------------
 .../apache/samza/task/TestTaskFactoryUtil.java  |   5 +-
 .../apache/samza/testUtils/StreamTestUtils.java |  39 ++
 .../org/apache/samza/util/TestStreamUtil.java   | 337 ++++++++++++++++
 .../samza/system/kafka/KafkaStreamSpec.java     |  15 +-
 .../org/apache/samza/config/KafkaConfig.scala   |   4 +-
 .../samza/config/RegExTopicGenerator.scala      |   8 +-
 .../samza/system/kafka/KafkaSystemAdmin.scala   |   4 +-
 .../samza/system/kafka/TestKafkaStreamSpec.java |   6 +-
 .../kafka/TestKafkaCheckpointManager.scala      |   4 +-
 .../sql/translator/TestQueryTranslator.java     | 235 ++++++-----
 .../test/integration/NegateNumberTask.java      |   4 +-
 .../test/performance/TestPerformanceTask.scala  |   4 +-
 .../test/operator/RepartitionJoinWindowApp.java |  13 +-
 .../operator/TestRepartitionJoinWindowApp.java  |   7 +-
 58 files changed, 1214 insertions(+), 1348 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java 
b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
index fad9cf8..e2f5d0d 100644
--- 
a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
@@ -30,7 +30,7 @@ import org.apache.samza.annotation.InterfaceStability;
  */
 @InterfaceStability.Unstable
 @FunctionalInterface
-public interface MapFunction<M, OM>  extends InitableFunction, 
ClosableFunction, Serializable {
+public interface MapFunction<M, OM> extends InitableFunction, 
ClosableFunction, Serializable {
 
   /**
    * Transforms the provided message into another message.

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java 
b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
index 8339429..45abb5d 100644
--- a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
+++ b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
@@ -24,7 +24,6 @@ import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.job.ApplicationStatus;
-import org.apache.samza.system.StreamSpec;
 
 import java.lang.reflect.Constructor;
 
@@ -124,25 +123,4 @@ public abstract class ApplicationRunner {
   public boolean waitForFinish(Duration timeout) {
     throw new UnsupportedOperationException(getClass().getName() + " does not 
support timed waitForFinish.");
   }
-
-  /**
-   * Constructs a {@link StreamSpec} from the configuration for the specified 
streamId.
-   *
-   * The stream configurations are read from the following properties in the 
config:
-   * {@code streams.{$streamId}.*}
-   * <br>
-   * All properties matching this pattern are assumed to be system-specific 
with two exceptions. The following two
-   * properties are Samza properties which are used to bind the stream to a 
system and a physical resource on that system.
-   *
-   * <ul>
-   *   <li>samza.system -         The name of the System on which this stream 
will be used. If this property isn't defined
-   *                              the stream will be associated with the 
System defined in {@code job.default.system}</li>
-   *   <li>samza.physical.name -  The system-specific name for this stream. It 
could be a file URN, topic name, or other identifer.
-   *                              If this property isn't defined the 
physical.name will be set to the streamId</li>
-   * </ul>
-   *
-   * @param streamId  The logical identifier for the stream in Samza.
-   * @return          The {@link StreamSpec} instance.
-   */
-  public abstract StreamSpec getStreamSpec(String streamId);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java 
b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
index cd86426..aa71f0e 100644
--- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
+++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
@@ -76,24 +76,10 @@ public class StreamSpec implements Serializable {
   private final int partitionCount;
 
   /**
-   * Bounded or unbounded stream
-   */
-  private final boolean isBounded;
-
-  /**
-   * broadcast stream to all tasks
-   */
-  private final boolean isBroadcast;
-
-  /**
    * A set of all system-specific configurations for the stream.
    */
   private final Map<String, String> config;
 
-  @Override
-  public String toString() {
-    return String.format("StreamSpec: id=%s, systemName=%s, pName=%s, 
partCount=%d.", id, systemName, physicalName, partitionCount);
-  }
   /**
    *  @param id           The application-unique logical identifier for the 
stream. It is used to distinguish between
    *                      streams in a Samza application so it must be unique 
in the context of one deployable unit.
@@ -107,7 +93,7 @@ public class StreamSpec implements Serializable {
    *                      Samza System abstraction. See {@link SystemFactory}
    */
   public StreamSpec(String id, String physicalName, String systemName) {
-    this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, false, false, 
Collections.emptyMap());
+    this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, 
Collections.emptyMap());
   }
 
   /**
@@ -126,7 +112,7 @@ public class StreamSpec implements Serializable {
    * @param partitionCount  The number of partitionts for the stream. A value 
of {@code 1} indicates unpartitioned.
    */
   public StreamSpec(String id, String physicalName, String systemName, int 
partitionCount) {
-    this(id, physicalName, systemName, partitionCount, false, false, 
Collections.emptyMap());
+    this(id, physicalName, systemName, partitionCount, Collections.emptyMap());
   }
 
   /**
@@ -141,12 +127,10 @@ public class StreamSpec implements Serializable {
    * @param systemName    The System name on which this stream will exist. 
Corresponds to a named implementation of the
    *                      Samza System abstraction. See {@link SystemFactory}
    *
-   * @param isBounded     The stream is bounded or not.
-   *
    * @param config        A map of properties for the stream. These may be 
System-specfic.
    */
-  public StreamSpec(String id, String physicalName, String systemName, boolean 
isBounded, Map<String, String> config) {
-    this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, isBounded, 
false, config);
+  public StreamSpec(String id, String physicalName, String systemName, 
Map<String, String> config) {
+    this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, config);
   }
 
   /**
@@ -161,16 +145,11 @@ public class StreamSpec implements Serializable {
    * @param systemName      The System name on which this stream will exist. 
Corresponds to a named implementation of the
    *                        Samza System abstraction. See {@link SystemFactory}
    *
-   * @param partitionCount  The number of partitionts for the stream. A value 
of {@code 1} indicates unpartitioned.
-   *
-   * @param isBounded       The stream is bounded or not.
-   *
-   * @param isBroadcast     This stream is broadcast or not.
+   * @param partitionCount  The number of partitions for the stream. A value 
of {@code 1} indicates unpartitioned.
    *
    * @param config          A map of properties for the stream. These may be 
System-specfic.
    */
-  public StreamSpec(String id, String physicalName, String systemName, int 
partitionCount,
-                    boolean isBounded, boolean isBroadcast, Map<String, 
String> config) {
+  public StreamSpec(String id, String physicalName, String systemName, int 
partitionCount, Map<String, String> config) {
     validateLogicalIdentifier("streamId", id);
     validateLogicalIdentifier("systemName", systemName);
 
@@ -183,8 +162,6 @@ public class StreamSpec implements Serializable {
     this.systemName = systemName;
     this.physicalName = physicalName;
     this.partitionCount = partitionCount;
-    this.isBounded = isBounded;
-    this.isBroadcast = isBroadcast;
 
     if (config != null) {
       this.config = Collections.unmodifiableMap(new HashMap<>(config));
@@ -202,15 +179,11 @@ public class StreamSpec implements Serializable {
    * @return                A copy of this StreamSpec with the specified 
partitionCount.
    */
   public StreamSpec copyWithPartitionCount(int partitionCount) {
-    return new StreamSpec(id, physicalName, systemName, partitionCount, 
this.isBounded, this.isBroadcast, config);
+    return new StreamSpec(id, physicalName, systemName, partitionCount, 
config);
   }
 
   public StreamSpec copyWithPhysicalName(String physicalName) {
-    return new StreamSpec(id, physicalName, systemName, partitionCount, 
this.isBounded, this.isBroadcast, config);
-  }
-
-  public StreamSpec copyWithBroadCast() {
-    return new StreamSpec(id, physicalName, systemName, partitionCount, 
this.isBounded, true, config);
+    return new StreamSpec(id, physicalName, systemName, partitionCount, 
config);
   }
 
   public String getId() {
@@ -253,14 +226,6 @@ public class StreamSpec implements Serializable {
     return id.equals(COORDINATOR_STREAM_ID);
   }
 
-  public boolean isBounded() {
-    return isBounded;
-  }
-
-  public boolean isBroadcast() {
-    return isBroadcast;
-  }
-
   private void validateLogicalIdentifier(String identifierName, String 
identifierValue) {
     if (identifierValue == null || !identifierValue.matches("[A-Za-z0-9_-]+")) 
{
       throw new IllegalArgumentException(String.format("Identifier '%s' is 
'%s'. It must match the expression [A-Za-z0-9_-]+", identifierName, 
identifierValue));
@@ -297,4 +262,9 @@ public class StreamSpec implements Serializable {
   public static StreamSpec createStreamAppenderStreamSpec(String physicalName, 
String systemName, int partitionCount) {
     return new StreamSpec(STREAM_APPENDER_ID, physicalName, systemName, 
partitionCount);
   }
+
+  @Override
+  public String toString() {
+    return String.format("StreamSpec: id=%s, systemName=%s, pName=%s, 
partCount=%d.", id, systemName, physicalName, partitionCount);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
index 29dd3ef..c5b2183 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
@@ -32,6 +32,7 @@ import org.apache.samza.checkpoint.CheckpointManagerFactory;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.StreamUtil;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -102,7 +103,7 @@ public class TaskConfigJava extends MapConfig {
       } else {
         String systemStreamName = systemStreamPartition.substring(0, 
hashPosition);
         String partitionSegment = systemStreamPartition.substring(hashPosition 
+ 1);
-        SystemStream systemStream = 
Util.getSystemStreamFromNames(systemStreamName);
+        SystemStream systemStream = 
StreamUtil.getSystemStreamFromNames(systemStreamName);
 
         if (Pattern.matches(BROADCAST_STREAM_PATTERN, partitionSegment)) {
           systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, 
new Partition(Integer.valueOf(partitionSegment))));

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java 
b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index 48f939c..ef52e90 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -34,6 +34,7 @@ import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StreamConfig;
 import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
@@ -43,6 +44,8 @@ import org.apache.samza.table.TableSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.samza.util.StreamUtil.*;
+
 
 /**
  * The ExecutionPlanner creates the physical execution graph for the 
StreamGraph, and
@@ -93,8 +96,9 @@ public class ExecutionPlanner {
    */
   /* package private */ JobGraph createJobGraph(OperatorSpecGraph specGraph) {
     JobGraph jobGraph = new JobGraph(config, specGraph);
-    Set<StreamSpec> sourceStreams = new 
HashSet<>(specGraph.getInputOperators().keySet());
-    Set<StreamSpec> sinkStreams = new 
HashSet<>(specGraph.getOutputStreams().keySet());
+    StreamConfig streamConfig = new StreamConfig(config);
+    Set<StreamSpec> sourceStreams = 
getStreamSpecs(specGraph.getInputOperators().keySet(), streamConfig);
+    Set<StreamSpec> sinkStreams = 
getStreamSpecs(specGraph.getOutputStreams().keySet(), streamConfig);
     Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
     Set<TableSpec> tables = new HashSet<>(specGraph.getTables().keySet());
     intStreams.retainAll(sinkStreams);
@@ -128,7 +132,7 @@ public class ExecutionPlanner {
    */
   /* package private */ void calculatePartitions(JobGraph jobGraph) {
     // calculate the partitions for the input streams of join operators
-    calculateJoinInputPartitions(jobGraph);
+    calculateJoinInputPartitions(jobGraph, config);
 
     // calculate the partitions for the rest of intermediate streams
     calculateIntStreamPartitions(jobGraph, config);
@@ -172,7 +176,7 @@ public class ExecutionPlanner {
   /**
    * Calculate the partitions for the input streams of join operators
    */
-  /* package private */ static void calculateJoinInputPartitions(JobGraph 
jobGraph) {
+  /* package private */ static void calculateJoinInputPartitions(JobGraph 
jobGraph, Config config) {
     // mapping from a source stream to all join specs reachable from it
     Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges = 
HashMultimap.create();
     // reverse mapping of the above
@@ -183,10 +187,10 @@ public class ExecutionPlanner {
     Set<OperatorSpec> visited = new HashSet<>();
 
     jobGraph.getSpecGraph().getInputOperators().entrySet().forEach(entry -> {
-        StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(entry.getKey());
+        StreamConfig streamConfig = new StreamConfig(config);
+        StreamEdge streamEdge = 
jobGraph.getOrCreateStreamEdge(getStreamSpec(entry.getKey(), streamConfig));
         // Traverses the StreamGraph to find and update mappings for all Joins 
reachable from this input StreamEdge
-        findReachableJoins(entry.getValue(), streamEdge, 
joinSpecToStreamEdges, streamEdgeToJoinSpecs,
-            joinQ, visited);
+        findReachableJoins(entry.getValue(), streamEdge, 
joinSpecToStreamEdges, streamEdgeToJoinSpecs, joinQ, visited);
       });
 
     // At this point, joinQ contains joinSpecs where at least one of the input 
stream edge partitions is known.
@@ -203,7 +207,7 @@ public class ExecutionPlanner {
           } else if (partitions != edgePartitions) {
             throw  new SamzaException(String.format(
                 "Unable to resolve input partitions of stream %s for join. 
Expected: %d, Actual: %d",
-                edge.getFormattedSystemStream(), partitions, edgePartitions));
+                edge.getName(), partitions, edgePartitions));
           }
         }
       }
@@ -282,7 +286,7 @@ public class ExecutionPlanner {
   private static void validatePartitions(JobGraph jobGraph) {
     for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
       if (edge.getPartitionCount() <= 0) {
-        throw new SamzaException(String.format("Failure to assign the 
partitions to Stream %s", edge.getFormattedSystemStream()));
+        throw new SamzaException(String.format("Failure to assign the 
partitions to Stream %s", edge.getName()));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java 
b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
index 843db85..2f210f2 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
@@ -87,7 +87,7 @@ import org.slf4j.LoggerFactory;
   @Override
   public List<StreamSpec> getIntermediateStreams() {
     return getIntermediateStreamEdges().stream()
-        .map(streamEdge -> streamEdge.getStreamSpec())
+        .map(StreamEdge::getStreamSpec)
         .collect(Collectors.toList());
   }
 
@@ -187,12 +187,10 @@ import org.slf4j.LoggerFactory;
     String streamId = streamSpec.getId();
     StreamEdge edge = edges.get(streamId);
     if (edge == null) {
-      edge = new StreamEdge(streamSpec, isIntermediate, config);
+      boolean isBroadcast = specGraph.getBroadcastStreams().contains(streamId);
+      edge = new StreamEdge(streamSpec, isIntermediate, isBroadcast, config);
       edges.put(streamId, edge);
     }
-    if (streamSpec.isBroadcast()) {
-      edge.setPartitionCount(1);
-    }
     return edge;
   }
 
@@ -262,11 +260,11 @@ import org.slf4j.LoggerFactory;
     sources.forEach(edge -> {
         if (!edge.getSourceNodes().isEmpty()) {
           throw new IllegalArgumentException(
-              String.format("Source stream %s should not have producers.", 
edge.getFormattedSystemStream()));
+              String.format("Source stream %s should not have producers.", 
edge.getName()));
         }
         if (edge.getTargetNodes().isEmpty()) {
           throw new IllegalArgumentException(
-              String.format("Source stream %s should have consumers.", 
edge.getFormattedSystemStream()));
+              String.format("Source stream %s should have consumers.", 
edge.getName()));
         }
       });
   }
@@ -278,11 +276,11 @@ import org.slf4j.LoggerFactory;
     sinks.forEach(edge -> {
         if (!edge.getTargetNodes().isEmpty()) {
           throw new IllegalArgumentException(
-              String.format("Sink stream %s should not have consumers", 
edge.getFormattedSystemStream()));
+              String.format("Sink stream %s should not have consumers", 
edge.getName()));
         }
         if (edge.getSourceNodes().isEmpty()) {
           throw new IllegalArgumentException(
-              String.format("Sink stream %s should have producers", 
edge.getFormattedSystemStream()));
+              String.format("Sink stream %s should have producers", 
edge.getName()));
         }
       });
   }
@@ -298,7 +296,7 @@ import org.slf4j.LoggerFactory;
     internalEdges.forEach(edge -> {
         if (edge.getSourceNodes().isEmpty() || 
edge.getTargetNodes().isEmpty()) {
           throw new IllegalArgumentException(
-              String.format("Internal stream %s should have both producers and 
consumers", edge.getFormattedSystemStream()));
+              String.format("Internal stream %s should have both producers and 
consumers", edge.getName()));
         }
       });
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
 
b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
index 298042b..4f2aa23 100644
--- 
a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
@@ -170,10 +170,10 @@ import org.codehaus.jackson.map.ObjectMapper;
   private OperatorGraphJson buildOperatorGraphJson(JobNode jobNode) {
     OperatorGraphJson opGraph = new OperatorGraphJson();
     opGraph.inputStreams = new ArrayList<>();
-    jobNode.getSpecGraph().getInputOperators().forEach((streamSpec, 
operatorSpec) -> {
+    jobNode.getSpecGraph().getInputOperators().forEach((streamId, 
operatorSpec) -> {
         StreamJson inputJson = new StreamJson();
         opGraph.inputStreams.add(inputJson);
-        inputJson.streamId = streamSpec.getId();
+        inputJson.streamId = streamId;
         Collection<OperatorSpec> specs = 
operatorSpec.getRegisteredOperatorSpecs();
         inputJson.nextOperatorIds = 
specs.stream().map(OperatorSpec::getOpId).collect(Collectors.toSet());
 
@@ -181,9 +181,9 @@ import org.codehaus.jackson.map.ObjectMapper;
       });
 
     opGraph.outputStreams = new ArrayList<>();
-    jobNode.getSpecGraph().getOutputStreams().keySet().forEach(streamSpec -> {
+    jobNode.getSpecGraph().getOutputStreams().keySet().forEach(streamId -> {
         StreamJson outputJson = new StreamJson();
-        outputJson.streamId = streamSpec.getId();
+        outputJson.streamId = streamId;
         opGraph.outputStreams.add(outputJson);
       });
     return opGraph;
@@ -219,10 +219,10 @@ import org.codehaus.jackson.map.ObjectMapper;
 
     if (spec instanceof OutputOperatorSpec) {
       OutputStreamImpl outputStream = ((OutputOperatorSpec) 
spec).getOutputStream();
-      map.put("outputStreamId", outputStream.getStreamSpec().getId());
+      map.put("outputStreamId", outputStream.getStreamId());
     } else if (spec instanceof PartitionByOperatorSpec) {
       OutputStreamImpl outputStream = ((PartitionByOperatorSpec) 
spec).getOutputStream();
-      map.put("outputStreamId", outputStream.getStreamSpec().getId());
+      map.put("outputStreamId", outputStream.getStreamId());
     }
 
     if (spec instanceof StreamTableJoinOperatorSpec) {

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java 
b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index 288b1a1..dba47e1 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -49,8 +49,8 @@ import org.apache.samza.table.TableConfigGenerator;
 import org.apache.samza.util.MathUtil;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerializableSerde;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.table.TableSpec;
+import org.apache.samza.util.StreamUtil;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -133,8 +133,8 @@ public class JobNode {
     final List<String> inputs = new ArrayList<>();
     final List<String> broadcasts = new ArrayList<>();
     for (StreamEdge inEdge : inEdges) {
-      String formattedSystemStream = inEdge.getFormattedSystemStream();
-      if (inEdge.getStreamSpec().isBroadcast()) {
+      String formattedSystemStream = inEdge.getName();
+      if (inEdge.isBroadcast()) {
         broadcasts.add(formattedSystemStream + "#0");
       } else {
         inputs.add(formattedSystemStream);
@@ -184,9 +184,9 @@ public class JobNode {
         List<String> sideInputs = tableSpec.getSideInputs();
         if (sideInputs != null && !sideInputs.isEmpty()) {
           sideInputs.stream()
-              .map(sideInput -> Util.getSystemStreamFromNameOrId(config, 
sideInput))
+              .map(sideInput -> StreamUtil.getSystemStreamFromNameOrId(config, 
sideInput))
               .forEach(systemStream -> {
-                  inputs.add(Util.getNameFromSystemStream(systemStream));
+                  inputs.add(StreamUtil.getNameFromSystemStream(systemStream));
                   configs.put(String.format(StreamConfig.STREAM_PREFIX() + 
StreamConfig.BOOTSTRAP(),
                       systemStream.getSystem(), systemStream.getStream()), 
"true");
                 });
@@ -230,17 +230,17 @@ public class JobNode {
     // collect all key and msg serde instances for streams
     Map<String, Serde> streamKeySerdes = new HashMap<>();
     Map<String, Serde> streamMsgSerdes = new HashMap<>();
-    Map<StreamSpec, InputOperatorSpec> inputOperators = 
specGraph.getInputOperators();
+    Map<String, InputOperatorSpec> inputOperators = 
specGraph.getInputOperators();
     inEdges.forEach(edge -> {
         String streamId = edge.getStreamSpec().getId();
-        InputOperatorSpec inputOperatorSpec = 
inputOperators.get(edge.getStreamSpec());
+        InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId);
         streamKeySerdes.put(streamId, inputOperatorSpec.getKeySerde());
         streamMsgSerdes.put(streamId, inputOperatorSpec.getValueSerde());
       });
-    Map<StreamSpec, OutputStreamImpl> outputStreams = 
specGraph.getOutputStreams();
+    Map<String, OutputStreamImpl> outputStreams = specGraph.getOutputStreams();
     outEdges.forEach(edge -> {
         String streamId = edge.getStreamSpec().getId();
-        OutputStreamImpl outputStream = 
outputStreams.get(edge.getStreamSpec());
+        OutputStreamImpl outputStream = outputStreams.get(streamId);
         streamKeySerdes.put(streamId, outputStream.getKeySerde());
         streamMsgSerdes.put(streamId, outputStream.getValueSerde());
       });

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java 
b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
index b4c93d9..f2f0310 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
@@ -29,7 +29,7 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
-import org.apache.samza.util.Util;
+import org.apache.samza.util.StreamUtil;
 
 
 /**
@@ -41,23 +41,24 @@ public class StreamEdge {
   public static final int PARTITIONS_UNKNOWN = -1;
 
   private final StreamSpec streamSpec;
+  private final boolean isBroadcast;
+  private final boolean isIntermediate;
   private final List<JobNode> sourceNodes = new ArrayList<>();
   private final List<JobNode> targetNodes = new ArrayList<>();
   private final Config config;
+  private final String name;
 
-  private String name = "";
   private int partitions = PARTITIONS_UNKNOWN;
-  private final boolean isIntermediate;
-
-  StreamEdge(StreamSpec streamSpec, Config config) {
-    this(streamSpec, false, config);
-  }
 
-  StreamEdge(StreamSpec streamSpec, boolean isIntermediate, Config config) {
+  StreamEdge(StreamSpec streamSpec, boolean isIntermediate, boolean 
isBroadcast, Config config) {
     this.streamSpec = streamSpec;
-    this.name = Util.getNameFromSystemStream(getSystemStream());
     this.isIntermediate = isIntermediate;
+    this.isBroadcast = isBroadcast;
     this.config = config;
+    if (isBroadcast) {
+      partitions = 1;
+    }
+    this.name = StreamUtil.getNameFromSystemStream(getSystemStream());
   }
 
   void addSourceNode(JobNode sourceNode) {
@@ -85,10 +86,6 @@ public class StreamEdge {
     return getStreamSpec().toSystemStream();
   }
 
-  String getFormattedSystemStream() {
-    return Util.getNameFromSystemStream(getSystemStream());
-  }
-
   List<JobNode> getSourceNodes() {
     return sourceNodes;
   }
@@ -109,10 +106,6 @@ public class StreamEdge {
     return name;
   }
 
-  void setName(String name) {
-    this.name = name;
-  }
-
   boolean isIntermediate() {
     return isIntermediate;
   }
@@ -128,12 +121,13 @@ public class StreamEdge {
       
config.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), 
spec.getId()), "oldest");
       config.put(String.format(StreamConfig.PRIORITY_FOR_STREAM_ID(), 
spec.getId()), String.valueOf(Integer.MAX_VALUE));
     }
-    if (spec.isBounded()) {
-      config.put(String.format(StreamConfig.IS_BOUNDED_FOR_STREAM_ID(), 
spec.getId()), "true");
-    }
     spec.getConfig().forEach((property, value) -> {
         config.put(String.format(StreamConfig.STREAM_ID_PREFIX(), 
spec.getId()) + property, value);
       });
     return new MapConfig(config);
   }
+
+  public boolean isBroadcast() {
+    return isBroadcast;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java 
b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
index 7f60f96..2921f3b 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
@@ -38,6 +38,7 @@ import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.util.StreamUtil;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -142,7 +143,7 @@ public class StreamManager {
             .getOrElse(defaultValue(null));
         if (changelog != null) {
           LOGGER.info("Clear store {} changelog {}", store, changelog);
-          SystemStream systemStream = Util.getSystemStreamFromNames(changelog);
+          SystemStream systemStream = 
StreamUtil.getSystemStreamFromNames(changelog);
           StreamSpec spec = 
StreamSpec.createChangeLogStreamSpec(systemStream.getStream(), 
systemStream.getSystem(), 1);
           systemAdmins.getSystemAdmin(spec.getSystemName()).clearStream(spec);
         }

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java 
b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
index ba51c7c..b6c3dae 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
@@ -29,7 +29,6 @@ import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.serializers.SerializableSerde;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.table.TableSpec;
 
 
@@ -41,8 +40,9 @@ import org.apache.samza.table.TableSpec;
  */
 public class OperatorSpecGraph implements Serializable {
   // We use a LHM for deterministic order in initializing and closing 
operators.
-  private final Map<StreamSpec, InputOperatorSpec> inputOperators;
-  private final Map<StreamSpec, OutputStreamImpl> outputStreams;
+  private final Map<String, InputOperatorSpec> inputOperators;
+  private final Map<String, OutputStreamImpl> outputStreams;
+  private final Set<String> broadcastStreams;
   private final Map<TableSpec, TableImpl> tables;
   private final Set<OperatorSpec> allOpSpecs;
   private final boolean hasWindowOrJoins;
@@ -54,20 +54,25 @@ public class OperatorSpecGraph implements Serializable {
   OperatorSpecGraph(StreamGraphSpec graphSpec) {
     this.inputOperators = graphSpec.getInputOperators();
     this.outputStreams = graphSpec.getOutputStreams();
+    this.broadcastStreams = graphSpec.getBroadcastStreams();
     this.tables = graphSpec.getTables();
     this.allOpSpecs = Collections.unmodifiableSet(this.findAllOperatorSpecs());
     this.hasWindowOrJoins = checkWindowOrJoins();
     this.serializedOpSpecGraph = opSpecGraphSerde.toBytes(this);
   }
 
-  public Map<StreamSpec, InputOperatorSpec> getInputOperators() {
+  public Map<String, InputOperatorSpec> getInputOperators() {
     return inputOperators;
   }
 
-  public Map<StreamSpec, OutputStreamImpl> getOutputStreams() {
+  public Map<String, OutputStreamImpl> getOutputStreams() {
     return outputStreams;
   }
 
+  public Set<String> getBroadcastStreams() {
+    return broadcastStreams;
+  }
+
   public Map<TableSpec, TableImpl> getTables() {
     return tables;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java 
b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java
index ea9690b..a187b94 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java
@@ -34,11 +34,9 @@ import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.operators.spec.OperatorSpecs;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
-import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.TableSpec;
 import org.slf4j.Logger;
@@ -55,13 +53,13 @@ import com.google.common.base.Preconditions;
  */
 public class StreamGraphSpec implements StreamGraph {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamGraphSpec.class);
-  private static final Pattern USER_DEFINED_ID_PATTERN = 
Pattern.compile("[\\d\\w-_.]+");
+  private static final Pattern ID_PATTERN = Pattern.compile("[\\d\\w-_.]+");
 
   // We use a LHM for deterministic order in initializing and closing 
operators.
-  private final Map<StreamSpec, InputOperatorSpec> inputOperators = new 
LinkedHashMap<>();
-  private final Map<StreamSpec, OutputStreamImpl> outputStreams = new 
LinkedHashMap<>();
+  private final Map<String, InputOperatorSpec> inputOperators = new 
LinkedHashMap<>();
+  private final Map<String, OutputStreamImpl> outputStreams = new 
LinkedHashMap<>();
+  private final Set<String> broadcastStreams = new HashSet<>();
   private final Map<TableSpec, TableImpl> tables = new LinkedHashMap<>();
-  private final ApplicationRunner runner;
   private final Config config;
 
   /**
@@ -74,10 +72,7 @@ public class StreamGraphSpec implements StreamGraph {
   private Serde<?> defaultSerde = new KVSerde(new NoOpSerde(), new 
NoOpSerde());
   private ContextManager contextManager = null;
 
-  public StreamGraphSpec(ApplicationRunner runner, Config config) {
-    // TODO: SAMZA-1118 - Move StreamSpec and ApplicationRunner out of 
StreamGraphSpec once Systems
-    // can use streamId to send and receive messages.
-    this.runner = runner;
+  public StreamGraphSpec(Config config) {
     this.config = config;
   }
 
@@ -91,15 +86,15 @@ public class StreamGraphSpec implements StreamGraph {
 
   @Override
   public <M> MessageStream<M> getInputStream(String streamId, Serde<M> serde) {
-    StreamSpec streamSpec = runner.getStreamSpec(streamId);
-    Preconditions.checkState(streamSpec != null, "No StreamSpec found for 
streamId: " + streamId);
+    Preconditions.checkState(isValidId(streamId),
+        "streamId must be non-empty and must not contain spaces or special 
characters: " + streamId);
     Preconditions.checkNotNull(serde, "serde must not be null for an input 
stream.");
-    Preconditions.checkState(!inputOperators.containsKey(streamSpec),
+    Preconditions.checkState(!inputOperators.containsKey(streamId),
         "getInputStream must not be called multiple times with the same 
streamId: " + streamId);
 
     KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
-    if (outputStreams.containsKey(streamSpec)) {
-      OutputStreamImpl outputStream = outputStreams.get(streamSpec);
+    if (outputStreams.containsKey(streamId)) {
+      OutputStreamImpl outputStream = outputStreams.get(streamId);
       Serde keySerde = outputStream.getKeySerde();
       Serde valueSerde = outputStream.getValueSerde();
       Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && 
kvSerdes.getValue().equals(valueSerde),
@@ -109,10 +104,10 @@ public class StreamGraphSpec implements StreamGraph {
 
     boolean isKeyed = serde instanceof KVSerde;
     InputOperatorSpec inputOperatorSpec =
-        OperatorSpecs.createInputOperatorSpec(streamSpec, kvSerdes.getKey(), 
kvSerdes.getValue(),
+        OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), 
kvSerdes.getValue(),
             isKeyed, this.getNextOpId(OpCode.INPUT, null));
-    inputOperators.put(streamSpec, inputOperatorSpec);
-    return new MessageStreamImpl<>(this, inputOperators.get(streamSpec));
+    inputOperators.put(streamId, inputOperatorSpec);
+    return new MessageStreamImpl<>(this, inputOperators.get(streamId));
   }
 
   @Override
@@ -122,15 +117,15 @@ public class StreamGraphSpec implements StreamGraph {
 
   @Override
   public <M> OutputStream<M> getOutputStream(String streamId, Serde<M> serde) {
-    StreamSpec streamSpec = runner.getStreamSpec(streamId);
-    Preconditions.checkState(streamSpec != null, "No StreamSpec found for 
streamId: " + streamId);
+    Preconditions.checkState(isValidId(streamId),
+        "streamId must be non-empty and must not contain spaces or special 
characters: " + streamId);
     Preconditions.checkNotNull(serde, "serde must not be null for an output 
stream.");
-    Preconditions.checkState(!outputStreams.containsKey(streamSpec),
+    Preconditions.checkState(!outputStreams.containsKey(streamId),
         "getOutputStream must not be called multiple times with the same 
streamId: " + streamId);
 
     KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
-    if (inputOperators.containsKey(streamSpec)) {
-      InputOperatorSpec inputOperatorSpec = inputOperators.get(streamSpec);
+    if (inputOperators.containsKey(streamId)) {
+      InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId);
       Serde keySerde = inputOperatorSpec.getKeySerde();
       Serde valueSerde = inputOperatorSpec.getValueSerde();
       Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && 
kvSerdes.getValue().equals(valueSerde),
@@ -139,8 +134,8 @@ public class StreamGraphSpec implements StreamGraph {
     }
 
     boolean isKeyed = serde instanceof KVSerde;
-    outputStreams.put(streamSpec, new OutputStreamImpl<>(streamSpec, 
kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
-    return outputStreams.get(streamSpec);
+    outputStreams.put(streamId, new OutputStreamImpl<>(streamId, 
kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
+    return outputStreams.get(streamId);
   }
 
   @Override
@@ -183,8 +178,8 @@ public class StreamGraphSpec implements StreamGraph {
    * @return the unique ID for the next operator in the graph
    */
   public String getNextOpId(OpCode opCode, String userDefinedId) {
-    if (StringUtils.isNotBlank(userDefinedId) && 
!USER_DEFINED_ID_PATTERN.matcher(userDefinedId).matches()) {
-      throw new SamzaException("Operator ID must not contain spaces and 
special characters: " + userDefinedId);
+    if (StringUtils.isNotBlank(userDefinedId) && 
!ID_PATTERN.matcher(userDefinedId).matches()) {
+      throw new SamzaException("Operator ID must not contain spaces or special 
characters: " + userDefinedId);
     }
 
     String nextOpId = String.format("%s-%s-%s-%s",
@@ -234,17 +229,10 @@ public class StreamGraphSpec implements StreamGraph {
    * @param isBroadcast whether the stream is a broadcast stream.
    * @param <M> the type of messages in the intermediate {@link MessageStream}
    * @return  the intermediate {@link MessageStreamImpl}
-   *
-   * TODO: once SAMZA-1566 is resolved, we should be able to pass in the 
StreamSpec directly.
    */
   @VisibleForTesting
   <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, 
Serde<M> serde, boolean isBroadcast) {
-    StreamSpec streamSpec = runner.getStreamSpec(streamId);
-    if (isBroadcast) {
-      streamSpec = streamSpec.copyWithBroadCast();
-    }
-
-    Preconditions.checkState(!inputOperators.containsKey(streamSpec) && 
!outputStreams.containsKey(streamSpec),
+    Preconditions.checkState(!inputOperators.containsKey(streamId) && 
!outputStreams.containsKey(streamId),
         "getIntermediateStream must not be called multiple times with the same 
streamId: " + streamId);
 
     if (serde == null) {
@@ -252,28 +240,37 @@ public class StreamGraphSpec implements StreamGraph {
       serde = (Serde<M>) defaultSerde;
     }
 
+    if (isBroadcast) broadcastStreams.add(streamId);
     boolean isKeyed = serde instanceof KVSerde;
     KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
     InputOperatorSpec inputOperatorSpec =
-        OperatorSpecs.createInputOperatorSpec(streamSpec, kvSerdes.getKey(), 
kvSerdes.getValue(),
+        OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), 
kvSerdes.getValue(),
             isKeyed, this.getNextOpId(OpCode.INPUT, null));
-    inputOperators.put(streamSpec, inputOperatorSpec);
-    outputStreams.put(streamSpec, new OutputStreamImpl(streamSpec, 
kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
-    return new IntermediateMessageStreamImpl<>(this, 
inputOperators.get(streamSpec), outputStreams.get(streamSpec));
+    inputOperators.put(streamId, inputOperatorSpec);
+    outputStreams.put(streamId, new OutputStreamImpl(streamId, 
kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
+    return new IntermediateMessageStreamImpl<>(this, 
inputOperators.get(streamId), outputStreams.get(streamId));
   }
 
-  Map<StreamSpec, InputOperatorSpec> getInputOperators() {
+  Map<String, InputOperatorSpec> getInputOperators() {
     return Collections.unmodifiableMap(inputOperators);
   }
 
-  Map<StreamSpec, OutputStreamImpl> getOutputStreams() {
+  Map<String, OutputStreamImpl> getOutputStreams() {
     return Collections.unmodifiableMap(outputStreams);
   }
 
+  Set<String> getBroadcastStreams() {
+    return Collections.unmodifiableSet(broadcastStreams);
+  }
+
   Map<TableSpec, TableImpl> getTables() {
     return Collections.unmodifiableMap(tables);
   }
 
+  private boolean isValidId(String id) {
+    return StringUtils.isNotBlank(id) && ID_PATTERN.matcher(id).matches();
+  }
+
   private KV<Serde, Serde> getKVSerdes(String streamId, Serde serde) {
     Serde keySerde, valueSerde;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java
index 8df670e..99ed089 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java
@@ -40,9 +40,9 @@ class BroadcastOperatorImpl<M> extends OperatorImpl<M, Void> {
   private final SystemStream systemStream;
   private final String taskName;
 
-  BroadcastOperatorImpl(BroadcastOperatorSpec<M> broadcastOpSpec, TaskContext 
context) {
+  BroadcastOperatorImpl(BroadcastOperatorSpec<M> broadcastOpSpec, SystemStream 
systemStream, TaskContext context) {
     this.broadcastOpSpec = broadcastOpSpec;
-    this.systemStream = broadcastOpSpec.getOutputStream().getSystemStream();
+    this.systemStream = systemStream;
     this.taskName = context.getTaskName().getTaskName();
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index df73e48..7f62e00 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -22,6 +22,7 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.StreamConfig;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.operators.KV;
@@ -96,12 +97,14 @@ public class OperatorImplGraph {
    */
   public OperatorImplGraph(OperatorSpecGraph specGraph, Config config, 
TaskContext context, Clock clock) {
     this.clock = clock;
-
+    StreamConfig streamConfig = new StreamConfig(config);
     TaskContextImpl taskContext = (TaskContextImpl) context;
-    Map<SystemStream, Integer> producerTaskCounts = 
hasIntermediateStreams(specGraph) ?
-        
getProducerTaskCountForIntermediateStreams(getStreamToConsumerTasks(taskContext.getJobModel()),
-            getIntermediateToInputStreamsMap(specGraph)) :
-        Collections.EMPTY_MAP;
+    Map<SystemStream, Integer> producerTaskCounts =
+        hasIntermediateStreams(specGraph)
+            ? getProducerTaskCountForIntermediateStreams(
+                getStreamToConsumerTasks(taskContext.getJobModel()),
+                getIntermediateToInputStreamsMap(specGraph, streamConfig))
+            : Collections.EMPTY_MAP;
     producerTaskCounts.forEach((stream, count) -> {
         LOG.info("{} has {} producer tasks.", stream, count);
       });
@@ -113,8 +116,8 @@ public class OperatorImplGraph {
     taskContext.registerObject(WatermarkStates.class.getName(),
         new WatermarkStates(context.getSystemStreamPartitions(), 
producerTaskCounts));
 
-    specGraph.getInputOperators().forEach((streamSpec, inputOpSpec) -> {
-        SystemStream systemStream = new 
SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName());
+    specGraph.getInputOperators().forEach((streamId, inputOpSpec) -> {
+        SystemStream systemStream = 
streamConfig.streamIdToSystemStream(streamId);
         InputOperatorImpl inputOperatorImpl =
             (InputOperatorImpl) createAndRegisterOperatorImpl(null, 
inputOpSpec, systemStream, config, context);
         this.inputOperators.put(systemStream, inputOperatorImpl);
@@ -210,6 +213,7 @@ public class OperatorImplGraph {
    */
   OperatorImpl createOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec 
operatorSpec,
       Config config, TaskContext context) {
+    StreamConfig streamConfig = new StreamConfig(config);
     if (operatorSpec instanceof InputOperatorSpec) {
       return new InputOperatorImpl((InputOperatorSpec) operatorSpec);
     } else if (operatorSpec instanceof StreamOperatorSpec) {
@@ -217,9 +221,13 @@ public class OperatorImplGraph {
     } else if (operatorSpec instanceof SinkOperatorSpec) {
       return new SinkOperatorImpl((SinkOperatorSpec) operatorSpec, config, 
context);
     } else if (operatorSpec instanceof OutputOperatorSpec) {
-      return new OutputOperatorImpl((OutputOperatorSpec) operatorSpec);
+      String streamId = ((OutputOperatorSpec) 
operatorSpec).getOutputStream().getStreamId();
+      SystemStream systemStream = 
streamConfig.streamIdToSystemStream(streamId);
+      return new OutputOperatorImpl((OutputOperatorSpec) operatorSpec, 
systemStream);
     } else if (operatorSpec instanceof PartitionByOperatorSpec) {
-      return new PartitionByOperatorImpl((PartitionByOperatorSpec) 
operatorSpec, config, context);
+      String streamId = ((PartitionByOperatorSpec) 
operatorSpec).getOutputStream().getStreamId();
+      SystemStream systemStream = 
streamConfig.streamIdToSystemStream(streamId);
+      return new PartitionByOperatorImpl((PartitionByOperatorSpec) 
operatorSpec, systemStream, context);
     } else if (operatorSpec instanceof WindowOperatorSpec) {
       return new WindowOperatorImpl((WindowOperatorSpec) operatorSpec, clock);
     } else if (operatorSpec instanceof JoinOperatorSpec) {
@@ -231,7 +239,9 @@ public class OperatorImplGraph {
     } else if (operatorSpec instanceof SendToTableOperatorSpec) {
       return new SendToTableOperatorImpl((SendToTableOperatorSpec) 
operatorSpec, config, context);
     } else if (operatorSpec instanceof BroadcastOperatorSpec) {
-      return new BroadcastOperatorImpl((BroadcastOperatorSpec) operatorSpec, 
context);
+      String streamId = ((BroadcastOperatorSpec) 
operatorSpec).getOutputStream().getStreamId();
+      SystemStream systemStream = 
streamConfig.streamIdToSystemStream(streamId);
+      return new BroadcastOperatorImpl((BroadcastOperatorSpec) operatorSpec, 
systemStream, context);
     }
     throw new IllegalArgumentException(
         String.format("Unsupported OperatorSpec: %s", 
operatorSpec.getClass().getName()));
@@ -323,10 +333,6 @@ public class OperatorImplGraph {
     };
   }
 
-  private boolean hasIntermediateStreams(OperatorSpecGraph specGraph) {
-    return !Collections.disjoint(specGraph.getInputOperators().keySet(), 
specGraph.getOutputStreams().keySet());
-  }
-
   /**
    * calculate the task count that produces to each intermediate streams
    * @param streamToConsumerTasks input streams to task mapping
@@ -337,12 +343,11 @@ public class OperatorImplGraph {
       Multimap<SystemStream, String> streamToConsumerTasks,
       Multimap<SystemStream, SystemStream> intermediateToInputStreams) {
     Map<SystemStream, Integer> result = new HashMap<>();
-    intermediateToInputStreams.asMap().entrySet().forEach(entry -> {
+    intermediateToInputStreams.asMap().entrySet().forEach(entry ->
         result.put(entry.getKey(),
             entry.getValue().stream()
                 .flatMap(systemStream -> 
streamToConsumerTasks.get(systemStream).stream())
-                .collect(Collectors.toSet()).size());
-      });
+                .collect(Collectors.toSet()).size()));
     return result;
   }
 
@@ -368,25 +373,34 @@ public class OperatorImplGraph {
    * @param specGraph the user {@link OperatorSpecGraph}
    * @return mapping from output streams to input streams
    */
-  static Multimap<SystemStream, SystemStream> 
getIntermediateToInputStreamsMap(OperatorSpecGraph specGraph) {
+  static Multimap<SystemStream, SystemStream> getIntermediateToInputStreamsMap(
+      OperatorSpecGraph specGraph, StreamConfig streamConfig) {
     Multimap<SystemStream, SystemStream> outputToInputStreams = 
HashMultimap.create();
     specGraph.getInputOperators().entrySet().stream()
-        .forEach(
-            entry -> computeOutputToInput(entry.getKey().toSystemStream(), 
entry.getValue(), outputToInputStreams));
+        .forEach(entry -> {
+            SystemStream systemStream = 
streamConfig.streamIdToSystemStream(entry.getKey());
+            computeOutputToInput(systemStream, entry.getValue(), 
outputToInputStreams, streamConfig);
+          });
     return outputToInputStreams;
   }
 
   private static void computeOutputToInput(SystemStream input, OperatorSpec 
opSpec,
-      Multimap<SystemStream, SystemStream> outputToInputStreams) {
+      Multimap<SystemStream, SystemStream> outputToInputStreams, StreamConfig 
streamConfig) {
     if (opSpec instanceof PartitionByOperatorSpec) {
       PartitionByOperatorSpec spec = (PartitionByOperatorSpec) opSpec;
-      
outputToInputStreams.put(spec.getOutputStream().getStreamSpec().toSystemStream(),
 input);
+      SystemStream systemStream = 
streamConfig.streamIdToSystemStream(spec.getOutputStream().getStreamId());
+      outputToInputStreams.put(systemStream, input);
     } else if (opSpec instanceof BroadcastOperatorSpec) {
       BroadcastOperatorSpec spec = (BroadcastOperatorSpec) opSpec;
-      
outputToInputStreams.put(spec.getOutputStream().getStreamSpec().toSystemStream(),
 input);
+      SystemStream systemStream = 
streamConfig.streamIdToSystemStream(spec.getOutputStream().getStreamId());
+      outputToInputStreams.put(systemStream, input);
     } else {
       Collection<OperatorSpec> nextOperators = 
opSpec.getRegisteredOperatorSpecs();
-      nextOperators.forEach(spec -> computeOutputToInput(input, spec, 
outputToInputStreams));
+      nextOperators.forEach(spec -> computeOutputToInput(input, spec, 
outputToInputStreams, streamConfig));
     }
   }
+
+  private boolean hasIntermediateStreams(OperatorSpecGraph specGraph) {
+    return !Collections.disjoint(specGraph.getInputOperators().keySet(), 
specGraph.getOutputStreams().keySet());
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
index e625484..22fbb1b 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
@@ -42,10 +42,10 @@ class OutputOperatorImpl<M> extends OperatorImpl<M, Void> {
   private final OutputStreamImpl<M> outputStream;
   private final SystemStream systemStream;
 
-  OutputOperatorImpl(OutputOperatorSpec<M> outputOpSpec) {
+  OutputOperatorImpl(OutputOperatorSpec<M> outputOpSpec, SystemStream 
systemStream) {
     this.outputOpSpec = outputOpSpec;
     this.outputStream = outputOpSpec.getOutputStream();
-    this.systemStream = outputStream.getSystemStream();
+    this.systemStream = systemStream;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
index dd64429..63e269d 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
@@ -20,10 +20,8 @@ package org.apache.samza.operators.impl;
 
 import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskContextImpl;
-import org.apache.samza.operators.KV;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.spec.PartitionByOperatorSpec;
 import org.apache.samza.system.ControlMessage;
 import org.apache.samza.system.EndOfStreamMessage;
@@ -51,10 +49,10 @@ class PartitionByOperatorImpl<M, K, V> extends 
OperatorImpl<M, Void> {
   private final String taskName;
   private final ControlMessageSender controlMessageSender;
 
-  PartitionByOperatorImpl(PartitionByOperatorSpec<M, K, V> partitionByOpSpec, 
Config config, TaskContext context) {
+  PartitionByOperatorImpl(PartitionByOperatorSpec<M, K, V> partitionByOpSpec,
+      SystemStream systemStream, TaskContext context) {
     this.partitionByOpSpec = partitionByOpSpec;
-    OutputStreamImpl<KV<K, V>> outputStream = 
partitionByOpSpec.getOutputStream();
-    this.systemStream = outputStream.getSystemStream();
+    this.systemStream = systemStream;
     this.keyFunction = partitionByOpSpec.getKeyFunction();
     this.valueFunction = partitionByOpSpec.getValueFunction();
     this.taskName = context.getTaskName().getTaskName();
@@ -102,7 +100,6 @@ class PartitionByOperatorImpl<M, K, V> extends 
OperatorImpl<M, Void> {
   }
 
   private void sendControlMessage(ControlMessage message, MessageCollector 
collector) {
-    SystemStream outputStream = 
partitionByOpSpec.getOutputStream().getSystemStream();
-    controlMessageSender.send(message, outputStream, collector);
+    controlMessageSender.send(message, systemStream, collector);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
index a636ac5..922a1f9 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
@@ -22,7 +22,6 @@ import org.apache.samza.operators.KV;
 import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.StreamSpec;
 
 /**
  * The spec for an operator that receives incoming messages from an input 
stream
@@ -34,7 +33,7 @@ import org.apache.samza.system.StreamSpec;
 public class InputOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, Object> { 
// Object == KV<K, V> | V
 
   private final boolean isKeyed;
-  private final StreamSpec streamSpec;
+  private final String streamId;
 
   /**
    * The following {@link Serde}s are serialized by the ExecutionPlanner when 
generating the configs for a stream, and deserialized
@@ -43,17 +42,16 @@ public class InputOperatorSpec<K, V> extends 
OperatorSpec<KV<K, V>, Object> { //
   private transient final Serde<K> keySerde;
   private transient final Serde<V> valueSerde;
 
-  public InputOperatorSpec(StreamSpec streamSpec,
-      Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyed, String opId) {
+  public InputOperatorSpec(String streamId, Serde<K> keySerde, Serde<V> 
valueSerde, boolean isKeyed, String opId) {
     super(OpCode.INPUT, opId);
-    this.streamSpec = streamSpec;
+    this.streamId = streamId;
     this.keySerde = keySerde;
     this.valueSerde = valueSerde;
     this.isKeyed = isKeyed;
   }
 
-  public StreamSpec getStreamSpec() {
-    return this.streamSpec;
+  public String getStreamId() {
+    return this.streamId;
   }
 
   public Serde<K> getKeySerde() {

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index 6e98d5a..9e788da 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -28,7 +28,6 @@ import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.table.TableSpec;
 
 
@@ -42,7 +41,7 @@ public class OperatorSpecs {
   /**
    * Creates an {@link InputOperatorSpec} for consuming input.
    *
-   * @param streamSpec  the stream spec for the input stream
+   * @param streamId  the stream id for the input stream
    * @param keySerde  the serde for the input key
    * @param valueSerde  the serde for the input value
    * @param isKeyed  whether the input stream is keyed
@@ -52,8 +51,8 @@ public class OperatorSpecs {
    * @return  the {@link InputOperatorSpec}
    */
   public static <K, V> InputOperatorSpec<K, V> createInputOperatorSpec(
-    StreamSpec streamSpec, Serde<K> keySerde, Serde<V> valueSerde, boolean 
isKeyed, String opId) {
-    return new InputOperatorSpec<>(streamSpec, keySerde, valueSerde, isKeyed, 
opId);
+    String streamId, Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyed, 
String opId) {
+    return new InputOperatorSpec<>(streamId, keySerde, valueSerde, isKeyed, 
opId);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
index fe0abcb..5d70e6f 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
@@ -21,13 +21,10 @@ package org.apache.samza.operators.spec;
 import java.io.Serializable;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStream;
-
 
 public class OutputStreamImpl<M> implements OutputStream<M>, Serializable {
 
-  private final StreamSpec streamSpec;
+  private final String streamId;
   private final boolean isKeyed;
 
   /**
@@ -37,16 +34,15 @@ public class OutputStreamImpl<M> implements 
OutputStream<M>, Serializable {
   private transient final Serde keySerde;
   private transient final Serde valueSerde;
 
-  public OutputStreamImpl(StreamSpec streamSpec,
-      Serde keySerde, Serde valueSerde, boolean isKeyed) {
-    this.streamSpec = streamSpec;
+  public OutputStreamImpl(String streamId, Serde keySerde, Serde valueSerde, 
boolean isKeyed) {
+    this.streamId = streamId;
     this.keySerde = keySerde;
     this.valueSerde = valueSerde;
     this.isKeyed = isKeyed;
   }
 
-  public StreamSpec getStreamSpec() {
-    return streamSpec;
+  public String getStreamId() {
+    return streamId;
   }
 
   public Serde getKeySerde() {
@@ -57,10 +53,6 @@ public class OutputStreamImpl<M> implements OutputStream<M>, 
Serializable {
     return valueSerde;
   }
 
-  public SystemStream getSystemStream() {
-    return this.streamSpec.toSystemStream();
-  }
-
   public boolean isKeyed() {
     return isKeyed;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
index 272ba63..3bb8713 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
@@ -24,7 +24,6 @@ import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
-import org.apache.samza.system.StreamSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,13 +50,13 @@ public class IntermediateMessageStreamImpl<M> extends 
MessageStreamImpl<M> imple
     this.outputStream = outputStream;
     if (inputOperatorSpec.isKeyed() != outputStream.isKeyed()) {
       LOGGER.error("Input and output streams for intermediate stream {} aren't 
keyed consistently. Input: {}, Output: {}",
-          new Object[]{inputOperatorSpec.getStreamSpec().getId(), 
inputOperatorSpec.isKeyed(), outputStream.isKeyed()});
+          new Object[]{inputOperatorSpec.getStreamId(), 
inputOperatorSpec.isKeyed(), outputStream.isKeyed()});
     }
     this.isKeyed = inputOperatorSpec.isKeyed() && outputStream.isKeyed();
   }
 
-  public StreamSpec getStreamSpec() {
-    return this.outputStream.getStreamSpec();
+  public String getStreamId() {
+    return this.outputStream.getStreamId();
   }
 
   public OutputStreamImpl<M> getOutputStream() {

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
 
b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
index 3716d2b..7cd19fb 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
@@ -38,7 +38,6 @@ import org.apache.samza.execution.ExecutionPlanner;
 import org.apache.samza.execution.StreamManager;
 import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.StreamGraphSpec;
-import org.apache.samza.system.StreamSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,58 +55,7 @@ public abstract class AbstractApplicationRunner extends 
ApplicationRunner {
 
   public AbstractApplicationRunner(Config config) {
     super(config);
-    this.graphSpec = new StreamGraphSpec(this, config);
-  }
-
-  @Override
-  public StreamSpec getStreamSpec(String streamId) {
-    StreamConfig streamConfig = new StreamConfig(config);
-    String physicalName = streamConfig.getPhysicalName(streamId);
-    return getStreamSpec(streamId, physicalName);
-  }
-
-  /**
-   * Constructs a {@link StreamSpec} from the configuration for the specified 
streamId.
-   *
-   * The stream configurations are read from the following properties in the 
config:
-   * {@code streams.{$streamId}.*}
-   * <br>
-   * All properties matching this pattern are assumed to be system-specific 
with one exception. The following
-   * property is a Samza property which is used to bind the stream to a system.
-   *
-   * <ul>
-   *   <li>samza.system - The name of the System on which this stream will be 
used. If this property isn't defined
-   *                      the stream will be associated with the System 
defined in {@code job.default.system}</li>
-   * </ul>
-   *
-   * @param streamId      The logical identifier for the stream in Samza.
-   * @param physicalName  The system-specific name for this stream. It could 
be a file URN, topic name, or other identifer.
-   * @return              The {@link StreamSpec} instance.
-   */
-  /*package private*/ StreamSpec getStreamSpec(String streamId, String 
physicalName) {
-    StreamConfig streamConfig = new StreamConfig(config);
-    String system = streamConfig.getSystem(streamId);
-
-    return getStreamSpec(streamId, physicalName, system);
-  }
-
-  /**
-   * Constructs a {@link StreamSpec} from the configuration for the specified 
streamId.
-   *
-   * The stream configurations are read from the following properties in the 
config:
-   * {@code streams.{$streamId}.*}
-   *
-   * @param streamId      The logical identifier for the stream in Samza.
-   * @param physicalName  The system-specific name for this stream. It could 
be a file URN, topic name, or other identifer.
-   * @param system        The name of the System on which this stream will be 
used.
-   * @return              The {@link StreamSpec} instance.
-   */
-  /*package private*/ StreamSpec getStreamSpec(String streamId, String 
physicalName, String system) {
-    StreamConfig streamConfig = new StreamConfig(config);
-    Map<String, String> properties = 
streamConfig.getStreamProperties(streamId);
-    boolean isBounded = streamConfig.getIsBounded(streamId);
-
-    return new StreamSpec(streamId, physicalName, system, isBounded, 
properties);
+    this.graphSpec = new StreamGraphSpec(config);
   }
 
   public ExecutionPlan getExecutionPlan(StreamApplication app, StreamManager 
streamManager) throws Exception {
@@ -118,20 +66,22 @@ public abstract class AbstractApplicationRunner extends 
ApplicationRunner {
   ExecutionPlan getExecutionPlan(StreamApplication app, String runId, 
StreamManager streamManager) throws Exception {
     // build stream graph
     app.init(graphSpec, config);
-
     OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
-    // create the physical execution plan
+
+    // update application configs
     Map<String, String> cfg = new HashMap<>(config);
     if (StringUtils.isNoneEmpty(runId)) {
       cfg.put(ApplicationConfig.APP_RUN_ID, runId);
     }
 
-    Set<StreamSpec> inputStreams = new 
HashSet<>(specGraph.getInputOperators().keySet());
+    StreamConfig streamConfig = new StreamConfig(config);
+    Set<String> inputStreams = new 
HashSet<>(specGraph.getInputOperators().keySet());
     inputStreams.removeAll(specGraph.getOutputStreams().keySet());
-    ApplicationMode mode = 
inputStreams.stream().allMatch(StreamSpec::isBounded)
+    ApplicationMode mode = 
inputStreams.stream().allMatch(streamConfig::getIsBounded)
         ? ApplicationMode.BATCH : ApplicationMode.STREAM;
     cfg.put(ApplicationConfig.APP_MODE, mode.name());
 
+    // create the physical execution plan
     ExecutionPlanner planner = new ExecutionPlanner(new MapConfig(cfg), 
streamManager);
     return planner.plan(specGraph);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java 
b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
index 6aeb2ba..ea55fe5 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
@@ -34,7 +34,7 @@ import 
org.apache.samza.coordinator.stream.CoordinatorStreamManager;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStream;
-import org.apache.samza.util.Util;
+import org.apache.samza.util.StreamUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -113,7 +113,7 @@ public class ChangelogStreamManager {
         .stream()
         .filter(name -> 
StringUtils.isNotBlank(storageConfig.getChangelogStream(name)))
         .collect(Collectors.toMap(name -> name,
-            name -> 
Util.getSystemStreamFromNames(storageConfig.getChangelogStream(name))));
+            name -> 
StreamUtil.getSystemStreamFromNames(storageConfig.getChangelogStream(name))));
 
     // Get SystemAdmin for changelog store's system and attempt to create the 
stream
     JavaSystemConfig systemConfig = new JavaSystemConfig(config);

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index c807b02..f9c6c0c 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -52,6 +52,7 @@ import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.CommandLine;
 import org.apache.samza.util.ScalaJavaUtil;
+import org.apache.samza.util.StreamUtil;
 import org.apache.samza.util.SystemClock;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
@@ -151,7 +152,7 @@ public class StorageRecovery extends CommandLine {
       log.info("stream name for " + storeName + " is " + streamName);
 
       if (streamName != null) {
-        changeLogSystemStreams.put(storeName, 
Util.getSystemStreamFromNames(streamName));
+        changeLogSystemStreams.put(storeName, 
StreamUtil.getSystemStreamFromNames(streamName));
       }
 
       String factoryClass = config.getStorageFactoryClassName(storeName);

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/util/StreamUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/StreamUtil.java 
b/samza-core/src/main/java/org/apache/samza/util/StreamUtil.java
new file mode 100644
index 0000000..e7a1e54
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/StreamUtil.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStream;
+
+public class StreamUtil {
+  /**
+   * Gets the {@link SystemStream} corresponding to the provided stream, which 
may be
+   * a streamId, or stream name of the format systemName.streamName.
+   *
+   * @param stream the stream name or id to get the {@link SystemStream} for.
+   * @return the {@link SystemStream} for the stream
+   */
+  public static SystemStream getSystemStreamFromNameOrId(Config config, String 
stream) {
+    String[] parts = stream.split("\\.");
+    if (parts.length == 0 || parts.length > 2) {
+      throw new SamzaException(
+          String.format("Invalid stream %s. Expected to be of the format 
streamId or systemName.streamName", stream));
+    }
+    if (parts.length == 1) {
+      return new StreamConfig(config).streamIdToSystemStream(stream);
+    } else {
+      return new SystemStream(parts[0], parts[1]);
+    }
+  }
+
+  /**
+   * Returns a SystemStream object based on the system stream name given. For
+   * example, kafka.topic would return SystemStream("kafka", "topic").
+   *
+   * @param systemStreamName name of the system stream
+   * @return the {@link SystemStream} for the {@code systemStreamName}
+   */
+  public static SystemStream getSystemStreamFromNames(String systemStreamName) 
{
+    int idx = systemStreamName.indexOf('.');
+    if (idx < 0) {
+      throw new IllegalArgumentException("No '.' in stream name '" + 
systemStreamName +
+          "'. Stream names should be in the form 'system.stream'");
+    }
+    return new SystemStream(
+        systemStreamName.substring(0, idx),
+        systemStreamName.substring(idx + 1, systemStreamName.length()));
+  }
+
+  /**
+   * Returns the period separated system stream name for the provided {@code 
systemStream}. For
+   * example, SystemStream("kafka", "topic") would return "kafka.topic".
+   *
+   * @param systemStream the {@link SystemStream} to get the name for
+   * @return the system stream name
+   */
+  public static String getNameFromSystemStream(SystemStream systemStream) {
+    return systemStream.getSystem() + "." + systemStream.getStream();
+  }
+
+  public static Set<StreamSpec> getStreamSpecs(Set<String> streamIds, 
StreamConfig streamConfig) {
+    return streamIds.stream().map(streamId -> getStreamSpec(streamId, 
streamConfig)).collect(Collectors.toSet());
+  }
+
+  public static StreamSpec getStreamSpec(String streamId, StreamConfig 
streamConfig) {
+    String physicalName = streamConfig.getPhysicalName(streamId);
+    String system = streamConfig.getSystem(streamId);
+    Map<String, String> streamProperties = 
streamConfig.getStreamProperties(streamId);
+    return new StreamSpec(streamId, physicalName, system, streamProperties);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
index c9df3b5..42b6130 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
@@ -22,8 +22,7 @@ package org.apache.samza.config
 
 import java.util.concurrent.TimeUnit
 import scala.collection.JavaConverters._
-import org.apache.samza.util.Logging
-import org.apache.samza.util.Util
+import org.apache.samza.util.{Logging, StreamUtil}
 
 object StorageConfig {
   // stream config constants
@@ -106,7 +105,7 @@ class StorageConfig(config: Config) extends 
ScalaMapConfig(config) with Logging
       .map(getChangelogStream(_))
       .filter(_.isDefined)
       // Convert "system.stream" to systemName
-      .map(systemStreamName => 
Util.getSystemStreamFromNames(systemStreamName.get).getSystem)
+      .map(systemStreamName => 
StreamUtil.getSystemStreamFromNames(systemStreamName.get).getSystem)
       .contains(systemName)
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index ab11785..a64589f 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -22,7 +22,7 @@ package org.apache.samza.config
 import org.apache.samza.checkpoint.CheckpointManager
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.system.SystemStream
-import org.apache.samza.util.{Logging, Util}
+import org.apache.samza.util.{Logging, StreamUtil}
 
 object TaskConfig {
   // task config constants
@@ -78,7 +78,7 @@ class TaskConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
   def getInputStreams = getOption(TaskConfig.INPUT_STREAMS) match {
     case Some(streams) => if (streams.length > 0) {
       streams.split(",").map(systemStreamNames => {
-        Util.getSystemStreamFromNames(systemStreamNames.trim)
+        StreamUtil.getSystemStreamFromNames(systemStreamNames.trim)
       }).toSet
     } else {
       Set[SystemStream]()

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 35802ac..bb1b1cf 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -326,7 +326,7 @@ object SamzaContainer extends Logging {
       .getStoreNames
       .filter(config.getChangelogStream(_).isDefined)
       .map(name => (name, config.getChangelogStream(name).get)).toMap
-      .mapValues(Util.getSystemStreamFromNames(_))
+      .mapValues(StreamUtil.getSystemStreamFromNames(_))
 
     info("Got change log system streams: %s" format changeLogSystemStreams)
 
@@ -357,7 +357,7 @@ object SamzaContainer extends Logging {
     val sideInputStoresToSystemStreams = config.getStoreNames
       .map { storeName => (storeName, config.getSideInputs(storeName)) }
       .filter { case (storeName, sideInputs) => sideInputs.nonEmpty }
-      .map { case (storeName, sideInputs) => (storeName, 
sideInputs.map(Util.getSystemStreamFromNameOrId(config, _))) }
+      .map { case (storeName, sideInputs) => (storeName, 
sideInputs.map(StreamUtil.getSystemStreamFromNameOrId(config, _))) }
       .toMap
 
     info("Got side input store system streams: %s" format 
sideInputStoresToSystemStreams)

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 7b83874..0b472aa 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -28,7 +28,6 @@ import 
org.apache.samza.coordinator.stream.CoordinatorStreamManager
 import org.apache.samza.job.{StreamJob, StreamJobFactory}
 import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap, 
MetricsReporter}
 import org.apache.samza.operators.StreamGraphSpec
-import org.apache.samza.runtime.LocalContainerRunner
 import org.apache.samza.storage.ChangelogStreamManager
 import org.apache.samza.task.TaskFactoryUtil
 import org.apache.samza.util.Logging
@@ -72,10 +71,9 @@ class ThreadJobFactory extends StreamJobFactory with Logging 
{
     val containerId = "0"
     val jmxServer = new JmxServer
     val streamApp = TaskFactoryUtil.createStreamApplication(config)
-    val appRunner = new LocalContainerRunner(jobModel, "0")
 
     val taskFactory = if (streamApp != null) {
-      val graphSpec = new StreamGraphSpec(appRunner, config)
+      val graphSpec = new StreamGraphSpec(config)
       streamApp.init(graphSpec, config)
       TaskFactoryUtil.createTaskFactory(graphSpec.getOperatorSpecGraph(), 
graphSpec.getContextManager)
     } else {

Reply via email to