http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
 
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
index 33802a1..e41d4a8 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
@@ -19,7 +19,7 @@
 
 package org.apache.samza.metrics.reporter
 
-import org.apache.samza.util.Logging
+import org.apache.samza.util.{Logging, StreamUtil, Util}
 import org.apache.samza.SamzaException
 import org.apache.samza.config.{ApplicationConfig, Config}
 import org.apache.samza.config.JobConfig.Config2Job
@@ -30,7 +30,6 @@ import 
org.apache.samza.config.SerializerConfig.Config2Serializer
 import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.metrics.MetricsReporter
 import org.apache.samza.metrics.MetricsReporterFactory
-import org.apache.samza.util.Util
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.serializers.SerdeFactory
 import org.apache.samza.system.SystemFactory
@@ -68,7 +67,7 @@ class MetricsSnapshotReporterFactory extends 
MetricsReporterFactory with Logging
       .getMetricsReporterStream(name)
       .getOrElse(throw new SamzaException("No metrics stream defined in 
config."))
 
-    val systemStream = Util.getSystemStreamFromNames(metricsSystemStreamName)
+    val systemStream = 
StreamUtil.getSystemStreamFromNames(metricsSystemStreamName)
 
     info("Got system stream %s." format systemStream)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala 
b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index fd06c20..53b7d19 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -22,9 +22,7 @@ package org.apache.samza.util
 
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config._
-import org.apache.samza.system.SystemStream
 import org.apache.samza.SamzaException
-
 import java.lang.management.ManagementFactory
 import java.net.Inet4Address
 import java.net.InetAddress
@@ -67,46 +65,6 @@ object Util extends Logging {
   }
 
   /**
-   * Returns a SystemStream object based on the system stream name given. For
-   * example, kafka.topic would return new SystemStream("kafka", "topic").
-   */
-  def getSystemStreamFromNames(systemStreamNames: String): SystemStream = {
-    val idx = systemStreamNames.indexOf('.')
-    if (idx < 0) {
-      throw new IllegalArgumentException("No '.' in stream name '" + 
systemStreamNames + "'. Stream names should be in the form 'system.stream'")
-    }
-    new SystemStream(systemStreamNames.substring(0, idx), 
systemStreamNames.substring(idx + 1, systemStreamNames.length))
-  }
-
-  /**
-   * Returns a SystemStream object based on the system stream name given. For
-   * example, kafka.topic would return new SystemStream("kafka", "topic").
-   */
-  def getNameFromSystemStream(systemStream: SystemStream) = {
-    systemStream.getSystem + "." + systemStream.getStream
-  }
-
-  /**
-    * Gets the [[SystemStream]] corresponding to the provided stream, which 
may be
-    * a streamId, or stream name of the format systemName.streamName.
-    *
-    * @param stream the stream name or id to get the { @link SystemStream} for.
-    * @return the [[SystemStream]] for the stream
-    */
-  def getSystemStreamFromNameOrId(config: Config, stream: String): 
SystemStream = {
-    val parts = stream.split("\\.")
-    if (parts.length == 0 || parts.length > 2) {
-      throw new SamzaException(
-        String.format("Invalid stream %s. Expected to be of the format 
streamId or systemName.streamName", stream))
-    }
-    if (parts.length == 1) {
-      new StreamConfig(config).streamIdToSystemStream(stream)
-    } else {
-      new SystemStream(parts(0), parts(1))
-    }
-  }
-
-  /**
    * Returns the the first host address which is not the loopback address, or 
[[java.net.InetAddress#getLocalHost]] as a fallback
    *
    * @return the [[java.net.InetAddress]] which represents the localhost

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 83fe5ad..4d25ebb 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -38,13 +38,13 @@ import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.testUtils.StreamTestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -59,7 +59,6 @@ public class TestExecutionPlanner {
 
   private SystemAdmins systemAdmins;
   private StreamManager streamManager;
-  private ApplicationRunner runner;
   private Config config;
 
   private StreamSpec input1;
@@ -104,7 +103,7 @@ public class TestExecutionPlanner {
      * input1 -> partitionBy -> map -> output1
      *
      */
-    StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
     MessageStream<KV<Object, Object>> input1 = 
graphSpec.getInputStream("input1");
     OutputStream<KV<Object, Object>> output1 = 
graphSpec.getOutputStream("output1");
     input1
@@ -127,7 +126,7 @@ public class TestExecutionPlanner {
      *
      */
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
     MessageStream<KV<Object, Object>> messageStream1 =
         graphSpec.<KV<Object, Object>>getInputStream("input1")
             .map(m -> m);
@@ -159,7 +158,7 @@ public class TestExecutionPlanner {
 
   private StreamGraphSpec createStreamGraphWithJoinAndWindow() {
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
     MessageStream<KV<Object, Object>> messageStream1 =
         graphSpec.<KV<Object, Object>>getInputStream("input1")
             .map(m -> m);
@@ -207,7 +206,12 @@ public class TestExecutionPlanner {
     Map<String, String> configMap = new HashMap<>();
     configMap.put(JobConfig.JOB_NAME(), "test-app");
     configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), DEFAULT_SYSTEM);
-
+    StreamTestUtils.addStreamConfigs(configMap, "input1", "system1", "input1");
+    StreamTestUtils.addStreamConfigs(configMap, "input2", "system2", "input2");
+    StreamTestUtils.addStreamConfigs(configMap, "input3", "system2", "input3");
+    StreamTestUtils.addStreamConfigs(configMap, "input4", "system1", "input4");
+    StreamTestUtils.addStreamConfigs(configMap, "output1", "system1", 
"output1");
+    StreamTestUtils.addStreamConfigs(configMap, "output2", "system2", 
"output2");
     config = new MapConfig(configMap);
 
     input1 = new StreamSpec("input1", "input1", "system1");
@@ -234,22 +238,6 @@ public class TestExecutionPlanner {
     when(systemAdmins.getSystemAdmin("system1")).thenReturn(systemAdmin1);
     when(systemAdmins.getSystemAdmin("system2")).thenReturn(systemAdmin2);
     streamManager = new StreamManager(systemAdmins);
-
-    runner = mock(ApplicationRunner.class);
-    when(runner.getStreamSpec("input1")).thenReturn(input1);
-    when(runner.getStreamSpec("input2")).thenReturn(input2);
-    when(runner.getStreamSpec("input3")).thenReturn(input3);
-    when(runner.getStreamSpec("input4")).thenReturn(input4);
-    when(runner.getStreamSpec("output1")).thenReturn(output1);
-    when(runner.getStreamSpec("output2")).thenReturn(output2);
-
-    // intermediate streams used in tests
-    when(runner.getStreamSpec("test-app-1-partition_by-p1"))
-        .thenReturn(new StreamSpec("test-app-1-partition_by-p1", 
"test-app-1-partition_by-p1", "default-system"));
-    when(runner.getStreamSpec("test-app-1-partition_by-p2"))
-        .thenReturn(new StreamSpec("test-app-1-partition_by-p2", 
"test-app-1-partition_by-p2", "default-system"));
-    when(runner.getStreamSpec("test-app-1-partition_by-p3"))
-        .thenReturn(new StreamSpec("test-app-1-partition_by-p3", 
"test-app-1-partition_by-p3", "default-system"));
   }
 
   @Test
@@ -288,7 +276,7 @@ public class TestExecutionPlanner {
     JobGraph jobGraph = 
planner.createJobGraph(graphSpec.getOperatorSpecGraph());
 
     ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
-    ExecutionPlanner.calculateJoinInputPartitions(jobGraph);
+    ExecutionPlanner.calculateJoinInputPartitions(jobGraph, config);
 
     // the partitions should be the same as input1
     jobGraph.getIntermediateStreams().forEach(edge -> {
@@ -406,13 +394,13 @@ public class TestExecutionPlanner {
   @Test
   public void testMaxPartition() {
     Collection<StreamEdge> edges = new ArrayList<>();
-    StreamEdge edge = new StreamEdge(input1, config);
+    StreamEdge edge = new StreamEdge(input1, false, false, config);
     edge.setPartitionCount(2);
     edges.add(edge);
-    edge = new StreamEdge(input2, config);
+    edge = new StreamEdge(input2, false, false, config);
     edge.setPartitionCount(32);
     edges.add(edge);
-    edge = new StreamEdge(input3, config);
+    edge = new StreamEdge(input3, false, false, config);
     edge.setPartitionCount(16);
     edges.add(edge);
 
@@ -427,7 +415,7 @@ public class TestExecutionPlanner {
     int partitionLimit = ExecutionPlanner.MAX_INFERRED_PARTITIONS;
 
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
 
     MessageStream<KV<Object, Object>> input1 = 
graphSpec.getInputStream("input4");
     OutputStream<KV<Object, Object>> output1 = 
graphSpec.getOutputStream("output1");

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
index 359c422..73452d8 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
@@ -19,16 +19,20 @@
 
 package org.apache.samza.execution;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.system.StreamSpec;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 
 public class TestJobGraph {
@@ -57,7 +61,9 @@ public class TestJobGraph {
    * 2 9 10
    */
   private void createGraph1() {
-    graph1 = new JobGraph(null, null);
+    OperatorSpecGraph specGraph = mock(OperatorSpecGraph.class);
+    when(specGraph.getBroadcastStreams()).thenReturn(Collections.emptySet());
+    graph1 = new JobGraph(null, specGraph);
 
     JobNode n2 = graph1.getOrCreateJobNode("2", "1");
     JobNode n3 = graph1.getOrCreateJobNode("3", "1");
@@ -90,7 +96,9 @@ public class TestJobGraph {
    *      |<---6 <--|    <>
    */
   private void createGraph2() {
-    graph2 = new JobGraph(null, null);
+    OperatorSpecGraph specGraph = mock(OperatorSpecGraph.class);
+    when(specGraph.getBroadcastStreams()).thenReturn(Collections.emptySet());
+    graph2 = new JobGraph(null, specGraph);
 
     JobNode n1 = graph2.getOrCreateJobNode("1", "1");
     JobNode n2 = graph2.getOrCreateJobNode("2", "1");
@@ -117,7 +125,9 @@ public class TestJobGraph {
    * 1<->1 -> 2<->2
    */
   private void createGraph3() {
-    graph3 = new JobGraph(null, null);
+    OperatorSpecGraph specGraph = mock(OperatorSpecGraph.class);
+    when(specGraph.getBroadcastStreams()).thenReturn(Collections.emptySet());
+    graph3 = new JobGraph(null, specGraph);
 
     JobNode n1 = graph3.getOrCreateJobNode("1", "1");
     JobNode n2 = graph3.getOrCreateJobNode("2", "1");
@@ -133,7 +143,9 @@ public class TestJobGraph {
    * 1<->1
    */
   private void createGraph4() {
-    graph4 = new JobGraph(null, null);
+    OperatorSpecGraph specGraph = mock(OperatorSpecGraph.class);
+    when(specGraph.getBroadcastStreams()).thenReturn(Collections.emptySet());
+    graph4 = new JobGraph(null, specGraph);
 
     JobNode n1 = graph4.getOrCreateJobNode("1", "1");
 
@@ -151,7 +163,9 @@ public class TestJobGraph {
 
   @Test
   public void testAddSource() {
-    JobGraph graph = new JobGraph(null, null);
+    OperatorSpecGraph specGraph = mock(OperatorSpecGraph.class);
+    when(specGraph.getBroadcastStreams()).thenReturn(Collections.emptySet());
+    JobGraph graph = new JobGraph(null, specGraph);
 
     /**
      * s1 -> 1
@@ -192,7 +206,9 @@ public class TestJobGraph {
      * 2 -> s2
      * 2 -> s3
      */
-    JobGraph graph = new JobGraph(null, null);
+    OperatorSpecGraph specGraph = mock(OperatorSpecGraph.class);
+    when(specGraph.getBroadcastStreams()).thenReturn(Collections.emptySet());
+    JobGraph graph = new JobGraph(null, specGraph);
     JobNode n1 = graph.getOrCreateJobNode("1", "1");
     JobNode n2 = graph.getOrCreateJobNode("2", "1");
     StreamSpec s1 = genStream();

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index abe8969..b0f3843 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -31,15 +31,14 @@ import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.LongSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.testUtils.StreamTestUtils;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.Test;
 
@@ -76,28 +75,13 @@ public class TestJobGraphJsonGenerator {
     Map<String, String> configMap = new HashMap<>();
     configMap.put(JobConfig.JOB_NAME(), "test-app");
     configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system");
+    StreamTestUtils.addStreamConfigs(configMap, "input1", "system1", "input1");
+    StreamTestUtils.addStreamConfigs(configMap, "input2", "system2", "input2");
+    StreamTestUtils.addStreamConfigs(configMap, "input3", "system2", "input3");
+    StreamTestUtils.addStreamConfigs(configMap, "output1", "system1", 
"output1");
+    StreamTestUtils.addStreamConfigs(configMap, "output2", "system2", 
"output2");
     Config config = new MapConfig(configMap);
 
-    StreamSpec input1 = new StreamSpec("input1", "input1", "system1");
-    StreamSpec input2 = new StreamSpec("input2", "input2", "system2");
-    StreamSpec input3 = new StreamSpec("input3", "input3", "system2");
-
-    StreamSpec output1 = new StreamSpec("output1", "output1", "system1");
-    StreamSpec output2 = new StreamSpec("output2", "output2", "system2");
-
-    ApplicationRunner runner = mock(ApplicationRunner.class);
-    when(runner.getStreamSpec("input1")).thenReturn(input1);
-    when(runner.getStreamSpec("input2")).thenReturn(input2);
-    when(runner.getStreamSpec("input3")).thenReturn(input3);
-    when(runner.getStreamSpec("output1")).thenReturn(output1);
-    when(runner.getStreamSpec("output2")).thenReturn(output2);
-
-    // intermediate streams used in tests
-    when(runner.getStreamSpec("test-app-1-partition_by-p1"))
-        .thenReturn(new StreamSpec("test-app-1-partition_by-p1", 
"test-app-1-partition_by-p1", "default-system"));
-    when(runner.getStreamSpec("test-app-1-partition_by-p2"))
-        .thenReturn(new StreamSpec("test-app-1-partition_by-p2", 
"test-app-1-partition_by-p2", "default-system"));
-
     // set up external partition count
     Map<String, Integer> system1Map = new HashMap<>();
     system1Map.put("input1", 64);
@@ -114,7 +98,7 @@ public class TestJobGraphJsonGenerator {
     when(systemAdmins.getSystemAdmin("system2")).thenReturn(systemAdmin2);
     StreamManager streamManager = new StreamManager(systemAdmins);
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
     graphSpec.setDefaultSerde(KVSerde.of(new NoOpSerde<>(), new 
NoOpSerde<>()));
     MessageStream<KV<Object, Object>> messageStream1 =
         graphSpec.<KV<Object, Object>>getInputStream("input1")
@@ -163,19 +147,10 @@ public class TestJobGraphJsonGenerator {
     Map<String, String> configMap = new HashMap<>();
     configMap.put(JobConfig.JOB_NAME(), "test-app");
     configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system");
+    StreamTestUtils.addStreamConfigs(configMap, "PageView", "hdfs", 
"hdfs:/user/dummy/PageViewEvent");
+    StreamTestUtils.addStreamConfigs(configMap, "PageViewCount", "kafka", 
"PageViewCount");
     Config config = new MapConfig(configMap);
 
-    StreamSpec input = new StreamSpec("PageView", 
"hdfs:/user/dummy/PageViewEvent", "hdfs");
-    StreamSpec output = new StreamSpec("PageViewCount", "PageViewCount", 
"kafka");
-
-    ApplicationRunner runner = mock(ApplicationRunner.class);
-    when(runner.getStreamSpec("PageView")).thenReturn(input);
-    when(runner.getStreamSpec("PageViewCount")).thenReturn(output);
-
-    // intermediate streams used in tests
-    when(runner.getStreamSpec("test-app-1-partition_by-keyed-by-country"))
-        .thenReturn(new StreamSpec("test-app-1-partition_by-keyed-by-country", 
"test-app-1-partition_by-keyed-by-country", "kafka"));
-
     // set up external partition count
     Map<String, Integer> system1Map = new HashMap<>();
     system1Map.put("hdfs:/user/dummy/PageViewEvent", 512);
@@ -189,7 +164,7 @@ public class TestJobGraphJsonGenerator {
     when(systemAdmins.getSystemAdmin("kafka")).thenReturn(systemAdmin2);
     StreamManager streamManager = new StreamManager(systemAdmins);
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
     MessageStream<KV<String, PageViewEvent>> inputStream = 
graphSpec.getInputStream("PageView");
     inputStream
         .partitionBy(kv -> kv.getValue().getCountry(), kv -> kv.getValue(), 
"keyed-by-country")

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
index c43e242..cefe128 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
@@ -29,7 +29,6 @@ import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.impl.store.TimestampedValueSerde;
-import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.Serde;
@@ -48,7 +47,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -56,22 +54,17 @@ public class TestJobNode {
 
   @Test
   public void testAddSerdeConfigs() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     StreamSpec input1Spec = new StreamSpec("input1", "input1", "input-system");
     StreamSpec input2Spec = new StreamSpec("input2", "input2", "input-system");
     StreamSpec outputSpec = new StreamSpec("output", "output", 
"output-system");
     StreamSpec partitionBySpec =
         new StreamSpec("jobName-jobId-partition_by-p1", "partition_by-p1", 
"intermediate-system");
-    doReturn(input1Spec).when(mockRunner).getStreamSpec("input1");
-    doReturn(input2Spec).when(mockRunner).getStreamSpec("input2");
-    doReturn(outputSpec).when(mockRunner).getStreamSpec("output");
-    
doReturn(partitionBySpec).when(mockRunner).getStreamSpec("jobName-jobId-partition_by-p1");
 
     Config mockConfig = mock(Config.class);
     when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName");
     when(mockConfig.get(eq(JobConfig.JOB_ID()), 
anyString())).thenReturn("jobId");
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
     graphSpec.setDefaultSerde(KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>()));
     MessageStream<KV<String, Object>> input1 = 
graphSpec.getInputStream("input1");
     MessageStream<KV<String, Object>> input2 = 
graphSpec.getInputStream("input2");
@@ -86,10 +79,10 @@ public class TestJobNode {
 
     JobNode jobNode = new JobNode("jobName", "jobId", 
graphSpec.getOperatorSpecGraph(), mockConfig);
     Config config = new MapConfig();
-    StreamEdge input1Edge = new StreamEdge(input1Spec, config);
-    StreamEdge input2Edge = new StreamEdge(input2Spec, config);
-    StreamEdge outputEdge = new StreamEdge(outputSpec, config);
-    StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true, config);
+    StreamEdge input1Edge = new StreamEdge(input1Spec, false, false, config);
+    StreamEdge input2Edge = new StreamEdge(input2Spec, false, false, config);
+    StreamEdge outputEdge = new StreamEdge(outputSpec, false, false, config);
+    StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true, false, 
config);
     jobNode.addInEdge(input1Edge);
     jobNode.addInEdge(input2Edge);
     jobNode.addOutEdge(outputEdge);

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
index 8ad5b7e..5928db1 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
@@ -37,7 +37,7 @@ public class TestStreamEdge {
 
   @Test
   public void testGetStreamSpec() {
-    StreamEdge edge = new StreamEdge(spec, false, new MapConfig());
+    StreamEdge edge = new StreamEdge(spec, false, false, new MapConfig());
     assertEquals(edge.getStreamSpec(), spec);
     assertEquals(edge.getStreamSpec().getPartitionCount(), 1 
/*StreamSpec.DEFAULT_PARTITION_COUNT*/);
 
@@ -50,32 +50,30 @@ public class TestStreamEdge {
     Map<String, String> config = new HashMap<>();
     config.put(ApplicationConfig.APP_MODE, 
ApplicationConfig.ApplicationMode.BATCH.name());
     config.put(ApplicationConfig.APP_RUN_ID, "123");
-    StreamEdge edge = new StreamEdge(spec, true, new MapConfig(config));
+    StreamEdge edge = new StreamEdge(spec, true, false, new MapConfig(config));
     assertEquals(edge.getStreamSpec().getPhysicalName(), 
spec.getPhysicalName() + "-123");
   }
 
   @Test
   public void testGenerateConfig() {
     // an example unbounded IO stream
-    StreamSpec spec = new StreamSpec("stream-1", "physical-stream-1", 
"system-1", false, Collections.singletonMap("property1", "haha"));
-    StreamEdge edge = new StreamEdge(spec, false, new MapConfig());
+    StreamSpec spec = new StreamSpec("stream-1", "physical-stream-1", 
"system-1", Collections.singletonMap("property1", "haha"));
+    StreamEdge edge = new StreamEdge(spec, false, false, new MapConfig());
     Config config = edge.generateConfig();
     StreamConfig streamConfig = new StreamConfig(config);
     assertEquals(streamConfig.getSystem(spec.getId()), "system-1");
     assertEquals(streamConfig.getPhysicalName(spec.getId()), 
"physical-stream-1");
     assertEquals(streamConfig.getIsIntermediateStream(spec.getId()), false);
-    assertEquals(streamConfig.getIsBounded(spec.getId()), false);
     
assertEquals(streamConfig.getStreamProperties(spec.getId()).get("property1"), 
"haha");
 
     // bounded stream
-    spec = new StreamSpec("stream-1", "physical-stream-1", "system-1", true, 
Collections.singletonMap("property1", "haha"));
-    edge = new StreamEdge(spec, false, new MapConfig());
+    spec = new StreamSpec("stream-1", "physical-stream-1", "system-1", 
Collections.singletonMap("property1", "haha"));
+    edge = new StreamEdge(spec, false, false, new MapConfig());
     config = edge.generateConfig();
     streamConfig = new StreamConfig(config);
-    assertEquals(streamConfig.getIsBounded(spec.getId()), true);
 
     // intermediate stream
-    edge = new StreamEdge(spec, true, new MapConfig());
+    edge = new StreamEdge(spec, true, false, new MapConfig());
     config = edge.generateConfig();
     streamConfig = new StreamConfig(config);
     assertEquals(streamConfig.getIsIntermediateStream(spec.getId()), true);

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java 
b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index 602b595..7054727 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -28,12 +28,10 @@ import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.impl.store.TestInMemoryStore;
 import org.apache.samza.operators.impl.store.TimestampedValueSerde;
-import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.MessageCollector;
@@ -74,7 +72,6 @@ public class TestJoinOperator {
   @Before
   public void setUp() {
     Map<String, String> mapConfig = new HashMap<>();
-    mapConfig.put("app.runner.class", 
"org.apache.samza.runtime.LocalApplicationRunner");
     mapConfig.put("job.default.system", "insystem");
     mapConfig.put("job.name", "jobName");
     mapConfig.put("job.id", "jobId");
@@ -101,7 +98,7 @@ public class TestJoinOperator {
   public void joinWithSelfThrowsException() throws Exception {
     config.put("streams.instream.system", "insystem");
 
-    StreamGraphSpec graphSpec = new 
StreamGraphSpec(mock(ApplicationRunner.class), config);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
     IntegerSerde integerSerde = new IntegerSerde();
     KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde);
     MessageStream<KV<Integer, Integer>> inStream = 
graphSpec.getInputStream("instream", kvSerde);
@@ -320,11 +317,7 @@ public class TestJoinOperator {
   }
 
   private StreamGraphSpec getTestJoinStreamGraph(TestJoinFunction joinFn) 
throws IOException {
-    ApplicationRunner runner = mock(ApplicationRunner.class);
-    when(runner.getStreamSpec("instream")).thenReturn(new 
StreamSpec("instream", "instream", "insystem"));
-    when(runner.getStreamSpec("instream2")).thenReturn(new 
StreamSpec("instream2", "instream2", "insystem"));
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
     IntegerSerde integerSerde = new IntegerSerde();
     KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde);
     MessageStream<KV<Integer, Integer>> inStream = 
graphSpec.getInputStream("instream", kvSerde);

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
 
b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
index 2be88ca..4cfc66a 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
@@ -38,7 +38,6 @@ import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.system.StreamSpec;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -59,8 +58,8 @@ import static org.mockito.Mockito.*;
 public class TestOperatorSpecGraph {
 
   private StreamGraphSpec mockGraph;
-  private Map<StreamSpec, InputOperatorSpec> inputOpSpecMap;
-  private Map<StreamSpec, OutputStreamImpl> outputStrmMap;
+  private Map<String, InputOperatorSpec> inputOpSpecMap;
+  private Map<String, OutputStreamImpl> outputStrmMap;
   private Set<OperatorSpec> allOpSpecs;
 
   @Before
@@ -72,26 +71,26 @@ public class TestOperatorSpecGraph {
      * 1) input1 --> filter --> sendTo
      * 2) input2 --> map --> sink
      */
-    StreamSpec testInputSpec = new StreamSpec("test-input-1", "test-input-1", 
"kafka");
-    InputOperatorSpec testInput = new InputOperatorSpec(testInputSpec, new 
NoOpSerde(), new NoOpSerde(), true, "test-input-1");
+    String inputStreamId1 = "test-input-1";
+    String outputStreamId = "test-output-1";
+    InputOperatorSpec testInput = new InputOperatorSpec(inputStreamId1, new 
NoOpSerde(), new NoOpSerde(), true, inputStreamId1);
     StreamOperatorSpec filterOp = OperatorSpecs.createFilterOperatorSpec(m -> 
true, "test-filter-2");
-    StreamSpec testOutputSpec = new StreamSpec("test-output-1", 
"test-output-1", "kafka");
-    OutputStreamImpl outputStream1 = new OutputStreamImpl(testOutputSpec, 
null, null, true);
+    OutputStreamImpl outputStream1 = new OutputStreamImpl(outputStreamId, 
null, null, true);
     OutputOperatorSpec outputSpec = 
OperatorSpecs.createSendToOperatorSpec(outputStream1, "test-output-3");
     testInput.registerNextOperatorSpec(filterOp);
     filterOp.registerNextOperatorSpec(outputSpec);
-    StreamSpec testInputSpec2 = new StreamSpec("test-input-2", "test-input-2", 
"kafka");
-    InputOperatorSpec testInput2 = new InputOperatorSpec(testInputSpec2, new 
NoOpSerde(), new NoOpSerde(), true, "test-input-4");
+    String streamId2 = "test-input-2";
+    InputOperatorSpec testInput2 = new InputOperatorSpec(streamId2, new 
NoOpSerde(), new NoOpSerde(), true, "test-input-4");
     StreamOperatorSpec testMap = OperatorSpecs.createMapOperatorSpec(m -> m, 
"test-map-5");
     SinkOperatorSpec testSink = OperatorSpecs.createSinkOperatorSpec((m, mc, 
tc) -> { }, "test-sink-6");
     testInput2.registerNextOperatorSpec(testMap);
     testMap.registerNextOperatorSpec(testSink);
 
     this.inputOpSpecMap = new LinkedHashMap<>();
-    inputOpSpecMap.put(testInputSpec, testInput);
-    inputOpSpecMap.put(testInputSpec2, testInput2);
+    inputOpSpecMap.put(inputStreamId1, testInput);
+    inputOpSpecMap.put(streamId2, testInput2);
     this.outputStrmMap = new LinkedHashMap<>();
-    outputStrmMap.put(testOutputSpec, outputStream1);
+    outputStrmMap.put(outputStreamId, outputStream1);
     
when(mockGraph.getInputOperators()).thenReturn(Collections.unmodifiableMap(inputOpSpecMap));
     
when(mockGraph.getOutputStreams()).thenReturn(Collections.unmodifiableMap(outputStrmMap));
     this.allOpSpecs = new HashSet<OperatorSpec>() { {

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java 
b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java
index e476abc..dfb4b70 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java
@@ -31,11 +31,9 @@ import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
-import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.table.TableSpec;
 import org.junit.Test;
 
@@ -53,82 +51,71 @@ public class TestStreamGraphSpec {
 
   @Test
   public void testGetInputStreamWithValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, 
mock(Config.class));
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
+    String streamId = "test-stream-1";
     Serde mockValueSerde = mock(Serde.class);
-    MessageStream<TestMessageEnvelope> inputStream = 
graphSpec.getInputStream("test-stream-1", mockValueSerde);
+    MessageStream<TestMessageEnvelope> inputStream = 
graphSpec.getInputStream(streamId, mockValueSerde);
 
     InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
         (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) 
inputStream).getOperatorSpec();
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), 
inputOpSpec);
-    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+    assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec);
+    assertEquals(streamId, inputOpSpec.getStreamId());
     assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
     assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
   }
 
   @Test
   public void testGetInputStreamWithKeyValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, 
mock(Config.class));
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
+    String streamId = "test-stream-1";
     KVSerde mockKVSerde = mock(KVSerde.class);
     Serde mockKeySerde = mock(Serde.class);
     Serde mockValueSerde = mock(Serde.class);
     doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
     doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
-    MessageStream<TestMessageEnvelope> inputStream = 
graphSpec.getInputStream("test-stream-1", mockKVSerde);
+    MessageStream<TestMessageEnvelope> inputStream = 
graphSpec.getInputStream(streamId, mockKVSerde);
 
     InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
         (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) 
inputStream).getOperatorSpec();
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), 
inputOpSpec);
-    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+    assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec);
+    assertEquals(streamId, inputOpSpec.getStreamId());
     assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
     assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
   }
 
   @Test(expected = NullPointerException.class)
   public void testGetInputStreamWithNullSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, 
mock(Config.class));
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
     graphSpec.getInputStream("test-stream-1", null);
   }
 
   @Test
   public void testGetInputStreamWithDefaultValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, 
mock(Config.class));
+    String streamId = "test-stream-1";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
     Serde mockValueSerde = mock(Serde.class);
     graphSpec.setDefaultSerde(mockValueSerde);
-    MessageStream<TestMessageEnvelope> inputStream = 
graphSpec.getInputStream("test-stream-1");
+    MessageStream<TestMessageEnvelope> inputStream = 
graphSpec.getInputStream(streamId);
 
     InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
         (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) 
inputStream).getOperatorSpec();
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), 
inputOpSpec);
-    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+    assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec);
+    assertEquals(streamId, inputOpSpec.getStreamId());
     assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
     assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
   }
 
   @Test
   public void testGetInputStreamWithDefaultKeyValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, 
mock(Config.class));
+    String streamId = "test-stream-1";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
     KVSerde mockKVSerde = mock(KVSerde.class);
     Serde mockKeySerde = mock(Serde.class);
@@ -136,63 +123,56 @@ public class TestStreamGraphSpec {
     doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
     doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
     graphSpec.setDefaultSerde(mockKVSerde);
-    MessageStream<TestMessageEnvelope> inputStream = 
graphSpec.getInputStream("test-stream-1");
+    MessageStream<TestMessageEnvelope> inputStream = 
graphSpec.getInputStream(streamId);
 
     InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
         (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) 
inputStream).getOperatorSpec();
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), 
inputOpSpec);
-    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+    assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec);
+    assertEquals(streamId, inputOpSpec.getStreamId());
     assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
     assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
   }
 
   @Test
   public void testGetInputStreamWithDefaultDefaultSerde() {
-    // default default serde == user hasn't provided a default serde
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, 
mock(Config.class));
+    String streamId = "test-stream-1";
 
-    MessageStream<TestMessageEnvelope> inputStream = 
graphSpec.getInputStream("test-stream-1");
+    // default default serde == user hasn't provided a default serde
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
+    MessageStream<TestMessageEnvelope> inputStream = 
graphSpec.getInputStream(streamId);
 
     InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
         (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) 
inputStream).getOperatorSpec();
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), 
inputOpSpec);
-    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+    assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec);
+    assertEquals(streamId, inputOpSpec.getStreamId());
     assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
     assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde);
   }
 
   @Test
   public void testGetInputStreamWithRelaxedTypes() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, 
mock(Config.class));
+    String streamId = "test-stream-1";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
-    MessageStream<TestMessageEnvelope> inputStream = 
graphSpec.getInputStream("test-stream-1");
+    MessageStream<TestMessageEnvelope> inputStream = 
graphSpec.getInputStream(streamId);
 
     InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
         (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) 
inputStream).getOperatorSpec();
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), 
inputOpSpec);
-    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+    assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec);
+    assertEquals(streamId, inputOpSpec.getStreamId());
   }
 
   @Test
   public void testMultipleGetInputStreams() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec1 = mock(StreamSpec.class);
