http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java index 6fdcacc..4e77fae 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java @@ -21,6 +21,7 @@ package org.apache.samza.operators.impl; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; + import java.io.Serializable; import java.time.Duration; import java.util.ArrayList; @@ -28,6 +29,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.BiFunction; @@ -36,6 +38,7 @@ import org.apache.samza.Partition; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; +import org.apache.samza.config.StreamConfig; import org.apache.samza.container.SamzaContainerContext; import org.apache.samza.container.TaskContextImpl; import org.apache.samza.container.TaskName; @@ -45,8 +48,8 @@ import org.apache.samza.job.model.TaskModel; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.StreamGraphSpec; import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraphSpec; import org.apache.samza.operators.functions.ClosableFunction; import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.InitableFunction; @@ -54,20 +57,18 @@ import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.util.TimestampedValue; import org.apache.samza.operators.spec.OperatorSpec.OpCode; -import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.storage.kv.KeyValueStore; -import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskContext; -import java.util.List; import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.testUtils.StreamTestUtils; import org.apache.samza.util.Clock; import org.apache.samza.util.SystemClock; import org.junit.After; @@ -76,7 +77,6 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -217,7 +217,7 @@ public class TestOperatorImplGraph { @Test public void testEmptyChain() { - StreamGraphSpec graphSpec = new StreamGraphSpec(mock(ApplicationRunner.class), mock(Config.class)); + StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); OperatorImplGraph opGraph = new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mock(Config.class), mock(TaskContextImpl.class), mock(Clock.class)); assertEquals(0, opGraph.getAllInputOperators().size()); @@ -225,13 +225,26 @@ public class TestOperatorImplGraph { @Test public void testLinearChain() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system")); - when(mockRunner.getStreamSpec(eq("output"))).thenReturn(mock(StreamSpec.class)); - StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class)); - - MessageStream<Object> inputStream = graphSpec.getInputStream("input"); - OutputStream<Object> outputStream = graphSpec.getOutputStream("output"); + String inputStreamId = "input"; + String inputSystem = "input-system"; + String inputPhysicalName = "input-stream"; + String outputStreamId = "output"; + String outputSystem = "output-system"; + String outputPhysicalName = "output-stream"; + String intermediateSystem = "intermediate-system"; + + HashMap<String, String> configs = new HashMap<>(); + configs.put(JobConfig.JOB_NAME(), "jobName"); + configs.put(JobConfig.JOB_ID(), "jobId"); + configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), intermediateSystem); + StreamTestUtils.addStreamConfigs(configs, inputStreamId, inputSystem, inputPhysicalName); + StreamTestUtils.addStreamConfigs(configs, outputStreamId, outputSystem, outputPhysicalName); + Config config = new MapConfig(configs); + + StreamGraphSpec graphSpec = new StreamGraphSpec(config); + + MessageStream<Object> inputStream = graphSpec.getInputStream(inputStreamId); + OutputStream<Object> outputStream = graphSpec.getOutputStream(outputStreamId); inputStream .filter(mock(FilterFunction.class)) @@ -242,9 +255,9 @@ public class TestOperatorImplGraph { when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); when(mockTaskContext.getTaskName()).thenReturn(new TaskName("task 0")); OperatorImplGraph opImplGraph = - new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mock(Config.class), mockTaskContext, mock(Clock.class)); + new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, mockTaskContext, mock(Clock.class)); - InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream")); + InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream(inputSystem, inputPhysicalName)); assertEquals(1, inputOpImpl.registeredOperators.size()); OperatorImpl filterOpImpl = (StreamOperatorImpl) inputOpImpl.registeredOperators.iterator().next(); @@ -262,18 +275,27 @@ public class TestOperatorImplGraph { @Test public void testPartitionByChain() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system")); - when(mockRunner.getStreamSpec(eq("output"))).thenReturn(new StreamSpec("output", "output-stream", "output-system")); - when(mockRunner.getStreamSpec(eq("jobName-jobId-partition_by-p1"))) - .thenReturn(new StreamSpec("intermediate", "intermediate-stream", "intermediate-system")); - Config mockConfig = mock(Config.class); - when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName"); - when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId"); - StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig); - MessageStream<Object> inputStream = graphSpec.getInputStream("input"); + String inputStreamId = "input"; + String inputSystem = "input-system"; + String inputPhysicalName = "input-stream"; + String outputStreamId = "output"; + String outputSystem = "output-system"; + String outputPhysicalName = "output-stream"; + String intermediateStreamId = "jobName-jobId-partition_by-p1"; + String intermediateSystem = "intermediate-system"; + + HashMap<String, String> configs = new HashMap<>(); + configs.put(JobConfig.JOB_NAME(), "jobName"); + configs.put(JobConfig.JOB_ID(), "jobId"); + configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), intermediateSystem); + StreamTestUtils.addStreamConfigs(configs, inputStreamId, inputSystem, inputPhysicalName); + StreamTestUtils.addStreamConfigs(configs, outputStreamId, outputSystem, outputPhysicalName); + Config config = new MapConfig(configs); + + StreamGraphSpec graphSpec = new StreamGraphSpec(config); + MessageStream<Object> inputStream = graphSpec.getInputStream(inputStreamId); OutputStream<KV<Integer, String>> outputStream = graphSpec - .getOutputStream("output", KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class))); + .getOutputStream(outputStreamId, KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class))); inputStream .partitionBy(Object::hashCode, Object::toString, @@ -291,12 +313,12 @@ public class TestOperatorImplGraph { when(taskModel.getSystemStreamPartitions()).thenReturn(Collections.emptySet()); when(mockTaskContext.getJobModel()).thenReturn(jobModel); SamzaContainerContext containerContext = - new SamzaContainerContext("0", mockConfig, Collections.singleton(new TaskName("task 0")), new MetricsRegistryMap()); + new SamzaContainerContext("0", config, Collections.singleton(new TaskName("task 0")), new MetricsRegistryMap()); when(mockTaskContext.getSamzaContainerContext()).thenReturn(containerContext); OperatorImplGraph opImplGraph = - new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mockConfig, mockTaskContext, mock(Clock.class)); + new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, mockTaskContext, mock(Clock.class)); - InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream")); + InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream(inputSystem, inputPhysicalName)); assertEquals(1, inputOpImpl.registeredOperators.size()); OperatorImpl partitionByOpImpl = (PartitionByOperatorImpl) inputOpImpl.registeredOperators.iterator().next(); @@ -304,7 +326,7 @@ public class TestOperatorImplGraph { assertEquals(OpCode.PARTITION_BY, partitionByOpImpl.getOperatorSpec().getOpCode()); InputOperatorImpl repartitionedInputOpImpl = - opImplGraph.getInputOperator(new SystemStream("intermediate-system", "intermediate-stream")); + opImplGraph.getInputOperator(new SystemStream(intermediateSystem, intermediateStreamId)); assertEquals(1, repartitionedInputOpImpl.registeredOperators.size()); OperatorImpl sendToOpImpl = (OutputOperatorImpl) repartitionedInputOpImpl.registeredOperators.iterator().next(); @@ -314,18 +336,20 @@ public class TestOperatorImplGraph { @Test public void testBroadcastChain() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system")); - StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class)); + String inputStreamId = "input"; + HashMap<String, String> configMap = new HashMap<>(); + StreamTestUtils.addStreamConfigs(configMap, "input", "input-system", "input-stream"); + Config config = new MapConfig(configMap); + StreamGraphSpec graphSpec = new StreamGraphSpec(config); - MessageStream<Object> inputStream = graphSpec.getInputStream("input"); + MessageStream<Object> inputStream = graphSpec.getInputStream(inputStreamId); inputStream.filter(mock(FilterFunction.class)); inputStream.map(mock(MapFunction.class)); TaskContextImpl mockTaskContext = mock(TaskContextImpl.class); when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); OperatorImplGraph opImplGraph = - new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mock(Config.class), mockTaskContext, mock(Clock.class)); + new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, mockTaskContext, mock(Clock.class)); InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream")); assertEquals(2, inputOpImpl.registeredOperators.size()); @@ -337,12 +361,10 @@ public class TestOperatorImplGraph { @Test public void testMergeChain() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - when(mockRunner.getStreamSpec(eq("input"))) - .thenReturn(new StreamSpec("input", "input-stream", "input-system")); - StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class)); + String inputStreamId = "input"; + StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class)); - MessageStream<Object> inputStream = graphSpec.getInputStream("input"); + MessageStream<Object> inputStream = graphSpec.getInputStream(inputStreamId); MessageStream<Object> stream1 = inputStream.filter(mock(FilterFunction.class)); MessageStream<Object> stream2 = inputStream.map(mock(MapFunction.class)); MessageStream<Object> mergedStream = stream1.merge(Collections.singleton(stream2)); @@ -372,20 +394,23 @@ public class TestOperatorImplGraph { @Test public void testJoinChain() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - when(mockRunner.getStreamSpec(eq("input1"))).thenReturn(new StreamSpec("input1", "input-stream1", "input-system")); - when(mockRunner.getStreamSpec(eq("input2"))).thenReturn(new StreamSpec("input2", "input-stream2", "input-system")); - Config mockConfig = mock(Config.class); - when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName"); - when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId"); - StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig); + String inputStreamId1 = "input1"; + String inputStreamId2 = "input2"; + + HashMap<String, String> configs = new HashMap<>(); + configs.put(JobConfig.JOB_NAME(), "jobName"); + configs.put(JobConfig.JOB_ID(), "jobId"); + StreamTestUtils.addStreamConfigs(configs, "input1", "input-system", "input-stream1"); + StreamTestUtils.addStreamConfigs(configs, "input2", "input-system", "input-stream2"); + Config config = new MapConfig(configs); + StreamGraphSpec graphSpec = new StreamGraphSpec(config); Integer joinKey = new Integer(1); Function<Object, Integer> keyFn = (Function & Serializable) m -> joinKey; JoinFunction testJoinFunction = new TestJoinFunction("jobName-jobId-join-j1", (BiFunction & Serializable) (m1, m2) -> KV.of(m1, m2), keyFn, keyFn); - MessageStream<Object> inputStream1 = graphSpec.getInputStream("input1", new NoOpSerde<>()); - MessageStream<Object> inputStream2 = graphSpec.getInputStream("input2", new NoOpSerde<>()); + MessageStream<Object> inputStream1 = graphSpec.getInputStream(inputStreamId1, new NoOpSerde<>()); + MessageStream<Object> inputStream2 = graphSpec.getInputStream(inputStreamId2, new NoOpSerde<>()); inputStream1.join(inputStream2, testJoinFunction, mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j1"); @@ -398,7 +423,7 @@ public class TestOperatorImplGraph { KeyValueStore mockRightStore = mock(KeyValueStore.class); when(mockTaskContext.getStore(eq("jobName-jobId-join-j1-R"))).thenReturn(mockRightStore); OperatorImplGraph opImplGraph = - new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mockConfig, mockTaskContext, mock(Clock.class)); + new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, mockTaskContext, mock(Clock.class)); // verify that join function is initialized once. assertEquals(TestJoinFunction.getInstanceByTaskName(mockTaskName, "jobName-jobId-join-j1").numInitCalled, 1); @@ -434,18 +459,17 @@ public class TestOperatorImplGraph { @Test public void testOperatorGraphInitAndClose() { - ApplicationRunner mockRunner = mock(ApplicationRunner.class); - when(mockRunner.getStreamSpec("input1")).thenReturn(new StreamSpec("input1", "input-stream1", "input-system")); - when(mockRunner.getStreamSpec("input2")).thenReturn(new StreamSpec("input2", "input-stream2", "input-system")); + String inputStreamId1 = "input1"; + String inputStreamId2 = "input2"; Config mockConfig = mock(Config.class); TaskName mockTaskName = mock(TaskName.class); TaskContextImpl mockContext = mock(TaskContextImpl.class); when(mockContext.getTaskName()).thenReturn(mockTaskName); when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); - StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig); + StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig); - MessageStream<Object> inputStream1 = graphSpec.getInputStream("input1"); - MessageStream<Object> inputStream2 = graphSpec.getInputStream("input2"); + MessageStream<Object> inputStream1 = graphSpec.getInputStream(inputStreamId1); + MessageStream<Object> inputStream2 = graphSpec.getInputStream(inputStreamId2); Function mapFn = (Function & Serializable) m -> m; inputStream1.map(new TestMapFunction<Object, Object>("1", mapFn)) @@ -476,12 +500,19 @@ public class TestOperatorImplGraph { @Test public void testGetStreamToConsumerTasks() { String system = "test-system"; - String stream0 = "test-stream-0"; - String stream1 = "test-stream-1"; + String streamId0 = "test-stream-0"; + String streamId1 = "test-stream-1"; + + HashMap<String, String> configs = new HashMap<>(); + configs.put(JobConfig.JOB_NAME(), "test-app"); + configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system"); + StreamTestUtils.addStreamConfigs(configs, streamId0, system, streamId0); + StreamTestUtils.addStreamConfigs(configs, streamId1, system, streamId1); + Config config = new MapConfig(configs); - SystemStreamPartition ssp0 = new SystemStreamPartition(system, stream0, new Partition(0)); - SystemStreamPartition ssp1 = new SystemStreamPartition(system, stream0, new Partition(1)); - SystemStreamPartition ssp2 = new SystemStreamPartition(system, stream1, new Partition(0)); + SystemStreamPartition ssp0 = new SystemStreamPartition(system, streamId0, new Partition(0)); + SystemStreamPartition ssp1 = new SystemStreamPartition(system, streamId0, new Partition(1)); + SystemStreamPartition ssp2 = new SystemStreamPartition(system, streamId1, new Partition(0)); TaskName task0 = new TaskName("Task 0"); TaskName task1 = new TaskName("Task 1"); @@ -497,7 +528,7 @@ public class TestOperatorImplGraph { cms.put(cm0.getProcessorId(), cm0); cms.put(cm1.getProcessorId(), cm1); - JobModel jobModel = new JobModel(new MapConfig(), cms, null); + JobModel jobModel = new JobModel(config, cms, null); Multimap<SystemStream, String> streamToTasks = OperatorImplGraph.getStreamToConsumerTasks(jobModel); assertEquals(streamToTasks.get(ssp0.getSystemStream()).size(), 2); assertEquals(streamToTasks.get(ssp2.getSystemStream()).size(), 1); @@ -505,56 +536,44 @@ public class TestOperatorImplGraph { @Test public void testGetOutputToInputStreams() { - Map<String, String> configMap = new HashMap<>(); - configMap.put(JobConfig.JOB_NAME(), "test-app"); - configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system"); - Config config = new MapConfig(configMap); - - /** - * the graph looks like the following. number of partitions in parentheses. quotes indicate expected value. - * - * input1 -> map -> join -> partitionBy (10) -> output1 - * | - * input2 -> filter -| - * | - * input3 -> filter -> partitionBy -> map -> join -> output2 - * - */ - StreamSpec input1 = new StreamSpec("input1", "input1", "system1"); - StreamSpec input2 = new StreamSpec("input2", "input2", "system2"); - StreamSpec input3 = new StreamSpec("input3", "input3", "system2"); - - StreamSpec output1 = new StreamSpec("output1", "output1", "system1"); - StreamSpec output2 = new StreamSpec("output2", "output2", "system2"); - - ApplicationRunner runner = mock(ApplicationRunner.class); - when(runner.getStreamSpec("input1")).thenReturn(input1); - when(runner.getStreamSpec("input2")).thenReturn(input2); - when(runner.getStreamSpec("input3")).thenReturn(input3); - when(runner.getStreamSpec("output1")).thenReturn(output1); - when(runner.getStreamSpec("output2")).thenReturn(output2); - - // intermediate streams used in tests - StreamSpec int1 = new StreamSpec("test-app-1-partition_by-p2", "test-app-1-partition_by-p2", "default-system"); - StreamSpec int2 = new StreamSpec("test-app-1-partition_by-p1", "test-app-1-partition_by-p1", "default-system"); - when(runner.getStreamSpec("test-app-1-partition_by-p2")).thenReturn(int1); - when(runner.getStreamSpec("test-app-1-partition_by-p1")).thenReturn(int2); - - StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config); - MessageStream messageStream1 = graphSpec.getInputStream("input1").map(m -> m); - MessageStream messageStream2 = graphSpec.getInputStream("input2").filter(m -> true); + String inputStreamId1 = "input1"; + String inputStreamId2 = "input2"; + String inputStreamId3 = "input3"; + String inputSystem = "input-system"; + + String outputStreamId1 = "output1"; + String outputStreamId2 = "output2"; + String outputSystem = "output-system"; + + String intStreamId1 = "test-app-1-partition_by-p1"; + String intStreamId2 = "test-app-1-partition_by-p2"; + String intSystem = "test-system"; + + HashMap<String, String> configs = new HashMap<>(); + configs.put(JobConfig.JOB_NAME(), "test-app"); + configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), intSystem); + StreamTestUtils.addStreamConfigs(configs, inputStreamId1, inputSystem, inputStreamId1); + StreamTestUtils.addStreamConfigs(configs, inputStreamId2, inputSystem, inputStreamId2); + StreamTestUtils.addStreamConfigs(configs, inputStreamId3, inputSystem, inputStreamId3); + StreamTestUtils.addStreamConfigs(configs, outputStreamId1, outputSystem, outputStreamId1); + StreamTestUtils.addStreamConfigs(configs, outputStreamId2, outputSystem, outputStreamId2); + Config config = new MapConfig(configs); + + StreamGraphSpec graphSpec = new StreamGraphSpec(config); + MessageStream messageStream1 = graphSpec.getInputStream(inputStreamId1).map(m -> m); + MessageStream messageStream2 = graphSpec.getInputStream(inputStreamId2).filter(m -> true); MessageStream messageStream3 = - graphSpec.getInputStream("input3") + graphSpec.getInputStream(inputStreamId3) .filter(m -> true) - .partitionBy(m -> "hehe", m -> m, "p1") + .partitionBy(m -> "m", m -> m, "p1") .map(m -> m); - OutputStream<Object> outputStream1 = graphSpec.getOutputStream("output1"); - OutputStream<Object> outputStream2 = graphSpec.getOutputStream("output2"); + OutputStream<Object> outputStream1 = graphSpec.getOutputStream(outputStreamId1); + OutputStream<Object> outputStream2 = graphSpec.getOutputStream(outputStreamId2); messageStream1 .join(messageStream2, mock(JoinFunction.class), mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1") - .partitionBy(m -> "haha", m -> m, "p2") + .partitionBy(m -> "m", m -> m, "p2") .sendTo(outputStream1); messageStream3 .join(messageStream2, mock(JoinFunction.class), @@ -562,19 +581,41 @@ public class TestOperatorImplGraph { .sendTo(outputStream2); Multimap<SystemStream, SystemStream> outputToInput = - OperatorImplGraph.getIntermediateToInputStreamsMap(graphSpec.getOperatorSpecGraph()); - Collection<SystemStream> inputs = outputToInput.get(int1.toSystemStream()); + OperatorImplGraph.getIntermediateToInputStreamsMap(graphSpec.getOperatorSpecGraph(), new StreamConfig(config)); + Collection<SystemStream> inputs = outputToInput.get(new SystemStream(intSystem, intStreamId2)); assertEquals(inputs.size(), 2); - assertTrue(inputs.contains(input1.toSystemStream())); - assertTrue(inputs.contains(input2.toSystemStream())); + assertTrue(inputs.contains(new SystemStream(inputSystem, inputStreamId1))); + assertTrue(inputs.contains(new SystemStream(inputSystem, inputStreamId2))); - inputs = outputToInput.get(int2.toSystemStream()); + inputs = outputToInput.get(new SystemStream(intSystem, intStreamId1)); assertEquals(inputs.size(), 1); - assertEquals(inputs.iterator().next(), input3.toSystemStream()); + assertEquals(inputs.iterator().next(), new SystemStream(inputSystem, inputStreamId3)); } @Test public void testGetProducerTaskCountForIntermediateStreams() { + String inputStreamId1 = "input1"; + String inputStreamId2 = "input2"; + String inputStreamId3 = "input3"; + String inputSystem1 = "system1"; + String inputSystem2 = "system2"; + + HashMap<String, String> configs = new HashMap<>(); + configs.put(JobConfig.JOB_NAME(), "test-app"); + configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), inputSystem1); + StreamTestUtils.addStreamConfigs(configs, inputStreamId1, inputSystem1, inputStreamId1); + StreamTestUtils.addStreamConfigs(configs, inputStreamId2, inputSystem2, inputStreamId2); + StreamTestUtils.addStreamConfigs(configs, inputStreamId3, inputSystem2, inputStreamId3); + Config config = new MapConfig(configs); + + SystemStream input1 = new SystemStream("system1", "intput1"); + SystemStream input2 = new SystemStream("system2", "intput2"); + SystemStream input3 = new SystemStream("system2", "intput3"); + + SystemStream int1 = new SystemStream("system1", "int1"); + SystemStream int2 = new SystemStream("system1", "int2"); + + /** * the task assignment looks like the following: * @@ -585,15 +626,6 @@ public class TestOperatorImplGraph { * input3 ------> task1 -----------> int2 * */ - - SystemStream input1 = new SystemStream("system1", "intput1"); - SystemStream input2 = new SystemStream("system2", "intput2"); - SystemStream input3 = new SystemStream("system2", "intput3"); - - SystemStream int1 = new SystemStream("system1", "int1"); - SystemStream int2 = new SystemStream("system1", "int2"); - - String task0 = "Task 0"; String task1 = "Task 1"; String task2 = "Task 2";
http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java index 9741fc4..0ef6680 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java @@ -42,13 +42,11 @@ import org.apache.samza.operators.triggers.Triggers; import org.apache.samza.operators.windows.AccumulationMode; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.Windows; -import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.Serde; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.OutgoingMessageEnvelope; -import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.task.MessageCollector; @@ -80,7 +78,6 @@ public class TestWindowOperator { private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3); private Config config; private TaskContextImpl taskContext; - private ApplicationRunner runner; @Before public void setup() throws Exception { @@ -88,19 +85,16 @@ public class TestWindowOperator { when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName"); when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId"); taskContext = mock(TaskContextImpl.class); - runner = mock(ApplicationRunner.class); Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde()); Serde storeValSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde()); when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet - .of(new SystemStreamPartition("kafka", "integers", new Partition(0)))); + .of(new SystemStreamPartition("kafka", "integTestExecutionPlannerers", new Partition(0)))); when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); when(taskContext.getStore("jobName-jobId-window-w1")) .thenReturn(new TestInMemoryStore<>(storeKeySerde, storeValSerde)); - when(runner.getStreamSpec("integers")).thenReturn(new StreamSpec("integers", "integers", "kafka")); Map<String, String> mapConfig = new HashMap<>(); - mapConfig.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner"); mapConfig.put("job.default.system", "kafka"); mapConfig.put("job.name", "jobName"); mapConfig.put("job.id", "jobId"); @@ -552,7 +546,7 @@ public class TestWindowOperator { private StreamGraphSpec getKeyedTumblingWindowStreamGraph(AccumulationMode mode, Duration duration, Trigger<KV<Integer, Integer>> earlyTrigger) throws IOException { - StreamGraphSpec graph = new StreamGraphSpec(runner, config); + StreamGraphSpec graph = new StreamGraphSpec(config); KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde()); graph.getInputStream("integers", kvSerde) @@ -568,7 +562,7 @@ public class TestWindowOperator { private StreamGraphSpec getTumblingWindowStreamGraph(AccumulationMode mode, Duration duration, Trigger<KV<Integer, Integer>> earlyTrigger) throws IOException { - StreamGraphSpec graph = new StreamGraphSpec(runner, config); + StreamGraphSpec graph = new StreamGraphSpec(config); KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde()); graph.getInputStream("integers", kvSerde) @@ -582,7 +576,7 @@ public class TestWindowOperator { } private StreamGraphSpec getKeyedSessionWindowStreamGraph(AccumulationMode mode, Duration duration) throws IOException { - StreamGraphSpec graph = new StreamGraphSpec(runner, config); + StreamGraphSpec graph = new StreamGraphSpec(config); KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde()); graph.getInputStream("integers", kvSerde) @@ -597,7 +591,7 @@ public class TestWindowOperator { private StreamGraphSpec getAggregateTumblingWindowStreamGraph(AccumulationMode mode, Duration timeDuration, Trigger<IntegerEnvelope> earlyTrigger) throws IOException { - StreamGraphSpec graph = new StreamGraphSpec(runner, config); + StreamGraphSpec graph = new StreamGraphSpec(config); MessageStream<KV<Integer, Integer>> integers = graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde())); http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java b/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java index b39b0d0..7704a5b 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java @@ -31,7 +31,6 @@ import org.apache.samza.operators.TableImpl; import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.serializers.SerializableSerde; -import org.apache.samza.system.StreamSpec; import org.apache.samza.table.TableSpec; import static org.junit.Assert.*; @@ -105,8 +104,8 @@ public class OperatorSpecTestUtils { assertEquals(oTable.getTableSpec(), nTable.getTableSpec()); } - private static void assertClonedOutputs(Map<StreamSpec, OutputStreamImpl> originalOutputs, - Map<StreamSpec, OutputStreamImpl> clonedOutputs) { + private static void assertClonedOutputs(Map<String, OutputStreamImpl> originalOutputs, + Map<String, OutputStreamImpl> clonedOutputs) { assertEquals(originalOutputs.size(), clonedOutputs.size()); assertEquals(originalOutputs.keySet(), clonedOutputs.keySet()); Iterator<OutputStreamImpl> oIter = originalOutputs.values().iterator(); @@ -117,12 +116,11 @@ public class OperatorSpecTestUtils { private static void assertClonedOutputImpl(OutputStreamImpl oOutput, OutputStreamImpl nOutput) { assertNotEquals(oOutput, nOutput); assertEquals(oOutput.isKeyed(), nOutput.isKeyed()); - assertEquals(oOutput.getSystemStream(), nOutput.getSystemStream()); - assertEquals(oOutput.getStreamSpec(), nOutput.getStreamSpec()); + assertEquals(oOutput.getStreamId(), nOutput.getStreamId()); } - private static void assertClonedInputs(Map<StreamSpec, InputOperatorSpec> originalInputs, - Map<StreamSpec, InputOperatorSpec> clonedInputs) { + private static void assertClonedInputs(Map<String, InputOperatorSpec> originalInputs, + Map<String, InputOperatorSpec> clonedInputs) { assertEquals(originalInputs.size(), clonedInputs.size()); assertEquals(originalInputs.keySet(), clonedInputs.keySet()); Iterator<InputOperatorSpec> oIter = originalInputs.values().iterator(); @@ -134,7 +132,7 @@ public class OperatorSpecTestUtils { assertNotEquals(originalInput, clonedInput); assertEquals(originalInput.getOpId(), clonedInput.getOpId()); assertEquals(originalInput.getOpCode(), clonedInput.getOpCode()); - assertEquals(originalInput.getStreamSpec(), clonedInput.getStreamSpec()); + assertEquals(originalInput.getStreamId(), clonedInput.getStreamId()); assertEquals(originalInput.isKeyed(), clonedInput.isKeyed()); } http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java index cb221b0..b27c944 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java @@ -40,7 +40,6 @@ import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.StreamSpec; import org.apache.samza.table.TableSpec; import org.junit.Test; import org.mockito.internal.util.reflection.Whitebox; @@ -50,7 +49,6 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; /** @@ -230,9 +228,8 @@ public class TestOperatorSpec { } }; - StreamSpec mockStreamSpec = mock(StreamSpec.class); InputOperatorSpec<String, Object> inputOperatorSpec = new InputOperatorSpec<>( - mockStreamSpec, new StringSerde("UTF-8"), objSerde, true, "op0"); + "mockStreamId", new StringSerde("UTF-8"), objSerde, true, "op0"); InputOperatorSpec<String, Object> inputOpCopy = (InputOperatorSpec<String, Object>) OperatorSpecTestUtils.copyOpSpec(inputOperatorSpec); assertNotEquals("Expected deserialized copy of operator spec should not be the same as the original operator spec", inputOperatorSpec, inputOpCopy); @@ -254,9 +251,8 @@ public class TestOperatorSpec { return new byte[0]; } }; - StreamSpec mockStreamSpec = mock(StreamSpec.class); - OutputStreamImpl<KV<String, Object>> outputStrmImpl = new OutputStreamImpl<>(mockStreamSpec, new StringSerde("UTF-8"), objSerde, true); - OutputOperatorSpec<KV<String, Object>> outputOperatorSpec = new OutputOperatorSpec<KV<String, Object>>(outputStrmImpl, "op0"); + OutputStreamImpl<KV<String, Object>> outputStrmImpl = new OutputStreamImpl<>("mockStreamId", new StringSerde("UTF-8"), objSerde, true); + OutputOperatorSpec<KV<String, Object>> outputOperatorSpec = new OutputOperatorSpec<>(outputStrmImpl, "op0"); OutputOperatorSpec<KV<String, Object>> outputOpCopy = (OutputOperatorSpec<KV<String, Object>>) OperatorSpecTestUtils .copyOpSpec(outputOperatorSpec); assertNotEquals("Expected deserialized copy of operator spec should not be the same as the original operator spec", outputOperatorSpec, outputOpCopy); @@ -276,10 +272,10 @@ public class TestOperatorSpec { public void testJoinOperatorSpec() { InputOperatorSpec<TestMessageEnvelope, Object> leftOpSpec = new InputOperatorSpec<>( - new StreamSpec("test-input-1", "test-input-1", "kafka"), new NoOpSerde<>(), + "test-input-1", new NoOpSerde<>(), new NoOpSerde<>(), false, "op0"); InputOperatorSpec<TestMessageEnvelope, Object> rightOpSpec = new InputOperatorSpec<>( - new StreamSpec("test-input-2", "test-input-2", "kafka"), new NoOpSerde<>(), + "test-input-2", new NoOpSerde<>(), new NoOpSerde<>(), false, "op1"); Serde<Object> objSerde = new Serde<Object>() { @@ -341,14 +337,14 @@ public class TestOperatorSpec { @Test public void testBroadcastOperatorSpec() { OutputStreamImpl<TestOutputMessageEnvelope> outputStream = - new OutputStreamImpl<>(new StreamSpec("output-0", "outputStream-0", "kafka"), new StringSerde("UTF-8"), new JsonSerdeV2<TestOutputMessageEnvelope>(), true); + new OutputStreamImpl<>("output-0", new StringSerde("UTF-8"), new JsonSerdeV2<TestOutputMessageEnvelope>(), true); BroadcastOperatorSpec<TestOutputMessageEnvelope> broadcastOpSpec = new BroadcastOperatorSpec<>(outputStream, "broadcast-1"); BroadcastOperatorSpec<TestOutputMessageEnvelope> broadcastOpCopy = (BroadcastOperatorSpec<TestOutputMessageEnvelope>) OperatorSpecTestUtils .copyOpSpec(broadcastOpSpec); assertNotEquals(broadcastOpCopy, broadcastOpSpec); assertEquals(broadcastOpCopy.getOpId(), broadcastOpSpec.getOpId()); assertTrue(broadcastOpCopy.getOutputStream() != broadcastOpSpec.getOutputStream()); - assertEquals(broadcastOpCopy.getOutputStream().getSystemStream(), broadcastOpSpec.getOutputStream().getSystemStream()); + assertEquals(broadcastOpCopy.getOutputStream().getStreamId(), broadcastOpSpec.getOutputStream().getStreamId()); assertEquals(broadcastOpCopy.getOutputStream().isKeyed(), broadcastOpSpec.getOutputStream().isKeyed()); } http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java index 00ec176..9bbcbfa 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java @@ -29,9 +29,7 @@ import org.apache.samza.operators.TimerRegistry; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.operators.functions.WatermarkFunction; -import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.serializers.NoOpSerde; -import org.apache.samza.system.StreamSpec; import org.junit.Before; import org.junit.Test; import org.mockito.internal.util.reflection.Whitebox; @@ -45,7 +43,6 @@ import static org.mockito.Mockito.*; */ public class TestPartitionByOperatorSpec { - private final ApplicationRunner mockRunner = mock(ApplicationRunner.class); private final Config mockConfig = mock(Config.class); private final String testInputId = "test-input-1"; private final String testJobName = "testJob"; @@ -93,12 +90,7 @@ public class TestPartitionByOperatorSpec { public void setup() { when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn(testJobName); when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn(testJobId); - StreamSpec inputSpec1 = new StreamSpec(testInputId, testInputId, "kafka"); - when(mockRunner.getStreamSpec(testInputId)).thenReturn(inputSpec1); - String intermediateStreamName = String.format("%s-%s-partition_by-%s", testJobName, testJobId, testReparStreamName); - StreamSpec intermediateSpec1 = new StreamSpec(intermediateStreamName, intermediateStreamName, "kafka"); - when(mockRunner.getStreamSpec(intermediateStreamName)).thenReturn(intermediateSpec1); - graphSpec = new StreamGraphSpec(mockRunner, mockConfig); + graphSpec = new StreamGraphSpec(mockConfig); } @Test @@ -109,7 +101,7 @@ public class TestPartitionByOperatorSpec { MessageStream<KV<String, Object>> reparStream = inputStream.partitionBy(keyFn, valueFn, testReparStreamName); InputOperatorSpec inputOpSpec = (InputOperatorSpec) Whitebox.getInternalState(reparStream, "operatorSpec"); - assertEquals(inputOpSpec.getStreamSpec().getId(), String.format("%s-%s-partition_by-%s", testJobName, testJobId, testReparStreamName)); + assertEquals(inputOpSpec.getStreamId(), String.format("%s-%s-partition_by-%s", testJobName, testJobId, testReparStreamName)); assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde); assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde); assertTrue(inputOpSpec.isKeyed()); @@ -121,7 +113,7 @@ public class TestPartitionByOperatorSpec { assertEquals(reparOpSpec.getOpId(), String.format("%s-%s-partition_by-%s", testJobName, testJobId, testReparStreamName)); assertEquals(reparOpSpec.getKeyFunction(), keyFn); assertEquals(reparOpSpec.getValueFunction(), valueFn); - assertEquals(reparOpSpec.getOutputStream().getStreamSpec(), new StreamSpec(reparOpSpec.getOpId(), reparOpSpec.getOpId(), "kafka")); + assertEquals(reparOpSpec.getOutputStream().getStreamId(), reparOpSpec.getOpId()); assertNull(reparOpSpec.getTimerFn()); assertNull(reparOpSpec.getWatermarkFn()); } http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java deleted file mode 100644 index b8d3f15..0000000 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java +++ /dev/null @@ -1,391 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.runtime; - -import java.util.HashMap; -import java.util.Map; -import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; -import org.apache.samza.config.JobConfig; -import org.apache.samza.config.MapConfig; -import org.apache.samza.config.StreamConfig; -import org.apache.samza.job.ApplicationStatus; -import org.apache.samza.system.StreamSpec; -import org.junit.Test; - -import static org.junit.Assert.*; - -public class TestAbstractApplicationRunner { - private static final String STREAM_ID = "t3st-Stream_Id"; - private static final String STREAM_ID_INVALID = "test#Str3amId!"; - - private static final String TEST_PHYSICAL_NAME = "t3st-Physical_Name"; - private static final String TEST_PHYSICAL_NAME2 = "testPhysicalName2"; - private static final String TEST_PHYSICAL_NAME_SPECIAL_CHARS = "test://Physical.Name?"; - - private static final String TEST_SYSTEM = "t3st-System_Name"; - private static final String TEST_SYSTEM2 = "testSystemName2"; - private static final String TEST_SYSTEM_INVALID = "test:System!Name@"; - - private static final String TEST_DEFAULT_SYSTEM = "testDefaultSystemName"; - - - @Test(expected = NullPointerException.class) - public void testConfigValidation() { - new TestAbstractApplicationRunnerImpl(null); - } - - // The physical name should be pulled from the StreamConfig.PHYSICAL_NAME property value. - @Test - public void testgetStreamWithPhysicalNameInConfig() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, - StreamConfig.SYSTEM(), TEST_SYSTEM); - - AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = runner.getStreamSpec(STREAM_ID); - - assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName()); - } - - // The streamId should be used as the physicalName when the physical name is not specified. - // NOTE: its either this, set to null, or exception. This seems better for backward compatibility and API brevity. - @Test - public void testgetStreamWithoutPhysicalNameInConfig() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.SYSTEM(), TEST_SYSTEM); - - AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = runner.getStreamSpec(STREAM_ID); - - assertEquals(STREAM_ID, spec.getPhysicalName()); - } - - // If the system is specified at the stream scope, use it - @Test - public void testgetStreamWithSystemAtStreamScopeInConfig() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, - StreamConfig.SYSTEM(), TEST_SYSTEM); - - AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = runner.getStreamSpec(STREAM_ID); - - assertEquals(TEST_SYSTEM, spec.getSystemName()); - } - - // If system isn't specified at stream scope, use the default system - @Test - public void testgetStreamWithSystemAtDefaultScopeInConfig() { - Config config = addConfigs(buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME), - JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM); - - AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = runner.getStreamSpec(STREAM_ID); - - assertEquals(TEST_DEFAULT_SYSTEM, spec.getSystemName()); - } - - // Stream scope should override default scope - @Test - public void testgetStreamWithSystemAtBothScopesInConfig() { - Config config = addConfigs(buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, - StreamConfig.SYSTEM(), TEST_SYSTEM), - JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM); - - AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = runner.getStreamSpec(STREAM_ID); - - assertEquals(TEST_SYSTEM, spec.getSystemName()); - } - - // System is required. Throw if it cannot be determined. - @Test(expected = IllegalArgumentException.class) - public void testgetStreamWithOutSystemInConfig() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME); - - AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = runner.getStreamSpec(STREAM_ID); - - assertEquals(TEST_SYSTEM, spec.getSystemName()); - } - - // The properties in the config "streams.{streamId}.*" should be passed through to the spec. - @Test - public void testgetStreamPropertiesPassthrough() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, - StreamConfig.SYSTEM(), TEST_SYSTEM, - "systemProperty1", "systemValue1", - "systemProperty2", "systemValue2", - "systemProperty3", "systemValue3"); - - AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = runner.getStreamSpec(STREAM_ID); - - Map<String, String> properties = spec.getConfig(); - assertEquals(3, properties.size()); - assertEquals("systemValue1", properties.get("systemProperty1")); - assertEquals("systemValue2", properties.get("systemProperty2")); - assertEquals("systemValue3", properties.get("systemProperty3")); - assertEquals("systemValue1", spec.get("systemProperty1")); - assertEquals("systemValue2", spec.get("systemProperty2")); - assertEquals("systemValue3", spec.get("systemProperty3")); - } - - // The samza properties (which are invalid for the underlying system) should be filtered out. - @Test - public void testGetStreamSamzaPropertiesOmitted() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, - StreamConfig.SYSTEM(), TEST_SYSTEM, - "systemProperty1", "systemValue1", - "systemProperty2", "systemValue2", - "systemProperty3", "systemValue3"); - - AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = runner.getStreamSpec(STREAM_ID); - - Map<String, String> properties = spec.getConfig(); - assertEquals(3, properties.size()); - assertNull(properties.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID))); - assertNull(properties.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID))); - assertNull(spec.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID))); - assertNull(spec.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID))); - } - - @Test - public void testStreamConfigOverrides() { - final String sysStreamPrefix = String.format("systems.%s.streams.%s.", TEST_SYSTEM, TEST_PHYSICAL_NAME); - Config config = addConfigs(buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, - StreamConfig.SYSTEM(), TEST_SYSTEM, - "systemProperty1", "systemValue1", - "systemProperty2", "systemValue2", - "systemProperty3", "systemValue3"), - sysStreamPrefix + "systemProperty4", "systemValue4", - sysStreamPrefix + "systemProperty2", "systemValue8"); - - AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = env.getStreamSpec(STREAM_ID); - - Map<String, String> properties = spec.getConfig(); - assertEquals(4, properties.size()); - assertEquals("systemValue4", properties.get("systemProperty4")); - assertEquals("systemValue2", properties.get("systemProperty2")); - } - - // Verify that we use a default specified with systems.x.default.stream.*, if specified - @Test - public void testStreamConfigOverridesWithSystemDefaults() { - Config config = addConfigs(buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, - StreamConfig.SYSTEM(), TEST_SYSTEM, - "segment.bytes", "5309"), - String.format("systems.%s.default.stream.replication.factor", TEST_SYSTEM), "4", // System default property - String.format("systems.%s.default.stream.segment.bytest", TEST_SYSTEM), "867" - ); - - AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = env.getStreamSpec(STREAM_ID); - - Map<String, String> properties = spec.getConfig(); - assertEquals(3, properties.size()); - assertEquals("4", properties.get("replication.factor")); // Uses system default - assertEquals("5309", properties.get("segment.bytes")); // Overrides system default - } - - // When the physicalName argument is passed explicitly it should be used, regardless of whether it is also in the config - @Test - public void testGetStreamPhysicalNameArgSimple() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg - StreamConfig.SYSTEM(), TEST_SYSTEM); - - AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = runner.getStreamSpec(STREAM_ID); - - assertEquals(STREAM_ID, spec.getId()); - assertEquals(TEST_PHYSICAL_NAME2, spec.getPhysicalName()); - assertEquals(TEST_SYSTEM, spec.getSystemName()); - } - - // Special characters are allowed for the physical name - @Test - public void testGetStreamPhysicalNameArgSpecialCharacters() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME_SPECIAL_CHARS, - StreamConfig.SYSTEM(), TEST_SYSTEM); - - AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = runner.getStreamSpec(STREAM_ID); - assertEquals(TEST_PHYSICAL_NAME_SPECIAL_CHARS, spec.getPhysicalName()); - } - - // Null is allowed for the physical name - @Test - public void testGetStreamPhysicalNameArgNull() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), null, - StreamConfig.SYSTEM(), TEST_SYSTEM); - - AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = runner.getStreamSpec(STREAM_ID); - assertNull(spec.getPhysicalName()); - } - - // When the system name is provided explicitly, it should be used, regardless of whether it's also in the config - @Test - public void testGetStreamSystemNameArgValid() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, // This should be ignored because of the explicit arg - StreamConfig.SYSTEM(), TEST_SYSTEM); // This too - - AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); - StreamSpec spec = runner.getStreamSpec(STREAM_ID); - - assertEquals(STREAM_ID, spec.getId()); - assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName()); - assertEquals(TEST_SYSTEM, spec.getSystemName()); - } - - // Special characters are NOT allowed for system name, because it's used as an identifier in the config. - @Test(expected = IllegalArgumentException.class) - public void testGetStreamSystemNameArgInvalid() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, - StreamConfig.SYSTEM(), TEST_SYSTEM_INVALID); - - AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); - runner.getStreamSpec(STREAM_ID); - } - - // Empty strings are NOT allowed for system name, because it's used as an identifier in the config. - @Test(expected = IllegalArgumentException.class) - public void testGetStreamSystemNameArgEmpty() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, - StreamConfig.SYSTEM(), ""); - - AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); - runner.getStreamSpec(STREAM_ID); - } - - // Null is not allowed IllegalArgumentException system name. - @Test(expected = IllegalArgumentException.class) - public void testGetStreamSystemNameArgNull() { - Config config = buildStreamConfig(STREAM_ID, - StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, - StreamConfig.SYSTEM(), null); - - AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); - runner.getStreamSpec(STREAM_ID); - } - - // Special characters are NOT allowed for streamId, because it's used as an identifier in the config. - @Test(expected = IllegalArgumentException.class) - public void testGetStreamStreamIdInvalid() { - Config config = buildStreamConfig(STREAM_ID_INVALID, - StreamConfig.SYSTEM(), TEST_SYSTEM); - - AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); - runner.getStreamSpec(STREAM_ID_INVALID); - } - - // Empty strings are NOT allowed for streamId, because it's used as an identifier in the config. - @Test(expected = IllegalArgumentException.class) - public void testGetStreamStreamIdEmpty() { - Config config = buildStreamConfig("", - StreamConfig.SYSTEM(), TEST_SYSTEM); - - AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); - runner.getStreamSpec(""); - } - - // Null is not allowed for streamId. - @Test(expected = IllegalArgumentException.class) - public void testGetStreamStreamIdNull() { - Config config = buildStreamConfig(null, - StreamConfig.SYSTEM(), TEST_SYSTEM); - - AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config); - runner.getStreamSpec(null); - } - - - // Helper methods - - private Config buildStreamConfig(String streamId, String... kvs) { - // inject streams.x. into each key - for (int i = 0; i < kvs.length - 1; i += 2) { - kvs[i] = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId) + kvs[i]; - } - return buildConfig(kvs); - } - - private Config buildConfig(String... kvs) { - if (kvs.length % 2 != 0) { - throw new IllegalArgumentException("There must be parity between the keys and values"); - } - - Map<String, String> configMap = new HashMap<>(); - for (int i = 0; i < kvs.length - 1; i += 2) { - configMap.put(kvs[i], kvs[i + 1]); - } - return new MapConfig(configMap); - } - - private Config addConfigs(Config original, String... kvs) { - Map<String, String> result = new HashMap<>(); - result.putAll(original); - result.putAll(buildConfig(kvs)); - return new MapConfig(result); - } - - private class TestAbstractApplicationRunnerImpl extends AbstractApplicationRunner { - - public TestAbstractApplicationRunnerImpl(Config config) { - super(config); - } - - @Override - public void runTask() { - throw new UnsupportedOperationException("runTask is not supported in this test"); - } - - @Override - public void run(StreamApplication streamApp) { - // do nothing. We're only testing the stream creation methods at this point. - } - - @Override - public void kill(StreamApplication streamApp) { - // do nothing. We're only testing the stream creation methods at this point. - } - - @Override - public ApplicationStatus status(StreamApplication streamApp) { - // do nothing. We're only testing the stream creation methods at this point. - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java index e207772..0b91315 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java @@ -27,7 +27,6 @@ import org.apache.samza.config.Config; import org.apache.samza.config.ConfigException; import org.apache.samza.config.MapConfig; import org.apache.samza.operators.StreamGraphSpec; -import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.testUtils.TestAsyncStreamTask; import org.apache.samza.testUtils.TestStreamTask; import org.junit.Test; @@ -46,8 +45,6 @@ import static org.mockito.Mockito.mock; */ public class TestTaskFactoryUtil { - private final ApplicationRunner mockRunner = mock(ApplicationRunner.class); - @Test public void testStreamTaskClass() { Config config = new MapConfig(new HashMap<String, String>() { @@ -81,7 +78,7 @@ public class TestTaskFactoryUtil { }); StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config); assertNotNull(streamApp); - StreamGraphSpec graph = new StreamGraphSpec(mockRunner, config); + StreamGraphSpec graph = new StreamGraphSpec(config); streamApp.init(graph, config); Object retFactory = TaskFactoryUtil.createTaskFactory(graph.getOperatorSpecGraph(), null); assertTrue(retFactory instanceof StreamTaskFactory); http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/testUtils/StreamTestUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/StreamTestUtils.java b/samza-core/src/test/java/org/apache/samza/testUtils/StreamTestUtils.java new file mode 100644 index 0000000..9b1fa4d --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/testUtils/StreamTestUtils.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.testUtils; + +import java.util.Map; +import org.apache.samza.config.StreamConfig$; + +public class StreamTestUtils { + + /** + * Adds the stream.stream-id.* configurations for the provided {@code streamId} to the provided {@code configs} Map. + * + * @param configs the configs Map to add the stream configurations to + * @param streamId the id of the stream + * @param systemName the system for the stream + * @param physicalName the physical name for the stream + */ + public static void addStreamConfigs(Map<String, String> configs, + String streamId, String systemName, String physicalName) { + configs.put(String.format(StreamConfig$.MODULE$.SYSTEM_FOR_STREAM_ID(), streamId), systemName); + configs.put(String.format(StreamConfig$.MODULE$.PHYSICAL_NAME_FOR_STREAM_ID(), streamId), physicalName); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/test/java/org/apache/samza/util/TestStreamUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/util/TestStreamUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestStreamUtil.java new file mode 100644 index 0000000..ac075cd --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/util/TestStreamUtil.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.util; + +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.StreamConfig; +import org.apache.samza.system.StreamSpec; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + + +public class TestStreamUtil { + private static final String STREAM_ID = "t3st-Stream_Id"; + private static final String STREAM_ID_INVALID = "test#Str3amId!"; + + private static final String TEST_PHYSICAL_NAME = "t3st-Physical_Name"; + private static final String TEST_PHYSICAL_NAME2 = "testPhysicalName2"; + private static final String TEST_PHYSICAL_NAME_SPECIAL_CHARS = "test://Physical.Name?"; + + private static final String TEST_SYSTEM = "t3st-System_Name"; + private static final String TEST_SYSTEM_INVALID = "test:System!Name@"; + + private static final String TEST_DEFAULT_SYSTEM = "testDefaultSystemName"; + + + // The physical name should be pulled from the StreamConfig.PHYSICAL_NAME property value. + @Test + public void testGetStreamWithPhysicalNameInConfig() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, + StreamConfig.SYSTEM(), TEST_SYSTEM); + + StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config)); + + assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName()); + } + + // The streamId should be used as the physicalName when the physical name is not specified. + // NOTE: its either this, set to null, or exception. This seems better for backward compatibility and API brevity. + @Test + public void testGetStreamWithoutPhysicalNameInConfig() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.SYSTEM(), TEST_SYSTEM); + + StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config)); + + assertEquals(STREAM_ID, spec.getPhysicalName()); + } + + // If the system is specified at the stream scope, use it + @Test + public void testGetStreamWithSystemAtStreamScopeInConfig() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, + StreamConfig.SYSTEM(), TEST_SYSTEM); + + StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config)); + + assertEquals(TEST_SYSTEM, spec.getSystemName()); + } + + // If system isn't specified at stream scope, use the default system + @Test + public void testGetStreamWithSystemAtDefaultScopeInConfig() { + Config config = addConfigs(buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME), + JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM); + + StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config)); + + assertEquals(TEST_DEFAULT_SYSTEM, spec.getSystemName()); + } + + // Stream scope should override default scope + @Test + public void testGetStreamWithSystemAtBothScopesInConfig() { + Config config = addConfigs(buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, + StreamConfig.SYSTEM(), TEST_SYSTEM), + JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM); + + StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config)); + + assertEquals(TEST_SYSTEM, spec.getSystemName()); + } + + // System is required. Throw if it cannot be determined. + @Test(expected = IllegalArgumentException.class) + public void testGetStreamWithOutSystemInConfig() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME); + + StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config)); + + assertEquals(TEST_SYSTEM, spec.getSystemName()); + } + + // The properties in the config "streams.{streamId}.*" should be passed through to the spec. + @Test + public void testGetStreamPropertiesPassthrough() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, + StreamConfig.SYSTEM(), TEST_SYSTEM, + "systemProperty1", "systemValue1", + "systemProperty2", "systemValue2", + "systemProperty3", "systemValue3"); + + StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config)); + + Map<String, String> properties = spec.getConfig(); + assertEquals(3, properties.size()); + assertEquals("systemValue1", properties.get("systemProperty1")); + assertEquals("systemValue2", properties.get("systemProperty2")); + assertEquals("systemValue3", properties.get("systemProperty3")); + assertEquals("systemValue1", spec.get("systemProperty1")); + assertEquals("systemValue2", spec.get("systemProperty2")); + assertEquals("systemValue3", spec.get("systemProperty3")); + } + + // The samza properties (which are invalid for the underlying system) should be filtered out. + @Test + public void testGetStreamSamzaPropertiesOmitted() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, + StreamConfig.SYSTEM(), TEST_SYSTEM, + "systemProperty1", "systemValue1", + "systemProperty2", "systemValue2", + "systemProperty3", "systemValue3"); + + StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config)); + + Map<String, String> properties = spec.getConfig(); + assertEquals(3, properties.size()); + assertNull(properties.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID))); + assertNull(properties.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID))); + assertNull(spec.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID))); + assertNull(spec.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID))); + } + + @Test + public void testStreamConfigOverrides() { + final String sysStreamPrefix = String.format("systems.%s.streams.%s.", TEST_SYSTEM, TEST_PHYSICAL_NAME); + Config config = addConfigs(buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, + StreamConfig.SYSTEM(), TEST_SYSTEM, + "systemProperty1", "systemValue1", + "systemProperty2", "systemValue2", + "systemProperty3", "systemValue3"), + sysStreamPrefix + "systemProperty4", "systemValue4", + sysStreamPrefix + "systemProperty2", "systemValue8"); + + StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config)); + + Map<String, String> properties = spec.getConfig(); + assertEquals(4, properties.size()); + assertEquals("systemValue4", properties.get("systemProperty4")); + assertEquals("systemValue2", properties.get("systemProperty2")); + } + + // Verify that we use a default specified with systems.x.default.stream.*, if specified + @Test + public void testStreamConfigOverridesWithSystemDefaults() { + Config config = addConfigs(buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, + StreamConfig.SYSTEM(), TEST_SYSTEM, + "segment.bytes", "5309"), + String.format("systems.%s.default.stream.replication.factor", TEST_SYSTEM), "4", // System default property + String.format("systems.%s.default.stream.segment.bytest", TEST_SYSTEM), "867" + ); + + StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config)); + + Map<String, String> properties = spec.getConfig(); + assertEquals(3, properties.size()); + assertEquals("4", properties.get("replication.factor")); // Uses system default + assertEquals("5309", properties.get("segment.bytes")); // Overrides system default + } + + // When the physicalName argument is passed explicitly it should be used, regardless of whether it is also in the config + @Test + public void testGetStreamPhysicalNameArgSimple() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg + StreamConfig.SYSTEM(), TEST_SYSTEM); + + StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config)); + + assertEquals(STREAM_ID, spec.getId()); + assertEquals(TEST_PHYSICAL_NAME2, spec.getPhysicalName()); + assertEquals(TEST_SYSTEM, spec.getSystemName()); + } + + // Special characters are allowed for the physical name + @Test + public void testGetStreamPhysicalNameArgSpecialCharacters() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME_SPECIAL_CHARS, + StreamConfig.SYSTEM(), TEST_SYSTEM); + + StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config)); + assertEquals(TEST_PHYSICAL_NAME_SPECIAL_CHARS, spec.getPhysicalName()); + } + + // Null is allowed for the physical name + @Test + public void testGetStreamPhysicalNameArgNull() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), null, + StreamConfig.SYSTEM(), TEST_SYSTEM); + + StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config)); + assertNull(spec.getPhysicalName()); + } + + // When the system name is provided explicitly, it should be used, regardless of whether it's also in the config + @Test + public void testGetStreamSystemNameArgValid() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, // This should be ignored because of the explicit arg + StreamConfig.SYSTEM(), TEST_SYSTEM); // This too + + StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config)); + + assertEquals(STREAM_ID, spec.getId()); + assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName()); + assertEquals(TEST_SYSTEM, spec.getSystemName()); + } + + // Special characters are NOT allowed for system name, because it's used as an identifier in the config. + @Test(expected = IllegalArgumentException.class) + public void testGetStreamSystemNameArgInvalid() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, + StreamConfig.SYSTEM(), TEST_SYSTEM_INVALID); + + StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config)); + } + + // Empty strings are NOT allowed for system name, because it's used as an identifier in the config. + @Test(expected = IllegalArgumentException.class) + public void testGetStreamSystemNameArgEmpty() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, + StreamConfig.SYSTEM(), ""); + + StreamSpec spec = StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config)); + } + + // Null is not allowed IllegalArgumentException system name. + @Test(expected = IllegalArgumentException.class) + public void testGetStreamSystemNameArgNull() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, + StreamConfig.SYSTEM(), null); + + StreamUtil.getStreamSpec(STREAM_ID, new StreamConfig(config)); + } + + // Special characters are NOT allowed for streamId, because it's used as an identifier in the config. + @Test(expected = IllegalArgumentException.class) + public void testGetStreamStreamIdInvalid() { + Config config = buildStreamConfig(STREAM_ID_INVALID, + StreamConfig.SYSTEM(), TEST_SYSTEM); + + StreamUtil.getStreamSpec(STREAM_ID_INVALID, new StreamConfig(config)); + } + + // Empty strings are NOT allowed for streamId, because it's used as an identifier in the config. + @Test(expected = IllegalArgumentException.class) + public void testGetStreamStreamIdEmpty() { + Config config = buildStreamConfig("", + StreamConfig.SYSTEM(), TEST_SYSTEM); + + StreamUtil.getStreamSpec("", new StreamConfig(config)); + } + + // Null is not allowed for streamId. + @Test(expected = IllegalArgumentException.class) + public void testGetStreamStreamIdNull() { + Config config = buildStreamConfig(null, + StreamConfig.SYSTEM(), TEST_SYSTEM); + + StreamUtil.getStreamSpec(null, new StreamConfig(config)); + } + + + // Helper methods + + private Config buildStreamConfig(String streamId, String... kvs) { + // inject streams.x. into each key + for (int i = 0; i < kvs.length - 1; i += 2) { + kvs[i] = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId) + kvs[i]; + } + return buildConfig(kvs); + } + + private Config buildConfig(String... kvs) { + if (kvs.length % 2 != 0) { + throw new IllegalArgumentException("There must be parity between the keys and values"); + } + + Map<String, String> configMap = new HashMap<>(); + for (int i = 0; i < kvs.length - 1; i += 2) { + configMap.put(kvs[i], kvs[i + 1]); + } + return new MapConfig(configMap); + } + + private Config addConfigs(Config original, String... kvs) { + Map<String, String> result = new HashMap<>(); + result.putAll(original); + result.putAll(buildConfig(kvs)); + return new MapConfig(result); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java index 217248d..113dced 100644 --- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java @@ -110,7 +110,6 @@ public class KafkaStreamSpec extends StreamSpec { originalSpec.getSystemName(), originalSpec.getPartitionCount(), replicationFactor, - originalSpec.isBroadcast(), mapToProperties(filterUnsupportedProperties(originalSpec.getConfig()))); } @@ -125,7 +124,7 @@ public class KafkaStreamSpec extends StreamSpec { * @param partitionCount The number of partitions. */ public KafkaStreamSpec(String id, String topicName, String systemName, int partitionCount) { - this(id, topicName, systemName, partitionCount, DEFAULT_REPLICATION_FACTOR, false, new Properties()); + this(id, topicName, systemName, partitionCount, DEFAULT_REPLICATION_FACTOR, new Properties()); } /** @@ -146,13 +145,11 @@ public class KafkaStreamSpec extends StreamSpec { * * @param replicationFactor The number of topic replicas in the Kafka cluster for durability. * - * @param isBroadcast The stream is broadcast or not. - * * @param properties A set of properties for the stream. These may be System-specfic. */ public KafkaStreamSpec(String id, String topicName, String systemName, int partitionCount, int replicationFactor, - Boolean isBroadcast, Properties properties) { - super(id, topicName, systemName, partitionCount, false, isBroadcast, propertiesToMap(properties)); + Properties properties) { + super(id, topicName, systemName, partitionCount, propertiesToMap(properties)); if (partitionCount < 1) { throw new IllegalArgumentException("Parameter 'partitionCount' must be > 0"); @@ -168,12 +165,12 @@ public class KafkaStreamSpec extends StreamSpec { @Override public StreamSpec copyWithPartitionCount(int partitionCount) { return new KafkaStreamSpec(getId(), getPhysicalName(), getSystemName(), partitionCount, getReplicationFactor(), - isBroadcast(), getProperties()); + getProperties()); } public KafkaStreamSpec copyWithReplicationFactor(int replicationFactor) { return new KafkaStreamSpec(getId(), getPhysicalName(), getSystemName(), getPartitionCount(), replicationFactor, - isBroadcast(), getProperties()); + getProperties()); } /** @@ -183,7 +180,7 @@ public class KafkaStreamSpec extends StreamSpec { */ public KafkaStreamSpec copyWithProperties(Properties properties) { return new KafkaStreamSpec(getId(), getPhysicalName(), getSystemName(), getPartitionCount(), getReplicationFactor(), - isBroadcast(), properties); + properties); } public int getReplicationFactor() { http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 07f4710..26664ea 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -32,7 +32,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.samza.SamzaException import org.apache.samza.config.ApplicationConfig.ApplicationMode import org.apache.samza.config.SystemConfig.Config2System -import org.apache.samza.util.{Logging, Util} +import org.apache.samza.util.{Logging, StreamUtil} import scala.collection.JavaConverters._ @@ -237,7 +237,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { val storeName = if (matcher.find()) matcher.group(1) else throw new SamzaException("Unable to find store name in the changelog configuration: " + changelogConfig + " with SystemStream: " + cn) storageConfig.getChangelogStream(storeName).foreach(changelogName => { - val systemStream = Util.getSystemStreamFromNames(changelogName) + val systemStream = StreamUtil.getSystemStreamFromNames(changelogName) val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem)) storeToChangelog += storeName -> systemStream.getStream }) http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala index 6dc2f82..ce4544b 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala @@ -21,11 +21,11 @@ package org.apache.samza.config import org.I0Itec.zkclient.ZkClient import kafka.utils.ZkUtils -import org.apache.samza.config.KafkaConfig.{ Config2Kafka, REGEX_RESOLVED_STREAMS } +import org.apache.samza.config.KafkaConfig.{Config2Kafka, REGEX_RESOLVED_STREAMS} import org.apache.samza.SamzaException -import org.apache.samza.util.Util +import org.apache.samza.util.{Logging, StreamUtil} + import collection.JavaConverters._ -import org.apache.samza.util.Logging import scala.collection._ import org.apache.samza.config.TaskConfig.Config2Task import org.apache.samza.system.SystemStream @@ -87,7 +87,7 @@ class RegExTopicGenerator extends ConfigRewriter with Logging { info("Generated config values for %d new topics" format newInputStreams.size) val inputStreams = TaskConfig.INPUT_STREAMS -> (existingInputStreams ++ newInputStreams) - .map(Util.getNameFromSystemStream) + .map(StreamUtil.getNameFromSystemStream) .toArray .sortWith(_ < _) .mkString(",") http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index a63db03..6ab4d32 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -486,10 +486,10 @@ class KafkaSystemAdmin( val topicName = spec.getPhysicalName val topicMeta = topicMetaInformation.getOrElse(topicName, throw new StreamValidationException("Unable to find topic information for topic " + topicName)) new KafkaStreamSpec(spec.getId, topicName, systemName, spec.getPartitionCount, topicMeta.replicationFactor, - spec.isBroadcast, topicMeta.kafkaProps) + topicMeta.kafkaProps) } else if (spec.isCoordinatorStream){ new KafkaStreamSpec(spec.getId, spec.getPhysicalName, systemName, 1, coordinatorStreamReplicationFactor, - spec.isBroadcast, coordinatorStreamProperties) + coordinatorStreamProperties) } else if (intermediateStreamProperties.contains(spec.getId)) { KafkaStreamSpec.fromSpec(spec).copyWithProperties(intermediateStreamProperties(spec.getId)) } else { http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java index c00ed2d..14d2fe6 100644 --- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java @@ -21,20 +21,20 @@ package org.apache.samza.system.kafka; import com.google.common.collect.ImmutableMap; import java.util.Map; import java.util.Properties; -import org.apache.samza.runtime.TestAbstractApplicationRunner; import org.apache.samza.system.StreamSpec; +import org.apache.samza.util.TestStreamUtil; import org.junit.Test; import static org.junit.Assert.*; /** - * See also the general StreamSpec tests in {@link TestAbstractApplicationRunner} + * See also the general StreamSpec tests in {@link TestStreamUtil} */ public class TestKafkaStreamSpec { @Test public void testUnsupportedConfigStrippedFromProperties() { - StreamSpec original = new StreamSpec("dummyId","dummyPhysicalName", "dummySystemName", false, ImmutableMap.of("segment.bytes", "4", "replication.factor", "7")); + StreamSpec original = new StreamSpec("dummyId","dummyPhysicalName", "dummySystemName", ImmutableMap.of("segment.bytes", "4", "replication.factor", "7")); // First verify the original assertEquals("7", original.get("replication.factor"));
