Repository: samza
Updated Branches:
  refs/heads/master 43c36e6f2 -> 440a25c97


http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 71718b0..8d92f4d 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -113,7 +113,7 @@ class TestKafkaCheckpointManager extends 
KafkaServerTestHarness {
     Mockito.doThrow(new 
RuntimeException()).when(mockKafkaProducer).flush(taskName.getTaskName)
 
     val props = new 
org.apache.samza.config.KafkaConfig(config).getCheckpointTopicProperties()
-    val spec = new KafkaStreamSpec("id", checkpointTopic, 
checkpointSystemName, 1, 1, false, props)
+    val spec = new KafkaStreamSpec("id", checkpointTopic, 
checkpointSystemName, 1, 1, props)
     val checkPointManager = new KafkaCheckpointManager(spec, new 
MockSystemFactory, false, config, new NoOpMetricsRegistry)
     checkPointManager.MaxRetryDurationMs = 1
 
@@ -193,7 +193,7 @@ class TestKafkaCheckpointManager extends 
KafkaServerTestHarness {
 
     val systemFactory = Util.getObj(systemFactoryClassName, 
classOf[SystemFactory])
 
-    val spec = new KafkaStreamSpec("id", cpTopic, checkpointSystemName, 1, 1, 
false, props)
+    val spec = new KafkaStreamSpec("id", cpTopic, checkpointSystemName, 1, 1, 
props)
     new KafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, 
config, new NoOpMetricsRegistry, serde)
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
 
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
index 65b8c8c..ede7995 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -26,11 +26,11 @@ import java.util.Map;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.StreamGraphSpec;
-import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
@@ -86,17 +86,25 @@ public class TestQueryTranslator {
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
     OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+
+    StreamConfig streamConfig = new StreamConfig(samzaConfig);
+    String inputStreamId = 
specGraph.getInputOperators().keySet().stream().findFirst().get();
+    String inputSystem = streamConfig.getSystem(inputStreamId);
+    String inputPhysicalName = streamConfig.getPhysicalName(inputStreamId);
+    String outputStreamId = 
specGraph.getOutputStreams().keySet().stream().findFirst().get();
+    String outputSystem = streamConfig.getSystem(outputStreamId);
+    String outputPhysicalName = streamConfig.getPhysicalName(outputStreamId);
+    
     Assert.assertEquals(1, specGraph.getOutputStreams().size());
-    Assert.assertEquals("testavro", 
specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("outputTopic", 
specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", outputSystem);
+    Assert.assertEquals("outputTopic", outputPhysicalName);
     Assert.assertEquals(1, specGraph.getInputOperators().size());
-    Assert.assertEquals("testavro",
-        
specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("SIMPLE1",
-        
specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+    
+    Assert.assertEquals("testavro", inputSystem);
+    Assert.assertEquals("SIMPLE1", inputPhysicalName);
 
     validatePerTaskContextInit(graphSpec, samzaConfig);
   }
@@ -130,17 +138,24 @@ public class TestQueryTranslator {
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
     OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+
+    StreamConfig streamConfig = new StreamConfig(samzaConfig);
+    String inputStreamId = 
specGraph.getInputOperators().keySet().stream().findFirst().get();
+    String inputSystem = streamConfig.getSystem(inputStreamId);
+    String inputPhysicalName = streamConfig.getPhysicalName(inputStreamId);
+    String outputStreamId = 
specGraph.getOutputStreams().keySet().stream().findFirst().get();
+    String outputSystem = streamConfig.getSystem(outputStreamId);
+    String outputPhysicalName = streamConfig.getPhysicalName(outputStreamId);
+
     Assert.assertEquals(1, specGraph.getOutputStreams().size());
-    Assert.assertEquals("testavro", 
specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("outputTopic", 
specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", outputSystem);
+    Assert.assertEquals("outputTopic", outputPhysicalName);
     Assert.assertEquals(1, specGraph.getInputOperators().size());
-    Assert.assertEquals("testavro",
-        
specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("COMPLEX1",
-        
specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", inputSystem);
+    Assert.assertEquals("COMPLEX1", inputPhysicalName);
 
     validatePerTaskContextInit(graphSpec, samzaConfig);
   }
@@ -155,17 +170,24 @@ public class TestQueryTranslator {
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
     OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+
+    StreamConfig streamConfig = new StreamConfig(samzaConfig);
+    String inputStreamId = 
specGraph.getInputOperators().keySet().stream().findFirst().get();
+    String inputSystem = streamConfig.getSystem(inputStreamId);
+    String inputPhysicalName = streamConfig.getPhysicalName(inputStreamId);
+    String outputStreamId = 
specGraph.getOutputStreams().keySet().stream().findFirst().get();
+    String outputSystem = streamConfig.getSystem(outputStreamId);
+    String outputPhysicalName = streamConfig.getPhysicalName(outputStreamId);
+
     Assert.assertEquals(1, specGraph.getOutputStreams().size());
-    Assert.assertEquals("testavro", 
specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("outputTopic", 
specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", outputSystem);
+    Assert.assertEquals("outputTopic", outputPhysicalName);
     Assert.assertEquals(1, specGraph.getInputOperators().size());
-    Assert.assertEquals("testavro",
-        
specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("COMPLEX1",
-        
specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", inputSystem);
+    Assert.assertEquals("COMPLEX1", inputPhysicalName);
 
     validatePerTaskContextInit(graphSpec, samzaConfig);
   }
@@ -184,7 +206,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -203,7 +225,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -222,7 +244,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -241,7 +263,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -258,7 +280,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -277,7 +299,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -297,7 +319,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -316,7 +338,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -335,7 +357,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -354,7 +376,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -373,7 +395,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -396,7 +418,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -415,7 +437,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 
@@ -434,29 +456,40 @@ public class TestQueryTranslator {
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
     OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
 
+    StreamConfig streamConfig = new StreamConfig(samzaConfig);
+    String input1StreamId = 
specGraph.getInputOperators().keySet().stream().findFirst().get();
+    String input1System = streamConfig.getSystem(input1StreamId);
+    String input1PhysicalName = streamConfig.getPhysicalName(input1StreamId);
+    String input2StreamId = 
specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get();
+    String input2System = streamConfig.getSystem(input2StreamId);
+    String input2PhysicalName = streamConfig.getPhysicalName(input2StreamId);
+    String input3StreamId = 
specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get();
+    String input3System = streamConfig.getSystem(input3StreamId);
+    String input3PhysicalName = streamConfig.getPhysicalName(input3StreamId);
+    String output1StreamId = 
specGraph.getOutputStreams().keySet().stream().findFirst().get();
+    String output1System = streamConfig.getSystem(output1StreamId);
+    String output1PhysicalName = streamConfig.getPhysicalName(output1StreamId);
+    String output2StreamId = 
specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get();
+    String output2System = streamConfig.getSystem(output2StreamId);
+    String output2PhysicalName = streamConfig.getPhysicalName(output2StreamId);
+
     Assert.assertEquals(2, specGraph.getOutputStreams().size());
-    Assert.assertEquals("kafka", 
specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("sql-job-1-partition_by-stream_1", 
specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
-    Assert.assertEquals("testavro", 
specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName());
-    Assert.assertEquals("enrichedPageViewTopic", 
specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+    Assert.assertEquals("kafka", output1System);
+    Assert.assertEquals("sql-job-1-partition_by-stream_1", 
output1PhysicalName);
+    Assert.assertEquals("testavro", output2System);
+    Assert.assertEquals("enrichedPageViewTopic", output2PhysicalName);
 
     Assert.assertEquals(3, specGraph.getInputOperators().size());
-    Assert.assertEquals("testavro",
-        
specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("PAGEVIEW",
-        
specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
-    Assert.assertEquals("testavro",
-        
specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName());
-    Assert.assertEquals("PROFILE",
-        
specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName());
-    Assert.assertEquals("kafka",
-        
specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName());
-    Assert.assertEquals("sql-job-1-partition_by-stream_1",
-        
specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", input1System);
+    Assert.assertEquals("PAGEVIEW", input1PhysicalName);
+    Assert.assertEquals("testavro", input2System);
+    Assert.assertEquals("PROFILE", input2PhysicalName);
+    Assert.assertEquals("kafka", input3System);
+    Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName);
 
     validatePerTaskContextInit(graphSpec, samzaConfig);
   }
@@ -476,32 +509,41 @@ public class TestQueryTranslator {
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
 
     OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
 
+    StreamConfig streamConfig = new StreamConfig(samzaConfig);
+    String input1StreamId = 
specGraph.getInputOperators().keySet().stream().findFirst().get();
+    String input1System = streamConfig.getSystem(input1StreamId);
+    String input1PhysicalName = streamConfig.getPhysicalName(input1StreamId);
+    String input2StreamId = 
specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get();
+    String input2System = streamConfig.getSystem(input2StreamId);
+    String input2PhysicalName = streamConfig.getPhysicalName(input2StreamId);
+    String input3StreamId = 
specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get();
+    String input3System = streamConfig.getSystem(input3StreamId);
+    String input3PhysicalName = streamConfig.getPhysicalName(input3StreamId);
+    String output1StreamId = 
specGraph.getOutputStreams().keySet().stream().findFirst().get();
+    String output1System = streamConfig.getSystem(output1StreamId);
+    String output1PhysicalName = streamConfig.getPhysicalName(output1StreamId);
+    String output2StreamId = 
specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get();
+    String output2System = streamConfig.getSystem(output2StreamId);
+    String output2PhysicalName = streamConfig.getPhysicalName(output2StreamId);
+
     Assert.assertEquals(2, specGraph.getOutputStreams().size());
-    Assert.assertEquals("kafka", 
specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("sql-job-1-partition_by-stream_1",
-        
specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
-    Assert.assertEquals("testavro", 
specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName());
-    Assert.assertEquals("enrichedPageViewTopic",
-        
specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+    Assert.assertEquals("kafka", output1System);
+    Assert.assertEquals("sql-job-1-partition_by-stream_1", 
output1PhysicalName);
+    Assert.assertEquals("testavro", output2System);
+    Assert.assertEquals("enrichedPageViewTopic", output2PhysicalName);
 
     Assert.assertEquals(3, specGraph.getInputOperators().size());
-    Assert.assertEquals("testavro",
-        
specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("PAGEVIEW",
-        
specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
-    Assert.assertEquals("testavro",
-        
specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName());
-    Assert.assertEquals("PROFILE",
-        
specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName());
-    Assert.assertEquals("kafka",
-        
specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName());
-    Assert.assertEquals("sql-job-1-partition_by-stream_1",
-        
specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", input1System);
+    Assert.assertEquals("PAGEVIEW", input1PhysicalName);
+    Assert.assertEquals("testavro", input2System);
+    Assert.assertEquals("PROFILE", input2PhysicalName);
+    Assert.assertEquals("kafka", input3System);
+    Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName);
 
     validatePerTaskContextInit(graphSpec, samzaConfig);
   }
@@ -521,32 +563,41 @@ public class TestQueryTranslator {
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
 
     OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
 
+    StreamConfig streamConfig = new StreamConfig(samzaConfig);
+    String input1StreamId = 
specGraph.getInputOperators().keySet().stream().findFirst().get();
+    String input1System = streamConfig.getSystem(input1StreamId);
+    String input1PhysicalName = streamConfig.getPhysicalName(input1StreamId);
+    String input2StreamId = 
specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get();
+    String input2System = streamConfig.getSystem(input2StreamId);
+    String input2PhysicalName = streamConfig.getPhysicalName(input2StreamId);
+    String input3StreamId = 
specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get();
+    String input3System = streamConfig.getSystem(input3StreamId);
+    String input3PhysicalName = streamConfig.getPhysicalName(input3StreamId);
+    String output1StreamId = 
specGraph.getOutputStreams().keySet().stream().findFirst().get();
+    String output1System = streamConfig.getSystem(output1StreamId);
+    String output1PhysicalName = streamConfig.getPhysicalName(output1StreamId);
+    String output2StreamId = 
specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get();
+    String output2System = streamConfig.getSystem(output2StreamId);
+    String output2PhysicalName = streamConfig.getPhysicalName(output2StreamId);
+
     Assert.assertEquals(2, specGraph.getOutputStreams().size());
-    Assert.assertEquals("kafka", 
specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("sql-job-1-partition_by-stream_1",
-        
specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName());
-    Assert.assertEquals("testavro", 
specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName());
-    Assert.assertEquals("enrichedPageViewTopic",
-        
specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName());
+    Assert.assertEquals("kafka", output1System);
+    Assert.assertEquals("sql-job-1-partition_by-stream_1", 
output1PhysicalName);
+    Assert.assertEquals("testavro", output2System);
+    Assert.assertEquals("enrichedPageViewTopic", output2PhysicalName);
 
     Assert.assertEquals(3, specGraph.getInputOperators().size());
-    Assert.assertEquals("testavro",
-        
specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName());
-    Assert.assertEquals("PROFILE",
-        
specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName());
-    Assert.assertEquals("testavro",
-        
specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName());
-    Assert.assertEquals("PAGEVIEW",
-        
specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName());
-    Assert.assertEquals("kafka",
-        
specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName());
-    Assert.assertEquals("sql-job-1-partition_by-stream_1",
-        
specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName());
+    Assert.assertEquals("testavro", input1System);
+    Assert.assertEquals("PROFILE", input1PhysicalName);
+    Assert.assertEquals("testavro", input2System);
+    Assert.assertEquals("PAGEVIEW", input2PhysicalName);
+    Assert.assertEquals("kafka", input3System);
+    Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName);
 
     validatePerTaskContextInit(graphSpec, samzaConfig);
   }
@@ -566,7 +617,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
     OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
 
@@ -590,7 +641,7 @@ public class TestQueryTranslator {
     QueryTranslator translator = new 
QueryTranslator(samzaSqlApplicationConfig);
     SamzaSqlQueryParser.QueryInfo queryInfo = 
samzaSqlApplicationConfig.getQueryInfo().get(0);
     StreamGraphSpec
-        graphSpec = new StreamGraphSpec(new 
LocalApplicationRunner(samzaConfig), samzaConfig);
+        graphSpec = new StreamGraphSpec(samzaConfig);
     translator.translate(queryInfo, graphSpec);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java
 
b/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java
index 617cea6..4309d92 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java
@@ -30,7 +30,7 @@ import org.apache.samza.task.StreamTask;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.task.TaskCoordinator.RequestScope;
-import org.apache.samza.util.Util;
+import org.apache.samza.util.StreamUtil;
 
 /**
  * A simple test job that reads strings, converts them to integers, multiplies
@@ -59,7 +59,7 @@ public class NegateNumberTask implements StreamTask, 
InitableTask {
     if (outputSystemStreamString == null) {
       throw new ConfigException("Missing required configuration: 
task.outputs");
     }
-    outputSystemStream = 
Util.getSystemStreamFromNames(outputSystemStreamString);
+    outputSystemStream = 
StreamUtil.getSystemStreamFromNames(outputSystemStreamString);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
 
b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
index d1f1d84..99d047d 100644
--- 
a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
+++ 
b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
@@ -27,7 +27,7 @@ import org.apache.samza.task.StreamTask
 import org.apache.samza.task.TaskCoordinator
 import org.apache.samza.task.TaskCoordinator.RequestScope
 import org.apache.samza.config.Config
-import org.apache.samza.util.{Util, Logging}
+import org.apache.samza.util.{Logging, StreamUtil}
 import org.apache.samza.system.SystemStream
 import org.apache.samza.system.OutgoingMessageEnvelope
 
@@ -85,7 +85,7 @@ class TestPerformanceTask extends StreamTask with 
InitableTask with Logging {
   def init(config: Config, context: TaskContext) {
     logInterval = config.getInt("task.log.interval", 10000)
     maxMessages = config.getInt("task.max.messages", 10000000)
-    outputSystemStream = Option(config.get("task.outputs", 
null)).map(Util.getSystemStreamFromNames(_))
+    outputSystemStream = Option(config.get("task.outputs", 
null)).map(StreamUtil.getSystemStreamFromNames)
   }
 
   def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, 
coordinator: TaskCoordinator) {

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
index 2171d07..ec9c05d 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
@@ -34,7 +34,6 @@ import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.test.operator.data.AdClick;
@@ -54,7 +53,7 @@ public class RepartitionJoinWindowApp implements 
StreamApplication {
   public static final String INPUT_TOPIC_NAME_2_PROP = "inputTopicName2";
   public static final String OUTPUT_TOPIC_NAME_PROP = "outputTopicName";
 
-  private final List<StreamSpec> intermediateStreams = new ArrayList<>();
+  private final List<String> intermediateStreamIds = new ArrayList<>();
 
   public static void main(String[] args) throws Exception {
     CommandLine cmdLine = new CommandLine();
@@ -106,14 +105,14 @@ public class RepartitionJoinWindowApp implements 
StreamApplication {
           });
 
 
-    intermediateStreams.add(((IntermediateMessageStreamImpl) 
pageViewsRepartitionedByViewId).getStreamSpec());
-    intermediateStreams.add(((IntermediateMessageStreamImpl) 
adClicksRepartitionedByViewId).getStreamSpec());
-    intermediateStreams.add(((IntermediateMessageStreamImpl) 
userPageAdClicksByUserId).getStreamSpec());
+    intermediateStreamIds.add(((IntermediateMessageStreamImpl) 
pageViewsRepartitionedByViewId).getStreamId());
+    intermediateStreamIds.add(((IntermediateMessageStreamImpl) 
adClicksRepartitionedByViewId).getStreamId());
+    intermediateStreamIds.add(((IntermediateMessageStreamImpl) 
userPageAdClicksByUserId).getStreamId());
 
   }
 
-  public List<StreamSpec> getIntermediateStreams() {
-    return intermediateStreams;
+  List<String> getIntermediateStreamIds() {
+    return intermediateStreamIds;
   }
 
   private static class UserPageViewAdClicksJoiner implements 
JoinFunction<String, PageView, AdClick, UserPageAdClick> {

http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
index a2adb70..a9a4026 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
@@ -24,7 +24,6 @@ import java.util.HashSet;
 import java.util.Map;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.samza.Partition;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStreamMetadata;
 import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
 import org.apache.samza.system.kafka.KafkaSystemAdmin;
@@ -122,14 +121,14 @@ public class TestRepartitionJoinWindowApp extends 
StreamApplicationIntegrationTe
 
     // Verify that messages in the intermediate stream will be deleted in 10 
seconds
     long startTimeMs = System.currentTimeMillis();
-    for (StreamSpec spec: app.getIntermediateStreams()) {
+    for (String streamId: app.getIntermediateStreamIds()) {
       long remainingMessageNum = -1;
 
       while (remainingMessageNum != 0 && System.currentTimeMillis() - 
startTimeMs < 10000) {
         remainingMessageNum = 0;
         SystemStreamMetadata metadatas = systemAdmin.getSystemStreamMetadata(
-            new HashSet<>(Arrays.asList(spec.getPhysicalName())), new 
ExponentialSleepStrategy.Mock(3)
-        ).get(spec.getPhysicalName()).get();
+            new HashSet<>(Arrays.asList(streamId)), new 
ExponentialSleepStrategy.Mock(3)
+        ).get(streamId).get();
 
         for (Map.Entry<Partition, SystemStreamPartitionMetadata> entry : 
metadatas.getSystemStreamPartitionMetadata().entrySet()) {
           SystemStreamPartitionMetadata metadata = entry.getValue();

Reply via email to