-    StreamSpec mockStreamSpec2 = mock(StreamSpec.class);
-    
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec1);
-    
when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(mockStreamSpec2);
+    String streamId1 = "test-stream-1";
+    String streamId2 = "test-stream-2";
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, 
mock(Config.class));
-    MessageStream<Object> inputStream1 = 
graphSpec.getInputStream("test-stream-1");
-    MessageStream<Object> inputStream2 = 
graphSpec.getInputStream("test-stream-2");
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
+    MessageStream<Object> inputStream1 = graphSpec.getInputStream(streamId1);
+    MessageStream<Object> inputStream2 = graphSpec.getInputStream(streamId2);
 
     InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec1 =
         (InputOperatorSpec) ((MessageStreamImpl<Object>) 
inputStream1).getOperatorSpec();
@@ -200,98 +180,83 @@ public class TestStreamGraphSpec {
         (InputOperatorSpec) ((MessageStreamImpl<Object>) 
inputStream2).getOperatorSpec();
 
     assertEquals(graphSpec.getInputOperators().size(), 2);
-    assertEquals(graphSpec.getInputOperators().get(mockStreamSpec1), 
inputOpSpec1);
-    assertEquals(graphSpec.getInputOperators().get(mockStreamSpec2), 
inputOpSpec2);
+    assertEquals(graphSpec.getInputOperators().get(streamId1), inputOpSpec1);
+    assertEquals(graphSpec.getInputOperators().get(streamId2), inputOpSpec2);
   }
 
   @Test(expected = IllegalStateException.class)
   public void testGetSameInputStreamTwice() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, 
