http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java index 779d299..61289af 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java @@ -24,12 +24,18 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.samza.Partition; import org.apache.samza.SamzaException; +import org.apache.samza.application.ApplicationDescriptor; +import org.apache.samza.application.LegacyTaskApplication; +import org.apache.samza.application.SamzaApplication; import org.apache.samza.application.StreamApplicationDescriptorImpl; +import org.apache.samza.application.TaskApplicationDescriptorImpl; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; @@ -37,9 +43,13 @@ import org.apache.samza.config.TaskConfig; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.TableDescriptor; import org.apache.samza.operators.descriptors.GenericInputDescriptor; import org.apache.samza.operators.descriptors.GenericOutputDescriptor; import org.apache.samza.operators.descriptors.GenericSystemDescriptor; +import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; +import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.windows.Windows; import org.apache.samza.serializers.KVSerde; @@ -54,8 +64,12 @@ import org.apache.samza.testUtils.StreamTestUtils; import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestExecutionPlanner { @@ -63,6 +77,11 @@ public class TestExecutionPlanner { private static final String DEFAULT_SYSTEM = "test-system"; private static final int DEFAULT_PARTITIONS = 10; + private final Set<SystemDescriptor> systemDescriptors = new HashSet<>(); + private final Map<String, InputDescriptor> inputDescriptors = new HashMap<>(); + private final Map<String, OutputDescriptor> outputDescriptors = new HashMap<>(); + private final Set<TableDescriptor> tableDescriptors = new HashSet<>(); + private SystemAdmins systemAdmins; private StreamManager streamManager; private Config config; @@ -78,6 +97,8 @@ public class TestExecutionPlanner { private GenericOutputDescriptor<KV<Object, Object>> output1Descriptor; private StreamSpec output2Spec; private GenericOutputDescriptor<KV<Object, Object>> output2Descriptor; + private GenericSystemDescriptor system1Descriptor; + private GenericSystemDescriptor system2Descriptor; static SystemAdmin createSystemAdmin(Map<String, Integer> streamToPartitions) { @@ -236,20 +257,35 @@ public class TestExecutionPlanner { KVSerde<Object, Object> kvSerde = new KVSerde<>(new NoOpSerde(), new NoOpSerde()); String mockSystemFactoryClass = "factory.class.name"; - GenericSystemDescriptor system1 = new GenericSystemDescriptor("system1", mockSystemFactoryClass); - GenericSystemDescriptor system2 = new GenericSystemDescriptor("system2", mockSystemFactoryClass); - input1Descriptor = system1.getInputDescriptor("input1", kvSerde); - input2Descriptor = system2.getInputDescriptor("input2", kvSerde); - input3Descriptor = system2.getInputDescriptor("input3", kvSerde); - input4Descriptor = system1.getInputDescriptor("input4", kvSerde); - output1Descriptor = system1.getOutputDescriptor("output1", kvSerde); - output2Descriptor = system2.getOutputDescriptor("output2", kvSerde); + system1Descriptor = new GenericSystemDescriptor("system1", mockSystemFactoryClass); + system2Descriptor = new GenericSystemDescriptor("system2", mockSystemFactoryClass); + input1Descriptor = system1Descriptor.getInputDescriptor("input1", kvSerde); + input2Descriptor = system2Descriptor.getInputDescriptor("input2", kvSerde); + input3Descriptor = system2Descriptor.getInputDescriptor("input3", kvSerde); + input4Descriptor = system1Descriptor.getInputDescriptor("input4", kvSerde); + output1Descriptor = system1Descriptor.getOutputDescriptor("output1", kvSerde); + output2Descriptor = system2Descriptor.getOutputDescriptor("output2", kvSerde); + + // clean and set up sets and maps of descriptors + systemDescriptors.clear(); + inputDescriptors.clear(); + outputDescriptors.clear(); + tableDescriptors.clear(); + systemDescriptors.add(system1Descriptor); + systemDescriptors.add(system2Descriptor); + inputDescriptors.put(input1Descriptor.getStreamId(), input1Descriptor); + inputDescriptors.put(input2Descriptor.getStreamId(), input2Descriptor); + inputDescriptors.put(input3Descriptor.getStreamId(), input3Descriptor); + inputDescriptors.put(input4Descriptor.getStreamId(), input4Descriptor); + outputDescriptors.put(output1Descriptor.getStreamId(), output1Descriptor); + outputDescriptors.put(output2Descriptor.getStreamId(), output2Descriptor); + // set up external partition count Map<String, Integer> system1Map = new HashMap<>(); system1Map.put("input1", 64); system1Map.put("output1", 8); - system1Map.put("input4", ExecutionPlanner.MAX_INFERRED_PARTITIONS * 2); + system1Map.put("input4", IntermediateStreamManager.MAX_INFERRED_PARTITIONS * 2); Map<String, Integer> system2Map = new HashMap<>(); system2Map.put("input2", 16); system2Map.put("input3", 32); @@ -268,7 +304,7 @@ public class TestExecutionPlanner { ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin(); - JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph()); + JobGraph jobGraph = planner.createJobGraph(graphSpec.getConfig(), graphSpec); assertTrue(jobGraph.getInputStreams().size() == 3); assertTrue(jobGraph.getOutputStreams().size() == 2); assertTrue(jobGraph.getIntermediateStreams().size() == 2); // two streams generated by partitionBy @@ -278,9 +314,9 @@ public class TestExecutionPlanner { public void testFetchExistingStreamPartitions() { ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin(); - JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph()); + JobGraph jobGraph = planner.createJobGraph(graphSpec.getConfig(), graphSpec); - planner.fetchInputAndOutputStreamPartitions(jobGraph); + ExecutionPlanner.setInputAndOutputStreamPartitionCount(jobGraph, streamManager); assertTrue(jobGraph.getOrCreateStreamEdge(input1Spec).getPartitionCount() == 64); assertTrue(jobGraph.getOrCreateStreamEdge(input2Spec).getPartitionCount() == 16); assertTrue(jobGraph.getOrCreateStreamEdge(input3Spec).getPartitionCount() == 32); @@ -296,7 +332,10 @@ public class TestExecutionPlanner { public void testCalculateJoinInputPartitions() { ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin(); - JobGraph jobGraph = (JobGraph) planner.plan(graphSpec.getOperatorSpecGraph()); + JobGraph jobGraph = planner.createJobGraph(graphSpec.getConfig(), graphSpec); + + ExecutionPlanner.setInputAndOutputStreamPartitionCount(jobGraph, streamManager); + new IntermediateStreamManager(config, graphSpec).calculatePartitions(jobGraph); // the partitions should be the same as input1 jobGraph.getIntermediateStreams().forEach(edge -> { @@ -309,7 +348,7 @@ public class TestExecutionPlanner { ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithInvalidJoin(); - planner.plan(graphSpec.getOperatorSpecGraph()); + planner.plan(graphSpec); } @Test @@ -320,7 +359,7 @@ public class TestExecutionPlanner { ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager); StreamApplicationDescriptorImpl graphSpec = createSimpleGraph(); - JobGraph jobGraph = (JobGraph) planner.plan(graphSpec.getOperatorSpecGraph()); + JobGraph jobGraph = (JobGraph) planner.plan(graphSpec); // the partitions should be the same as input1 jobGraph.getIntermediateStreams().forEach(edge -> { @@ -336,7 +375,7 @@ public class TestExecutionPlanner { ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager); StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin(); - ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph()); + ExecutionPlan plan = planner.plan(graphSpec); List<JobConfig> jobConfigs = plan.getJobConfigs(); for (JobConfig config : jobConfigs) { System.out.println(config); @@ -351,7 +390,7 @@ public class TestExecutionPlanner { ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager); StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoinAndWindow(); - ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph()); + ExecutionPlan plan = planner.plan(graphSpec); List<JobConfig> jobConfigs = plan.getJobConfigs(); assertEquals(1, jobConfigs.size()); @@ -368,7 +407,7 @@ public class TestExecutionPlanner { ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager); StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoinAndWindow(); - ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph()); + ExecutionPlan plan = planner.plan(graphSpec); List<JobConfig> jobConfigs = plan.getJobConfigs(); assertEquals(1, jobConfigs.size()); @@ -384,7 +423,7 @@ public class TestExecutionPlanner { ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager); StreamApplicationDescriptorImpl graphSpec = createSimpleGraph(); - ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph()); + ExecutionPlan plan = planner.plan(graphSpec); List<JobConfig> jobConfigs = plan.getJobConfigs(); assertEquals(1, jobConfigs.size()); assertFalse(jobConfigs.get(0).containsKey(TaskConfig.WINDOW_MS())); @@ -399,7 +438,7 @@ public class TestExecutionPlanner { ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager); StreamApplicationDescriptorImpl graphSpec = createSimpleGraph(); - ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph()); + ExecutionPlan plan = planner.plan(graphSpec); List<JobConfig> jobConfigs = plan.getJobConfigs(); assertEquals(1, jobConfigs.size()); assertEquals("2000", jobConfigs.get(0).get(TaskConfig.WINDOW_MS())); @@ -409,7 +448,7 @@ public class TestExecutionPlanner { public void testCalculateIntStreamPartitions() { ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); StreamApplicationDescriptorImpl graphSpec = createSimpleGraph(); - JobGraph jobGraph = (JobGraph) planner.plan(graphSpec.getOperatorSpecGraph()); + JobGraph jobGraph = (JobGraph) planner.plan(graphSpec); // the partitions should be the same as input1 jobGraph.getIntermediateStreams().forEach(edge -> { @@ -430,15 +469,15 @@ public class TestExecutionPlanner { edge.setPartitionCount(16); edges.add(edge); - assertEquals(32, ExecutionPlanner.maxPartitions(edges)); + assertEquals(32, IntermediateStreamManager.maxPartitions(edges)); edges = Collections.emptyList(); - assertEquals(StreamEdge.PARTITIONS_UNKNOWN, ExecutionPlanner.maxPartitions(edges)); + assertEquals(StreamEdge.PARTITIONS_UNKNOWN, IntermediateStreamManager.maxPartitions(edges)); } @Test public void testMaxPartitionLimit() throws Exception { - int partitionLimit = ExecutionPlanner.MAX_INFERRED_PARTITIONS; + int partitionLimit = IntermediateStreamManager.MAX_INFERRED_PARTITIONS; ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> { @@ -447,11 +486,99 @@ public class TestExecutionPlanner { input1.partitionBy(m -> m.key, m -> m.value, "p1").map(kv -> kv).sendTo(output1); }, config); - JobGraph jobGraph = (JobGraph) planner.plan(graphSpec.getOperatorSpecGraph()); + JobGraph jobGraph = (JobGraph) planner.plan(graphSpec); // the partitions should be the same as input1 jobGraph.getIntermediateStreams().forEach(edge -> { assertEquals(partitionLimit, edge.getPartitionCount()); // max of input1 and output1 }); } + + @Test + public void testCreateJobGraphForTaskApplication() { + TaskApplicationDescriptorImpl taskAppDesc = mock(TaskApplicationDescriptorImpl.class); + // add interemediate streams + String intermediateStream1 = "intermediate-stream1"; + String intermediateBroadcast = "intermediate-broadcast1"; + // intermediate stream1, not broadcast + GenericInputDescriptor<KV<Object, Object>> intermediateInput1 = system1Descriptor.getInputDescriptor( + intermediateStream1, new KVSerde<>(new NoOpSerde(), new NoOpSerde())); + GenericOutputDescriptor<KV<Object, Object>> intermediateOutput1 = system1Descriptor.getOutputDescriptor( + intermediateStream1, new KVSerde<>(new NoOpSerde(), new NoOpSerde())); + // intermediate stream2, broadcast + GenericInputDescriptor<KV<Object, Object>> intermediateBroacastInput1 = system1Descriptor.getInputDescriptor( + intermediateBroadcast, new KVSerde<>(new NoOpSerde<>(), new NoOpSerde<>())); + GenericOutputDescriptor<KV<Object, Object>> intermediateBroacastOutput1 = system1Descriptor.getOutputDescriptor( + intermediateBroadcast, new KVSerde<>(new NoOpSerde<>(), new NoOpSerde<>())); + inputDescriptors.put(intermediateStream1, intermediateInput1); + outputDescriptors.put(intermediateStream1, intermediateOutput1); + inputDescriptors.put(intermediateBroadcast, intermediateBroacastInput1); + outputDescriptors.put(intermediateBroadcast, intermediateBroacastOutput1); + Set<String> broadcastStreams = new HashSet<>(); + broadcastStreams.add(intermediateBroadcast); + + when(taskAppDesc.getInputDescriptors()).thenReturn(inputDescriptors); + when(taskAppDesc.getInputStreamIds()).thenReturn(inputDescriptors.keySet()); + when(taskAppDesc.getOutputDescriptors()).thenReturn(outputDescriptors); + when(taskAppDesc.getOutputStreamIds()).thenReturn(outputDescriptors.keySet()); + when(taskAppDesc.getTableDescriptors()).thenReturn(Collections.emptySet()); + when(taskAppDesc.getSystemDescriptors()).thenReturn(systemDescriptors); + when(taskAppDesc.getBroadcastStreams()).thenReturn(broadcastStreams); + doReturn(MockTaskApplication.class).when(taskAppDesc).getAppClass(); + + Map<String, String> systemStreamConfigs = new HashMap<>(); + inputDescriptors.forEach((key, value) -> systemStreamConfigs.putAll(value.toConfig())); + outputDescriptors.forEach((key, value) -> systemStreamConfigs.putAll(value.toConfig())); + systemDescriptors.forEach(sd -> systemStreamConfigs.putAll(sd.toConfig())); + + ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); + JobGraph jobGraph = planner.createJobGraph(config, taskAppDesc); + assertEquals(1, jobGraph.getJobNodes().size()); + assertTrue(jobGraph.getInputStreams().stream().map(edge -> edge.getName()) + .filter(streamId -> inputDescriptors.containsKey(streamId)).collect(Collectors.toList()).isEmpty()); + Set<String> intermediateStreams = new HashSet<>(inputDescriptors.keySet()); + jobGraph.getInputStreams().forEach(edge -> { + if (intermediateStreams.contains(edge.getStreamSpec().getId())) { + intermediateStreams.remove(edge.getStreamSpec().getId()); + } + }); + assertEquals(new HashSet<String>() { { this.add(intermediateStream1); this.add(intermediateBroadcast); } }.toArray(), + intermediateStreams.toArray()); + } + + @Test + public void testCreateJobGraphForLegacyTaskApplication() { + TaskApplicationDescriptorImpl taskAppDesc = mock(TaskApplicationDescriptorImpl.class); + + when(taskAppDesc.getInputDescriptors()).thenReturn(new HashMap<>()); + when(taskAppDesc.getOutputDescriptors()).thenReturn(new HashMap<>()); + when(taskAppDesc.getTableDescriptors()).thenReturn(new HashSet<>()); + when(taskAppDesc.getSystemDescriptors()).thenReturn(new HashSet<>()); + when(taskAppDesc.getBroadcastStreams()).thenReturn(new HashSet<>()); + doReturn(LegacyTaskApplication.class).when(taskAppDesc).getAppClass(); + + Map<String, String> systemStreamConfigs = new HashMap<>(); + inputDescriptors.forEach((key, value) -> systemStreamConfigs.putAll(value.toConfig())); + outputDescriptors.forEach((key, value) -> systemStreamConfigs.putAll(value.toConfig())); + systemDescriptors.forEach(sd -> systemStreamConfigs.putAll(sd.toConfig())); + + ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); + JobGraph jobGraph = planner.createJobGraph(config, taskAppDesc); + assertEquals(1, jobGraph.getJobNodes().size()); + JobNode jobNode = jobGraph.getJobNodes().get(0); + assertEquals("test-app", jobNode.getJobName()); + assertEquals("test-app-1", jobNode.getJobNameAndId()); + assertEquals(0, jobNode.getInEdges().size()); + assertEquals(0, jobNode.getOutEdges().size()); + assertEquals(0, jobNode.getTables().size()); + assertEquals(config, jobNode.getConfig()); + } + + public static class MockTaskApplication implements SamzaApplication { + + @Override + public void describe(ApplicationDescriptor appDesc) { + + } + } }
http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/execution/TestIntermediateStreamManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestIntermediateStreamManager.java b/samza-core/src/test/java/org/apache/samza/execution/TestIntermediateStreamManager.java new file mode 100644 index 0000000..bc15709 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/execution/TestIntermediateStreamManager.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.execution; + +import org.apache.samza.application.StreamApplicationDescriptorImpl; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +/** + * Unit tests for {@link IntermediateStreamManager} + */ +public class TestIntermediateStreamManager extends ExecutionPlannerTestBase { + + @Test + public void testCalculateRepartitionJoinTopicPartitions() { + mockStreamAppDesc = new StreamApplicationDescriptorImpl(getRepartitionJoinStreamApplication(), mockConfig); + IntermediateStreamManager partitionPlanner = new IntermediateStreamManager(mockConfig, mockStreamAppDesc); + JobGraph mockGraph = new ExecutionPlanner(mockConfig, mock(StreamManager.class)) + .createJobGraph(mockConfig, mockStreamAppDesc); + // set the input stream partitions + mockGraph.getInputStreams().forEach(inEdge -> { + if (inEdge.getStreamSpec().getId().equals(input1Descriptor.getStreamId())) { + inEdge.setPartitionCount(6); + } else if (inEdge.getStreamSpec().getId().equals(input2Descriptor.getStreamId())) { + inEdge.setPartitionCount(5); + } + }); + partitionPlanner.calculatePartitions(mockGraph); + assertEquals(1, mockGraph.getIntermediateStreamEdges().size()); + assertEquals(5, mockGraph.getIntermediateStreamEdges().stream() + .filter(inEdge -> inEdge.getStreamSpec().getId().equals(intermediateInputDescriptor.getStreamId())) + .findFirst().get().getPartitionCount()); + } + + @Test + public void testCalculateRepartitionIntermediateTopicPartitions() { + mockStreamAppDesc = new StreamApplicationDescriptorImpl(getRepartitionOnlyStreamApplication(), mockConfig); + IntermediateStreamManager partitionPlanner = new IntermediateStreamManager(mockConfig, mockStreamAppDesc); + JobGraph mockGraph = new ExecutionPlanner(mockConfig, mock(StreamManager.class)) + .createJobGraph(mockConfig, mockStreamAppDesc); + // set the input stream partitions + mockGraph.getInputStreams().forEach(inEdge -> inEdge.setPartitionCount(7)); + partitionPlanner.calculatePartitions(mockGraph); + assertEquals(1, mockGraph.getIntermediateStreamEdges().size()); + assertEquals(7, mockGraph.getIntermediateStreamEdges().stream() + .filter(inEdge -> inEdge.getStreamSpec().getId().equals(intermediateInputDescriptor.getStreamId())) + .findFirst().get().getPartitionCount()); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java index ed35d67..4de0485 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java @@ -19,12 +19,11 @@ package org.apache.samza.execution; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.samza.operators.OperatorSpecGraph; +import org.apache.samza.application.StreamApplicationDescriptorImpl; import org.apache.samza.system.StreamSpec; import org.junit.Before; import org.junit.Test; @@ -32,7 +31,6 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class TestJobGraph { @@ -61,9 +59,8 @@ public class TestJobGraph { * 2 9 10 */ private void createGraph1() { - OperatorSpecGraph specGraph = mock(OperatorSpecGraph.class); - when(specGraph.getBroadcastStreams()).thenReturn(Collections.emptySet()); - graph1 = new JobGraph(null, specGraph); + StreamApplicationDescriptorImpl appDesc = mock(StreamApplicationDescriptorImpl.class); + graph1 = new JobGraph(null, appDesc); JobNode n2 = graph1.getOrCreateJobNode("2", "1"); JobNode n3 = graph1.getOrCreateJobNode("3", "1"); @@ -96,9 +93,8 @@ public class TestJobGraph { * |<---6 <--| <> */ private void createGraph2() { - OperatorSpecGraph specGraph = mock(OperatorSpecGraph.class); - when(specGraph.getBroadcastStreams()).thenReturn(Collections.emptySet()); - graph2 = new JobGraph(null, specGraph); + StreamApplicationDescriptorImpl appDesc = mock(StreamApplicationDescriptorImpl.class); + graph2 = new JobGraph(null, appDesc); JobNode n1 = graph2.getOrCreateJobNode("1", "1"); JobNode n2 = graph2.getOrCreateJobNode("2", "1"); @@ -125,9 +121,8 @@ public class TestJobGraph { * 1<->1 -> 2<->2 */ private void createGraph3() { - OperatorSpecGraph specGraph = mock(OperatorSpecGraph.class); - when(specGraph.getBroadcastStreams()).thenReturn(Collections.emptySet()); - graph3 = new JobGraph(null, specGraph); + StreamApplicationDescriptorImpl appDesc = mock(StreamApplicationDescriptorImpl.class); + graph3 = new JobGraph(null, appDesc); JobNode n1 = graph3.getOrCreateJobNode("1", "1"); JobNode n2 = graph3.getOrCreateJobNode("2", "1"); @@ -143,9 +138,8 @@ public class TestJobGraph { * 1<->1 */ private void createGraph4() { - OperatorSpecGraph specGraph = mock(OperatorSpecGraph.class); - when(specGraph.getBroadcastStreams()).thenReturn(Collections.emptySet()); - graph4 = new JobGraph(null, specGraph); + StreamApplicationDescriptorImpl appDesc = mock(StreamApplicationDescriptorImpl.class); + graph4 = new JobGraph(null, appDesc); JobNode n1 = graph4.getOrCreateJobNode("1", "1"); @@ -163,9 +157,8 @@ public class TestJobGraph { @Test public void testAddSource() { - OperatorSpecGraph specGraph = mock(OperatorSpecGraph.class); - when(specGraph.getBroadcastStreams()).thenReturn(Collections.emptySet()); - JobGraph graph = new JobGraph(null, specGraph); + StreamApplicationDescriptorImpl appDesc = mock(StreamApplicationDescriptorImpl.class); + JobGraph graph = new JobGraph(null, appDesc); /** * s1 -> 1 @@ -206,9 +199,8 @@ public class TestJobGraph { * 2 -> s2 * 2 -> s3 */ - OperatorSpecGraph specGraph = mock(OperatorSpecGraph.class); - when(specGraph.getBroadcastStreams()).thenReturn(Collections.emptySet()); - JobGraph graph = new JobGraph(null, specGraph); + StreamApplicationDescriptorImpl appDesc = mock(StreamApplicationDescriptorImpl.class); + JobGraph graph = new JobGraph(null, appDesc); JobNode n1 = graph.getOrCreateJobNode("1", "1"); JobNode n2 = graph.getOrCreateJobNode("2", "1"); StreamSpec s1 = genStream(); http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java index ae6e25e..c207118 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java @@ -20,9 +20,14 @@ package org.apache.samza.execution; import java.time.Duration; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.samza.application.StreamApplicationDescriptorImpl; +import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; @@ -40,10 +45,13 @@ import org.apache.samza.serializers.LongSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemAdmins; import org.apache.samza.testUtils.StreamTestUtils; import org.codehaus.jackson.map.ObjectMapper; +import org.hamcrest.Matchers; +import org.junit.Before; import org.junit.Test; import static org.apache.samza.execution.TestExecutionPlanner.*; @@ -51,16 +59,68 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.*; +/** + * Unit test for {@link JobGraphJsonGenerator} + */ public class TestJobGraphJsonGenerator { + private Config mockConfig; + private JobNode mockJobNode; + private StreamSpec input1Spec; + private StreamSpec input2Spec; + private StreamSpec outputSpec; + private StreamSpec repartitionSpec; + private KVSerde<String, Object> defaultSerde; + private GenericSystemDescriptor inputSystemDescriptor; + private GenericSystemDescriptor outputSystemDescriptor; + private GenericSystemDescriptor intermediateSystemDescriptor; + private GenericInputDescriptor<KV<String, Object>> input1Descriptor; + private GenericInputDescriptor<KV<String, Object>> input2Descriptor; + private GenericOutputDescriptor<KV<String, Object>> outputDescriptor; - public class PageViewEvent { - String getCountry() { - return ""; - } + @Before + public void setUp() { + input1Spec = new StreamSpec("input1", "input1", "input-system"); + input2Spec = new StreamSpec("input2", "input2", "input-system"); + outputSpec = new StreamSpec("output", "output", "output-system"); + repartitionSpec = + new StreamSpec("jobName-jobId-partition_by-p1", "partition_by-p1", "intermediate-system"); + + + defaultSerde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>()); + inputSystemDescriptor = new GenericSystemDescriptor("input-system", "mockSystemFactoryClassName"); + outputSystemDescriptor = new GenericSystemDescriptor("output-system", "mockSystemFactoryClassName"); + intermediateSystemDescriptor = new GenericSystemDescriptor("intermediate-system", "mockSystemFactoryClassName"); + input1Descriptor = inputSystemDescriptor.getInputDescriptor("input1", defaultSerde); + input2Descriptor = inputSystemDescriptor.getInputDescriptor("input2", defaultSerde); + outputDescriptor = outputSystemDescriptor.getOutputDescriptor("output", defaultSerde); + + Map<String, String> configs = new HashMap<>(); + configs.put(JobConfig.JOB_NAME(), "jobName"); + configs.put(JobConfig.JOB_ID(), "jobId"); + mockConfig = spy(new MapConfig(configs)); + + mockJobNode = mock(JobNode.class); + StreamEdge input1Edge = new StreamEdge(input1Spec, false, false, mockConfig); + StreamEdge input2Edge = new StreamEdge(input2Spec, false, false, mockConfig); + StreamEdge outputEdge = new StreamEdge(outputSpec, false, false, mockConfig); + StreamEdge repartitionEdge = new StreamEdge(repartitionSpec, true, false, mockConfig); + Map<String, StreamEdge> inputEdges = new HashMap<>(); + inputEdges.put(input1Descriptor.getStreamId(), input1Edge); + inputEdges.put(input2Descriptor.getStreamId(), input2Edge); + inputEdges.put(repartitionSpec.getId(), repartitionEdge); + Map<String, StreamEdge> outputEdges = new HashMap<>(); + outputEdges.put(outputDescriptor.getStreamId(), outputEdge); + outputEdges.put(repartitionSpec.getId(), repartitionEdge); + when(mockJobNode.getInEdges()).thenReturn(inputEdges); + when(mockJobNode.getOutEdges()).thenReturn(outputEdges); + when(mockJobNode.getConfig()).thenReturn(mockConfig); + when(mockJobNode.getJobName()).thenReturn("jobName"); + when(mockJobNode.getJobId()).thenReturn("jobId"); + when(mockJobNode.getJobNameAndId()).thenReturn(JobNode.createJobNameAndId("jobName", "jobId")); } @Test - public void test() throws Exception { + public void testRepartitionedJoinStreamApplication() throws Exception { /** * the graph looks like the following. @@ -142,7 +202,7 @@ public class TestJobGraphJsonGenerator { }, config); ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); - ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph()); + ExecutionPlan plan = planner.plan(graphSpec); String json = plan.getPlanAsJson(); System.out.println(json); @@ -157,7 +217,7 @@ public class TestJobGraphJsonGenerator { } @Test - public void test2() throws Exception { + public void testRepartitionedWindowStreamApplication() throws Exception { Map<String, String> configMap = new HashMap<>(); configMap.put(JobConfig.JOB_NAME(), "test-app"); configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system"); @@ -202,7 +262,7 @@ public class TestJobGraphJsonGenerator { }, config); ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); - ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph()); + ExecutionPlan plan = planner.plan(graphSpec); String json = plan.getPlanAsJson(); System.out.println(json); @@ -222,4 +282,75 @@ public class TestJobGraphJsonGenerator { assertEquals(operatorGraphJson.operators.get("test-app-1-send_to-5").get("outputStreamId"), "PageViewCount"); } + + @Test + public void testTaskApplication() throws Exception { + JobGraphJsonGenerator jsonGenerator = new JobGraphJsonGenerator(); + JobGraph mockJobGraph = mock(JobGraph.class); + ApplicationConfig mockAppConfig = mock(ApplicationConfig.class); + when(mockAppConfig.getAppName()).thenReturn("testTaskApp"); + when(mockAppConfig.getAppId()).thenReturn("testTaskAppId"); + when(mockJobGraph.getApplicationConfig()).thenReturn(mockAppConfig); + // compute the three disjoint sets of the JobGraph: input only, output only, and intermediate streams + Set<StreamEdge> inEdges = new HashSet<>(mockJobNode.getInEdges().values()); + Set<StreamEdge> outEdges = new HashSet<>(mockJobNode.getOutEdges().values()); + Set<StreamEdge> intermediateEdges = new HashSet<>(inEdges); + // intermediate streams are the intersection between input and output + intermediateEdges.retainAll(outEdges); + // remove all intermediate streams from input + inEdges.removeAll(intermediateEdges); + // remove all intermediate streams from output + outEdges.removeAll(intermediateEdges); + // set the return values for mockJobGraph + when(mockJobGraph.getInputStreams()).thenReturn(inEdges); + when(mockJobGraph.getOutputStreams()).thenReturn(outEdges); + when(mockJobGraph.getIntermediateStreamEdges()).thenReturn(intermediateEdges); + when(mockJobGraph.getJobNodes()).thenReturn(Collections.singletonList(mockJobNode)); + String graphJson = jsonGenerator.toJson(mockJobGraph); + ObjectMapper objectMapper = new ObjectMapper(); + JobGraphJsonGenerator.JobGraphJson jsonObject = objectMapper.readValue(graphJson.getBytes(), JobGraphJsonGenerator.JobGraphJson.class); + assertEquals("testTaskAppId", jsonObject.applicationId); + assertEquals("testTaskApp", jsonObject.applicationName); + Set<String> inStreamIds = inEdges.stream().map(stream -> stream.getStreamSpec().getId()).collect(Collectors.toSet()); + assertThat(jsonObject.sourceStreams.keySet(), Matchers.containsInAnyOrder(inStreamIds.toArray())); + Set<String> outStreamIds = outEdges.stream().map(stream -> stream.getStreamSpec().getId()).collect(Collectors.toSet()); + assertThat(jsonObject.sinkStreams.keySet(), Matchers.containsInAnyOrder(outStreamIds.toArray())); + Set<String> intStreamIds = intermediateEdges.stream().map(stream -> stream.getStreamSpec().getId()).collect(Collectors.toSet()); + assertThat(jsonObject.intermediateStreams.keySet(), Matchers.containsInAnyOrder(intStreamIds.toArray())); + JobGraphJsonGenerator.JobNodeJson expectedNodeJson = new JobGraphJsonGenerator.JobNodeJson(); + expectedNodeJson.jobId = mockJobNode.getJobId(); + expectedNodeJson.jobName = mockJobNode.getJobName(); + assertEquals(1, jsonObject.jobs.size()); + JobGraphJsonGenerator.JobNodeJson actualNodeJson = jsonObject.jobs.get(0); + assertEquals(expectedNodeJson.jobId, actualNodeJson.jobId); + assertEquals(expectedNodeJson.jobName, actualNodeJson.jobName); + assertEquals(3, actualNodeJson.operatorGraph.inputStreams.size()); + assertEquals(2, actualNodeJson.operatorGraph.outputStreams.size()); + assertEquals(0, actualNodeJson.operatorGraph.operators.size()); + } + + @Test + public void testLegacyTaskApplication() throws Exception { + JobGraphJsonGenerator jsonGenerator = new JobGraphJsonGenerator(); + JobGraph mockJobGraph = mock(JobGraph.class); + ApplicationConfig mockAppConfig = mock(ApplicationConfig.class); + when(mockAppConfig.getAppName()).thenReturn("testTaskApp"); + when(mockAppConfig.getAppId()).thenReturn("testTaskAppId"); + when(mockJobGraph.getApplicationConfig()).thenReturn(mockAppConfig); + String graphJson = jsonGenerator.toJson(mockJobGraph); + ObjectMapper objectMapper = new ObjectMapper(); + JobGraphJsonGenerator.JobGraphJson jsonObject = objectMapper.readValue(graphJson.getBytes(), JobGraphJsonGenerator.JobGraphJson.class); + assertEquals("testTaskAppId", jsonObject.applicationId); + assertEquals("testTaskApp", jsonObject.applicationName); + JobGraphJsonGenerator.JobNodeJson expectedNodeJson = new JobGraphJsonGenerator.JobNodeJson(); + expectedNodeJson.jobId = mockJobNode.getJobId(); + expectedNodeJson.jobName = mockJobNode.getJobName(); + assertEquals(0, jsonObject.jobs.size()); + } + + public class PageViewEvent { + String getCountry() { + return ""; + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java deleted file mode 100644 index 163b094..0000000 --- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.execution; - -import java.time.Duration; -import java.util.Base64; -import java.util.HashMap; -import java.util.Map; -import java.util.stream.Collectors; -import org.apache.samza.application.StreamApplicationDescriptorImpl; -import org.apache.samza.config.Config; -import org.apache.samza.config.JobConfig; -import org.apache.samza.config.MapConfig; -import org.apache.samza.config.SerializerConfig; -import org.apache.samza.operators.KV; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.descriptors.GenericInputDescriptor; -import org.apache.samza.operators.descriptors.GenericOutputDescriptor; -import org.apache.samza.operators.descriptors.GenericSystemDescriptor; -import org.apache.samza.operators.functions.JoinFunction; -import org.apache.samza.operators.impl.store.TimestampedValueSerde; -import org.apache.samza.serializers.JsonSerdeV2; -import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.Serde; -import org.apache.samza.serializers.SerializableSerde; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.StreamSpec; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.*; - -public class TestJobNode { - - @Test - public void testAddSerdeConfigs() { - StreamSpec input1Spec = new StreamSpec("input1", "input1", "input-system"); - StreamSpec input2Spec = new StreamSpec("input2", "input2", "input-system"); - StreamSpec outputSpec = new StreamSpec("output", "output", "output-system"); - StreamSpec partitionBySpec = - new StreamSpec("jobName-jobId-partition_by-p1", "partition_by-p1", "intermediate-system"); - - Config mockConfig = mock(Config.class); - when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName"); - when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId"); - - StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> { - KVSerde<String, Object> serde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>()); - GenericSystemDescriptor sd = new GenericSystemDescriptor("system1", "mockSystemFactoryClass"); - GenericInputDescriptor<KV<String, Object>> inputDescriptor1 = sd.getInputDescriptor("input1", serde); - GenericInputDescriptor<KV<String, Object>> inputDescriptor2 = sd.getInputDescriptor("input2", serde); - GenericOutputDescriptor<KV<String, Object>> outputDescriptor = sd.getOutputDescriptor("output", serde); - MessageStream<KV<String, Object>> input1 = appDesc.getInputStream(inputDescriptor1); - MessageStream<KV<String, Object>> input2 = appDesc.getInputStream(inputDescriptor2); - OutputStream<KV<String, Object>> output = appDesc.getOutputStream(outputDescriptor); - JoinFunction<String, Object, Object, KV<String, Object>> mockJoinFn = mock(JoinFunction.class); - input1 - .partitionBy(KV::getKey, KV::getValue, serde, "p1") - .map(kv -> kv.value) - .join(input2.map(kv -> kv.value), mockJoinFn, - new StringSerde(), new JsonSerdeV2<>(Object.class), new JsonSerdeV2<>(Object.class), - Duration.ofHours(1), "j1") - .sendTo(output); - }, mockConfig); - - JobNode jobNode = new JobNode("jobName", "jobId", graphSpec.getOperatorSpecGraph(), mockConfig); - Config config = new MapConfig(); - StreamEdge input1Edge = new StreamEdge(input1Spec, false, false, config); - StreamEdge input2Edge = new StreamEdge(input2Spec, false, false, config); - StreamEdge outputEdge = new StreamEdge(outputSpec, false, false, config); - StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true, false, config); - jobNode.addInEdge(input1Edge); - jobNode.addInEdge(input2Edge); - jobNode.addOutEdge(outputEdge); - jobNode.addInEdge(repartitionEdge); - jobNode.addOutEdge(repartitionEdge); - - Map<String, String> configs = new HashMap<>(); - jobNode.addSerdeConfigs(configs); - - MapConfig mapConfig = new MapConfig(configs); - Config serializers = mapConfig.subset("serializers.registry.", true); - - // make sure that the serializers deserialize correctly - SerializableSerde<Serde> serializableSerde = new SerializableSerde<>(); - Map<String, Serde> deserializedSerdes = serializers.entrySet().stream().collect(Collectors.toMap( - e -> e.getKey().replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), ""), - e -> serializableSerde.fromBytes(Base64.getDecoder().decode(e.getValue().getBytes())) - )); - assertEquals(5, serializers.size()); // 2 default + 3 specific for join - - String input1KeySerde = mapConfig.get("streams.input1.samza.key.serde"); - String input1MsgSerde = mapConfig.get("streams.input1.samza.msg.serde"); - assertTrue("Serialized serdes should contain input1 key serde", - deserializedSerdes.containsKey(input1KeySerde)); - assertTrue("Serialized input1 key serde should be a StringSerde", - input1KeySerde.startsWith(StringSerde.class.getSimpleName())); - assertTrue("Serialized serdes should contain input1 msg serde", - deserializedSerdes.containsKey(input1MsgSerde)); - assertTrue("Serialized input1 msg serde should be a JsonSerdeV2", - input1MsgSerde.startsWith(JsonSerdeV2.class.getSimpleName())); - - String input2KeySerde = mapConfig.get("streams.input2.samza.key.serde"); - String input2MsgSerde = mapConfig.get("streams.input2.samza.msg.serde"); - assertTrue("Serialized serdes should contain input2 key serde", - deserializedSerdes.containsKey(input2KeySerde)); - assertTrue("Serialized input2 key serde should be a StringSerde", - input2KeySerde.startsWith(StringSerde.class.getSimpleName())); - assertTrue("Serialized serdes should contain input2 msg serde", - deserializedSerdes.containsKey(input2MsgSerde)); - assertTrue("Serialized input2 msg serde should be a JsonSerdeV2", - input2MsgSerde.startsWith(JsonSerdeV2.class.getSimpleName())); - - String outputKeySerde = mapConfig.get("streams.output.samza.key.serde"); - String outputMsgSerde = mapConfig.get("streams.output.samza.msg.serde"); - assertTrue("Serialized serdes should contain output key serde", - deserializedSerdes.containsKey(outputKeySerde)); - assertTrue("Serialized output key serde should be a StringSerde", - outputKeySerde.startsWith(StringSerde.class.getSimpleName())); - assertTrue("Serialized serdes should contain output msg serde", - deserializedSerdes.containsKey(outputMsgSerde)); - assertTrue("Serialized output msg serde should be a JsonSerdeV2", - outputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName())); - - String partitionByKeySerde = mapConfig.get("streams.jobName-jobId-partition_by-p1.samza.key.serde"); - String partitionByMsgSerde = mapConfig.get("streams.jobName-jobId-partition_by-p1.samza.msg.serde"); - assertTrue("Serialized serdes should contain intermediate stream key serde", - deserializedSerdes.containsKey(partitionByKeySerde)); - assertTrue("Serialized intermediate stream key serde should be a StringSerde", - partitionByKeySerde.startsWith(StringSerde.class.getSimpleName())); - assertTrue("Serialized serdes should contain intermediate stream msg serde", - deserializedSerdes.containsKey(partitionByMsgSerde)); - assertTrue( - "Serialized intermediate stream msg serde should be a JsonSerdeV2", - partitionByMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName())); - - String leftJoinStoreKeySerde = mapConfig.get("stores.jobName-jobId-join-j1-L.key.serde"); - String leftJoinStoreMsgSerde = mapConfig.get("stores.jobName-jobId-join-j1-L.msg.serde"); - assertTrue("Serialized serdes should contain left join store key serde", - deserializedSerdes.containsKey(leftJoinStoreKeySerde)); - assertTrue("Serialized left join store key serde should be a StringSerde", - leftJoinStoreKeySerde.startsWith(StringSerde.class.getSimpleName())); - assertTrue("Serialized serdes should contain left join store msg serde", - deserializedSerdes.containsKey(leftJoinStoreMsgSerde)); - assertTrue("Serialized left join store msg serde should be a TimestampedValueSerde", - leftJoinStoreMsgSerde.startsWith(TimestampedValueSerde.class.getSimpleName())); - - String rightJoinStoreKeySerde = mapConfig.get("stores.jobName-jobId-join-j1-R.key.serde"); - String rightJoinStoreMsgSerde = mapConfig.get("stores.jobName-jobId-join-j1-R.msg.serde"); - assertTrue("Serialized serdes should contain right join store key serde", - deserializedSerdes.containsKey(rightJoinStoreKeySerde)); - assertTrue("Serialized right join store key serde should be a StringSerde", - rightJoinStoreKeySerde.startsWith(StringSerde.class.getSimpleName())); - assertTrue("Serialized serdes should contain right join store msg serde", - deserializedSerdes.containsKey(rightJoinStoreMsgSerde)); - assertTrue("Serialized right join store msg serde should be a TimestampedValueSerde", - rightJoinStoreMsgSerde.startsWith(TimestampedValueSerde.class.getSimpleName())); - } - - @Test - public void testAddSerdeConfigsForRepartitionWithNoDefaultSystem() { - StreamSpec inputSpec = new StreamSpec("input", "input", "input-system"); - StreamSpec partitionBySpec = - new StreamSpec("jobName-jobId-partition_by-p1", "partition_by-p1", "intermediate-system"); - - Config mockConfig = mock(Config.class); - when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName"); - when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId"); - - StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> { - GenericSystemDescriptor sd = new GenericSystemDescriptor("system1", "mockSystemFactoryClassName"); - GenericInputDescriptor<KV<String, Object>> inputDescriptor1 = - sd.getInputDescriptor("input", KVSerde.of(new StringSerde(), new JsonSerdeV2<>())); - MessageStream<KV<String, Object>> input = appDesc.getInputStream(inputDescriptor1); - input.partitionBy(KV::getKey, KV::getValue, "p1"); - }, mockConfig); - - JobNode jobNode = new JobNode("jobName", "jobId", graphSpec.getOperatorSpecGraph(), mockConfig); - Config config = new MapConfig(); - StreamEdge input1Edge = new StreamEdge(inputSpec, false, false, config); - StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true, false, config); - jobNode.addInEdge(input1Edge); - jobNode.addInEdge(repartitionEdge); - jobNode.addOutEdge(repartitionEdge); - - Map<String, String> configs = new HashMap<>(); - jobNode.addSerdeConfigs(configs); - - MapConfig mapConfig = new MapConfig(configs); - Config serializers = mapConfig.subset("serializers.registry.", true); - - // make sure that the serializers deserialize correctly - SerializableSerde<Serde> serializableSerde = new SerializableSerde<>(); - Map<String, Serde> deserializedSerdes = serializers.entrySet().stream().collect(Collectors.toMap( - e -> e.getKey().replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), ""), - e -> serializableSerde.fromBytes(Base64.getDecoder().decode(e.getValue().getBytes())) - )); - assertEquals(2, serializers.size()); // 2 input stream - - String partitionByKeySerde = mapConfig.get("streams.jobName-jobId-partition_by-p1.samza.key.serde"); - String partitionByMsgSerde = mapConfig.get("streams.jobName-jobId-partition_by-p1.samza.msg.serde"); - assertTrue("Serialized serdes should not contain intermediate stream key serde", - !deserializedSerdes.containsKey(partitionByKeySerde)); - assertTrue("Serialized serdes should not contain intermediate stream msg serde", - !deserializedSerdes.containsKey(partitionByMsgSerde)); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java new file mode 100644 index 0000000..f351c44 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java @@ -0,0 +1,509 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.execution; + +import com.google.common.base.Joiner; +import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.samza.application.StreamApplicationDescriptorImpl; +import org.apache.samza.application.TaskApplicationDescriptorImpl; +import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigRewriter; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.SerializerConfig; +import org.apache.samza.config.TaskConfig; +import org.apache.samza.config.TaskConfigJava; +import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.operators.BaseTableDescriptor; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.TableDescriptor; +import org.apache.samza.operators.descriptors.GenericInputDescriptor; +import org.apache.samza.operators.impl.store.TimestampedValueSerde; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.SerializableSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.StreamSpec; +import org.apache.samza.table.Table; +import org.apache.samza.table.TableProvider; +import org.apache.samza.table.TableProviderFactory; +import org.apache.samza.table.TableSpec; +import org.apache.samza.task.TaskContext; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + + +/** + * Unit test for {@link JobNodeConfigurationGenerator} + */ +public class TestJobNodeConfigurationGenerator extends ExecutionPlannerTestBase { + + @Test + public void testConfigureSerdesWithRepartitionJoinApplication() { + mockStreamAppDesc = new StreamApplicationDescriptorImpl(getRepartitionJoinStreamApplication(), mockConfig); + configureJobNode(mockStreamAppDesc); + // create the JobGraphConfigureGenerator and generate the jobConfig for the jobNode + JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator(); + JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "testJobGraphJson"); + + // Verify the results + Config expectedJobConfig = getExpectedJobConfig(mockConfig, mockJobNode.getInEdges()); + validateJobConfig(expectedJobConfig, jobConfig); + // additional, check the computed window.ms for join + assertEquals("3600000", jobConfig.get(TaskConfig.WINDOW_MS())); + Map<String, Serde> deserializedSerdes = validateAndGetDeserializedSerdes(jobConfig, 5); + validateStreamConfigures(jobConfig, deserializedSerdes); + validateJoinStoreConfigures(jobConfig, deserializedSerdes); + } + + @Test + public void testConfigureSerdesForRepartitionWithNoDefaultSystem() { + // set the application to RepartitionOnlyStreamApplication + mockStreamAppDesc = new StreamApplicationDescriptorImpl(getRepartitionOnlyStreamApplication(), mockConfig); + configureJobNode(mockStreamAppDesc); + + // create the JobGraphConfigureGenerator and generate the jobConfig for the jobNode + JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator(); + JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "testJobGraphJson"); + + // Verify the results + Config expectedJobConfig = getExpectedJobConfig(mockConfig, mockJobNode.getInEdges()); + validateJobConfig(expectedJobConfig, jobConfig); + + Map<String, Serde> deserializedSerdes = validateAndGetDeserializedSerdes(jobConfig, 2); + validateStreamConfigures(jobConfig, null); + + String partitionByKeySerde = jobConfig.get("streams.jobName-jobId-partition_by-p1.samza.key.serde"); + String partitionByMsgSerde = jobConfig.get("streams.jobName-jobId-partition_by-p1.samza.msg.serde"); + assertTrue("Serialized serdes should not contain intermediate stream key serde", + !deserializedSerdes.containsKey(partitionByKeySerde)); + assertTrue("Serialized serdes should not contain intermediate stream msg serde", + !deserializedSerdes.containsKey(partitionByMsgSerde)); + } + + @Test + public void testGenerateJobConfigWithTaskApplication() { + // set the application to TaskApplication, which still wire up all input/output/intermediate streams + TaskApplicationDescriptorImpl taskAppDesc = new TaskApplicationDescriptorImpl(getTaskApplication(), mockConfig); + configureJobNode(taskAppDesc); + // create the JobGraphConfigureGenerator and generate the jobConfig for the jobNode + JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator(); + JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "testJobGraphJson"); + + // Verify the results + Config expectedJobConfig = getExpectedJobConfig(mockConfig, mockJobNode.getInEdges()); + validateJobConfig(expectedJobConfig, jobConfig); + Map<String, Serde> deserializedSerdes = validateAndGetDeserializedSerdes(jobConfig, 2); + validateStreamConfigures(jobConfig, deserializedSerdes); + } + + @Test + public void testGenerateJobConfigWithLegacyTaskApplication() { + TaskApplicationDescriptorImpl taskAppDesc = new TaskApplicationDescriptorImpl(getLegacyTaskApplication(), mockConfig); + configureJobNode(taskAppDesc); + Map<String, String> originConfig = new HashMap<>(mockConfig); + + // create the JobGraphConfigureGenerator and generate the jobConfig for the jobNode + JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator(); + JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, ""); + // jobConfig should be exactly the same as original config + Map<String, String> generatedConfig = new HashMap<>(jobConfig); + assertEquals(originConfig, generatedConfig); + } + + @Test + public void testBroadcastStreamApplication() { + // set the application to BroadcastStreamApplication + mockStreamAppDesc = new StreamApplicationDescriptorImpl(getBroadcastOnlyStreamApplication(defaultSerde), mockConfig); + configureJobNode(mockStreamAppDesc); + + // create the JobGraphConfigureGenerator and generate the jobConfig for the jobNode + JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator(); + JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "testJobGraphJson"); + Config expectedJobConfig = getExpectedJobConfig(mockConfig, mockJobNode.getInEdges()); + validateJobConfig(expectedJobConfig, jobConfig); + Map<String, Serde> deserializedSerdes = validateAndGetDeserializedSerdes(jobConfig, 2); + validateStreamSerdeConfigure(broadcastInputDesriptor.getStreamId(), jobConfig, deserializedSerdes); + validateIntermediateStreamConfigure(broadcastInputDesriptor.getStreamId(), broadcastInputDesriptor.getPhysicalName().get(), jobConfig); + } + + @Test + public void testBroadcastStreamApplicationWithoutSerde() { + // set the application to BroadcastStreamApplication withoutSerde + mockStreamAppDesc = new StreamApplicationDescriptorImpl(getBroadcastOnlyStreamApplication(null), mockConfig); + configureJobNode(mockStreamAppDesc); + + // create the JobGraphConfigureGenerator and generate the jobConfig for the jobNode + JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator(); + JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "testJobGraphJson"); + Config expectedJobConfig = getExpectedJobConfig(mockConfig, mockJobNode.getInEdges()); + validateJobConfig(expectedJobConfig, jobConfig); + Map<String, Serde> deserializedSerdes = validateAndGetDeserializedSerdes(jobConfig, 2); + validateIntermediateStreamConfigure(broadcastInputDesriptor.getStreamId(), broadcastInputDesriptor.getPhysicalName().get(), jobConfig); + + String keySerde = jobConfig.get(String.format("streams.%s.samza.key.serde", broadcastInputDesriptor.getStreamId())); + String msgSerde = jobConfig.get(String.format("streams.%s.samza.msg.serde", broadcastInputDesriptor.getStreamId())); + assertTrue("Serialized serdes should not contain intermediate stream key serde", + !deserializedSerdes.containsKey(keySerde)); + assertTrue("Serialized serdes should not contain intermediate stream msg serde", + !deserializedSerdes.containsKey(msgSerde)); + } + + @Test + public void testStreamApplicationWithTableAndSideInput() { + mockStreamAppDesc = new StreamApplicationDescriptorImpl(getRepartitionJoinStreamApplication(), mockConfig); + // add table to the RepartitionJoinStreamApplication + GenericInputDescriptor<KV<String, Object>> sideInput1 = inputSystemDescriptor.getInputDescriptor("sideInput1", defaultSerde); + BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class); + TableSpec mockTableSpec = mock(TableSpec.class); + when(mockTableSpec.getId()).thenReturn("testTable"); + when(mockTableSpec.getSerde()).thenReturn((KVSerde) defaultSerde); + when(mockTableSpec.getTableProviderFactoryClassName()).thenReturn(MockTableProviderFactory.class.getName()); + List<String> sideInputs = new ArrayList<>(); + sideInputs.add(sideInput1.getStreamId()); + when(mockTableSpec.getSideInputs()).thenReturn(sideInputs); + when(mockTableDescriptor.getTableId()).thenReturn("testTable"); + when(mockTableDescriptor.getTableSpec()).thenReturn(mockTableSpec); + when(mockTableDescriptor.getSerde()).thenReturn(defaultSerde); + // add side input and terminate at table in the appplication + mockStreamAppDesc.getInputStream(sideInput1).sendTo(mockStreamAppDesc.getTable(mockTableDescriptor)); + StreamEdge sideInputEdge = new StreamEdge(new StreamSpec(sideInput1.getStreamId(), "sideInput1", + inputSystemDescriptor.getSystemName()), false, false, mockConfig); + // need to put the sideInput related stream configuration to the original config + // TODO: this is confusing since part of the system and stream related configuration is generated outside the JobGraphConfigureGenerator + // It would be nice if all system and stream related configuration is generated in one place and only intermediate stream + // configuration is generated by JobGraphConfigureGenerator + Map<String, String> configs = new HashMap<>(mockConfig); + configs.putAll(sideInputEdge.generateConfig()); + mockConfig = spy(new MapConfig(configs)); + configureJobNode(mockStreamAppDesc); + + // create the JobGraphConfigureGenerator and generate the jobConfig for the jobNode + JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator(); + JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "testJobGraphJson"); + Config expectedJobConfig = getExpectedJobConfig(mockConfig, mockJobNode.getInEdges()); + validateJobConfig(expectedJobConfig, jobConfig); + Map<String, Serde> deserializedSerdes = validateAndGetDeserializedSerdes(jobConfig, 5); + validateTableConfigure(jobConfig, deserializedSerdes, mockTableDescriptor); + } + + @Test + public void testTaskApplicationWithTableAndSideInput() { + // add table to the RepartitionJoinStreamApplication + GenericInputDescriptor<KV<String, Object>> sideInput1 = inputSystemDescriptor.getInputDescriptor("sideInput1", defaultSerde); + BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class); + TableSpec mockTableSpec = mock(TableSpec.class); + when(mockTableSpec.getId()).thenReturn("testTable"); + when(mockTableSpec.getSerde()).thenReturn((KVSerde) defaultSerde); + when(mockTableSpec.getTableProviderFactoryClassName()).thenReturn(MockTableProviderFactory.class.getName()); + List<String> sideInputs = new ArrayList<>(); + sideInputs.add(sideInput1.getStreamId()); + when(mockTableSpec.getSideInputs()).thenReturn(sideInputs); + when(mockTableDescriptor.getTableId()).thenReturn("testTable"); + when(mockTableDescriptor.getTableSpec()).thenReturn(mockTableSpec); + when(mockTableDescriptor.getSerde()).thenReturn(defaultSerde); + StreamEdge sideInputEdge = new StreamEdge(new StreamSpec(sideInput1.getStreamId(), "sideInput1", + inputSystemDescriptor.getSystemName()), false, false, mockConfig); + // need to put the sideInput related stream configuration to the original config + // TODO: this is confusing since part of the system and stream related configuration is generated outside the JobGraphConfigureGenerator + // It would be nice if all system and stream related configuration is generated in one place and only intermediate stream + // configuration is generated by JobGraphConfigureGenerator + Map<String, String> configs = new HashMap<>(mockConfig); + configs.putAll(sideInputEdge.generateConfig()); + mockConfig = spy(new MapConfig(configs)); + + // set the application to TaskApplication, which still wire up all input/output/intermediate streams + TaskApplicationDescriptorImpl taskAppDesc = new TaskApplicationDescriptorImpl(getTaskApplication(), mockConfig); + // add table to the task application + taskAppDesc.addTable(mockTableDescriptor); + taskAppDesc.addInputStream(inputSystemDescriptor.getInputDescriptor("sideInput1", defaultSerde)); + configureJobNode(taskAppDesc); + + // create the JobGraphConfigureGenerator and generate the jobConfig for the jobNode + JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator(); + JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "testJobGraphJson"); + + // Verify the results + Config expectedJobConfig = getExpectedJobConfig(mockConfig, mockJobNode.getInEdges()); + validateJobConfig(expectedJobConfig, jobConfig); + Map<String, Serde> deserializedSerdes = validateAndGetDeserializedSerdes(jobConfig, 2); + validateStreamConfigures(jobConfig, deserializedSerdes); + validateTableConfigure(jobConfig, deserializedSerdes, mockTableDescriptor); + } + + @Test + public void testTaskInputsRemovedFromOriginalConfig() { + Map<String, String> configs = new HashMap<>(mockConfig); + configs.put(TaskConfig.INPUT_STREAMS(), "not.allowed1,not.allowed2"); + mockConfig = spy(new MapConfig(configs)); + + mockStreamAppDesc = new StreamApplicationDescriptorImpl(getBroadcastOnlyStreamApplication(defaultSerde), mockConfig); + configureJobNode(mockStreamAppDesc); + + JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator(); + JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "testJobGraphJson"); + Config expectedConfig = getExpectedJobConfig(mockConfig, mockJobNode.getInEdges()); + validateJobConfig(expectedConfig, jobConfig); + } + + @Test + public void testTaskInputsRetainedForLegacyTaskApplication() { + Map<String, String> originConfig = new HashMap<>(mockConfig); + originConfig.put(TaskConfig.INPUT_STREAMS(), "must.retain1,must.retain2"); + mockConfig = new MapConfig(originConfig); + TaskApplicationDescriptorImpl taskAppDesc = new TaskApplicationDescriptorImpl(getLegacyTaskApplication(), mockConfig); + configureJobNode(taskAppDesc); + + // create the JobGraphConfigureGenerator and generate the jobConfig for the jobNode + JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator(); + JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, ""); + // jobConfig should be exactly the same as original config + Map<String, String> generatedConfig = new HashMap<>(jobConfig); + assertEquals(originConfig, generatedConfig); + } + + @Test + public void testOverrideConfigs() { + Map<String, String> configs = new HashMap<>(mockConfig); + String streamCfgToOverride = String.format("streams.%s.samza.system", intermediateInputDescriptor.getStreamId()); + String overrideCfgKey = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId()) + streamCfgToOverride; + configs.put(overrideCfgKey, "customized-system"); + mockConfig = spy(new MapConfig(configs)); + mockStreamAppDesc = new StreamApplicationDescriptorImpl(getRepartitionJoinStreamApplication(), mockConfig); + configureJobNode(mockStreamAppDesc); + + JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator(); + JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "testJobGraphJson"); + Config expectedConfig = getExpectedJobConfig(mockConfig, mockJobNode.getInEdges()); + validateJobConfig(expectedConfig, jobConfig); + assertEquals("customized-system", jobConfig.get(streamCfgToOverride)); + } + + @Test + public void testConfigureRewriter() { + Map<String, String> configs = new HashMap<>(mockConfig); + String streamCfgToOverride = String.format("streams.%s.samza.system", intermediateInputDescriptor.getStreamId()); + String overrideCfgKey = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId()) + streamCfgToOverride; + configs.put(overrideCfgKey, "customized-system"); + configs.put(String.format(JobConfig.CONFIG_REWRITER_CLASS(), "mock"), MockConfigRewriter.class.getName()); + configs.put(JobConfig.CONFIG_REWRITERS(), "mock"); + configs.put(String.format("job.config.rewriter.mock.%s", streamCfgToOverride), "rewritten-system"); + mockConfig = spy(new MapConfig(configs)); + mockStreamAppDesc = new StreamApplicationDescriptorImpl(getRepartitionJoinStreamApplication(), mockConfig); + configureJobNode(mockStreamAppDesc); + + JobNodeConfigurationGenerator configureGenerator = new JobNodeConfigurationGenerator(); + JobConfig jobConfig = configureGenerator.generateJobConfig(mockJobNode, "testJobGraphJson"); + Config expectedConfig = getExpectedJobConfig(mockConfig, mockJobNode.getInEdges()); + validateJobConfig(expectedConfig, jobConfig); + assertEquals("rewritten-system", jobConfig.get(streamCfgToOverride)); + } + + private void validateTableConfigure(JobConfig jobConfig, Map<String, Serde> deserializedSerdes, + TableDescriptor tableDescriptor) { + Config tableConfig = jobConfig.subset(String.format("tables.%s.", tableDescriptor.getTableId())); + assertEquals(MockTableProviderFactory.class.getName(), tableConfig.get("provider.factory")); + MockTableProvider mockTableProvider = + (MockTableProvider) new MockTableProviderFactory().getTableProvider(((BaseTableDescriptor) tableDescriptor).getTableSpec()); + assertEquals(mockTableProvider.configMap.get("mock.table.provider.config"), jobConfig.get("mock.table.provider.config")); + validateTableSerdeConfigure(tableDescriptor.getTableId(), jobConfig, deserializedSerdes); + } + + private Config getExpectedJobConfig(Config originConfig, Map<String, StreamEdge> inputEdges) { + Map<String, String> configMap = new HashMap<>(originConfig); + Set<String> inputs = new HashSet<>(); + Set<String> broadcasts = new HashSet<>(); + for (StreamEdge inputEdge : inputEdges.values()) { + if (inputEdge.isBroadcast()) { + broadcasts.add(inputEdge.getName() + "#0"); + } else { + inputs.add(inputEdge.getName()); + } + } + if (!inputs.isEmpty()) { + configMap.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs)); + } + if (!broadcasts.isEmpty()) { + configMap.put(TaskConfigJava.BROADCAST_INPUT_STREAMS, Joiner.on(',').join(broadcasts)); + } + return new MapConfig(configMap); + } + + private Map<String, Serde> validateAndGetDeserializedSerdes(Config jobConfig, int numSerdes) { + Config serializers = jobConfig.subset("serializers.registry.", true); + // make sure that the serializers deserialize correctly + SerializableSerde<Serde> serializableSerde = new SerializableSerde<>(); + assertEquals(numSerdes, serializers.size()); + return serializers.entrySet().stream().collect(Collectors.toMap( + e -> e.getKey().replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), ""), + e -> serializableSerde.fromBytes(Base64.getDecoder().decode(e.getValue().getBytes())) + )); + } + + private void validateJobConfig(Config expectedConfig, JobConfig jobConfig) { + assertEquals(expectedConfig.get(JobConfig.JOB_NAME()), jobConfig.getName().get()); + assertEquals(expectedConfig.get(JobConfig.JOB_ID()), jobConfig.getJobId()); + assertEquals("testJobGraphJson", jobConfig.get(JobNodeConfigurationGenerator.CONFIG_INTERNAL_EXECUTION_PLAN)); + assertEquals(expectedConfig.get(TaskConfig.INPUT_STREAMS()), jobConfig.get(TaskConfig.INPUT_STREAMS())); + assertEquals(expectedConfig.get(TaskConfigJava.BROADCAST_INPUT_STREAMS), jobConfig.get(TaskConfigJava.BROADCAST_INPUT_STREAMS)); + } + + private void validateStreamSerdeConfigure(String streamId, Config config, Map<String, Serde> deserializedSerdes) { + Config streamConfig = config.subset(String.format("streams.%s.samza.", streamId)); + String keySerdeName = streamConfig.get("key.serde"); + String valueSerdeName = streamConfig.get("msg.serde"); + assertTrue(String.format("Serialized serdes should contain %s key serde", streamId), deserializedSerdes.containsKey(keySerdeName)); + assertTrue(String.format("Serialized %s key serde should be a StringSerde", streamId), keySerdeName.startsWith(StringSerde.class.getSimpleName())); + assertTrue(String.format("Serialized serdes should contain %s msg serde", streamId), deserializedSerdes.containsKey(valueSerdeName)); + assertTrue(String.format("Serialized %s msg serde should be a JsonSerdeV2", streamId), valueSerdeName.startsWith(JsonSerdeV2.class.getSimpleName())); + } + + private void validateTableSerdeConfigure(String tableId, Config config, Map<String, Serde> deserializedSerdes) { + Config streamConfig = config.subset(String.format("tables.%s.", tableId)); + String keySerdeName = streamConfig.get("key.serde"); + String valueSerdeName = streamConfig.get("value.serde"); + assertTrue(String.format("Serialized serdes should contain %s key serde", tableId), deserializedSerdes.containsKey(keySerdeName)); + assertTrue(String.format("Serialized %s key serde should be a StringSerde", tableId), keySerdeName.startsWith(StringSerde.class.getSimpleName())); + assertTrue(String.format("Serialized serdes should contain %s value serde", tableId), deserializedSerdes.containsKey(valueSerdeName)); + assertTrue(String.format("Serialized %s msg serde should be a JsonSerdeV2", tableId), valueSerdeName.startsWith(JsonSerdeV2.class.getSimpleName())); + } + + private void validateIntermediateStreamConfigure(String streamId, String physicalName, Config config) { + Config intStreamConfig = config.subset(String.format("streams.%s.", streamId), true); + assertEquals("intermediate-system", intStreamConfig.get("samza.system")); + assertEquals(String.valueOf(Integer.MAX_VALUE), intStreamConfig.get("samza.priority")); + assertEquals("true", intStreamConfig.get("samza.delete.committed.messages")); + assertEquals(physicalName, intStreamConfig.get("samza.physical.name")); + assertEquals("true", intStreamConfig.get("samza.intermediate")); + assertEquals("oldest", intStreamConfig.get("samza.offset.default")); + } + + private void validateStreamConfigures(Config config, Map<String, Serde> deserializedSerdes) { + + if (deserializedSerdes != null) { + validateStreamSerdeConfigure(input1Descriptor.getStreamId(), config, deserializedSerdes); + validateStreamSerdeConfigure(input2Descriptor.getStreamId(), config, deserializedSerdes); + validateStreamSerdeConfigure(outputDescriptor.getStreamId(), config, deserializedSerdes); + validateStreamSerdeConfigure(intermediateInputDescriptor.getStreamId(), config, deserializedSerdes); + } + + // generated stream config for intermediate stream + String physicalName = intermediateInputDescriptor.getPhysicalName().isPresent() ? + intermediateInputDescriptor.getPhysicalName().get() : null; + validateIntermediateStreamConfigure(intermediateInputDescriptor.getStreamId(), physicalName, config); + } + + private void validateJoinStoreConfigures(MapConfig mapConfig, Map<String, Serde> deserializedSerdes) { + String leftJoinStoreKeySerde = mapConfig.get("stores.jobName-jobId-join-j1-L.key.serde"); + String leftJoinStoreMsgSerde = mapConfig.get("stores.jobName-jobId-join-j1-L.msg.serde"); + assertTrue("Serialized serdes should contain left join store key serde", + deserializedSerdes.containsKey(leftJoinStoreKeySerde)); + assertTrue("Serialized left join store key serde should be a StringSerde", + leftJoinStoreKeySerde.startsWith(StringSerde.class.getSimpleName())); + assertTrue("Serialized serdes should contain left join store msg serde", + deserializedSerdes.containsKey(leftJoinStoreMsgSerde)); + assertTrue("Serialized left join store msg serde should be a TimestampedValueSerde", + leftJoinStoreMsgSerde.startsWith(TimestampedValueSerde.class.getSimpleName())); + + String rightJoinStoreKeySerde = mapConfig.get("stores.jobName-jobId-join-j1-R.key.serde"); + String rightJoinStoreMsgSerde = mapConfig.get("stores.jobName-jobId-join-j1-R.msg.serde"); + assertTrue("Serialized serdes should contain right join store key serde", + deserializedSerdes.containsKey(rightJoinStoreKeySerde)); + assertTrue("Serialized right join store key serde should be a StringSerde", + rightJoinStoreKeySerde.startsWith(StringSerde.class.getSimpleName())); + assertTrue("Serialized serdes should contain right join store msg serde", + deserializedSerdes.containsKey(rightJoinStoreMsgSerde)); + assertTrue("Serialized right join store msg serde should be a TimestampedValueSerde", + rightJoinStoreMsgSerde.startsWith(TimestampedValueSerde.class.getSimpleName())); + + Config leftJoinStoreConfig = mapConfig.subset("stores.jobName-jobId-join-j1-L.", true); + validateJoinStoreConfigure(leftJoinStoreConfig, "jobName-jobId-join-j1-L"); + Config rightJoinStoreConfig = mapConfig.subset("stores.jobName-jobId-join-j1-R.", true); + validateJoinStoreConfigure(rightJoinStoreConfig, "jobName-jobId-join-j1-R"); + } + + private void validateJoinStoreConfigure(Config joinStoreConfig, String changelogName) { + assertEquals("org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory", joinStoreConfig.get("factory")); + assertEquals(changelogName, joinStoreConfig.get("changelog")); + assertEquals("delete", joinStoreConfig.get("changelog.kafka.cleanup.policy")); + assertEquals("3600000", joinStoreConfig.get("changelog.kafka.retention.ms")); + assertEquals("3600000", joinStoreConfig.get("rocksdb.ttl.ms")); + } + + private static class MockTableProvider implements TableProvider { + private final Map<String, String> configMap; + + MockTableProvider(Map<String, String> configMap) { + this.configMap = configMap; + } + + @Override + public void init(SamzaContainerContext containerContext, TaskContext taskContext) { + + } + + @Override + public Table getTable() { + return null; + } + + @Override + public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) { + return configMap; + } + + @Override + public void close() { + + } + } + + public static class MockTableProviderFactory implements TableProviderFactory { + + @Override + public TableProvider getTableProvider(TableSpec tableSpec) { + Map<String, String> configMap = new HashMap<>(); + configMap.put("mock.table.provider.config", "mock.config.value"); + return new MockTableProvider(configMap); + } + } + + public static class MockConfigRewriter implements ConfigRewriter { + + @Override + public Config rewrite(String name, Config config) { + Map<String, String> configMap = new HashMap<>(config); + configMap.putAll(config.subset(String.format("job.config.rewriter.%s.", name))); + return new MapConfig(configMap); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/execution/TestRemoteJobPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestRemoteJobPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestRemoteJobPlanner.java index 988fb34..85921f4 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestRemoteJobPlanner.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestRemoteJobPlanner.java @@ -69,7 +69,7 @@ public class TestRemoteJobPlanner { ApplicationConfig mockAppConfig = mock(ApplicationConfig.class); when(mockAppConfig.getAppMode()).thenReturn(ApplicationConfig.ApplicationMode.STREAM); when(plan.getApplicationConfig()).thenReturn(mockAppConfig); - doReturn(plan).when(remotePlanner).getExecutionPlan(any(), any()); + doReturn(plan).when(remotePlanner).getExecutionPlan(any()); remotePlanner.prepareJobs(); http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java index a5b15b8..57ae6d8 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java @@ -117,7 +117,6 @@ public class TestOperatorSpecGraph { OperatorSpecGraph specGraph = new OperatorSpecGraph(mockAppDesc); assertEquals(specGraph.getInputOperators(), inputOpSpecMap); assertEquals(specGraph.getOutputStreams(), outputStrmMap); - assertTrue(specGraph.getTables().isEmpty()); assertTrue(!specGraph.hasWindowOrJoins()); assertEquals(specGraph.getAllOperatorSpecs(), this.allOpSpecs); } http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java b/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java index 7704a5b..a34fdc3 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java @@ -53,7 +53,6 @@ public class OperatorSpecTestUtils { public static void assertClonedGraph(OperatorSpecGraph originalGraph, OperatorSpecGraph clonedGraph) { assertClonedInputs(originalGraph.getInputOperators(), clonedGraph.getInputOperators()); assertClonedOutputs(originalGraph.getOutputStreams(), clonedGraph.getOutputStreams()); - assertClonedTables(originalGraph.getTables(), clonedGraph.getTables()); assertAllOperators(originalGraph.getAllOperatorSpecs(), clonedGraph.getAllOperatorSpecs()); } http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java index 19ee74f..fd0ddf8 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java @@ -25,10 +25,10 @@ import java.util.HashMap; import java.util.Map; import org.apache.samza.application.ApplicationDescriptor; import org.apache.samza.application.ApplicationDescriptorImpl; +import org.apache.samza.application.LegacyTaskApplication; import org.apache.samza.application.SamzaApplication; import org.apache.samza.application.ApplicationDescriptorUtil; import org.apache.samza.application.StreamApplication; -import org.apache.samza.application.TaskApplication; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; @@ -37,7 +37,6 @@ import org.apache.samza.job.ApplicationStatus; import org.apache.samza.processor.StreamProcessor; import org.apache.samza.execution.LocalJobPlanner; import org.apache.samza.task.IdentityStreamTask; -import org.apache.samza.task.StreamTaskFactory; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -73,8 +72,9 @@ public class TestLocalApplicationRunner { final Map<String, String> cfgs = new HashMap<>(); cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName()); cfgs.put(JobConfig.JOB_NAME(), "test-task-job"); + cfgs.put(JobConfig.JOB_ID(), "jobId"); config = new MapConfig(cfgs); - mockApp = (TaskApplication) appDesc -> appDesc.setTaskFactory((StreamTaskFactory) () -> new IdentityStreamTask()); + mockApp = new LegacyTaskApplication(IdentityStreamTask.class.getName()); prepareTest(); StreamProcessor sp = mock(StreamProcessor.class); @@ -186,7 +186,8 @@ public class TestLocalApplicationRunner { } private void prepareTest() { - ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc = ApplicationDescriptorUtil.getAppDescriptor(mockApp, config); + ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc = + ApplicationDescriptorUtil.getAppDescriptor(mockApp, config); localPlanner = spy(new LocalJobPlanner(appDesc)); runner = spy(new LocalApplicationRunner(appDesc, localPlanner)); } http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java index ae525fb..702cbfb 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java @@ -124,7 +124,7 @@ public class TestRemoteApplicationRunner { @Override public ApplicationStatus getStatus() { - String jobId = c.getJobId().get(); + String jobId = c.getJobId(); switch (jobId) { case "newJob": return ApplicationStatus.New; http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala index 05d717a..3f5f11c 100644 --- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala +++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala @@ -35,7 +35,7 @@ class HdfsSystemFactory extends SystemFactory with Logging { def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = { val jobConfig = new JobConfig(config) val jobName = jobConfig.getName.getOrElse(throw new ConfigException("Missing job name.")) - val jobId = jobConfig.getJobId.getOrElse("1") + val jobId = jobConfig.getJobId val clientId = getClientId("samza-producer", jobName, jobId) val metrics = new HdfsSystemProducerMetrics(systemName, registry)
