Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 8b37ee372 -> 9ba99b0ca
APEXMALHAR-1970 #resolve #comment Fix the ArrayOutOfBoundaryException and add a bunch of tests for both one_to_one and one_to_many partition Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b431eb34 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b431eb34 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b431eb34 Branch: refs/heads/devel-3 Commit: b431eb34f497f113b30158ae2033be3318de28bc Parents: 04c9f52 Author: Siyuan Hua <[email protected]> Authored: Mon Jan 11 22:09:25 2016 -0800 Committer: Siyuan Hua <[email protected]> Committed: Mon Jan 11 22:09:25 2016 -0800 ---------------------------------------------------------------------- .../malhar/kafka/AbstractKafkaPartitioner.java | 4 ++-- .../apex/malhar/kafka/OneToManyPartitioner.java | 4 ++-- .../malhar/kafka/KafkaInputOperatorTest.java | 23 ++++++++++++-------- 3 files changed, 18 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b431eb34/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java index 0fdd721..2159e4f 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java @@ -257,13 +257,13 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa } PartitionMeta that = (PartitionMeta)o; return Objects.equals(cluster, that.cluster) && - Objects.equals(topicPartition, that.topicPartition); + Objects.equals(getTopicPartition(), that.getTopicPartition()); } @Override public int hashCode() { - return Objects.hash(cluster, topicPartition); + return Objects.hash(cluster, getTopicPartition()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b431eb34/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java index 09d22eb..736727e 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java @@ -52,8 +52,8 @@ public class OneToManyPartitioner extends AbstractKafkaPartitioner for (Map.Entry<String, List<PartitionInfo>> topicPartition : clusterMap.getValue().entrySet()) { for (PartitionInfo pif : topicPartition.getValue()) { int index = i++ % partitionCount; - if (eachPartitionAssignment.get(index) == null) { - eachPartitionAssignment.add(index, new HashSet<PartitionMeta>()); + if (index >= eachPartitionAssignment.size()) { + eachPartitionAssignment.add(new HashSet<PartitionMeta>()); } eachPartitionAssignment.get(index).add(new PartitionMeta(clusterMap.getKey(), topicPartition.getKey(), pif.partition())); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b431eb34/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java index 17bc465..d055555 100644 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java @@ -48,26 +48,30 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase private int totalBrokers = 0; + private String partition = null; - - @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}") - public static Collection<Boolean[]> testScenario() + @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}") + public static Collection<Object[]> testScenario() { - return Arrays.asList(new Boolean[][]{{true, false}, // multi cluster with single partition - {true, true}, // multi cluster with multi partitions - {false, true}, // single cluster with multi partitions - {false, false}, // single cluster with single partitions + return Arrays.asList(new Object[][]{{true, false, "one_to_one"},// multi cluster with single partition + {true, false, "one_to_many"}, + {true, true, "one_to_one"},// multi cluster with multi partitions + {true, true, "one_to_many"}, + {false, true, "one_to_one"}, // single cluster with multi partitions + {false, true, "one_to_many"}, + {false, false, "one_to_one"}, // single cluster with single partitions + {false, false, "one_to_many"} }); } - public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition) + public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition, String partition) { // This class want to initialize several kafka brokers for multiple partitions this.hasMultiCluster = hasMultiCluster; this.hasMultiPartition = hasMultiPartition; int cluster = 1 + (hasMultiCluster ? 1 : 0); totalBrokers = (1 + (hasMultiPartition ? 1 : 0)) * cluster; - + this.partition = partition; } private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class); @@ -167,6 +171,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase node.setTopics(TEST_TOPIC); node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name()); node.setClusters(getClusterConfig()); + node.setStrategy(partition); // Create Test tuple collector CollectorModule collector = dag.addOperator("TestMessageCollector", new CollectorModule());