mock(Config.class));
-    graphSpec.getInputStream("test-stream-1");
+    String streamId = "test-stream-1";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
+    graphSpec.getInputStream(streamId);
     // should throw exception
-    graphSpec.getInputStream("test-stream-1");
+    graphSpec.getInputStream(streamId);
   }
 
   @Test
   public void testGetOutputStreamWithValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, 
mock(Config.class));
+    String streamId = "test-stream-1";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
     Serde mockValueSerde = mock(Serde.class);
     OutputStream<TestMessageEnvelope> outputStream =
-        graphSpec.getOutputStream("test-stream-1", mockValueSerde);
+        graphSpec.getOutputStream(streamId, mockValueSerde);
 
     OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = 
(OutputStreamImpl) outputStream;
-    assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), 
outputStreamImpl);
-    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+    assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl);
+    assertEquals(streamId, outputStreamImpl.getStreamId());
     assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
     assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
   }
 
   @Test
   public void testGetOutputStreamWithKeyValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, 
mock(Config.class));
+    String streamId = "test-stream-1";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
     KVSerde mockKVSerde = mock(KVSerde.class);
     Serde mockKeySerde = mock(Serde.class);
     Serde mockValueSerde = mock(Serde.class);
     doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
     doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
     graphSpec.setDefaultSerde(mockKVSerde);
