Repository: samza Updated Branches: refs/heads/master 43c36e6f2 -> 440a25c97
http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala index 71718b0..8d92f4d 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala @@ -113,7 +113,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { Mockito.doThrow(new RuntimeException()).when(mockKafkaProducer).flush(taskName.getTaskName) val props = new org.apache.samza.config.KafkaConfig(config).getCheckpointTopicProperties() - val spec = new KafkaStreamSpec("id", checkpointTopic, checkpointSystemName, 1, 1, false, props) + val spec = new KafkaStreamSpec("id", checkpointTopic, checkpointSystemName, 1, 1, props) val checkPointManager = new KafkaCheckpointManager(spec, new MockSystemFactory, false, config, new NoOpMetricsRegistry) checkPointManager.MaxRetryDurationMs = 1 @@ -193,7 +193,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { val systemFactory = Util.getObj(systemFactoryClassName, classOf[SystemFactory]) - val spec = new KafkaStreamSpec("id", cpTopic, checkpointSystemName, 1, 1, false, props) + val spec = new KafkaStreamSpec("id", cpTopic, checkpointSystemName, 1, 1, props) new KafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, config, new NoOpMetricsRegistry, serde) } http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java index 65b8c8c..ede7995 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java @@ -26,11 +26,11 @@ import java.util.Map; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; +import org.apache.samza.config.StreamConfig; import org.apache.samza.container.TaskContextImpl; import org.apache.samza.container.TaskName; import org.apache.samza.operators.OperatorSpecGraph; import org.apache.samza.operators.StreamGraphSpec; -import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.sql.data.SamzaSqlExecutionContext; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory; @@ -86,17 +86,25 @@ public class TestQueryTranslator { QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); StreamGraphSpec - graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + graphSpec = new StreamGraphSpec(samzaConfig); translator.translate(queryInfo, graphSpec); OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); + + StreamConfig streamConfig = new StreamConfig(samzaConfig); + String inputStreamId = specGraph.getInputOperators().keySet().stream().findFirst().get(); + String inputSystem = streamConfig.getSystem(inputStreamId); + String inputPhysicalName = streamConfig.getPhysicalName(inputStreamId); + String outputStreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get(); + String outputSystem = streamConfig.getSystem(outputStreamId); + String outputPhysicalName = streamConfig.getPhysicalName(outputStreamId); + Assert.assertEquals(1, specGraph.getOutputStreams().size()); - Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName()); - Assert.assertEquals("outputTopic", specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName()); + Assert.assertEquals("testavro", outputSystem); + Assert.assertEquals("outputTopic", outputPhysicalName); Assert.assertEquals(1, specGraph.getInputOperators().size()); - Assert.assertEquals("testavro", - specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName()); - Assert.assertEquals("SIMPLE1", - specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName()); + + Assert.assertEquals("testavro", inputSystem); + Assert.assertEquals("SIMPLE1", inputPhysicalName); validatePerTaskContextInit(graphSpec, samzaConfig); } @@ -130,17 +138,24 @@ public class TestQueryTranslator { QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); StreamGraphSpec - graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + graphSpec = new StreamGraphSpec(samzaConfig); translator.translate(queryInfo, graphSpec); OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); + + StreamConfig streamConfig = new StreamConfig(samzaConfig); + String inputStreamId = specGraph.getInputOperators().keySet().stream().findFirst().get(); + String inputSystem = streamConfig.getSystem(inputStreamId); + String inputPhysicalName = streamConfig.getPhysicalName(inputStreamId); + String outputStreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get(); + String outputSystem = streamConfig.getSystem(outputStreamId); + String outputPhysicalName = streamConfig.getPhysicalName(outputStreamId); + Assert.assertEquals(1, specGraph.getOutputStreams().size()); - Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName()); - Assert.assertEquals("outputTopic", specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName()); + Assert.assertEquals("testavro", outputSystem); + Assert.assertEquals("outputTopic", outputPhysicalName); Assert.assertEquals(1, specGraph.getInputOperators().size()); - Assert.assertEquals("testavro", - specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName()); - Assert.assertEquals("COMPLEX1", - specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName()); + Assert.assertEquals("testavro", inputSystem); + Assert.assertEquals("COMPLEX1", inputPhysicalName); validatePerTaskContextInit(graphSpec, samzaConfig); } @@ -155,17 +170,24 @@ public class TestQueryTranslator { QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); StreamGraphSpec - graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + graphSpec = new StreamGraphSpec(samzaConfig); translator.translate(queryInfo, graphSpec); OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); + + StreamConfig streamConfig = new StreamConfig(samzaConfig); + String inputStreamId = specGraph.getInputOperators().keySet().stream().findFirst().get(); + String inputSystem = streamConfig.getSystem(inputStreamId); + String inputPhysicalName = streamConfig.getPhysicalName(inputStreamId); + String outputStreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get(); + String outputSystem = streamConfig.getSystem(outputStreamId); + String outputPhysicalName = streamConfig.getPhysicalName(outputStreamId); + Assert.assertEquals(1, specGraph.getOutputStreams().size()); - Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName()); - Assert.assertEquals("outputTopic", specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName()); + Assert.assertEquals("testavro", outputSystem); + Assert.assertEquals("outputTopic", outputPhysicalName); Assert.assertEquals(1, specGraph.getInputOperators().size()); - Assert.assertEquals("testavro", - specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName()); - Assert.assertEquals("COMPLEX1", - specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName()); + Assert.assertEquals("testavro", inputSystem); + Assert.assertEquals("COMPLEX1", inputPhysicalName); validatePerTaskContextInit(graphSpec, samzaConfig); } @@ -184,7 +206,7 @@ public class TestQueryTranslator { QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); StreamGraphSpec - graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + graphSpec = new StreamGraphSpec(samzaConfig); translator.translate(queryInfo, graphSpec); } @@ -203,7 +225,7 @@ public class TestQueryTranslator { QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); StreamGraphSpec - graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + graphSpec = new StreamGraphSpec(samzaConfig); translator.translate(queryInfo, graphSpec); } @@ -222,7 +244,7 @@ public class TestQueryTranslator { QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); StreamGraphSpec - graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + graphSpec = new StreamGraphSpec(samzaConfig); translator.translate(queryInfo, graphSpec); } @@ -241,7 +263,7 @@ public class TestQueryTranslator { QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); StreamGraphSpec - graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + graphSpec = new StreamGraphSpec(samzaConfig); translator.translate(queryInfo, graphSpec); } @@ -258,7 +280,7 @@ public class TestQueryTranslator { QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); StreamGraphSpec - graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + graphSpec = new StreamGraphSpec(samzaConfig); translator.translate(queryInfo, graphSpec); } @@ -277,7 +299,7 @@ public class TestQueryTranslator { QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); StreamGraphSpec - graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + graphSpec = new StreamGraphSpec(samzaConfig); translator.translate(queryInfo, graphSpec); } @@ -297,7 +319,7 @@ public class TestQueryTranslator { QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); StreamGraphSpec - graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + graphSpec = new StreamGraphSpec(samzaConfig); translator.translate(queryInfo, graphSpec); } @@ -316,7 +338,7 @@ public class TestQueryTranslator { QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); StreamGraphSpec - graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + graphSpec = new StreamGraphSpec(samzaConfig); translator.translate(queryInfo, graphSpec); } @@ -335,7 +357,7 @@ public class TestQueryTranslator { QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); StreamGraphSpec - graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + graphSpec = new StreamGraphSpec(samzaConfig); translator.translate(queryInfo, graphSpec); } @@ -354,7 +376,7 @@ public class TestQueryTranslator { QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); StreamGraphSpec - graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + graphSpec = new StreamGraphSpec(samzaConfig); translator.translate(queryInfo, graphSpec); } @@ -373,7 +395,7 @@ public class TestQueryTranslator { QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); StreamGraphSpec - graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + graphSpec = new StreamGraphSpec(samzaConfig); translator.translate(queryInfo, graphSpec); } @@ -396,7 +418,7 @@ public class TestQueryTranslator { QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); StreamGraphSpec - graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + graphSpec = new StreamGraphSpec(samzaConfig); translator.translate(queryInfo, graphSpec); } @@ -415,7 +437,7 @@ public class TestQueryTranslator { QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); StreamGraphSpec - graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + graphSpec = new StreamGraphSpec(samzaConfig); translator.translate(queryInfo, graphSpec); } @@ -434,29 +456,40 @@ public class TestQueryTranslator { QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); StreamGraphSpec - graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + graphSpec = new StreamGraphSpec(samzaConfig); translator.translate(queryInfo, graphSpec); OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); + StreamConfig streamConfig = new StreamConfig(samzaConfig); + String input1StreamId = specGraph.getInputOperators().keySet().stream().findFirst().get(); + String input1System = streamConfig.getSystem(input1StreamId); + String input1PhysicalName = streamConfig.getPhysicalName(input1StreamId); + String input2StreamId = specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get(); + String input2System = streamConfig.getSystem(input2StreamId); + String input2PhysicalName = streamConfig.getPhysicalName(input2StreamId); + String input3StreamId = specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get(); + String input3System = streamConfig.getSystem(input3StreamId); + String input3PhysicalName = streamConfig.getPhysicalName(input3StreamId); + String output1StreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get(); + String output1System = streamConfig.getSystem(output1StreamId); + String output1PhysicalName = streamConfig.getPhysicalName(output1StreamId); + String output2StreamId = specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get(); + String output2System = streamConfig.getSystem(output2StreamId); + String output2PhysicalName = streamConfig.getPhysicalName(output2StreamId); + Assert.assertEquals(2, specGraph.getOutputStreams().size()); - Assert.assertEquals("kafka", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName()); - Assert.assertEquals("sql-job-1-partition_by-stream_1", specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName()); - Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName()); - Assert.assertEquals("enrichedPageViewTopic", specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName()); + Assert.assertEquals("kafka", output1System); + Assert.assertEquals("sql-job-1-partition_by-stream_1", output1PhysicalName); + Assert.assertEquals("testavro", output2System); + Assert.assertEquals("enrichedPageViewTopic", output2PhysicalName); Assert.assertEquals(3, specGraph.getInputOperators().size()); - Assert.assertEquals("testavro", - specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName()); - Assert.assertEquals("PAGEVIEW", - specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName()); - Assert.assertEquals("testavro", - specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName()); - Assert.assertEquals("PROFILE", - specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName()); - Assert.assertEquals("kafka", - specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName()); - Assert.assertEquals("sql-job-1-partition_by-stream_1", - specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName()); + Assert.assertEquals("testavro", input1System); + Assert.assertEquals("PAGEVIEW", input1PhysicalName); + Assert.assertEquals("testavro", input2System); + Assert.assertEquals("PROFILE", input2PhysicalName); + Assert.assertEquals("kafka", input3System); + Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName); validatePerTaskContextInit(graphSpec, samzaConfig); } @@ -476,32 +509,41 @@ public class TestQueryTranslator { QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); StreamGraphSpec - graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + graphSpec = new StreamGraphSpec(samzaConfig); translator.translate(queryInfo, graphSpec); OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); + StreamConfig streamConfig = new StreamConfig(samzaConfig); + String input1StreamId = specGraph.getInputOperators().keySet().stream().findFirst().get(); + String input1System = streamConfig.getSystem(input1StreamId); + String input1PhysicalName = streamConfig.getPhysicalName(input1StreamId); + String input2StreamId = specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get(); + String input2System = streamConfig.getSystem(input2StreamId); + String input2PhysicalName = streamConfig.getPhysicalName(input2StreamId); + String input3StreamId = specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get(); + String input3System = streamConfig.getSystem(input3StreamId); + String input3PhysicalName = streamConfig.getPhysicalName(input3StreamId); + String output1StreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get(); + String output1System = streamConfig.getSystem(output1StreamId); + String output1PhysicalName = streamConfig.getPhysicalName(output1StreamId); + String output2StreamId = specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get(); + String output2System = streamConfig.getSystem(output2StreamId); + String output2PhysicalName = streamConfig.getPhysicalName(output2StreamId); + Assert.assertEquals(2, specGraph.getOutputStreams().size()); - Assert.assertEquals("kafka", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName()); - Assert.assertEquals("sql-job-1-partition_by-stream_1", - specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName()); - Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName()); - Assert.assertEquals("enrichedPageViewTopic", - specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName()); + Assert.assertEquals("kafka", output1System); + Assert.assertEquals("sql-job-1-partition_by-stream_1", output1PhysicalName); + Assert.assertEquals("testavro", output2System); + Assert.assertEquals("enrichedPageViewTopic", output2PhysicalName); Assert.assertEquals(3, specGraph.getInputOperators().size()); - Assert.assertEquals("testavro", - specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName()); - Assert.assertEquals("PAGEVIEW", - specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName()); - Assert.assertEquals("testavro", - specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName()); - Assert.assertEquals("PROFILE", - specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName()); - Assert.assertEquals("kafka", - specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName()); - Assert.assertEquals("sql-job-1-partition_by-stream_1", - specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName()); + Assert.assertEquals("testavro", input1System); + Assert.assertEquals("PAGEVIEW", input1PhysicalName); + Assert.assertEquals("testavro", input2System); + Assert.assertEquals("PROFILE", input2PhysicalName); + Assert.assertEquals("kafka", input3System); + Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName); validatePerTaskContextInit(graphSpec, samzaConfig); } @@ -521,32 +563,41 @@ public class TestQueryTranslator { QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); StreamGraphSpec - graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + graphSpec = new StreamGraphSpec(samzaConfig); translator.translate(queryInfo, graphSpec); OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); + StreamConfig streamConfig = new StreamConfig(samzaConfig); + String input1StreamId = specGraph.getInputOperators().keySet().stream().findFirst().get(); + String input1System = streamConfig.getSystem(input1StreamId); + String input1PhysicalName = streamConfig.getPhysicalName(input1StreamId); + String input2StreamId = specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get(); + String input2System = streamConfig.getSystem(input2StreamId); + String input2PhysicalName = streamConfig.getPhysicalName(input2StreamId); + String input3StreamId = specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get(); + String input3System = streamConfig.getSystem(input3StreamId); + String input3PhysicalName = streamConfig.getPhysicalName(input3StreamId); + String output1StreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get(); + String output1System = streamConfig.getSystem(output1StreamId); + String output1PhysicalName = streamConfig.getPhysicalName(output1StreamId); + String output2StreamId = specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get(); + String output2System = streamConfig.getSystem(output2StreamId); + String output2PhysicalName = streamConfig.getPhysicalName(output2StreamId); + Assert.assertEquals(2, specGraph.getOutputStreams().size()); - Assert.assertEquals("kafka", specGraph.getOutputStreams().keySet().stream().findFirst().get().getSystemName()); - Assert.assertEquals("sql-job-1-partition_by-stream_1", - specGraph.getOutputStreams().keySet().stream().findFirst().get().getPhysicalName()); - Assert.assertEquals("testavro", specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getSystemName()); - Assert.assertEquals("enrichedPageViewTopic", - specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get().getPhysicalName()); + Assert.assertEquals("kafka", output1System); + Assert.assertEquals("sql-job-1-partition_by-stream_1", output1PhysicalName); + Assert.assertEquals("testavro", output2System); + Assert.assertEquals("enrichedPageViewTopic", output2PhysicalName); Assert.assertEquals(3, specGraph.getInputOperators().size()); - Assert.assertEquals("testavro", - specGraph.getInputOperators().keySet().stream().findFirst().get().getSystemName()); - Assert.assertEquals("PROFILE", - specGraph.getInputOperators().keySet().stream().findFirst().get().getPhysicalName()); - Assert.assertEquals("testavro", - specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getSystemName()); - Assert.assertEquals("PAGEVIEW", - specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get().getPhysicalName()); - Assert.assertEquals("kafka", - specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getSystemName()); - Assert.assertEquals("sql-job-1-partition_by-stream_1", - specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get().getPhysicalName()); + Assert.assertEquals("testavro", input1System); + Assert.assertEquals("PROFILE", input1PhysicalName); + Assert.assertEquals("testavro", input2System); + Assert.assertEquals("PAGEVIEW", input2PhysicalName); + Assert.assertEquals("kafka", input3System); + Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName); validatePerTaskContextInit(graphSpec, samzaConfig); } @@ -566,7 +617,7 @@ public class TestQueryTranslator { QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); StreamGraphSpec - graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + graphSpec = new StreamGraphSpec(samzaConfig); translator.translate(queryInfo, graphSpec); OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); @@ -590,7 +641,7 @@ public class TestQueryTranslator { QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); StreamGraphSpec - graphSpec = new StreamGraphSpec(new LocalApplicationRunner(samzaConfig), samzaConfig); + graphSpec = new StreamGraphSpec(samzaConfig); translator.translate(queryInfo, graphSpec); } } http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java b/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java index 617cea6..4309d92 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java @@ -30,7 +30,7 @@ import org.apache.samza.task.StreamTask; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.task.TaskCoordinator.RequestScope; -import org.apache.samza.util.Util; +import org.apache.samza.util.StreamUtil; /** * A simple test job that reads strings, converts them to integers, multiplies @@ -59,7 +59,7 @@ public class NegateNumberTask implements StreamTask, InitableTask { if (outputSystemStreamString == null) { throw new ConfigException("Missing required configuration: task.outputs"); } - outputSystemStream = Util.getSystemStreamFromNames(outputSystemStreamString); + outputSystemStream = StreamUtil.getSystemStreamFromNames(outputSystemStreamString); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala index d1f1d84..99d047d 100644 --- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala +++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala @@ -27,7 +27,7 @@ import org.apache.samza.task.StreamTask import org.apache.samza.task.TaskCoordinator import org.apache.samza.task.TaskCoordinator.RequestScope import org.apache.samza.config.Config -import org.apache.samza.util.{Util, Logging} +import org.apache.samza.util.{Logging, StreamUtil} import org.apache.samza.system.SystemStream import org.apache.samza.system.OutgoingMessageEnvelope @@ -85,7 +85,7 @@ class TestPerformanceTask extends StreamTask with InitableTask with Logging { def init(config: Config, context: TaskContext) { logInterval = config.getInt("task.log.interval", 10000) maxMessages = config.getInt("task.max.messages", 10000000) - outputSystemStream = Option(config.get("task.outputs", null)).map(Util.getSystemStreamFromNames(_)) + outputSystemStream = Option(config.get("task.outputs", null)).map(StreamUtil.getSystemStreamFromNames) } def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java index 2171d07..ec9c05d 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java @@ -34,7 +34,6 @@ import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.system.OutgoingMessageEnvelope; -import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemStream; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.test.operator.data.AdClick; @@ -54,7 +53,7 @@ public class RepartitionJoinWindowApp implements StreamApplication { public static final String INPUT_TOPIC_NAME_2_PROP = "inputTopicName2"; public static final String OUTPUT_TOPIC_NAME_PROP = "outputTopicName"; - private final List<StreamSpec> intermediateStreams = new ArrayList<>(); + private final List<String> intermediateStreamIds = new ArrayList<>(); public static void main(String[] args) throws Exception { CommandLine cmdLine = new CommandLine(); @@ -106,14 +105,14 @@ public class RepartitionJoinWindowApp implements StreamApplication { }); - intermediateStreams.add(((IntermediateMessageStreamImpl) pageViewsRepartitionedByViewId).getStreamSpec()); - intermediateStreams.add(((IntermediateMessageStreamImpl) adClicksRepartitionedByViewId).getStreamSpec()); - intermediateStreams.add(((IntermediateMessageStreamImpl) userPageAdClicksByUserId).getStreamSpec()); + intermediateStreamIds.add(((IntermediateMessageStreamImpl) pageViewsRepartitionedByViewId).getStreamId()); + intermediateStreamIds.add(((IntermediateMessageStreamImpl) adClicksRepartitionedByViewId).getStreamId()); + intermediateStreamIds.add(((IntermediateMessageStreamImpl) userPageAdClicksByUserId).getStreamId()); } - public List<StreamSpec> getIntermediateStreams() { - return intermediateStreams; + List<String> getIntermediateStreamIds() { + return intermediateStreamIds; } private static class UserPageViewAdClicksJoiner implements JoinFunction<String, PageView, AdClick, UserPageAdClick> { http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java index a2adb70..a9a4026 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java @@ -24,7 +24,6 @@ import java.util.HashSet; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.samza.Partition; -import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata; import org.apache.samza.system.kafka.KafkaSystemAdmin; @@ -122,14 +121,14 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe // Verify that messages in the intermediate stream will be deleted in 10 seconds long startTimeMs = System.currentTimeMillis(); - for (StreamSpec spec: app.getIntermediateStreams()) { + for (String streamId: app.getIntermediateStreamIds()) { long remainingMessageNum = -1; while (remainingMessageNum != 0 && System.currentTimeMillis() - startTimeMs < 10000) { remainingMessageNum = 0; SystemStreamMetadata metadatas = systemAdmin.getSystemStreamMetadata( - new HashSet<>(Arrays.asList(spec.getPhysicalName())), new ExponentialSleepStrategy.Mock(3) - ).get(spec.getPhysicalName()).get(); + new HashSet<>(Arrays.asList(streamId)), new ExponentialSleepStrategy.Mock(3) + ).get(streamId).get(); for (Map.Entry<Partition, SystemStreamPartitionMetadata> entry : metadatas.getSystemStreamPartitionMetadata().entrySet()) { SystemStreamPartitionMetadata metadata = entry.getValue();
