APEXMALHAR-1983 #resolve #comment split string in setter
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/0627199c Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/0627199c Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/0627199c Branch: refs/heads/master Commit: 0627199c8d3d299dfaf51e21d31859ff6322cde6 Parents: 9e77ef7 Author: Siyuan Hua <[email protected]> Authored: Fri Jan 29 21:22:30 2016 -0800 Committer: Siyuan Hua <[email protected]> Committed: Fri Jan 29 21:22:30 2016 -0800 ---------------------------------------------------------------------- .../malhar/kafka/AbstractKafkaInputOperator.java | 15 ++++++++++----- .../apex/malhar/kafka/AbstractKafkaPartitioner.java | 4 +++- .../apex/malhar/kafka/KafkaConsumerWrapper.java | 2 ++ .../org/apache/apex/malhar/kafka/KafkaMetrics.java | 2 ++ .../org/apache/apex/malhar/kafka/KafkaPartition.java | 3 +++ .../apex/malhar/kafka/OneToManyPartitioner.java | 2 ++ .../apex/malhar/kafka/OneToOnePartitioner.java | 2 ++ .../apache/apex/malhar/kafka/PartitionStrategy.java | 3 +++ 8 files changed, 27 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0627199c/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java index 512f058..89104a3 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java @@ -34,12 +34,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; import com.datatorrent.api.AutoMetric; import com.datatorrent.api.Context; @@ -66,6 +69,7 @@ import com.datatorrent.api.StatsListener; * * @since 3.3.0 */ [email protected] public abstract class AbstractKafkaInputOperator implements InputOperator, Operator.ActivationListener<Context.OperatorContext>, Operator.CheckpointListener, Partitioner<AbstractKafkaInputOperator>, StatsListener, OffsetCommitCallback { @@ -366,17 +370,18 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera return Joiner.on(';').join(clusters); } - public void setTopics(String... topics) + public void setTopics(String topics) { - this.topics = topics; + this.topics = Iterables.toArray(Splitter.on(',').trimResults().omitEmptyStrings().split(topics), String.class); } /** - * The topics the operator consumes + * The topics the operator consumes, separate by',' + * Topic name can only contain ASCII alphanumerics, '.', '_' and '-' */ - public String[] getTopics() + public String getTopics() { - return topics; + return Joiner.on(", ").join(topics); } public void setStrategy(String policy) http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0627199c/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java index 53bbd2a..01907e4 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java @@ -33,6 +33,7 @@ import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -56,6 +57,7 @@ import com.datatorrent.api.StatsListener; * * @since 3.3.0 */ [email protected] public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKafkaInputOperator>, StatsListener { @@ -274,7 +276,7 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa { return "PartitionMeta{" + "cluster='" + cluster + '\'' + - ", topicPartition=" + topicPartition + + ", topicPartition=" + getTopicPartition() + '}'; } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0627199c/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java index cac2ad2..7a1211a 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -63,6 +64,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * * @since 3.3.0 */ [email protected] public class KafkaConsumerWrapper implements Closeable { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0627199c/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java index 12a375d..75449a1 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; @@ -33,6 +34,7 @@ import com.datatorrent.api.AutoMetric; * * @since 3.3.0 */ [email protected] public class KafkaMetrics implements Serializable { private KafkaConsumerStats[] stats; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0627199c/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java index 4a4ebf3..1646ffe 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java @@ -21,9 +21,12 @@ package org.apache.apex.malhar.kafka; import java.io.Serializable; +import org.apache.hadoop.classification.InterfaceStability; + /** * @since 2.1.0 */ [email protected] public class KafkaPartition implements Serializable { protected static final String DEFAULT_CLUSTERID = "com.datatorrent.contrib.kafka.defaultcluster"; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0627199c/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java index bcd3073..3b4d3f3 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.kafka.common.PartitionInfo; /** @@ -32,6 +33,7 @@ import org.apache.kafka.common.PartitionInfo; * * @since 3.3.0 */ [email protected] public class OneToManyPartitioner extends AbstractKafkaPartitioner { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0627199c/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java index b787932..570bdea 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.kafka.common.PartitionInfo; import com.google.common.collect.Sets; @@ -33,6 +34,7 @@ import com.google.common.collect.Sets; * * @since 3.3.0 */ [email protected] public class OneToOnePartitioner extends AbstractKafkaPartitioner { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0627199c/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java index aaa35ec..7c142c5 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java @@ -18,9 +18,12 @@ */ package org.apache.apex.malhar.kafka; +import org.apache.hadoop.classification.InterfaceStability; + /** * @since 3.3.0 */ [email protected] public enum PartitionStrategy { /**