-    OutputStream<TestMessageEnvelope> outputStream = 
graphSpec.getOutputStream("test-stream-1", mockKVSerde);
+    OutputStream<TestMessageEnvelope> outputStream = 
graphSpec.getOutputStream(streamId, mockKVSerde);
 
     OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = 
(OutputStreamImpl) outputStream;
-    assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), 
outputStreamImpl);
-    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+    assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl);
+    assertEquals(streamId, outputStreamImpl.getStreamId());
     assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
     assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
   }
 
   @Test(expected = NullPointerException.class)
   public void testGetOutputStreamWithNullSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, 
mock(Config.class));
+    String streamId = "test-stream-1";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
-    graphSpec.getOutputStream("test-stream-1", null);
+    graphSpec.getOutputStream(streamId, null);
   }
 
   @Test
   public void testGetOutputStreamWithDefaultValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+    String streamId = "test-stream-1";
 
     Serde mockValueSerde = mock(Serde.class);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, 
mock(Config.class));
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
     graphSpec.setDefaultSerde(mockValueSerde);
-    OutputStream<TestMessageEnvelope> outputStream = 
graphSpec.getOutputStream("test-stream-1");
+    OutputStream<TestMessageEnvelope> outputStream = 
graphSpec.getOutputStream(streamId);
 
     OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = 
