http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 6fdcacc..4e77fae 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -21,6 +21,7 @@ package org.apache.samza.operators.impl;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+
 import java.io.Serializable;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -28,6 +29,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.BiFunction;
@@ -36,6 +38,7 @@ import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
 import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.container.TaskName;
@@ -45,8 +48,8 @@ import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.functions.ClosableFunction;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.InitableFunction;
@@ -54,20 +57,18 @@ import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.util.TimestampedValue;
 import org.apache.samza.operators.spec.OperatorSpec.OpCode;
-import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
-import java.util.List;
 import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.testUtils.StreamTestUtils;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.SystemClock;
 import org.junit.After;
@@ -76,7 +77,6 @@ import org.junit.Test;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -217,7 +217,7 @@ public class TestOperatorImplGraph {
 
   @Test
   public void testEmptyChain() {
-    StreamGraphSpec graphSpec = new 
StreamGraphSpec(mock(ApplicationRunner.class), mock(Config.class));
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
     OperatorImplGraph opGraph =
         new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), 
mock(Config.class), mock(TaskContextImpl.class), mock(Clock.class));
     assertEquals(0, opGraph.getAllInputOperators().size());
@@ -225,13 +225,26 @@ public class TestOperatorImplGraph {
 
   @Test
   public void testLinearChain() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new 
StreamSpec("input", "input-stream", "input-system"));
-    
when(mockRunner.getStreamSpec(eq("output"))).thenReturn(mock(StreamSpec.class));
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, 
mock(Config.class));
-
-    MessageStream<Object> inputStream = graphSpec.getInputStream("input");
-    OutputStream<Object> outputStream = graphSpec.getOutputStream("output");
+    String inputStreamId = "input";
+    String inputSystem = "input-system";
+    String inputPhysicalName = "input-stream";
+    String outputStreamId = "output";
+    String outputSystem = "output-system";
+    String outputPhysicalName = "output-stream";
+    String intermediateSystem = "intermediate-system";
+
+    HashMap<String, String> configs = new HashMap<>();
+    configs.put(JobConfig.JOB_NAME(), "jobName");
+    configs.put(JobConfig.JOB_ID(), "jobId");
+    configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), intermediateSystem);
+    StreamTestUtils.addStreamConfigs(configs, inputStreamId, inputSystem, 
inputPhysicalName);
+    StreamTestUtils.addStreamConfigs(configs, outputStreamId, outputSystem, 
outputPhysicalName);
+    Config config = new MapConfig(configs);
+
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
+
+    MessageStream<Object> inputStream = 
graphSpec.getInputStream(inputStreamId);
+    OutputStream<Object> outputStream = 
graphSpec.getOutputStream(outputStreamId);
 
     inputStream
         .filter(mock(FilterFunction.class))
@@ -242,9 +255,9 @@ public class TestOperatorImplGraph {
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
     when(mockTaskContext.getTaskName()).thenReturn(new TaskName("task 0"));
     OperatorImplGraph opImplGraph =
-        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), 
mock(Config.class), mockTaskContext, mock(Clock.class));
+        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, 
mockTaskContext, mock(Clock.class));
 
-    InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new 
SystemStream("input-system", "input-stream"));
+    InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new 
SystemStream(inputSystem, inputPhysicalName));
     assertEquals(1, inputOpImpl.registeredOperators.size());
 
     OperatorImpl filterOpImpl = (StreamOperatorImpl) 
inputOpImpl.registeredOperators.iterator().next();
@@ -262,18 +275,27 @@ public class TestOperatorImplGraph {
 
   @Test
   public void testPartitionByChain() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new 
StreamSpec("input", "input-stream", "input-system"));
-    when(mockRunner.getStreamSpec(eq("output"))).thenReturn(new 
StreamSpec("output", "output-stream", "output-system"));
-    when(mockRunner.getStreamSpec(eq("jobName-jobId-partition_by-p1")))
-        .thenReturn(new StreamSpec("intermediate", "intermediate-stream", 
"intermediate-system"));
-    Config mockConfig = mock(Config.class);
-    when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName");
-    when(mockConfig.get(eq(JobConfig.JOB_ID()), 
anyString())).thenReturn("jobId");
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
-    MessageStream<Object> inputStream = graphSpec.getInputStream("input");
+    String inputStreamId = "input";
+    String inputSystem = "input-system";
+    String inputPhysicalName = "input-stream";
+    String outputStreamId = "output";
+    String outputSystem = "output-system";
+    String outputPhysicalName = "output-stream";
+    String intermediateStreamId = "jobName-jobId-partition_by-p1";
+    String intermediateSystem = "intermediate-system";
+
+    HashMap<String, String> configs = new HashMap<>();
+    configs.put(JobConfig.JOB_NAME(), "jobName");
+    configs.put(JobConfig.JOB_ID(), "jobId");
+    configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), intermediateSystem);
+    StreamTestUtils.addStreamConfigs(configs, inputStreamId, inputSystem, 
inputPhysicalName);
+    StreamTestUtils.addStreamConfigs(configs, outputStreamId, outputSystem, 
outputPhysicalName);
+    Config config = new MapConfig(configs);
+
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
+    MessageStream<Object> inputStream = 
graphSpec.getInputStream(inputStreamId);
     OutputStream<KV<Integer, String>> outputStream = graphSpec
-        .getOutputStream("output", KVSerde.of(mock(IntegerSerde.class), 
mock(StringSerde.class)));
+        .getOutputStream(outputStreamId, KVSerde.of(mock(IntegerSerde.class), 
mock(StringSerde.class)));
 
     inputStream
         .partitionBy(Object::hashCode, Object::toString,
@@ -291,12 +313,12 @@ public class TestOperatorImplGraph {
     
when(taskModel.getSystemStreamPartitions()).thenReturn(Collections.emptySet());
     when(mockTaskContext.getJobModel()).thenReturn(jobModel);
     SamzaContainerContext containerContext =
-        new SamzaContainerContext("0", mockConfig, Collections.singleton(new 
TaskName("task 0")), new MetricsRegistryMap());
+        new SamzaContainerContext("0", config, Collections.singleton(new 
TaskName("task 0")), new MetricsRegistryMap());
     
when(mockTaskContext.getSamzaContainerContext()).thenReturn(containerContext);
     OperatorImplGraph opImplGraph =
-        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mockConfig, 
mockTaskContext, mock(Clock.class));
+        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, 
mockTaskContext, mock(Clock.class));
 
-    InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new 
SystemStream("input-system", "input-stream"));
+    InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new 
SystemStream(inputSystem, inputPhysicalName));
     assertEquals(1, inputOpImpl.registeredOperators.size());
 
     OperatorImpl partitionByOpImpl = (PartitionByOperatorImpl) 
inputOpImpl.registeredOperators.iterator().next();
@@ -304,7 +326,7 @@ public class TestOperatorImplGraph {
     assertEquals(OpCode.PARTITION_BY, 
partitionByOpImpl.getOperatorSpec().getOpCode());
 
     InputOperatorImpl repartitionedInputOpImpl =
-        opImplGraph.getInputOperator(new SystemStream("intermediate-system", 
"intermediate-stream"));
+        opImplGraph.getInputOperator(new SystemStream(intermediateSystem, 
intermediateStreamId));
     assertEquals(1, repartitionedInputOpImpl.registeredOperators.size());
 
     OperatorImpl sendToOpImpl = (OutputOperatorImpl) 
repartitionedInputOpImpl.registeredOperators.iterator().next();
@@ -314,18 +336,20 @@ public class TestOperatorImplGraph {
 
   @Test
   public void testBroadcastChain() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new 
StreamSpec("input", "input-stream", "input-system"));
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, 
mock(Config.class));
+    String inputStreamId = "input";
+    HashMap<String, String> configMap = new HashMap<>();
+    StreamTestUtils.addStreamConfigs(configMap, "input", "input-system", 
"input-stream");
+    Config config = new MapConfig(configMap);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
 
-    MessageStream<Object> inputStream = graphSpec.getInputStream("input");
+    MessageStream<Object> inputStream = 
graphSpec.getInputStream(inputStreamId);
     inputStream.filter(mock(FilterFunction.class));
     inputStream.map(mock(MapFunction.class));
 
     TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
     OperatorImplGraph opImplGraph =
-        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), 
mock(Config.class), mockTaskContext, mock(Clock.class));
+        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, 
mockTaskContext, mock(Clock.class));
 
     InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new 
SystemStream("input-system", "input-stream"));
     assertEquals(2, inputOpImpl.registeredOperators.size());
@@ -337,12 +361,10 @@ public class TestOperatorImplGraph {
 
   @Test
   public void testMergeChain() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    when(mockRunner.getStreamSpec(eq("input")))
-        .thenReturn(new StreamSpec("input", "input-stream", "input-system"));
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, 
mock(Config.class));
+    String inputStreamId = "input";
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
 
-    MessageStream<Object> inputStream = graphSpec.getInputStream("input");
+    MessageStream<Object> inputStream = 
graphSpec.getInputStream(inputStreamId);
     MessageStream<Object> stream1 = 
