SAMZA-797: Fix parsing errors in broadcast stream config values
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6f238327 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6f238327 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6f238327 Branch: refs/heads/samza-sql Commit: 6f23832731ac08cfbe0a9fa412edd8ff0f70775f Parents: 775d791 Author: Navina Ramesh <[email protected]> Authored: Thu Oct 22 16:44:06 2015 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Thu Oct 22 16:52:13 2015 -0700 ---------------------------------------------------------------------- .../org/apache/samza/config/TaskConfigJava.java | 48 +++++++++++--------- .../apache/samza/config/TestTaskConfigJava.java | 14 +++++- 2 files changed, 39 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/6f238327/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java index 015e994..8acb6ca 100644 --- a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java +++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java @@ -34,8 +34,8 @@ import org.slf4j.LoggerFactory; public class TaskConfigJava extends MapConfig { // broadcast streams consumed by all tasks. e.g. kafka.foo#1 public static final String BROADCAST_INPUT_STREAMS = "task.broadcast.inputs"; - private static final String BROADCAST_STREAM_PATTERN = "[^#\\.]+\\.[^#\\.]+#[\\d]+"; - private static final String BROADCAST_STREAM_RANGE_PATTERN = "[^#\\.]+\\.[^#\\.]+#\\[[\\d]+\\-[\\d]+\\]+"; + private static final String BROADCAST_STREAM_PATTERN = "^[\\d]+$"; + private static final String BROADCAST_STREAM_RANGE_PATTERN = "^\\[[\\d]+\\-[\\d]+\\]$"; public static final Logger LOGGER = LoggerFactory.getLogger(TaskConfigJava.class); @@ -55,28 +55,32 @@ public class TaskConfigJava extends MapConfig { List<String> systemStreamPartitions = getList(BROADCAST_INPUT_STREAMS); for (String systemStreamPartition : systemStreamPartitions) { - if (Pattern.matches(BROADCAST_STREAM_PATTERN, systemStreamPartition)) { - - int hashPosition = systemStreamPartition.indexOf("#"); - SystemStream systemStream = Util.getSystemStreamFromNames(systemStreamPartition.substring(0, hashPosition)); - systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, new Partition(Integer.valueOf(systemStreamPartition.substring(hashPosition + 1))))); - - } else if (Pattern.matches(BROADCAST_STREAM_RANGE_PATTERN, systemStreamPartition)) { - - SystemStream systemStream = Util.getSystemStreamFromNames(systemStreamPartition.substring(0, systemStreamPartition.indexOf("#"))); - - int startingPartition = Integer.valueOf(systemStreamPartition.substring(systemStreamPartition.indexOf("[") + 1, systemStreamPartition.lastIndexOf("-"))); - int endingPartition = Integer.valueOf(systemStreamPartition.substring(systemStreamPartition.lastIndexOf("-") + 1, systemStreamPartition.indexOf("]"))); - - if (startingPartition > endingPartition) { - LOGGER.warn("The starting partition in stream " + systemStream.toString() + " is bigger than the ending Partition. No partition is added"); - } - for (int i = startingPartition; i <= endingPartition; i++) { - systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, new Partition(i))); - } - } else { + int hashPosition = systemStreamPartition.indexOf("#"); + if (hashPosition == -1) { throw new IllegalArgumentException("incorrect format in " + systemStreamPartition + ". Broadcast stream names should be in the form 'system.stream#partitionId' or 'system.stream#[partitionN-partitionM]'"); + } else { + String systemStreamName = systemStreamPartition.substring(0, hashPosition); + String partitionSegment = systemStreamPartition.substring(hashPosition + 1); + SystemStream systemStream = Util.getSystemStreamFromNames(systemStreamName); + + if (Pattern.matches(BROADCAST_STREAM_PATTERN, partitionSegment)) { + systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, new Partition(Integer.valueOf(partitionSegment)))); + } else { + if (Pattern.matches(BROADCAST_STREAM_RANGE_PATTERN, partitionSegment)) { + int partitionStart = Integer.valueOf(partitionSegment.substring(1, partitionSegment.lastIndexOf("-"))); + int partitionEnd = Integer.valueOf(partitionSegment.substring(partitionSegment.lastIndexOf("-") + 1, partitionSegment.indexOf("]"))); + if (partitionStart > partitionEnd) { + LOGGER.warn("The starting partition in stream " + systemStream.toString() + " is bigger than the ending Partition. No partition is added"); + } + for (int i = partitionStart; i <= partitionEnd; i++) { + systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, new Partition(i))); + } + } else { + throw new IllegalArgumentException("incorrect format in " + systemStreamPartition + + ". Broadcast stream names should be in the form 'system.stream#partitionId' or 'system.stream#[partitionN-partitionM]'"); + } + } } } return systemStreamPartitionSet; http://git-wip-us.apache.org/repos/asf/samza/blob/6f238327/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java index 2d6060e..878ca01 100644 --- a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java +++ b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java @@ -34,7 +34,7 @@ public class TestTaskConfigJava { @Test public void testGetBroadcastSystemStreamPartitions() { HashMap<String, String> map = new HashMap<String, String>(); - map.put("task.broadcast.inputs", "kafka.foo#4, kafka.boo#5, kafka.z-o-o#[12-14]"); + map.put("task.broadcast.inputs", "kafka.foo#4, kafka.boo#5, kafka.z-o-o#[12-14], kafka.foo.bar#[3-4]"); Config config = new MapConfig(map); TaskConfigJava taskConfig = new TaskConfigJava(config); @@ -46,6 +46,8 @@ public class TestTaskConfigJava { expected.add(new SystemStreamPartition("kafka", "z-o-o", new Partition(12))); expected.add(new SystemStreamPartition("kafka", "z-o-o", new Partition(13))); expected.add(new SystemStreamPartition("kafka", "z-o-o", new Partition(14))); + expected.add(new SystemStreamPartition("kafka", "foo.bar", new Partition(3))); + expected.add(new SystemStreamPartition("kafka", "foo.bar", new Partition(4))); assertEquals(expected, systemStreamPartitionSet); map.put("task.broadcast.inputs", "kafka.foo"); @@ -57,5 +59,15 @@ public class TestTaskConfigJava { catchCorrectException = true; } assertTrue(catchCorrectException); + + map.put("task.broadcast.inputs", "kafka.org.apache.events.WhitelistedIps#1-2"); + taskConfig = new TaskConfigJava(new MapConfig(map)); + boolean invalidFormatException = false; + try { + taskConfig.getBroadcastSystemStreamPartitions(); + } catch (IllegalArgumentException e) { + invalidFormatException = true; + } + assertTrue(invalidFormatException); } }