(OutputStreamImpl) outputStream;
-    assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), 
outputStreamImpl);
-    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+    assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl);
+    assertEquals(streamId, outputStreamImpl.getStreamId());
     assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
     assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
   }
 
   @Test
   public void testGetOutputStreamWithDefaultKeyValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+    String streamId = "test-stream-1";
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, 
mock(Config.class));
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
     KVSerde mockKVSerde = mock(KVSerde.class);
     Serde mockKeySerde = mock(Serde.class);
     Serde mockValueSerde = mock(Serde.class);
@@ -299,89 +264,75 @@ public class TestStreamGraphSpec {
     doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
     graphSpec.setDefaultSerde(mockKVSerde);
 
-    OutputStream<TestMessageEnvelope> outputStream = 
graphSpec.getOutputStream("test-stream-1");
+    OutputStream<TestMessageEnvelope> outputStream = 
graphSpec.getOutputStream(streamId);
 
     OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = 
(OutputStreamImpl) outputStream;
-    assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), 
outputStreamImpl);
-    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+    assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl);
+    assertEquals(streamId, outputStreamImpl.getStreamId());
     assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
     assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
   }
 
   @Test
   public void testGetOutputStreamWithDefaultDefaultSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+    String streamId = "test-stream-1";
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, 
mock(Config.class));
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
-    OutputStream<TestMessageEnvelope> outputStream = 
graphSpec.getOutputStream("test-stream-1");
+    OutputStream<TestMessageEnvelope> outputStream = 
graphSpec.getOutputStream(streamId);
 
     OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = 