inputStream.filter(mock(FilterFunction.class));
     MessageStream<Object> stream2 = inputStream.map(mock(MapFunction.class));
     MessageStream<Object> mergedStream = 
stream1.merge(Collections.singleton(stream2));
@@ -372,20 +394,23 @@ public class TestOperatorImplGraph {
 
   @Test
   public void testJoinChain() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    when(mockRunner.getStreamSpec(eq("input1"))).thenReturn(new 
StreamSpec("input1", "input-stream1", "input-system"));
-    when(mockRunner.getStreamSpec(eq("input2"))).thenReturn(new 
StreamSpec("input2", "input-stream2", "input-system"));
-    Config mockConfig = mock(Config.class);
-    when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName");
-    when(mockConfig.get(eq(JobConfig.JOB_ID()), 
anyString())).thenReturn("jobId");
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    String inputStreamId1 = "input1";
+    String inputStreamId2 = "input2";
+
+    HashMap<String, String> configs = new HashMap<>();
+    configs.put(JobConfig.JOB_NAME(), "jobName");
+    configs.put(JobConfig.JOB_ID(), "jobId");
+    StreamTestUtils.addStreamConfigs(configs, "input1", "input-system", 
"input-stream1");
+    StreamTestUtils.addStreamConfigs(configs, "input2", "input-system", 
"input-stream2");
+    Config config = new MapConfig(configs);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
 
     Integer joinKey = new Integer(1);
     Function<Object, Integer> keyFn = (Function & Serializable) m -> joinKey;
     JoinFunction testJoinFunction = new 
TestJoinFunction("jobName-jobId-join-j1",
         (BiFunction & Serializable) (m1, m2) -> KV.of(m1, m2), keyFn, keyFn);
-    MessageStream<Object> inputStream1 = graphSpec.getInputStream("input1", 
new NoOpSerde<>());
-    MessageStream<Object> inputStream2 = graphSpec.getInputStream("input2", 
new NoOpSerde<>());
+    MessageStream<Object> inputStream1 = 
graphSpec.getInputStream(inputStreamId1, new NoOpSerde<>());
+    MessageStream<Object> inputStream2 = 
graphSpec.getInputStream(inputStreamId2, new NoOpSerde<>());
     inputStream1.join(inputStream2, testJoinFunction,
         mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(1), "j1");
 
@@ -398,7 +423,7 @@ public class TestOperatorImplGraph {
     KeyValueStore mockRightStore = mock(KeyValueStore.class);
     
when(mockTaskContext.getStore(eq("jobName-jobId-join-j1-R"))).thenReturn(mockRightStore);
     OperatorImplGraph opImplGraph =
-        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mockConfig, 
mockTaskContext, mock(Clock.class));
+        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, 
mockTaskContext, mock(Clock.class));
 
     // verify that join function is initialized once.
     assertEquals(TestJoinFunction.getInstanceByTaskName(mockTaskName, 
"jobName-jobId-join-j1").numInitCalled, 1);
@@ -434,18 +459,17 @@ public class TestOperatorImplGraph {
 
   @Test
   public void testOperatorGraphInitAndClose() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    when(mockRunner.getStreamSpec("input1")).thenReturn(new 
StreamSpec("input1", "input-stream1", "input-system"));
-    when(mockRunner.getStreamSpec("input2")).thenReturn(new 
StreamSpec("input2", "input-stream2", "input-system"));
+    String inputStreamId1 = "input1";
+    String inputStreamId2 = "input2";
     Config mockConfig = mock(Config.class);
     TaskName mockTaskName = mock(TaskName.class);
     TaskContextImpl mockContext = mock(TaskContextImpl.class);
     when(mockContext.getTaskName()).thenReturn(mockTaskName);
     when(mockContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
 
-    MessageStream<Object> inputStream1 = graphSpec.getInputStream("input1");
-    MessageStream<Object> inputStream2 = graphSpec.getInputStream("input2");
+    MessageStream<Object> inputStream1 = 
graphSpec.getInputStream(inputStreamId1);
+    MessageStream<Object> inputStream2 = 
graphSpec.getInputStream(inputStreamId2);
 
     Function mapFn = (Function & Serializable) m -> m;
     inputStream1.map(new TestMapFunction<Object, Object>("1", mapFn))
@@ -476,12 +500,19 @@ public class TestOperatorImplGraph {
   @Test
   public void testGetStreamToConsumerTasks() {
     String system = "test-system";
-    String stream0 = "test-stream-0";
-    String stream1 = "test-stream-1";
+    String streamId0 = "test-stream-0";
+    String streamId1 = "test-stream-1";
+
+    HashMap<String, String> configs = new HashMap<>();
+    configs.put(JobConfig.JOB_NAME(), "test-app");
+    configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system");
+    StreamTestUtils.addStreamConfigs(configs, streamId0, system, streamId0);
+    StreamTestUtils.addStreamConfigs(configs, streamId1, system, streamId1);
+    Config config = new MapConfig(configs);
 
-    SystemStreamPartition ssp0 = new SystemStreamPartition(system, stream0, 
new Partition(0));
-    SystemStreamPartition ssp1 = new SystemStreamPartition(system, stream0, 
new Partition(1));
-    SystemStreamPartition ssp2 = new SystemStreamPartition(system, stream1, 
new Partition(0));
+    SystemStreamPartition ssp0 = new SystemStreamPartition(system, streamId0, 
new Partition(0));
+    SystemStreamPartition ssp1 = new SystemStreamPartition(system, streamId0, 
new Partition(1));
+    SystemStreamPartition ssp2 = new SystemStreamPartition(system, streamId1, 
new Partition(0));
 
     TaskName task0 = new TaskName("Task 0");
     TaskName task1 = new TaskName("Task 1");
@@ -497,7 +528,7 @@ public class TestOperatorImplGraph {
     cms.put(cm0.getProcessorId(), cm0);
     cms.put(cm1.getProcessorId(), cm1);
 
-    JobModel jobModel = new JobModel(new MapConfig(), cms, null);
+    JobModel jobModel = new JobModel(config, cms, null);
     Multimap<SystemStream, String> streamToTasks = 
OperatorImplGraph.getStreamToConsumerTasks(jobModel);
     assertEquals(streamToTasks.get(ssp0.getSystemStream()).size(), 2);
     assertEquals(streamToTasks.get(ssp2.getSystemStream()).size(), 1);
@@ -505,56 +536,44 @@ public class TestOperatorImplGraph {
 
   @Test
   public void testGetOutputToInputStreams() {
-    Map<String, String> configMap = new HashMap<>();
-    configMap.put(JobConfig.JOB_NAME(), "test-app");
-    configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system");
-    Config config = new MapConfig(configMap);
-
-    /**
-     * the graph looks like the following. number of partitions in 
parentheses. quotes indicate expected value.
-     *
-     *                                    input1 -> map -> join -> partitionBy 
(10) -> output1
-     *                                                       |
-     *                                     input2 -> filter -|
-     *                                                       |
-     *           input3 -> filter -> partitionBy -> map -> join -> output2
-     *
-     */
-    StreamSpec input1 = new StreamSpec("input1", "input1", "system1");
-    StreamSpec input2 = new StreamSpec("input2", "input2", "system2");
-    StreamSpec input3 = new StreamSpec("input3", "input3", "system2");
-
-    StreamSpec output1 = new StreamSpec("output1", "output1", "system1");
-    StreamSpec output2 = new StreamSpec("output2", "output2", "system2");
-
-    ApplicationRunner runner = mock(ApplicationRunner.class);
-    when(runner.getStreamSpec("input1")).thenReturn(input1);
-    when(runner.getStreamSpec("input2")).thenReturn(input2);
-    when(runner.getStreamSpec("input3")).thenReturn(input3);
-    when(runner.getStreamSpec("output1")).thenReturn(output1);
-    when(runner.getStreamSpec("output2")).thenReturn(output2);
-
-    // intermediate streams used in tests
-    StreamSpec int1 = new StreamSpec("test-app-1-partition_by-p2", 
"test-app-1-partition_by-p2", "default-system");
-    StreamSpec int2 = new StreamSpec("test-app-1-partition_by-p1", 
"test-app-1-partition_by-p1", "default-system");
-    when(runner.getStreamSpec("test-app-1-partition_by-p2")).thenReturn(int1);
-    when(runner.getStreamSpec("test-app-1-partition_by-p1")).thenReturn(int2);
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
-    MessageStream messageStream1 = graphSpec.getInputStream("input1").map(m -> 
m);
-    MessageStream messageStream2 = graphSpec.getInputStream("input2").filter(m 
-> true);
+    String inputStreamId1 = "input1";
+    String inputStreamId2 = "input2";
+    String inputStreamId3 = "input3";
+    String inputSystem = "input-system";
+
+    String outputStreamId1 = "output1";
+    String outputStreamId2 = "output2";
+    String outputSystem = "output-system";
+
+    String intStreamId1 = "test-app-1-partition_by-p1";
+    String intStreamId2 = "test-app-1-partition_by-p2";
+    String intSystem = "test-system";
+
+    HashMap<String, String> configs = new HashMap<>();
+    configs.put(JobConfig.JOB_NAME(), "test-app");
+    configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), intSystem);
+    StreamTestUtils.addStreamConfigs(configs, inputStreamId1, inputSystem, 
inputStreamId1);
+    StreamTestUtils.addStreamConfigs(configs, inputStreamId2, inputSystem, 
inputStreamId2);
+    StreamTestUtils.addStreamConfigs(configs, inputStreamId3, inputSystem, 
inputStreamId3);
+    StreamTestUtils.addStreamConfigs(configs, outputStreamId1, outputSystem, 
outputStreamId1);
+    StreamTestUtils.addStreamConfigs(configs, outputStreamId2, outputSystem, 
outputStreamId2);
+    Config config = new MapConfig(configs);
+
+    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
+    MessageStream messageStream1 = 
graphSpec.getInputStream(inputStreamId1).map(m -> m);
+    MessageStream messageStream2 = 
graphSpec.getInputStream(inputStreamId2).filter(m -> true);
     MessageStream messageStream3 =
-        graphSpec.getInputStream("input3")
+        graphSpec.getInputStream(inputStreamId3)
             .filter(m -> true)
-            .partitionBy(m -> "hehe", m -> m, "p1")
+            .partitionBy(m -> "m", m -> m, "p1")
             .map(m -> m);
-    OutputStream<Object> outputStream1 = graphSpec.getOutputStream("output1");
-    OutputStream<Object> outputStream2 = graphSpec.getOutputStream("output2");
+    OutputStream<Object> outputStream1 = 
graphSpec.getOutputStream(outputStreamId1);
+    OutputStream<Object> outputStream2 = 
graphSpec.getOutputStream(outputStreamId2);
 
     messageStream1
         .join(messageStream2, mock(JoinFunction.class),
             mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(2), "j1")
-        .partitionBy(m -> "haha", m -> m, "p2")
+        .partitionBy(m -> "m", m -> m, "p2")
         .sendTo(outputStream1);
     messageStream3
         .join(messageStream2, mock(JoinFunction.class),
@@ -562,19 +581,41 @@ public class TestOperatorImplGraph {
         .sendTo(outputStream2);
 
     Multimap<SystemStream, SystemStream> outputToInput =
-        
OperatorImplGraph.getIntermediateToInputStreamsMap(graphSpec.getOperatorSpecGraph());
-    Collection<SystemStream> inputs = outputToInput.get(int1.toSystemStream());
+        
OperatorImplGraph.getIntermediateToInputStreamsMap(graphSpec.getOperatorSpecGraph(),
 new StreamConfig(config));
+    Collection<SystemStream> inputs = outputToInput.get(new 
SystemStream(intSystem, intStreamId2));
     assertEquals(inputs.size(), 2);
-    assertTrue(inputs.contains(input1.toSystemStream()));
-    assertTrue(inputs.contains(input2.toSystemStream()));
+    assertTrue(inputs.contains(new SystemStream(inputSystem, inputStreamId1)));
+    assertTrue(inputs.contains(new SystemStream(inputSystem, inputStreamId2)));
 
-    inputs = outputToInput.get(int2.toSystemStream());
+    inputs = outputToInput.get(new SystemStream(intSystem, intStreamId1));
     assertEquals(inputs.size(), 1);
-    assertEquals(inputs.iterator().next(), input3.toSystemStream());
+    assertEquals(inputs.iterator().next(), new SystemStream(inputSystem, 
inputStreamId3));
   }
 
   @Test
   public void testGetProducerTaskCountForIntermediateStreams() {
+    String inputStreamId1 = "input1";
+    String inputStreamId2 = "input2";
+    String inputStreamId3 = "input3";
+    String inputSystem1 = "system1";
+    String inputSystem2 = "system2";
+
+    HashMap<String, String> configs = new HashMap<>();
+    configs.put(JobConfig.JOB_NAME(), "test-app");
+    configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), inputSystem1);
+    StreamTestUtils.addStreamConfigs(configs, inputStreamId1, inputSystem1, 
inputStreamId1);
+    StreamTestUtils.addStreamConfigs(configs, inputStreamId2, inputSystem2, 
inputStreamId2);
+    StreamTestUtils.addStreamConfigs(configs, inputStreamId3, inputSystem2, 
inputStreamId3);
+    Config config = new MapConfig(configs);
+
+    SystemStream input1 = new SystemStream("system1", "intput1");
+    SystemStream input2 = new SystemStream("system2", "intput2");
+    SystemStream input3 = new SystemStream("system2", "intput3");
+
+    SystemStream int1 = new SystemStream("system1", "int1");
+    SystemStream int2 = new SystemStream("system1", "int2");
+
+
     /**
      * the task assignment looks like the following:
      *
@@ -585,15 +626,6 @@ public class TestOperatorImplGraph {
      * input3 ------> task1 -----------> int2
      *
      */
-
-    SystemStream input1 = new SystemStream("system1", "intput1");
-    SystemStream input2 = new SystemStream("system2", "intput2");
-    SystemStream input3 = new SystemStream("system2", "intput3");
-
-    SystemStream int1 = new SystemStream("system1", "int1");
-    SystemStream int2 = new SystemStream("system1", "int2");
-
-
     String task0 = "Task 0";
     String task1 = "Task 1";
     String task2 = "Task 2";

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
index 9741fc4..0ef6680 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
@@ -42,13 +42,11 @@ import org.apache.samza.operators.triggers.Triggers;
 import org.apache.samza.operators.windows.AccumulationMode;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.MessageCollector;
@@ -80,7 +78,6 @@ public class TestWindowOperator {
   private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 
2, 3);
   private Config config;
   private TaskContextImpl taskContext;
-  private ApplicationRunner runner;
 
   @Before
   public void setup() throws Exception {
@@ -88,19 +85,16 @@ public class TestWindowOperator {
     when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName");
     when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
     taskContext = mock(TaskContextImpl.class);
-    runner = mock(ApplicationRunner.class);
     Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde());
     Serde storeValSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
 
     when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
-        .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
+        .of(new SystemStreamPartition("kafka", "integTestExecutionPlannerers", 
new Partition(0))));
     when(taskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
     when(taskContext.getStore("jobName-jobId-window-w1"))
         .thenReturn(new TestInMemoryStore<>(storeKeySerde, storeValSerde));
-    when(runner.getStreamSpec("integers")).thenReturn(new 
StreamSpec("integers", "integers", "kafka"));
 
     Map<String, String> mapConfig = new HashMap<>();
-    mapConfig.put("app.runner.class", 
"org.apache.samza.runtime.LocalApplicationRunner");
     mapConfig.put("job.default.system", "kafka");
     mapConfig.put("job.name", "jobName");
     mapConfig.put("job.id", "jobId");
@@ -552,7 +546,7 @@ public class TestWindowOperator {
 
   private StreamGraphSpec getKeyedTumblingWindowStreamGraph(AccumulationMode 
mode,
       Duration duration, Trigger<KV<Integer, Integer>> earlyTrigger) throws 
IOException {
-    StreamGraphSpec graph = new StreamGraphSpec(runner, config);
+    StreamGraphSpec graph = new StreamGraphSpec(config);
 
     KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new 
IntegerSerde());
     graph.getInputStream("integers", kvSerde)
@@ -568,7 +562,7 @@ public class TestWindowOperator {
 
   private StreamGraphSpec getTumblingWindowStreamGraph(AccumulationMode mode,
       Duration duration, Trigger<KV<Integer, Integer>> earlyTrigger) throws 
IOException {
-    StreamGraphSpec graph = new StreamGraphSpec(runner, config);
+    StreamGraphSpec graph = new StreamGraphSpec(config);
 
     KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new 
IntegerSerde());
     graph.getInputStream("integers", kvSerde)
@@ -582,7 +576,7 @@ public class TestWindowOperator {
   }
 
   private StreamGraphSpec getKeyedSessionWindowStreamGraph(AccumulationMode 
mode, Duration duration) throws IOException {
-    StreamGraphSpec graph = new StreamGraphSpec(runner, config);
+    StreamGraphSpec graph = new StreamGraphSpec(config);
 
     KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new 
IntegerSerde());
     graph.getInputStream("integers", kvSerde)
@@ -597,7 +591,7 @@ public class TestWindowOperator {
 
   private StreamGraphSpec 
getAggregateTumblingWindowStreamGraph(AccumulationMode mode, Duration 
timeDuration,
         Trigger<IntegerEnvelope> earlyTrigger) throws IOException {
-    StreamGraphSpec graph = new StreamGraphSpec(runner, config);
+    StreamGraphSpec graph = new StreamGraphSpec(config);
 
     MessageStream<KV<Integer, Integer>> integers = 
graph.getInputStream("integers",
         KVSerde.of(new IntegerSerde(), new IntegerSerde()));

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
 
b/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
index b39b0d0..7704a5b 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
@@ -31,7 +31,6 @@ import org.apache.samza.operators.TableImpl;
 import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.serializers.SerializableSerde;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.table.TableSpec;
 
 import static org.junit.Assert.*;
@@ -105,8 +104,8 @@ public class OperatorSpecTestUtils {
     assertEquals(oTable.getTableSpec(), nTable.getTableSpec());
   }
 
-  private static void assertClonedOutputs(Map<StreamSpec, OutputStreamImpl> 
originalOutputs,
-      Map<StreamSpec, OutputStreamImpl> clonedOutputs) {
+  private static void assertClonedOutputs(Map<String, OutputStreamImpl> 
originalOutputs,
+      Map<String, OutputStreamImpl> clonedOutputs) {
     assertEquals(originalOutputs.size(), clonedOutputs.size());
     assertEquals(originalOutputs.keySet(), clonedOutputs.keySet());
     Iterator<OutputStreamImpl> oIter = originalOutputs.values().iterator();
@@ -117,12 +116,11 @@ public class OperatorSpecTestUtils {
   private static void assertClonedOutputImpl(OutputStreamImpl oOutput, 
OutputStreamImpl nOutput) {
     assertNotEquals(oOutput, nOutput);
     assertEquals(oOutput.isKeyed(), nOutput.isKeyed());
-    assertEquals(oOutput.getSystemStream(), nOutput.getSystemStream());
-    assertEquals(oOutput.getStreamSpec(), nOutput.getStreamSpec());
+    assertEquals(oOutput.getStreamId(), nOutput.getStreamId());
   }
 
-  private static void assertClonedInputs(Map<StreamSpec, InputOperatorSpec> 
originalInputs,
-      Map<StreamSpec, InputOperatorSpec> clonedInputs) {
+  private static void assertClonedInputs(Map<String, InputOperatorSpec> 
originalInputs,
+      Map<String, InputOperatorSpec> clonedInputs) {
     assertEquals(originalInputs.size(), clonedInputs.size());
     assertEquals(originalInputs.keySet(), clonedInputs.keySet());
     Iterator<InputOperatorSpec> oIter = originalInputs.values().iterator();
@@ -134,7 +132,7 @@ public class OperatorSpecTestUtils {
     assertNotEquals(originalInput, clonedInput);
     assertEquals(originalInput.getOpId(), clonedInput.getOpId());
     assertEquals(originalInput.getOpCode(), clonedInput.getOpCode());
-    assertEquals(originalInput.getStreamSpec(), clonedInput.getStreamSpec());
+    assertEquals(originalInput.getStreamId(), clonedInput.getStreamId());
     assertEquals(originalInput.isKeyed(), clonedInput.isKeyed());
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
 
b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
index cb221b0..b27c944 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
@@ -40,7 +40,6 @@ import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.table.TableSpec;
 import org.junit.Test;
 import org.mockito.internal.util.reflection.Whitebox;
@@ -50,7 +49,6 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
 
 
 /**
@@ -230,9 +228,8 @@ public class TestOperatorSpec {
       }
     };
 
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
     InputOperatorSpec<String, Object> inputOperatorSpec = new 
InputOperatorSpec<>(
-        mockStreamSpec, new StringSerde("UTF-8"), objSerde, true, "op0");
+        "mockStreamId", new StringSerde("UTF-8"), objSerde, true, "op0");
     InputOperatorSpec<String, Object> inputOpCopy = (InputOperatorSpec<String, 
Object>) OperatorSpecTestUtils.copyOpSpec(inputOperatorSpec);
 
     assertNotEquals("Expected deserialized copy of operator spec should not be 
the same as the original operator spec", inputOperatorSpec, inputOpCopy);
@@ -254,9 +251,8 @@ public class TestOperatorSpec {
         return new byte[0];
       }
     };
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    OutputStreamImpl<KV<String, Object>> outputStrmImpl = new 
OutputStreamImpl<>(mockStreamSpec, new StringSerde("UTF-8"), objSerde, true);
-    OutputOperatorSpec<KV<String, Object>> outputOperatorSpec = new 
OutputOperatorSpec<KV<String, Object>>(outputStrmImpl, "op0");
+    OutputStreamImpl<KV<String, Object>> outputStrmImpl = new 
OutputStreamImpl<>("mockStreamId", new StringSerde("UTF-8"), objSerde, true);
+    OutputOperatorSpec<KV<String, Object>> outputOperatorSpec = new 
OutputOperatorSpec<>(outputStrmImpl, "op0");
     OutputOperatorSpec<KV<String, Object>> outputOpCopy = 
(OutputOperatorSpec<KV<String, Object>>) OperatorSpecTestUtils
         .copyOpSpec(outputOperatorSpec);
     assertNotEquals("Expected deserialized copy of operator spec should not be 
the same as the original operator spec", outputOperatorSpec, outputOpCopy);
@@ -276,10 +272,10 @@ public class TestOperatorSpec {
   public void testJoinOperatorSpec() {
 
     InputOperatorSpec<TestMessageEnvelope, Object> leftOpSpec = new 
InputOperatorSpec<>(
-        new StreamSpec("test-input-1", "test-input-1", "kafka"), new 
NoOpSerde<>(),
+        "test-input-1", new NoOpSerde<>(),
         new NoOpSerde<>(), false, "op0");
     InputOperatorSpec<TestMessageEnvelope, Object> rightOpSpec = new 
InputOperatorSpec<>(
-        new StreamSpec("test-input-2", "test-input-2", "kafka"), new 
NoOpSerde<>(),
+        "test-input-2", new NoOpSerde<>(),
         new NoOpSerde<>(), false, "op1");
 
     Serde<Object> objSerde = new Serde<Object>() {
@@ -341,14 +337,14 @@ public class TestOperatorSpec {
   @Test
   public void testBroadcastOperatorSpec() {
     OutputStreamImpl<TestOutputMessageEnvelope> outputStream =
-        new OutputStreamImpl<>(new StreamSpec("output-0", "outputStream-0", 
"kafka"), new StringSerde("UTF-8"), new 
JsonSerdeV2<TestOutputMessageEnvelope>(), true);
+        new OutputStreamImpl<>("output-0", new StringSerde("UTF-8"), new 
JsonSerdeV2<TestOutputMessageEnvelope>(), true);
     BroadcastOperatorSpec<TestOutputMessageEnvelope> broadcastOpSpec = new 
BroadcastOperatorSpec<>(outputStream, "broadcast-1");
     BroadcastOperatorSpec<TestOutputMessageEnvelope> broadcastOpCopy = 
(BroadcastOperatorSpec<TestOutputMessageEnvelope>) OperatorSpecTestUtils
         .copyOpSpec(broadcastOpSpec);
     assertNotEquals(broadcastOpCopy, broadcastOpSpec);
     assertEquals(broadcastOpCopy.getOpId(), broadcastOpSpec.getOpId());
     assertTrue(broadcastOpCopy.getOutputStream() != 
broadcastOpSpec.getOutputStream());
-    assertEquals(broadcastOpCopy.getOutputStream().getSystemStream(), 
broadcastOpSpec.getOutputStream().getSystemStream());
+    assertEquals(broadcastOpCopy.getOutputStream().getStreamId(), 
broadcastOpSpec.getOutputStream().getStreamId());
     assertEquals(broadcastOpCopy.getOutputStream().isKeyed(), 
broadcastOpSpec.getOutputStream().isKeyed());
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
 
b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
index 00ec176..9bbcbfa 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
@@ -29,9 +29,7 @@ import org.apache.samza.operators.TimerRegistry;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
-import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.system.StreamSpec;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.internal.util.reflection.Whitebox;
@@ -45,7 +43,6 @@ import static org.mockito.Mockito.*;
  */
 public class TestPartitionByOperatorSpec {
 
-  private final ApplicationRunner mockRunner = mock(ApplicationRunner.class);
   private final Config mockConfig = mock(Config.class);
   private final String testInputId = "test-input-1";
   private final String testJobName = "testJob";
@@ -93,12 +90,7 @@ public class TestPartitionByOperatorSpec {
   public void setup() {
     when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn(testJobName);
     when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn(testJobId);
-    StreamSpec inputSpec1 = new StreamSpec(testInputId, testInputId, "kafka");
-    when(mockRunner.getStreamSpec(testInputId)).thenReturn(inputSpec1);
-    String intermediateStreamName = String.format("%s-%s-partition_by-%s", 
testJobName, testJobId, testReparStreamName);
-    StreamSpec intermediateSpec1 = new StreamSpec(intermediateStreamName, 
intermediateStreamName, "kafka");
-    
when(mockRunner.getStreamSpec(intermediateStreamName)).thenReturn(intermediateSpec1);
-    graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    graphSpec = new StreamGraphSpec(mockConfig);
   }
 
   @Test
@@ -109,7 +101,7 @@ public class TestPartitionByOperatorSpec {
     MessageStream<KV<String, Object>>
         reparStream = inputStream.partitionBy(keyFn, valueFn, 
testReparStreamName);
     InputOperatorSpec inputOpSpec = (InputOperatorSpec) 
Whitebox.getInternalState(reparStream, "operatorSpec");
-    assertEquals(inputOpSpec.getStreamSpec().getId(), 
String.format("%s-%s-partition_by-%s", testJobName, testJobId, 
testReparStreamName));
+    assertEquals(inputOpSpec.getStreamId(), 
String.format("%s-%s-partition_by-%s", testJobName, testJobId, 
testReparStreamName));
     assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
     assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde);
     assertTrue(inputOpSpec.isKeyed());
@@ -121,7 +113,7 @@ public class TestPartitionByOperatorSpec {
     assertEquals(reparOpSpec.getOpId(), String.format("%s-%s-partition_by-%s", 
testJobName, testJobId, testReparStreamName));
     assertEquals(reparOpSpec.getKeyFunction(), keyFn);
     assertEquals(reparOpSpec.getValueFunction(), valueFn);
-    assertEquals(reparOpSpec.getOutputStream().getStreamSpec(), new 
StreamSpec(reparOpSpec.getOpId(), reparOpSpec.getOpId(), "kafka"));
+    assertEquals(reparOpSpec.getOutputStream().getStreamId(), 
reparOpSpec.getOpId());
     assertNull(reparOpSpec.getTimerFn());
     assertNull(reparOpSpec.getWatermarkFn());
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java
deleted file mode 100644
index b8d3f15..0000000
--- 
a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.runtime;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.StreamConfig;
-import org.apache.samza.job.ApplicationStatus;
-import org.apache.samza.system.StreamSpec;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class TestAbstractApplicationRunner {
-  private static final String STREAM_ID = "t3st-Stream_Id";
-  private static final String STREAM_ID_INVALID = "test#Str3amId!";
-
-  private static final String TEST_PHYSICAL_NAME = "t3st-Physical_Name";
-  private static final String TEST_PHYSICAL_NAME2 = "testPhysicalName2";
-  private static final String TEST_PHYSICAL_NAME_SPECIAL_CHARS = 
"test://Physical.Name?";
-
-  private static final String TEST_SYSTEM = "t3st-System_Name";
-  private static final String TEST_SYSTEM2 = "testSystemName2";
-  private static final String TEST_SYSTEM_INVALID = "test:System!Name@";
-
-  private static final String TEST_DEFAULT_SYSTEM = "testDefaultSystemName";
-
-
-  @Test(expected = NullPointerException.class)
-  public void testConfigValidation() {
-    new TestAbstractApplicationRunnerImpl(null);
-  }
-
-  // The physical name should be pulled from the StreamConfig.PHYSICAL_NAME 
property value.
-  @Test
-  public void testgetStreamWithPhysicalNameInConfig() {
-    Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), 
TEST_PHYSICAL_NAME,
-                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
-
-    AbstractApplicationRunner runner = new 
TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
-
-    assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
-  }
-
-  // The streamId should be used as the physicalName when the physical name is 
not specified.
-  // NOTE: its either this, set to null, or exception. This seems better for 
backward compatibility and API brevity.
-  @Test
-  public void testgetStreamWithoutPhysicalNameInConfig() {
-    Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
-
-    AbstractApplicationRunner runner = new 
TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
-
-    assertEquals(STREAM_ID, spec.getPhysicalName());
-  }
-
-  // If the system is specified at the stream scope, use it
-  @Test
-  public void testgetStreamWithSystemAtStreamScopeInConfig() {
-    Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), 
TEST_PHYSICAL_NAME,
-                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
-
-    AbstractApplicationRunner runner = new 
TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
-
-    assertEquals(TEST_SYSTEM, spec.getSystemName());
-  }
-
-  // If system isn't specified at stream scope, use the default system
-  @Test
-  public void testgetStreamWithSystemAtDefaultScopeInConfig() {
-    Config config = addConfigs(buildStreamConfig(STREAM_ID,
-                                                  
StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME),
-                                JobConfig.JOB_DEFAULT_SYSTEM(), 
TEST_DEFAULT_SYSTEM);
-
-    AbstractApplicationRunner runner = new 
TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
-
-    assertEquals(TEST_DEFAULT_SYSTEM, spec.getSystemName());
-  }
-
-  // Stream scope should override default scope
-  @Test
-  public void testgetStreamWithSystemAtBothScopesInConfig() {
-    Config config = addConfigs(buildStreamConfig(STREAM_ID,
-                                                StreamConfig.PHYSICAL_NAME(), 
TEST_PHYSICAL_NAME,
-                                                StreamConfig.SYSTEM(), 
TEST_SYSTEM),
-                                JobConfig.JOB_DEFAULT_SYSTEM(), 
TEST_DEFAULT_SYSTEM);
-
-    AbstractApplicationRunner runner = new 
TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
-
-    assertEquals(TEST_SYSTEM, spec.getSystemName());
-  }
-
-  // System is required. Throw if it cannot be determined.
-  @Test(expected = IllegalArgumentException.class)
-  public void testgetStreamWithOutSystemInConfig() {
-    Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), 
TEST_PHYSICAL_NAME);
-
-    AbstractApplicationRunner runner = new 
TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
-
-    assertEquals(TEST_SYSTEM, spec.getSystemName());
-  }
-
-  // The properties in the config "streams.{streamId}.*" should be passed 
through to the spec.
-  @Test
-  public void testgetStreamPropertiesPassthrough() {
-    Config config = buildStreamConfig(STREAM_ID,
-                                    StreamConfig.PHYSICAL_NAME(), 
TEST_PHYSICAL_NAME,
-                                    StreamConfig.SYSTEM(), TEST_SYSTEM,
-                                    "systemProperty1", "systemValue1",
-                                    "systemProperty2", "systemValue2",
-                                    "systemProperty3", "systemValue3");
-
-    AbstractApplicationRunner runner = new 
TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
-
-    Map<String, String> properties = spec.getConfig();
-    assertEquals(3, properties.size());
-    assertEquals("systemValue1", properties.get("systemProperty1"));
-    assertEquals("systemValue2", properties.get("systemProperty2"));
-    assertEquals("systemValue3", properties.get("systemProperty3"));
-    assertEquals("systemValue1", spec.get("systemProperty1"));
-    assertEquals("systemValue2", spec.get("systemProperty2"));
-    assertEquals("systemValue3", spec.get("systemProperty3"));
-  }
-
-  // The samza properties (which are invalid for the underlying system) should 
be filtered out.
-  @Test
-  public void testGetStreamSamzaPropertiesOmitted() {
-    Config config = buildStreamConfig(STREAM_ID,
-                              StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
-                                    StreamConfig.SYSTEM(), TEST_SYSTEM,
-                                    "systemProperty1", "systemValue1",
-                                    "systemProperty2", "systemValue2",
-                                    "systemProperty3", "systemValue3");
-
-    AbstractApplicationRunner runner = new 
TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
-
-    Map<String, String> properties = spec.getConfig();
-    assertEquals(3, properties.size());
-    
assertNull(properties.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(),
 STREAM_ID)));
-    
assertNull(properties.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), 
STREAM_ID)));
-    
assertNull(spec.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), 
STREAM_ID)));
-    assertNull(spec.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), 
STREAM_ID)));
-  }
-
-  @Test
-  public void testStreamConfigOverrides() {
-    final String sysStreamPrefix = String.format("systems.%s.streams.%s.", 
TEST_SYSTEM, TEST_PHYSICAL_NAME);
-    Config config = addConfigs(buildStreamConfig(STREAM_ID,
-        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
-        StreamConfig.SYSTEM(), TEST_SYSTEM,
-        "systemProperty1", "systemValue1",
-        "systemProperty2", "systemValue2",
-        "systemProperty3", "systemValue3"),
-        sysStreamPrefix + "systemProperty4", "systemValue4",
-        sysStreamPrefix + "systemProperty2", "systemValue8");
-
-    AbstractApplicationRunner env = new 
TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = env.getStreamSpec(STREAM_ID);
-
-    Map<String, String> properties = spec.getConfig();
-    assertEquals(4, properties.size());
-    assertEquals("systemValue4", properties.get("systemProperty4"));
-    assertEquals("systemValue2", properties.get("systemProperty2"));
-  }
-
-  // Verify that we use a default specified with systems.x.default.stream.*, 
if specified
-  @Test
-  public void testStreamConfigOverridesWithSystemDefaults() {
-    Config config = addConfigs(buildStreamConfig(STREAM_ID,
-        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
-        StreamConfig.SYSTEM(), TEST_SYSTEM,
-        "segment.bytes", "5309"),
-        String.format("systems.%s.default.stream.replication.factor", 
TEST_SYSTEM), "4", // System default property
-        String.format("systems.%s.default.stream.segment.bytest", 
TEST_SYSTEM), "867"
-        );
-
-    AbstractApplicationRunner env = new 
TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = env.getStreamSpec(STREAM_ID);
-
-    Map<String, String> properties = spec.getConfig();
-    assertEquals(3, properties.size());
-    assertEquals("4", properties.get("replication.factor")); // Uses system 
default
-    assertEquals("5309", properties.get("segment.bytes")); // Overrides system 
default
-  }
-
-  // When the physicalName argument is passed explicitly it should be used, 
regardless of whether it is also in the config
-  @Test
-  public void testGetStreamPhysicalNameArgSimple() {
-    Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), 
TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg
-                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
-
-    AbstractApplicationRunner runner = new 
TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
-
-    assertEquals(STREAM_ID, spec.getId());
-    assertEquals(TEST_PHYSICAL_NAME2, spec.getPhysicalName());
-    assertEquals(TEST_SYSTEM, spec.getSystemName());
-  }
-
-  // Special characters are allowed for the physical name
-  @Test
-  public void testGetStreamPhysicalNameArgSpecialCharacters() {
-    Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), 
TEST_PHYSICAL_NAME_SPECIAL_CHARS,
-                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
-
-    AbstractApplicationRunner runner = new 
TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
-    assertEquals(TEST_PHYSICAL_NAME_SPECIAL_CHARS, spec.getPhysicalName());
-  }
-
-  // Null is allowed for the physical name
-  @Test
-  public void testGetStreamPhysicalNameArgNull() {
-    Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), null,
-                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
-
-    AbstractApplicationRunner runner = new 
TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
-    assertNull(spec.getPhysicalName());
-  }
-
-  // When the system name is provided explicitly, it should be used, 
regardless of whether it's also in the config
-  @Test
-  public void testGetStreamSystemNameArgValid() {
-    Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), 
TEST_PHYSICAL_NAME, // This should be ignored because of the explicit arg
-                                      StreamConfig.SYSTEM(), TEST_SYSTEM);     
         // This too
-
-    AbstractApplicationRunner runner = new 
TestAbstractApplicationRunnerImpl(config);
-    StreamSpec spec = runner.getStreamSpec(STREAM_ID);
-
-    assertEquals(STREAM_ID, spec.getId());
-    assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
-    assertEquals(TEST_SYSTEM, spec.getSystemName());
-  }
-
-  // Special characters are NOT allowed for system name, because it's used as 
an identifier in the config.
-  @Test(expected = IllegalArgumentException.class)
-  public void testGetStreamSystemNameArgInvalid() {
-    Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), 
TEST_PHYSICAL_NAME,
-                                      StreamConfig.SYSTEM(), 
TEST_SYSTEM_INVALID);
-
-    AbstractApplicationRunner runner = new 
TestAbstractApplicationRunnerImpl(config);
-    runner.getStreamSpec(STREAM_ID);
-  }
-
-  // Empty strings are NOT allowed for system name, because it's used as an 
identifier in the config.
-  @Test(expected = IllegalArgumentException.class)
-  public void testGetStreamSystemNameArgEmpty() {
-    Config config = buildStreamConfig(STREAM_ID,
-        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
-        StreamConfig.SYSTEM(), "");
-
-    AbstractApplicationRunner runner = new 
TestAbstractApplicationRunnerImpl(config);
-    runner.getStreamSpec(STREAM_ID);
-  }
-
-  // Null is not allowed IllegalArgumentException system name.
-  @Test(expected = IllegalArgumentException.class)
-  public void testGetStreamSystemNameArgNull() {
-    Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), 
TEST_PHYSICAL_NAME,
-                                      StreamConfig.SYSTEM(), null);
-
-    AbstractApplicationRunner runner = new 
TestAbstractApplicationRunnerImpl(config);
-    runner.getStreamSpec(STREAM_ID);
-  }
-
-  // Special characters are NOT allowed for streamId, because it's used as an 
identifier in the config.
-  @Test(expected = IllegalArgumentException.class)
-  public void testGetStreamStreamIdInvalid() {
-    Config config = buildStreamConfig(STREAM_ID_INVALID,
-        StreamConfig.SYSTEM(), TEST_SYSTEM);
-
-    AbstractApplicationRunner runner = new 
TestAbstractApplicationRunnerImpl(config);
-    runner.getStreamSpec(STREAM_ID_INVALID);
-  }
-
-  // Empty strings are NOT allowed for streamId, because it's used as an 
identifier in the config.
-  @Test(expected = IllegalArgumentException.class)
-  public void testGetStreamStreamIdEmpty() {
-    Config config = buildStreamConfig("",
-        StreamConfig.SYSTEM(), TEST_SYSTEM);
-
-    AbstractApplicationRunner runner = new 
TestAbstractApplicationRunnerImpl(config);
-    runner.getStreamSpec("");
-  }
-
-  // Null is not allowed for streamId.
-  @Test(expected = IllegalArgumentException.class)
-  public void testGetStreamStreamIdNull() {
-    Config config = buildStreamConfig(null,
-        StreamConfig.SYSTEM(), TEST_SYSTEM);
-
-    AbstractApplicationRunner runner = new 
TestAbstractApplicationRunnerImpl(config);
-    runner.getStreamSpec(null);
-  }
-
-
-  // Helper methods
-
-  private Config buildStreamConfig(String streamId, String... kvs) {
-    // inject streams.x. into each key
-    for (int i = 0; i < kvs.length - 1; i += 2) {
-      kvs[i] = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId) + 
kvs[i];
-    }
-    return buildConfig(kvs);
-  }
-
-  private Config buildConfig(String... kvs) {
-    if (kvs.length % 2 != 0) {
-      throw new IllegalArgumentException("There must be parity between the 
keys and values");
-    }
-
-    Map<String, String> configMap = new HashMap<>();
-    for (int i = 0; i < kvs.length - 1; i += 2) {
-      configMap.put(kvs[i], kvs[i + 1]);
-    }
-    return new MapConfig(configMap);
-  }
-
-  private Config addConfigs(Config original, String... kvs) {
-    Map<String, String> result = new HashMap<>();
-    result.putAll(original);
-    result.putAll(buildConfig(kvs));
-    return new MapConfig(result);
-  }
-
-  private class TestAbstractApplicationRunnerImpl extends 
AbstractApplicationRunner {
-
-    public TestAbstractApplicationRunnerImpl(Config config) {
-      super(config);
-    }
-
-    @Override
-    public void runTask() {
-      throw new UnsupportedOperationException("runTask is not supported in 
this test");
-    }
-
-    @Override
-    public void run(StreamApplication streamApp) {
-      // do nothing. We're only testing the stream creation methods at this 
point.
-    }
-
-    @Override
-    public void kill(StreamApplication streamApp) {
-      // do nothing. We're only testing the stream creation methods at this 
point.
-    }
-
-    @Override
-    public ApplicationStatus status(StreamApplication streamApp) {
-      // do nothing. We're only testing the stream creation methods at this 
point.
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java 
b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java
index e207772..0b91315 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java
@@ -27,7 +27,6 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.operators.StreamGraphSpec;
-import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.testUtils.TestAsyncStreamTask;
 import org.apache.samza.testUtils.TestStreamTask;
 import org.junit.Test;
@@ -46,8 +45,6 @@ import static org.mockito.Mockito.mock;
  */
 public class TestTaskFactoryUtil {
 
-  private final ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-
   @Test
   public void testStreamTaskClass() {
     Config config = new MapConfig(new HashMap<String, String>() {
@@ -81,7 +78,7 @@ public class TestTaskFactoryUtil {
     });
     StreamApplication streamApp = 
TaskFactoryUtil.createStreamApplication(config);
     assertNotNull(streamApp);
-    StreamGraphSpec graph = new StreamGraphSpec(mockRunner, config);
+    StreamGraphSpec graph = new StreamGraphSpec(config);
     streamApp.init(graph, config);
     Object retFactory = 
TaskFactoryUtil.createTaskFactory(graph.getOperatorSpecGraph(), null);
     assertTrue(retFactory instanceof StreamTaskFactory);

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/testUtils/StreamTestUtils.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/testUtils/StreamTestUtils.java 
b/samza-core/src/test/java/org/apache/samza/testUtils/StreamTestUtils.java
new file mode 100644
index 0000000..9b1fa4d
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/testUtils/StreamTestUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.testUtils;
+
+import java.util.Map;
+import org.apache.samza.config.StreamConfig$;
+
+public class StreamTestUtils {
+
+  /**
+   * Adds the stream.stream-id.* configurations for the provided {@code 
streamId} to the provided {@code configs} Map.
+   *
+   * @param configs the configs Map to add the stream configurations to
+   * @param streamId the id of the stream
+   * @param systemName the system for the stream
+   * @param physicalName the physical name for the stream
+   */
+  public static void addStreamConfigs(Map<String, String> configs,
+      String streamId, String systemName, String physicalName) {
+    configs.put(String.format(StreamConfig$.MODULE$.SYSTEM_FOR_STREAM_ID(), 
streamId), systemName);
+    
configs.put(String.format(StreamConfig$.MODULE$.PHYSICAL_NAME_FOR_STREAM_ID(), 
streamId), physicalName);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/util/TestStreamUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestStreamUtil.java 
b/samza-core/src/test/java/org/apache/samza/util/TestStreamUtil.java
new file mode 100644
index 0000000..ac075cd
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/util/TestStreamUtil.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.system.StreamSpec;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+
+public class TestStreamUtil {
+  private static final String STREAM_ID = "t3st-Stream_Id";
+  private static final String STREAM_ID_INVALID = "test#Str3amId!";
+
+  private static final String TEST_PHYSICAL_NAME = "t3st-Physical_Name";
+  private static final String TEST_PHYSICAL_NAME2 = "testPhysicalName2";
+  private static final String TEST_PHYSICAL_NAME_SPECIAL_CHARS = 
"test://Physical.Name?";
+
+  private static final String TEST_SYSTEM = "t3st-System_Name";
+  private static final String TEST_SYSTEM_INVALID = "test:System!Name@";
+
+  private static final String TEST_DEFAULT_SYSTEM = "testDefaultSystemName";
+
+
+  // The physical name should be pulled from the StreamConfig.PHYSICAL_NAME 
property value.
+  @Test
+  public void testGetStreamWithPhysicalNameInConfig() {
+    Config config = buildStreamConfig(STREAM_ID,
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new 
StreamConfig(config));
+
+    assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
+  }
+
+  // The streamId should be used as the physicalName when the physical name is 
not specified.
+  // NOTE: its either this, set to null, or exception. This seems better for 
backward compatibility and API brevity.
+  @Test
+  public void testGetStreamWithoutPhysicalNameInConfig() {
+    Config config = buildStreamConfig(STREAM_ID,
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new 
StreamConfig(config));
+
+    assertEquals(STREAM_ID, spec.getPhysicalName());
+  }
+
+  // If the system is specified at the stream scope, use it
+  @Test
+  public void testGetStreamWithSystemAtStreamScopeInConfig() {
+    Config config = buildStreamConfig(STREAM_ID,
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new 
StreamConfig(config));
+
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+  }
+
+  // If system isn't specified at stream scope, use the default system
+  @Test
+  public void testGetStreamWithSystemAtDefaultScopeInConfig() {
+    Config config = addConfigs(buildStreamConfig(STREAM_ID,
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME),
+        JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
+
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new 
StreamConfig(config));
+
+    assertEquals(TEST_DEFAULT_SYSTEM, spec.getSystemName());
+  }
+
+  // Stream scope should override default scope
+  @Test
+  public void testGetStreamWithSystemAtBothScopesInConfig() {
+    Config config = addConfigs(buildStreamConfig(STREAM_ID,
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+        StreamConfig.SYSTEM(), TEST_SYSTEM),
+        JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
+
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new 
StreamConfig(config));
+
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+  }
+
+  // System is required. Throw if it cannot be determined.
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetStreamWithOutSystemInConfig() {
+    Config config = buildStreamConfig(STREAM_ID,
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME);
+
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new 
StreamConfig(config));
+
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+  }
+
+  // The properties in the config "streams.{streamId}.*" should be passed 
through to the spec.
+  @Test
+  public void testGetStreamPropertiesPassthrough() {
+    Config config = buildStreamConfig(STREAM_ID,
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+        StreamConfig.SYSTEM(), TEST_SYSTEM,
+        "systemProperty1", "systemValue1",
+        "systemProperty2", "systemValue2",
+        "systemProperty3", "systemValue3");
+
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new 
StreamConfig(config));
+
+    Map<String, String> properties = spec.getConfig();
+    assertEquals(3, properties.size());
+    assertEquals("systemValue1", properties.get("systemProperty1"));
+    assertEquals("systemValue2", properties.get("systemProperty2"));
+    assertEquals("systemValue3", properties.get("systemProperty3"));
+    assertEquals("systemValue1", spec.get("systemProperty1"));
+    assertEquals("systemValue2", spec.get("systemProperty2"));
+    assertEquals("systemValue3", spec.get("systemProperty3"));
+  }
+
+  // The samza properties (which are invalid for the underlying system) should 
be filtered out.
+  @Test
+  public void testGetStreamSamzaPropertiesOmitted() {
+    Config config = buildStreamConfig(STREAM_ID,
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+        StreamConfig.SYSTEM(), TEST_SYSTEM,
+        "systemProperty1", "systemValue1",
+        "systemProperty2", "systemValue2",
+        "systemProperty3", "systemValue3");
+
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new 
StreamConfig(config));
+
+    Map<String, String> properties = spec.getConfig();
+    assertEquals(3, properties.size());
+    
assertNull(properties.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(),
 STREAM_ID)));
+    
assertNull(properties.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), 
STREAM_ID)));
+    
assertNull(spec.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), 
STREAM_ID)));
+    assertNull(spec.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), 
STREAM_ID)));
+  }
+
+  @Test
+  public void testStreamConfigOverrides() {
+    final String sysStreamPrefix = String.format("systems.%s.streams.%s.", 
TEST_SYSTEM, TEST_PHYSICAL_NAME);
+    Config config = addConfigs(buildStreamConfig(STREAM_ID,
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+        StreamConfig.SYSTEM(), TEST_SYSTEM,
+        "systemProperty1", "systemValue1",
+        "systemProperty2", "systemValue2",
+        "systemProperty3", "systemValue3"),
+        sysStreamPrefix + "systemProperty4", "systemValue4",
+        sysStreamPrefix + "systemProperty2", "systemValue8");
+
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new 
StreamConfig(config));
+
+    Map<String, String> properties = spec.getConfig();
+    assertEquals(4, properties.size());
+    assertEquals("systemValue4", properties.get("systemProperty4"));
+    assertEquals("systemValue2", properties.get("systemProperty2"));
+  }
+
+  // Verify that we use a default specified with systems.x.default.stream.*, 
if specified
+  @Test
+  public void testStreamConfigOverridesWithSystemDefaults() {
+    Config config = addConfigs(buildStreamConfig(STREAM_ID,
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+        StreamConfig.SYSTEM(), TEST_SYSTEM,
+        "segment.bytes", "5309"),
+        String.format("systems.%s.default.stream.replication.factor", 
TEST_SYSTEM), "4", // System default property
+        String.format("systems.%s.default.stream.segment.bytest", 
TEST_SYSTEM), "867"
+    );
+
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new 
StreamConfig(config));
+
+    Map<String, String> properties = spec.getConfig();
+    assertEquals(3, properties.size());
+    assertEquals("4", properties.get("replication.factor")); // Uses system 
default
+    assertEquals("5309", properties.get("segment.bytes")); // Overrides system 
default
+  }
+
+  // When the physicalName argument is passed explicitly it should be used, 
regardless of whether it is also in the config
+  @Test
+  public void testGetStreamPhysicalNameArgSimple() {
+    Config config = buildStreamConfig(STREAM_ID,
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be 
ignored because of the explicit arg
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new 
StreamConfig(config));
+
+    assertEquals(STREAM_ID, spec.getId());
+    assertEquals(TEST_PHYSICAL_NAME2, spec.getPhysicalName());
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+  }
+
+  // Special characters are allowed for the physical name
+  @Test
+  public void testGetStreamPhysicalNameArgSpecialCharacters() {
+    Config config = buildStreamConfig(STREAM_ID,
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME_SPECIAL_CHARS,
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new 
StreamConfig(config));
+    assertEquals(TEST_PHYSICAL_NAME_SPECIAL_CHARS, spec.getPhysicalName());
+  }
+
+  // Null is allowed for the physical name
+  @Test
+  public void testGetStreamPhysicalNameArgNull() {
+    Config config = buildStreamConfig(STREAM_ID,
+        StreamConfig.PHYSICAL_NAME(), null,
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new 
StreamConfig(config));
+    assertNull(spec.getPhysicalName());
+  }
+
+  // When the system name is provided explicitly, it should be used, 
regardless of whether it's also in the config
+  @Test
+  public void testGetStreamSystemNameArgValid() {
+    Config config = buildStreamConfig(STREAM_ID,
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, // This should be 
ignored because of the explicit arg
+        StreamConfig.SYSTEM(), TEST_SYSTEM);              // This too
+
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new 
StreamConfig(config));
+
+    assertEquals(STREAM_ID, spec.getId());
+    assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+  }
+
+  // Special characters are NOT allowed for system name, because it's used as 
an identifier in the config.
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetStreamSystemNameArgInvalid() {
+    Config config = buildStreamConfig(STREAM_ID,
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+        StreamConfig.SYSTEM(), TEST_SYSTEM_INVALID);
+
+    StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config));
+  }
+
+  // Empty strings are NOT allowed for system name, because it's used as an 
identifier in the config.
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetStreamSystemNameArgEmpty() {
+    Config config = buildStreamConfig(STREAM_ID,
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+        StreamConfig.SYSTEM(), "");
+
+    StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new 
StreamConfig(config));
+  }
+
+  // Null is not allowed IllegalArgumentException system name.
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetStreamSystemNameArgNull() {
+    Config config = buildStreamConfig(STREAM_ID,
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+        StreamConfig.SYSTEM(), null);
+
+    StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config));
+  }
+
+  // Special characters are NOT allowed for streamId, because it's used as an 
identifier in the config.
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetStreamStreamIdInvalid() {
+    Config config = buildStreamConfig(STREAM_ID_INVALID,
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    StreamUtil.getStreamSpec(STREAM_ID_INVALID, new StreamConfig(config));
+  }
+
+  // Empty strings are NOT allowed for streamId, because it's used as an 
identifier in the config.
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetStreamStreamIdEmpty() {
+    Config config = buildStreamConfig("",
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    StreamUtil.getStreamSpec("", new StreamConfig(config));
+  }
+
+  // Null is not allowed for streamId.
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetStreamStreamIdNull() {
+    Config config = buildStreamConfig(null,
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    StreamUtil.getStreamSpec(null, new StreamConfig(config));
+  }
+
+
+  // Helper methods
+
+  private Config buildStreamConfig(String streamId, String... kvs) {
+    // inject streams.x. into each key
+    for (int i = 0; i < kvs.length - 1; i += 2) {
+      kvs[i] = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId) + 
kvs[i];
+    }
+    return buildConfig(kvs);
+  }
+
+  private Config buildConfig(String... kvs) {
+    if (kvs.length % 2 != 0) {
+      throw new IllegalArgumentException("There must be parity between the 
keys and values");
+    }
+
+    Map<String, String> configMap = new HashMap<>();
+    for (int i = 0; i < kvs.length - 1; i += 2) {
+      configMap.put(kvs[i], kvs[i + 1]);
+    }
+    return new MapConfig(configMap);
+  }
+
+  private Config addConfigs(Config original, String... kvs) {
+    Map<String, String> result = new HashMap<>();
+    result.putAll(original);
+    result.putAll(buildConfig(kvs));
+    return new MapConfig(result);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
index 217248d..113dced 100644
--- 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
+++ 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
@@ -110,7 +110,6 @@ public class KafkaStreamSpec extends StreamSpec {
                                 originalSpec.getSystemName(),
                                 originalSpec.getPartitionCount(),
                                 replicationFactor,
-                                originalSpec.isBroadcast(),
                                 
mapToProperties(filterUnsupportedProperties(originalSpec.getConfig())));
   }
 
@@ -125,7 +124,7 @@ public class KafkaStreamSpec extends StreamSpec {
    * @param partitionCount  The number of partitions.
    */
   public KafkaStreamSpec(String id, String topicName, String systemName, int 
partitionCount) {
-    this(id, topicName, systemName, partitionCount, 
DEFAULT_REPLICATION_FACTOR, false, new Properties());
+    this(id, topicName, systemName, partitionCount, 
DEFAULT_REPLICATION_FACTOR, new Properties());
   }
 
   /**
@@ -146,13 +145,11 @@ public class KafkaStreamSpec extends StreamSpec {
    *
    * @param replicationFactor The number of topic replicas in the Kafka 
cluster for durability.
    *
-   * @param isBroadcast       The stream is broadcast or not.
-   *
    * @param properties        A set of properties for the stream. These may be 
System-specfic.
    */
   public KafkaStreamSpec(String id, String topicName, String systemName, int 
partitionCount, int replicationFactor,
-      Boolean isBroadcast, Properties properties) {
-    super(id, topicName, systemName, partitionCount, false, isBroadcast, 
propertiesToMap(properties));
+      Properties properties) {
+    super(id, topicName, systemName, partitionCount, 
propertiesToMap(properties));
 
     if (partitionCount < 1) {
       throw new IllegalArgumentException("Parameter 'partitionCount' must be > 
0");
@@ -168,12 +165,12 @@ public class KafkaStreamSpec extends StreamSpec {
   @Override
   public StreamSpec copyWithPartitionCount(int partitionCount) {
     return new KafkaStreamSpec(getId(), getPhysicalName(), getSystemName(), 
partitionCount, getReplicationFactor(),
-        isBroadcast(), getProperties());
+        getProperties());
   }
 
   public KafkaStreamSpec copyWithReplicationFactor(int replicationFactor) {
     return new KafkaStreamSpec(getId(), getPhysicalName(), getSystemName(), 
getPartitionCount(), replicationFactor,
-        isBroadcast(), getProperties());
+        getProperties());
   }
 
   /**
@@ -183,7 +180,7 @@ public class KafkaStreamSpec extends StreamSpec {
    */
   public KafkaStreamSpec copyWithProperties(Properties properties) {
     return new KafkaStreamSpec(getId(), getPhysicalName(), getSystemName(), 
getPartitionCount(), getReplicationFactor(),
-        isBroadcast(), properties);
+        properties);
   }
 
   public int getReplicationFactor() {

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 07f4710..26664ea 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -32,7 +32,7 @@ import 
org.apache.kafka.common.serialization.ByteArraySerializer
 import org.apache.samza.SamzaException
 import org.apache.samza.config.ApplicationConfig.ApplicationMode
 import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.util.{Logging, Util}
+import org.apache.samza.util.{Logging, StreamUtil}
 
 import scala.collection.JavaConverters._
 
@@ -237,7 +237,7 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
       val storeName = if (matcher.find()) matcher.group(1) else throw new 
SamzaException("Unable to find store name in the changelog configuration: " + 
changelogConfig + " with SystemStream: " + cn)
 
       storageConfig.getChangelogStream(storeName).foreach(changelogName => {
-        val systemStream = Util.getSystemStreamFromNames(changelogName)
+        val systemStream = StreamUtil.getSystemStreamFromNames(changelogName)
         val factoryName = 
config.getSystemFactory(systemStream.getSystem).getOrElse(new 
SamzaException("Unable to determine factory for system: " + 
systemStream.getSystem))
         storeToChangelog += storeName -> systemStream.getStream
       })

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala 
b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
index 6dc2f82..ce4544b 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
@@ -21,11 +21,11 @@ package org.apache.samza.config
 
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils.ZkUtils
-import org.apache.samza.config.KafkaConfig.{ Config2Kafka, 
REGEX_RESOLVED_STREAMS }
+import org.apache.samza.config.KafkaConfig.{Config2Kafka, 
REGEX_RESOLVED_STREAMS}
 import org.apache.samza.SamzaException
-import org.apache.samza.util.Util
+import org.apache.samza.util.{Logging, StreamUtil}
+
 import collection.JavaConverters._
-import org.apache.samza.util.Logging
 import scala.collection._
 import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.system.SystemStream
@@ -87,7 +87,7 @@ class RegExTopicGenerator extends ConfigRewriter with Logging 
{
     info("Generated config values for %d new topics" format 
newInputStreams.size)
 
     val inputStreams = TaskConfig.INPUT_STREAMS -> (existingInputStreams ++ 
newInputStreams)
-      .map(Util.getNameFromSystemStream)
+      .map(StreamUtil.getNameFromSystemStream)
       .toArray
       .sortWith(_ < _)
       .mkString(",")

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index a63db03..6ab4d32 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -486,10 +486,10 @@ class KafkaSystemAdmin(
       val topicName = spec.getPhysicalName
       val topicMeta = topicMetaInformation.getOrElse(topicName, throw new 
StreamValidationException("Unable to find topic information for topic " + 
topicName))
       new KafkaStreamSpec(spec.getId, topicName, systemName, 
spec.getPartitionCount, topicMeta.replicationFactor,
-        spec.isBroadcast, topicMeta.kafkaProps)
+        topicMeta.kafkaProps)
     } else if (spec.isCoordinatorStream){
       new KafkaStreamSpec(spec.getId, spec.getPhysicalName, systemName, 1, 
coordinatorStreamReplicationFactor,
-        spec.isBroadcast, coordinatorStreamProperties)
+        coordinatorStreamProperties)
     } else if (intermediateStreamProperties.contains(spec.getId)) {
       
KafkaStreamSpec.fromSpec(spec).copyWithProperties(intermediateStreamProperties(spec.getId))
     } else {

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java
 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java
index c00ed2d..14d2fe6 100644
--- 
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java
+++ 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java
@@ -21,20 +21,20 @@ package org.apache.samza.system.kafka;
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import java.util.Properties;
-import org.apache.samza.runtime.TestAbstractApplicationRunner;
 import org.apache.samza.system.StreamSpec;
+import org.apache.samza.util.TestStreamUtil;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
 
 /**
- * See also the general StreamSpec tests in {@link 
TestAbstractApplicationRunner}
+ * See also the general StreamSpec tests in {@link TestStreamUtil}
  */
 public class TestKafkaStreamSpec {
 
   @Test
   public void testUnsupportedConfigStrippedFromProperties() {
-    StreamSpec original = new StreamSpec("dummyId","dummyPhysicalName", 
"dummySystemName", false, ImmutableMap.of("segment.bytes", "4", 
"replication.factor", "7"));
+    StreamSpec original = new StreamSpec("dummyId","dummyPhysicalName", 
"dummySystemName", ImmutableMap.of("segment.bytes", "4", "replication.factor", 
"7"));
 
     // First verify the original
     assertEquals("7", original.get("replication.factor"));

Reply via email to