Repository: incubator-apex-malhar Updated Branches: refs/heads/release-3.3 6ad18e8ca -> 040f8f777
APEXMALHAR-1970 #resolve #comment Fix the ArrayOutOfBoundaryException and add a bunch of tests for both one_to_one and one_to_many partition Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/cd183db8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/cd183db8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/cd183db8 Branch: refs/heads/release-3.3 Commit: cd183db8f4b6427dedb1bdda869c2a6b7042f430 Parents: 6ad18e8 Author: Siyuan Hua <[email protected]> Authored: Mon Jan 11 22:09:25 2016 -0800 Committer: Thomas Weise <[email protected]> Committed: Sat Feb 20 00:13:02 2016 -0800 ---------------------------------------------------------------------- .../malhar/kafka/AbstractKafkaPartitioner.java | 4 ++-- .../apex/malhar/kafka/OneToManyPartitioner.java | 4 ++-- .../malhar/kafka/KafkaInputOperatorTest.java | 23 ++++++++++++-------- 3 files changed, 18 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cd183db8/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java index e01c38e..c708145 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java @@ -259,13 +259,13 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa } PartitionMeta that = (PartitionMeta)o; return Objects.equals(cluster, that.cluster) && - Objects.equals(topicPartition, that.topicPartition); + Objects.equals(getTopicPartition(), that.getTopicPartition()); } @Override public int hashCode() { - return Objects.hash(cluster, topicPartition); + return Objects.hash(cluster, getTopicPartition()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cd183db8/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java index 16f4da4..bcd3073 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java @@ -54,8 +54,8 @@ public class OneToManyPartitioner extends AbstractKafkaPartitioner for (Map.Entry<String, List<PartitionInfo>> topicPartition : clusterMap.getValue().entrySet()) { for (PartitionInfo pif : topicPartition.getValue()) { int index = i++ % partitionCount; - if (eachPartitionAssignment.get(index) == null) { - eachPartitionAssignment.add(index, new HashSet<PartitionMeta>()); + if (index >= eachPartitionAssignment.size()) { + eachPartitionAssignment.add(new HashSet<PartitionMeta>()); } eachPartitionAssignment.get(index).add(new PartitionMeta(clusterMap.getKey(), topicPartition.getKey(), pif.partition())); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cd183db8/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java index 17bc465..d055555 100644 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java @@ -48,26 +48,30 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase private int totalBrokers = 0; + private String partition = null; - - @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}") - public static Collection<Boolean[]> testScenario() + @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}") + public static Collection<Object[]> testScenario() { - return Arrays.asList(new Boolean[][]{{true, false}, // multi cluster with single partition - {true, true}, // multi cluster with multi partitions - {false, true}, // single cluster with multi partitions - {false, false}, // single cluster with single partitions + return Arrays.asList(new Object[][]{{true, false, "one_to_one"},// multi cluster with single partition + {true, false, "one_to_many"}, + {true, true, "one_to_one"},// multi cluster with multi partitions + {true, true, "one_to_many"}, + {false, true, "one_to_one"}, // single cluster with multi partitions + {false, true, "one_to_many"}, + {false, false, "one_to_one"}, // single cluster with single partitions + {false, false, "one_to_many"} }); } - public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition) + public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition, String partition) { // This class want to initialize several kafka brokers for multiple partitions this.hasMultiCluster = hasMultiCluster; this.hasMultiPartition = hasMultiPartition; int cluster = 1 + (hasMultiCluster ? 1 : 0); totalBrokers = (1 + (hasMultiPartition ? 1 : 0)) * cluster; - + this.partition = partition; } private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class); @@ -167,6 +171,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase node.setTopics(TEST_TOPIC); node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name()); node.setClusters(getClusterConfig()); + node.setStrategy(partition); // Create Test tuple collector CollectorModule collector = dag.addOperator("TestMessageCollector", new CollectorModule());