(OutputStreamImpl) outputStream;
-    assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), 
outputStreamImpl);
-    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+    assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl);
+    assertEquals(streamId, outputStreamImpl.getStreamId());
     assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
     assertTrue(outputStreamImpl.getValueSerde() instanceof NoOpSerde);
   }
 
   @Test(expected = IllegalStateException.class)
   public void testSetDefaultSerdeAfterGettingStreams() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
+    String streamId = "test-stream-1";
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, 
mock(Config.class));
-    graphSpec.getInputStream("test-stream-1");
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
+    graphSpec.getInputStream(streamId);
     graphSpec.setDefaultSerde(mock(Serde.class)); // should throw exception
   }
 
   @Test(expected = IllegalStateException.class)
   public void testSetDefaultSerdeAfterGettingOutputStream() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, 
mock(Config.class));
-    graphSpec.getOutputStream("test-stream-1");
+    String streamId = "test-stream-1";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
+    graphSpec.getOutputStream(streamId);
     graphSpec.setDefaultSerde(mock(Serde.class)); // should throw exception
   }
 
   @Test(expected = IllegalStateException.class)
   public void testSetDefaultSerdeAfterGettingIntermediateStream() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, 
mock(Config.class));
-    graphSpec.getIntermediateStream("test-stream-1", null);
+    String streamId = "test-stream-1";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
+    graphSpec.getIntermediateStream(streamId, null);
     graphSpec.setDefaultSerde(mock(Serde.class)); // should throw exception
   }
 
   @Test(expected = IllegalStateException.class)
   public void testGetSameOutputStreamTwice() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, 
