Repository: incubator-apex-malhar Updated Branches: refs/heads/master 79c643b95 -> 968ff3c2f
APEXMALHAR-2084: Fixed getters and setters for kafka clusters and topics Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/968ff3c2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/968ff3c2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/968ff3c2 Branch: refs/heads/master Commit: 968ff3c2f52e4f1325d4db918559d8a9574d49fc Parents: 79c643b Author: bhupesh <bhupeshcha...@gmail.com> Authored: Tue May 10 17:36:14 2016 -0700 Committer: bhupesh <bhupeshcha...@gmail.com> Committed: Wed May 11 14:49:50 2016 -0700 ---------------------------------------------------------------------- .../kafka/AbstractKafkaInputOperator.java | 43 ++++++++++++-------- .../malhar/kafka/AbstractKafkaPartitioner.java | 18 ++++---- .../apache/apex/malhar/kafka/KafkaMetrics.java | 11 ++--- .../apex/malhar/kafka/OneToManyPartitioner.java | 2 +- .../apex/malhar/kafka/OneToOnePartitioner.java | 2 +- .../malhar/kafka/KafkaInputOperatorTest.java | 18 +++++--- 6 files changed, 56 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/968ff3c2/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java index 3e709eb..0e31fe8 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java @@ -19,6 +19,7 @@ package org.apache.apex.malhar.kafka; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; @@ -44,8 +45,6 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; import com.google.common.base.Joiner; -import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; import com.datatorrent.api.AutoMetric; import com.datatorrent.api.Context; @@ -88,10 +87,10 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera } @NotNull - private String[] clusters; + private List<String> clusters; @NotNull - private String[] topics; + private List<String> topics; /** * offset track for checkpoint @@ -413,33 +412,45 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera return initialPartitionCount; } - public void setClusters(String clusters) + /** + * The list of host:port pairs for establishing connection with the kafka cluster + * @param clusters + */ + public void setClusters(List<String> clusters) { - this.clusters = clusters.split(";"); + this.clusters = clusters; } /** - * Same setting as bootstrap.servers property to KafkaConsumer - * refer to http://kafka.apache.org/documentation.html#newconsumerconfigs - * To support multi cluster, you can have multiple bootstrap.servers separated by ";" + * The list of host:port pairs for establishing connection with the kafka cluster */ - public String getClusters() + public List<String> getClusters() { - return Joiner.on(';').join(clusters); + return clusters; } - public void setTopics(String topics) + /** + * The list of topics to be consumed by the operator + * Topic name can only contain ASCII alphanumerics, '.', '_' and '-' + * @param topics + */ + public void setTopics(List<String> topics) { - this.topics = Iterables.toArray(Splitter.on(',').trimResults().omitEmptyStrings().split(topics), String.class); + this.topics = new ArrayList<>(); + for(String topic: topics) { + if(topic != null && topic.length() > 0) { + topics.add(topic); + } + } } /** - * The topics the operator consumes, separate by',' + * The list of topics the operator consumes * Topic name can only contain ASCII alphanumerics, '.', '_' and '-' */ - public String getTopics() + public List<String> getTopics() { - return Joiner.on(", ").join(topics); + return topics; } public void setStrategy(String policy) http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/968ff3c2/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java index 7bb8585..ec4a7ce 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java @@ -62,9 +62,9 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa private static final String META_CONSUMER_GROUP_NAME = AbstractKafkaInputOperator.class.getName() + "META_GROUP"; - protected final String[] clusters; + protected final List<String> clusters; - protected final String[] topics; + protected final List<String> topics; protected final AbstractKafkaInputOperator prototypeOperator; @@ -73,7 +73,7 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa private final List<Set<AbstractKafkaPartitioner.PartitionMeta>> currentPartitions = new LinkedList<>(); // prevent null - public AbstractKafkaPartitioner(String[] clusters, String[] topics, AbstractKafkaInputOperator prototypeOperator) + public AbstractKafkaPartitioner(List<String> clusters, List<String> topics, AbstractKafkaInputOperator prototypeOperator) { this.clusters = clusters; this.topics = topics; @@ -93,14 +93,14 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa Map<String, Map<String, List<PartitionInfo>>> metadata = new HashMap<>(); - for (int i = 0; i < clusters.length; i++) { - metadata.put(clusters[i], new HashMap<String, List<PartitionInfo>>()); + for (int i = 0; i < clusters.size(); i++) { + metadata.put(clusters.get(i), new HashMap<String, List<PartitionInfo>>()); for (String topic : topics) { List<PartitionInfo> ptis = metadataRefreshClients.get(i).partitionsFor(topic); if (logger.isDebugEnabled()) { logger.debug("Partition metadata for topic {} : {}", topic, Joiner.on(';').join(ptis)); } - metadata.get(clusters[i]).put(topic, ptis); + metadata.get(clusters.get(i)).put(topic, ptis); } metadataRefreshClients.get(i).close(); } @@ -169,16 +169,16 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa */ private void initMetadataClients() { - if (metadataRefreshClients != null && metadataRefreshClients.size() == clusters.length) { + if (metadataRefreshClients != null && metadataRefreshClients.size() == clusters.size()) { // The metadata client is active return; } - if (clusters == null || clusters.length == 0) { + if (clusters == null || clusters.size() == 0) { throw new IllegalStateException("clusters can not be null"); } - metadataRefreshClients = new ArrayList<>(clusters.length); + metadataRefreshClients = new ArrayList<>(clusters.size()); int index = 0; for (String c : clusters) { Properties prop = new Properties(); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/968ff3c2/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java index 75449a1..9dfcf82 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java @@ -21,6 +21,7 @@ package org.apache.apex.malhar.kafka; import java.io.Serializable; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.hadoop.classification.InterfaceStability; @@ -48,7 +49,7 @@ public class KafkaMetrics implements Serializable this.metricsRefreshInterval = metricsRefreshInterval; } - void updateMetrics(String[] clusters, Map<String, Map<MetricName, ? extends Metric>> metricsMap) + void updateMetrics(List<String> clusters, Map<String, Map<MetricName, ? extends Metric>> metricsMap) { long current = System.currentTimeMillis(); if (current - lastMetricSampleTime < metricsRefreshInterval) { @@ -58,15 +59,15 @@ public class KafkaMetrics implements Serializable lastMetricSampleTime = current; if (stats == null) { - stats = new KafkaConsumerStats[clusters.length]; + stats = new KafkaConsumerStats[clusters.size()]; } - for (int i = 0; i < clusters.length; i++) { + for (int i = 0; i < clusters.size(); i++) { if (stats[i] == null) { stats[i] = new KafkaConsumerStats(); - stats[i].cluster = clusters[i]; + stats[i].cluster = clusters.get(i); } - Map<MetricName, ? extends Metric> cMetrics = metricsMap.get(clusters[i]); + Map<MetricName, ? extends Metric> cMetrics = metricsMap.get(clusters.get(i)); if (cMetrics == null || cMetrics.isEmpty()) { stats[i].bytesPerSec = 0; stats[i].msgsPerSec = 0; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/968ff3c2/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java index 3b4d3f3..8e57415 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java @@ -37,7 +37,7 @@ import org.apache.kafka.common.PartitionInfo; public class OneToManyPartitioner extends AbstractKafkaPartitioner { - public OneToManyPartitioner(String[] clusters, String[] topics, AbstractKafkaInputOperator protoTypeOperator) + public OneToManyPartitioner(List<String> clusters, List<String> topics, AbstractKafkaInputOperator protoTypeOperator) { super(clusters, topics, protoTypeOperator); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/968ff3c2/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java index 570bdea..7b4df75 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java @@ -38,7 +38,7 @@ import com.google.common.collect.Sets; public class OneToOnePartitioner extends AbstractKafkaPartitioner { - public OneToOnePartitioner(String[] clusters, String[] topics, AbstractKafkaInputOperator prototypeOperator) + public OneToOnePartitioner(List<String> clusters, List<String> topics, AbstractKafkaInputOperator prototypeOperator) { super(clusters, topics, prototypeOperator); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/968ff3c2/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java index ede7f38..2410f29 100644 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java @@ -19,6 +19,7 @@ package org.apache.apex.malhar.kafka; import java.io.File; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -250,8 +251,12 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase KafkaSinglePortInputOperator node = dag.addOperator("Kafka input", KafkaSinglePortInputOperator.class); node.setInitialPartitionCount(1); // set topic - node.setTopics(testName); + List<String> topics = new ArrayList<>(); + topics.add(testName); + node.setTopics(topics); node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name()); + List<String> clusters = new ArrayList<>(); + topics.add(testName); node.setClusters(getClusterConfig()); node.setStrategy(partition); if(idempotent) { @@ -306,12 +311,13 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase operator.setMaxTuplesPerWindow(500); } - private String getClusterConfig() { + private List<String> getClusterConfig() { String l = "localhost:"; - return l + TEST_KAFKA_BROKER_PORT[0][0] + - (hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") + - (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") + - (hasMultiCluster && hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[1][1] : ""); + String clustersDelimited = l + TEST_KAFKA_BROKER_PORT[0][0] + + (hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") + + (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") + + (hasMultiCluster && hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[1][1] : ""); + return Arrays.asList(clustersDelimited.split(";")); }