Repository: samza Updated Branches: refs/heads/master fd7a57708 -> 8b3fe5d26
SAMZA-1439: Address late review feedback from SAMZA-1434 Author: Jacob Maes <[email protected]> Reviewers: Xinyu Liu <[email protected]> Closes #313 from jmakes/samza-1439 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8b3fe5d2 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8b3fe5d2 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8b3fe5d2 Branch: refs/heads/master Commit: 8b3fe5d2666a97ee5b3571a3788e8fefd23f55fd Parents: fd7a577 Author: Jacob Maes <[email protected]> Authored: Wed Oct 4 14:02:26 2017 -0700 Committer: Jacob Maes <[email protected]> Committed: Wed Oct 4 14:02:26 2017 -0700 ---------------------------------------------------------------------- .../org/apache/samza/system/TestStreamSpec.java | 55 ++++++++++++++++++++ .../samza/execution/ExecutionPlanner.java | 13 +++-- .../org/apache/samza/execution/StreamEdge.java | 3 +- .../samza/execution/TestExecutionPlanner.java | 42 +++++++++++---- .../system/hdfs/TestHdfsSystemConsumer.java | 10 ++-- .../samza/system/kafka/KafkaStreamSpec.java | 4 ++ .../samza/system/kafka/TestKafkaStreamSpec.java | 5 ++ 7 files changed, 112 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/8b3fe5d2/samza-api/src/test/java/org/apache/samza/system/TestStreamSpec.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/system/TestStreamSpec.java b/samza-api/src/test/java/org/apache/samza/system/TestStreamSpec.java new file mode 100644 index 0000000..8974877 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/system/TestStreamSpec.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system; + +import org.junit.Test; + +import static org.junit.Assert.*; + + +public class TestStreamSpec { + @Test + public void testBasicConstructor() { + StreamSpec streamSpec = new StreamSpec("dummyId", "dummyPhysicalName", "dummySystemName", 1); + + assertEquals("dummyId", streamSpec.getId()); + assertEquals("dummyPhysicalName", streamSpec.getPhysicalName()); + assertEquals("dummySystemName", streamSpec.getSystemName()); + assertEquals(1, streamSpec.getPartitionCount()); + + // SystemStream should use the physical name, not the streamId. + SystemStream systemStream = new SystemStream("dummySystemName", "dummyPhysicalName"); + assertEquals(systemStream, streamSpec.toSystemStream()); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidPartitionCount() { + new StreamSpec("dummyId", "dummyPhysicalName", "dummySystemName", -1); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidStreamId() { + new StreamSpec("dummy.Id", "dummyPhysicalName", "dummySystemName", 0); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidSystemName() { + new StreamSpec("dummyId", "dummyPhysicalName", "dummy.System.Name", 0); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8b3fe5d2/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java index 998ea1e..468aab9 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java +++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java @@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory; public class ExecutionPlanner { private static final Logger log = LoggerFactory.getLogger(ExecutionPlanner.class); - private static final int MAX_INFERRED_PARTITIONS = 256; + static final int MAX_INFERRED_PARTITIONS = 256; private final Config config; private final StreamManager streamManager; @@ -255,10 +255,17 @@ public class ExecutionPlanner { if (partitions < 0) { // use the following simple algo to figure out the partitions // partition = MAX(MAX(Input topic partitions), MAX(Output topic partitions)) - // partition will be further bounded by MAX_INFERRED_PARTITIONS. This is important when running in hadoop. + // partition will be further bounded by MAX_INFERRED_PARTITIONS. + // This is important when running in hadoop where an HDFS input can have lots of files (partitions). int maxInPartitions = maxPartition(jobGraph.getSources()); int maxOutPartitions = maxPartition(jobGraph.getSinks()); - partitions = Math.min(Math.max(maxInPartitions, maxOutPartitions), MAX_INFERRED_PARTITIONS); + partitions = Math.max(maxInPartitions, maxOutPartitions); + + if (partitions > MAX_INFERRED_PARTITIONS) { + partitions = MAX_INFERRED_PARTITIONS; + log.warn(String.format("Inferred intermediate stream partition count %d is greater than the max %d. Using the max.", + partitions, MAX_INFERRED_PARTITIONS)); + } } for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) { if (edge.getPartitionCount() <= 0) { http://git-wip-us.apache.org/repos/asf/samza/blob/8b3fe5d2/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java index 792fde5..62d85f1 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java +++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java @@ -82,8 +82,7 @@ public class StreamEdge { } SystemStream getSystemStream() { - StreamSpec spec = getStreamSpec(); - return new SystemStream(spec.getSystemName(), spec.getPhysicalName()); + return getStreamSpec().toSystemStream(); } String getFormattedSystemStream() { http://git-wip-us.apache.org/repos/asf/samza/blob/8b3fe5d2/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java index eb5ca7b..50b0a13 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java @@ -67,6 +67,7 @@ public class TestExecutionPlanner { private StreamSpec input1; private StreamSpec input2; private StreamSpec input3; + private StreamSpec input4; private StreamSpec output1; private StreamSpec output2; @@ -194,6 +195,7 @@ public class TestExecutionPlanner { input1 = new StreamSpec("input1", "input1", "system1"); input2 = new StreamSpec("input2", "input2", "system2"); input3 = new StreamSpec("input3", "input3", "system2"); + input4 = new StreamSpec("input4", "input4", "system1"); output1 = new StreamSpec("output1", "output1", "system1"); output2 = new StreamSpec("output2", "output2", "system2"); @@ -202,6 +204,7 @@ public class TestExecutionPlanner { Map<String, Integer> system1Map = new HashMap<>(); system1Map.put("input1", 64); system1Map.put("output1", 8); + system1Map.put("input4", ExecutionPlanner.MAX_INFERRED_PARTITIONS * 2); Map<String, Integer> system2Map = new HashMap<>(); system2Map.put("input2", 16); system2Map.put("input3", 32); @@ -218,6 +221,7 @@ public class TestExecutionPlanner { when(runner.getStreamSpec("input1")).thenReturn(input1); when(runner.getStreamSpec("input2")).thenReturn(input2); when(runner.getStreamSpec("input3")).thenReturn(input3); + when(runner.getStreamSpec("input4")).thenReturn(input4); when(runner.getStreamSpec("output1")).thenReturn(output1); when(runner.getStreamSpec("output2")).thenReturn(output2); @@ -316,10 +320,10 @@ public class TestExecutionPlanner { StreamGraphImpl streamGraph = createStreamGraphWithJoinAndWindow(); ExecutionPlan plan = planner.plan(streamGraph); List<JobConfig> jobConfigs = plan.getJobConfigs(); - assertEquals(jobConfigs.size(), 1); + assertEquals(1, jobConfigs.size()); // GCD of 8, 16, 1600 and 252 is 4 - assertEquals(jobConfigs.get(0).get(TaskConfig.WINDOW_MS()), "4"); + assertEquals("4", jobConfigs.get(0).get(TaskConfig.WINDOW_MS())); } @Test @@ -333,10 +337,10 @@ public class TestExecutionPlanner { StreamGraphImpl streamGraph = createStreamGraphWithJoinAndWindow(); ExecutionPlan plan = planner.plan(streamGraph); List<JobConfig> jobConfigs = plan.getJobConfigs(); - assertEquals(jobConfigs.size(), 1); + assertEquals(1, jobConfigs.size()); // GCD of 8, 16, 1600 and 252 is 4 - assertEquals(jobConfigs.get(0).get(TaskConfig.WINDOW_MS()), "4"); + assertEquals("4", jobConfigs.get(0).get(TaskConfig.WINDOW_MS())); } @@ -350,7 +354,7 @@ public class TestExecutionPlanner { StreamGraphImpl streamGraph = createSimpleGraph(); ExecutionPlan plan = planner.plan(streamGraph); List<JobConfig> jobConfigs = plan.getJobConfigs(); - assertEquals(jobConfigs.size(), 1); + assertEquals(1, jobConfigs.size()); assertFalse(jobConfigs.get(0).containsKey(TaskConfig.WINDOW_MS())); } @@ -365,8 +369,8 @@ public class TestExecutionPlanner { StreamGraphImpl streamGraph = createSimpleGraph(); ExecutionPlan plan = planner.plan(streamGraph); List<JobConfig> jobConfigs = plan.getJobConfigs(); - assertEquals(jobConfigs.size(), 1); - assertEquals(jobConfigs.get(0).get(TaskConfig.WINDOW_MS()), "2000"); + assertEquals(1, jobConfigs.size()); + assertEquals("2000", jobConfigs.get(0).get(TaskConfig.WINDOW_MS())); } @Test @@ -377,7 +381,7 @@ public class TestExecutionPlanner { // the partitions should be the same as input1 jobGraph.getIntermediateStreams().forEach(edge -> { - assertTrue(edge.getPartitionCount() == 64); // max of input1 and output1 + assertEquals(64, edge.getPartitionCount()); // max of input1 and output1 }); } @@ -394,9 +398,27 @@ public class TestExecutionPlanner { edge.setPartitionCount(16); edges.add(edge); - assertEquals(ExecutionPlanner.maxPartition(edges), 32); + assertEquals(32, ExecutionPlanner.maxPartition(edges)); edges = Collections.emptyList(); - assertEquals(ExecutionPlanner.maxPartition(edges), StreamEdge.PARTITIONS_UNKNOWN); + assertEquals(StreamEdge.PARTITIONS_UNKNOWN, ExecutionPlanner.maxPartition(edges)); + } + + @Test + public void testMaxPartitionLimit() throws Exception { + int partitionLimit = ExecutionPlanner.MAX_INFERRED_PARTITIONS; + + ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); + StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); + + MessageStream<KV<Object, Object>> input1 = streamGraph.getInputStream("input4"); + OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1"); + input1.partitionBy(m -> m.key, m -> m.value).map(kv -> kv).sendTo(output1); + JobGraph jobGraph = (JobGraph) planner.plan(streamGraph); + + // the partitions should be the same as input1 + jobGraph.getIntermediateStreams().forEach(edge -> { + assertEquals(partitionLimit, edge.getPartitionCount()); // max of input1 and output1 + }); } } http://git-wip-us.apache.org/repos/asf/samza/blob/8b3fe5d2/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java index 21afcb9..396b06b 100644 --- a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java +++ b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java @@ -107,11 +107,11 @@ public class TestHdfsSystemConsumer { // verify events read from consumer int eventsReceived = 0; int totalEvents = (NUM_EVENTS + 1) * NUM_FILES; // one "End of Stream" event in the end - int remainingRetires = 100; + int remainingRetries = 100; Map<SystemStreamPartition, List<IncomingMessageEnvelope>> overallResults = new HashMap<>(); - while (eventsReceived < totalEvents && remainingRetires > 0) { - remainingRetires--; - Map<SystemStreamPartition, List<IncomingMessageEnvelope>> result = systemConsumer.poll(systemStreamPartitionSet, 200); + while (eventsReceived < totalEvents && remainingRetries > 0) { + remainingRetries--; + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> result = systemConsumer.poll(systemStreamPartitionSet, 1000); for(SystemStreamPartition ssp : result.keySet()) { List<IncomingMessageEnvelope> messageEnvelopeList = result.get(ssp); overallResults.putIfAbsent(ssp, new ArrayList<>()); @@ -122,7 +122,7 @@ public class TestHdfsSystemConsumer { eventsReceived += messageEnvelopeList.size(); } } - Assert.assertEquals(eventsReceived, totalEvents); + Assert.assertEquals("Did not receive all the events. Retry counter = " + remainingRetries, totalEvents, eventsReceived); Assert.assertEquals(NUM_FILES, overallResults.size()); overallResults.values().forEach(messages -> { Assert.assertEquals(NUM_EVENTS + 1, messages.size()); http://git-wip-us.apache.org/repos/asf/samza/blob/8b3fe5d2/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java index fd53a45..a49c022 100644 --- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java @@ -151,6 +151,10 @@ public class KafkaStreamSpec extends StreamSpec { Properties properties) { super(id, topicName, systemName, partitionCount, false, propertiesToMap(properties)); + if (partitionCount < 1) { + throw new IllegalArgumentException("Parameter 'partitionCount' must be > 0"); + } + if (replicationFactor <= 0) { throw new IllegalArgumentException( String.format("Replication factor %d must be greater than 0.", replicationFactor)); http://git-wip-us.apache.org/repos/asf/samza/blob/8b3fe5d2/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java index 5612704..1758bf0 100644 --- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java @@ -57,4 +57,9 @@ public class TestKafkaStreamSpec { assertNull(kafkaConfig.get("replication.factor")); assertEquals("4", kafkaConfig.get("segment.bytes")); } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidPartitionCount() { + new KafkaStreamSpec("dummyId","dummyPhysicalName", "dummySystemName", 0); + } }