mock(Config.class));
-    graphSpec.getOutputStream("test-stream-1");
-    graphSpec.getOutputStream("test-stream-1"); // should throw exception
+    String streamId = "test-stream-1";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
+    graphSpec.getOutputStream(streamId);
+    graphSpec.getOutputStream(streamId); // should throw exception
   }
 
   @Test
   public void testGetIntermediateStreamWithValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    String mockStreamName = "mockStreamName";
-    when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    String streamId = "stream-1";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
     Serde mockValueSerde = mock(Serde.class);
     IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graphSpec.getIntermediateStream(mockStreamName, mockValueSerde);
+        graphSpec.getIntermediateStream(streamId, mockValueSerde);
 
-    assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), 
intermediateStreamImpl.getOperatorSpec());
-    assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), 
intermediateStreamImpl.getOutputStream());
-    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+    assertEquals(graphSpec.getInputOperators().get(streamId), 
intermediateStreamImpl.getOperatorSpec());
+    assertEquals(graphSpec.getOutputStreams().get(streamId), 
intermediateStreamImpl.getOutputStream());
+    assertEquals(streamId, intermediateStreamImpl.getStreamId());
     assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() 
instanceof NoOpSerde);
     assertEquals(mockValueSerde, 
intermediateStreamImpl.getOutputStream().getValueSerde());
     assertTrue(((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
@@ -390,13 +341,8 @@ public class TestStreamGraphSpec {
 
   @Test
   public void testGetIntermediateStreamWithKeyValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    String mockStreamName = "mockStreamName";
-    when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    String streamId = "streamId";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
     KVSerde mockKVSerde = mock(KVSerde.class);
     Serde mockKeySerde = mock(Serde.class);
@@ -404,11 +350,11 @@ public class TestStreamGraphSpec {
     doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
     doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
     IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graphSpec.getIntermediateStream(mockStreamName, mockKVSerde);
+        graphSpec.getIntermediateStream(streamId, mockKVSerde);
 
-    assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), 
intermediateStreamImpl.getOperatorSpec());
-    assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), 
intermediateStreamImpl.getOutputStream());
-    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+    assertEquals(graphSpec.getInputOperators().get(streamId), 
intermediateStreamImpl.getOperatorSpec());
+    assertEquals(graphSpec.getOutputStreams().get(streamId), 
intermediateStreamImpl.getOutputStream());
+    assertEquals(streamId, intermediateStreamImpl.getStreamId());
     assertEquals(mockKeySerde, 
intermediateStreamImpl.getOutputStream().getKeySerde());
     assertEquals(mockValueSerde, 
intermediateStreamImpl.getOutputStream().getValueSerde());
     assertEquals(mockKeySerde, ((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getKeySerde());
@@ -417,22 +363,17 @@ public class TestStreamGraphSpec {
 
   @Test
   public void testGetIntermediateStreamWithDefaultValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    String mockStreamName = "mockStreamName";
-    when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
-
-    StreamGraphSpec graph = new StreamGraphSpec(mockRunner, mockConfig);
+    String streamId = "streamId";
+    StreamGraphSpec graph = new StreamGraphSpec(mock(Config.class));
 
     Serde mockValueSerde = mock(Serde.class);
     graph.setDefaultSerde(mockValueSerde);
     IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graph.getIntermediateStream(mockStreamName, null);
+        graph.getIntermediateStream(streamId, null);
 
-    assertEquals(graph.getInputOperators().get(mockStreamSpec), 
intermediateStreamImpl.getOperatorSpec());
-    assertEquals(graph.getOutputStreams().get(mockStreamSpec), 
intermediateStreamImpl.getOutputStream());
-    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+    assertEquals(graph.getInputOperators().get(streamId), 
intermediateStreamImpl.getOperatorSpec());
+    assertEquals(graph.getOutputStreams().get(streamId), 
intermediateStreamImpl.getOutputStream());
+    assertEquals(streamId, intermediateStreamImpl.getStreamId());
     assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() 
instanceof NoOpSerde);
     assertEquals(mockValueSerde, 
intermediateStreamImpl.getOutputStream().getValueSerde());
     assertTrue(((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
@@ -441,13 +382,10 @@ public class TestStreamGraphSpec {
 
   @Test
   public void testGetIntermediateStreamWithDefaultKeyValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    String mockStreamName = "mockStreamName";
-    when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    String streamId = "streamId";
+    
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
 
     KVSerde mockKVSerde = mock(KVSerde.class);
     Serde mockKeySerde = mock(Serde.class);
@@ -456,11 +394,11 @@ public class TestStreamGraphSpec {
     doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
     graphSpec.setDefaultSerde(mockKVSerde);
     IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graphSpec.getIntermediateStream(mockStreamName, null);
+        graphSpec.getIntermediateStream(streamId, null);
 
-    assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), 
intermediateStreamImpl.getOperatorSpec());
-    assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), 
intermediateStreamImpl.getOutputStream());
-    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+    assertEquals(graphSpec.getInputOperators().get(streamId), 
intermediateStreamImpl.getOperatorSpec());
+    assertEquals(graphSpec.getOutputStreams().get(streamId), 
intermediateStreamImpl.getOutputStream());
+    assertEquals(streamId, intermediateStreamImpl.getStreamId());
     assertEquals(mockKeySerde, 
intermediateStreamImpl.getOutputStream().getKeySerde());
     assertEquals(mockValueSerde, 
intermediateStreamImpl.getOutputStream().getValueSerde());
     assertEquals(mockKeySerde, ((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getKeySerde());
@@ -469,19 +407,16 @@ public class TestStreamGraphSpec {
 
   @Test
   public void testGetIntermediateStreamWithDefaultDefaultSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    String mockStreamName = "mockStreamName";
-    when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    String streamId = "streamId";
+    
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
     IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graphSpec.getIntermediateStream(mockStreamName, null);
+        graphSpec.getIntermediateStream(streamId, null);
 
-    assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), 
intermediateStreamImpl.getOperatorSpec());
-    assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), 
intermediateStreamImpl.getOutputStream());
-    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+    assertEquals(graphSpec.getInputOperators().get(streamId), 
intermediateStreamImpl.getOperatorSpec());
+    assertEquals(graphSpec.getOutputStreams().get(streamId), 
intermediateStreamImpl.getOutputStream());
+    assertEquals(streamId, intermediateStreamImpl.getStreamId());
     assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() 
instanceof NoOpSerde);
     assertTrue(intermediateStreamImpl.getOutputStream().getValueSerde() 
instanceof NoOpSerde);
     assertTrue(((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
@@ -490,22 +425,18 @@ public class TestStreamGraphSpec {
 
   @Test(expected = IllegalStateException.class)
   public void testGetSameIntermediateStreamTwice() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, 
mock(Config.class));
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
     graphSpec.getIntermediateStream("test-stream-1", mock(Serde.class));
     graphSpec.getIntermediateStream("test-stream-1", mock(Serde.class));
   }
 
   @Test
   public void testGetNextOpIdIncrementsId() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);
     when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
     when(mockConfig.get(eq(JobConfig.JOB_ID()), 
anyString())).thenReturn("1234");
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
     assertEquals("jobName-1234-merge-0", graphSpec.getNextOpId(OpCode.MERGE, 
null));
     assertEquals("jobName-1234-join-customName", 
graphSpec.getNextOpId(OpCode.JOIN, "customName"));
     assertEquals("jobName-1234-map-2", graphSpec.getNextOpId(OpCode.MAP, 
null));
@@ -513,24 +444,22 @@ public class TestStreamGraphSpec {
 
   @Test(expected = SamzaException.class)
   public void testGetNextOpIdRejectsDuplicates() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);
     when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
     when(mockConfig.get(eq(JobConfig.JOB_ID()), 
anyString())).thenReturn("1234");
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
     assertEquals("jobName-1234-join-customName", 
graphSpec.getNextOpId(OpCode.JOIN, "customName"));
     graphSpec.getNextOpId(OpCode.JOIN, "customName"); // should throw
   }
 
   @Test
   public void testUserDefinedIdValidation() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);
     when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
     when(mockConfig.get(eq(JobConfig.JOB_ID()), 
anyString())).thenReturn("1234");
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
 
     // null and empty userDefinedIDs should fall back to autogenerated IDs.
     try {
@@ -562,36 +491,29 @@ public class TestStreamGraphSpec {
 
   @Test
   public void testGetInputStreamPreservesInsertionOrder() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
-
-    StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", 
"physical-stream-1", "test-system");
-    
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1);
-
-    StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", 
"physical-stream-2", "test-system");
-    
when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2);
-
-    StreamSpec testStreamSpec3 = new StreamSpec("test-stream-3", 
"physical-stream-3", "test-system");
-    
when(mockRunner.getStreamSpec("test-stream-3")).thenReturn(testStreamSpec3);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
 
+    String testStreamId1 = "test-stream-1";
+    String testStreamId2 = "test-stream-2";
+    String testStreamId3 = "test-stream-3";
+    
     graphSpec.getInputStream("test-stream-1");
     graphSpec.getInputStream("test-stream-2");
     graphSpec.getInputStream("test-stream-3");
 
     List<InputOperatorSpec> inputSpecs = new 
ArrayList<>(graphSpec.getInputOperators().values());
     assertEquals(inputSpecs.size(), 3);
-    assertEquals(inputSpecs.get(0).getStreamSpec(), testStreamSpec1);
-    assertEquals(inputSpecs.get(1).getStreamSpec(), testStreamSpec2);
-    assertEquals(inputSpecs.get(2).getStreamSpec(), testStreamSpec3);
+    assertEquals(inputSpecs.get(0).getStreamId(), testStreamId1);
+    assertEquals(inputSpecs.get(1).getStreamId(), testStreamId2);
+    assertEquals(inputSpecs.get(2).getStreamId(), testStreamId3);
   }
 
   @Test
   public void testGetTable() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
 
     BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
     when(mockTableDescriptor.getTableSpec()).thenReturn(

Reply via email to