Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master 79c643b95 -> 968ff3c2f


APEXMALHAR-2084: Fixed getters and setters for kafka clusters and topics


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/968ff3c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/968ff3c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/968ff3c2

Branch: refs/heads/master
Commit: 968ff3c2f52e4f1325d4db918559d8a9574d49fc
Parents: 79c643b
Author: bhupesh <bhupeshcha...@gmail.com>
Authored: Tue May 10 17:36:14 2016 -0700
Committer: bhupesh <bhupeshcha...@gmail.com>
Committed: Wed May 11 14:49:50 2016 -0700

----------------------------------------------------------------------
 .../kafka/AbstractKafkaInputOperator.java       | 43 ++++++++++++--------
 .../malhar/kafka/AbstractKafkaPartitioner.java  | 18 ++++----
 .../apache/apex/malhar/kafka/KafkaMetrics.java  | 11 ++---
 .../apex/malhar/kafka/OneToManyPartitioner.java |  2 +-
 .../apex/malhar/kafka/OneToOnePartitioner.java  |  2 +-
 .../malhar/kafka/KafkaInputOperatorTest.java    | 18 +++++---
 6 files changed, 56 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/968ff3c2/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
index 3e709eb..0e31fe8 100644
--- 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
+++ 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
@@ -19,6 +19,7 @@
 package org.apache.apex.malhar.kafka;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -44,8 +45,6 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.TopicPartition;
 
 import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
 
 import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.Context;
@@ -88,10 +87,10 @@ public abstract class AbstractKafkaInputOperator implements 
InputOperator, Opera
   }
 
   @NotNull
-  private String[] clusters;
+  private List<String> clusters;
 
   @NotNull
-  private String[] topics;
+  private List<String> topics;
 
   /**
    *  offset track for checkpoint
@@ -413,33 +412,45 @@ public abstract class AbstractKafkaInputOperator 
implements InputOperator, Opera
     return initialPartitionCount;
   }
 
-  public void setClusters(String clusters)
+  /**
+   * The list of host:port pairs for establishing connection with the kafka 
cluster
+   * @param clusters
+   */
+  public void setClusters(List<String> clusters)
   {
-    this.clusters = clusters.split(";");
+    this.clusters = clusters;
   }
 
   /**
-   *  Same setting as bootstrap.servers property to KafkaConsumer
-   *  refer to http://kafka.apache.org/documentation.html#newconsumerconfigs
-   *  To support multi cluster, you can have multiple bootstrap.servers 
separated by ";"
+   * The list of host:port pairs for establishing connection with the kafka 
cluster
    */
-  public String getClusters()
+  public List<String> getClusters()
   {
-    return Joiner.on(';').join(clusters);
+    return clusters;
   }
 
-  public void setTopics(String topics)
+  /**
+   * The list of topics to be consumed by the operator
+   * Topic name can only contain ASCII alphanumerics, '.', '_' and '-'
+   * @param topics
+   */
+  public void setTopics(List<String> topics)
   {
-    this.topics = 
Iterables.toArray(Splitter.on(',').trimResults().omitEmptyStrings().split(topics),
 String.class);
+    this.topics = new ArrayList<>();
+    for(String topic: topics) {
+      if(topic != null && topic.length() > 0) {
+        topics.add(topic);
+      }
+    }
   }
 
   /**
-   * The topics the operator consumes, separate by','
+   * The list of topics the operator consumes
    * Topic name can only contain ASCII alphanumerics, '.', '_' and '-'
    */
-  public String getTopics()
+  public List<String> getTopics()
   {
-    return Joiner.on(", ").join(topics);
+    return topics;
   }
 
   public void setStrategy(String policy)

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/968ff3c2/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
index 7bb8585..ec4a7ce 100644
--- 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
+++ 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -62,9 +62,9 @@ public abstract class AbstractKafkaPartitioner implements 
Partitioner<AbstractKa
 
   private static final String META_CONSUMER_GROUP_NAME = 
AbstractKafkaInputOperator.class.getName() + "META_GROUP";
 
-  protected final String[] clusters;
+  protected final List<String> clusters;
 
-  protected final String[] topics;
+  protected final List<String> topics;
 
   protected final AbstractKafkaInputOperator prototypeOperator;
 
@@ -73,7 +73,7 @@ public abstract class AbstractKafkaPartitioner implements 
Partitioner<AbstractKa
 
   private final List<Set<AbstractKafkaPartitioner.PartitionMeta>> 
currentPartitions = new LinkedList<>(); // prevent null
 
-  public AbstractKafkaPartitioner(String[] clusters, String[] topics, 
AbstractKafkaInputOperator prototypeOperator)
+  public AbstractKafkaPartitioner(List<String> clusters, List<String> topics, 
AbstractKafkaInputOperator prototypeOperator)
   {
     this.clusters = clusters;
     this.topics = topics;
@@ -93,14 +93,14 @@ public abstract class AbstractKafkaPartitioner implements 
Partitioner<AbstractKa
     Map<String, Map<String, List<PartitionInfo>>> metadata = new HashMap<>();
 
 
-    for (int i = 0; i < clusters.length; i++) {
-      metadata.put(clusters[i], new HashMap<String, List<PartitionInfo>>());
+    for (int i = 0; i < clusters.size(); i++) {
+      metadata.put(clusters.get(i), new HashMap<String, 
List<PartitionInfo>>());
       for (String topic : topics) {
         List<PartitionInfo> ptis = 
metadataRefreshClients.get(i).partitionsFor(topic);
         if (logger.isDebugEnabled()) {
           logger.debug("Partition metadata for topic {} : {}", topic, 
Joiner.on(';').join(ptis));
         }
-        metadata.get(clusters[i]).put(topic, ptis);
+        metadata.get(clusters.get(i)).put(topic, ptis);
       }
       metadataRefreshClients.get(i).close();
     }
@@ -169,16 +169,16 @@ public abstract class AbstractKafkaPartitioner implements 
Partitioner<AbstractKa
    */
   private void initMetadataClients()
   {
-    if (metadataRefreshClients != null && metadataRefreshClients.size() == 
clusters.length) {
+    if (metadataRefreshClients != null && metadataRefreshClients.size() == 
clusters.size()) {
       // The metadata client is active
       return;
     }
 
-    if (clusters == null || clusters.length == 0) {
+    if (clusters == null || clusters.size() == 0) {
       throw new IllegalStateException("clusters can not be null");
     }
 
-    metadataRefreshClients = new ArrayList<>(clusters.length);
+    metadataRefreshClients = new ArrayList<>(clusters.size());
     int index = 0;
     for (String c : clusters) {
       Properties prop = new Properties();

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/968ff3c2/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java
index 75449a1..9dfcf82 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java
@@ -21,6 +21,7 @@ package org.apache.apex.malhar.kafka;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceStability;
@@ -48,7 +49,7 @@ public class KafkaMetrics implements Serializable
     this.metricsRefreshInterval = metricsRefreshInterval;
   }
 
-  void updateMetrics(String[] clusters, Map<String, Map<MetricName, ? extends 
Metric>> metricsMap)
+  void updateMetrics(List<String> clusters, Map<String, Map<MetricName, ? 
extends Metric>> metricsMap)
   {
     long current = System.currentTimeMillis();
     if (current - lastMetricSampleTime < metricsRefreshInterval) {
@@ -58,15 +59,15 @@ public class KafkaMetrics implements Serializable
     lastMetricSampleTime = current;
 
     if (stats == null) {
-      stats = new KafkaConsumerStats[clusters.length];
+      stats = new KafkaConsumerStats[clusters.size()];
     }
 
-    for (int i = 0; i < clusters.length; i++) {
+    for (int i = 0; i < clusters.size(); i++) {
       if (stats[i] == null) {
         stats[i] = new KafkaConsumerStats();
-        stats[i].cluster = clusters[i];
+        stats[i].cluster = clusters.get(i);
       }
-      Map<MetricName, ? extends Metric> cMetrics = metricsMap.get(clusters[i]);
+      Map<MetricName, ? extends Metric> cMetrics = 
metricsMap.get(clusters.get(i));
       if (cMetrics == null || cMetrics.isEmpty()) {
         stats[i].bytesPerSec = 0;
         stats[i].msgsPerSec = 0;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/968ff3c2/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
index 3b4d3f3..8e57415 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
@@ -37,7 +37,7 @@ import org.apache.kafka.common.PartitionInfo;
 public class OneToManyPartitioner extends AbstractKafkaPartitioner
 {
 
-  public OneToManyPartitioner(String[] clusters, String[] topics, 
AbstractKafkaInputOperator protoTypeOperator)
+  public OneToManyPartitioner(List<String> clusters, List<String> topics, 
AbstractKafkaInputOperator protoTypeOperator)
   {
     super(clusters, topics, protoTypeOperator);
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/968ff3c2/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
index 570bdea..7b4df75 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
@@ -38,7 +38,7 @@ import com.google.common.collect.Sets;
 public class OneToOnePartitioner extends AbstractKafkaPartitioner
 {
 
-  public OneToOnePartitioner(String[] clusters, String[] topics, 
AbstractKafkaInputOperator prototypeOperator)
+  public OneToOnePartitioner(List<String> clusters, List<String> topics, 
AbstractKafkaInputOperator prototypeOperator)
   {
     super(clusters, topics, prototypeOperator);
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/968ff3c2/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java 
b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
index ede7f38..2410f29 100644
--- 
a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
+++ 
b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.apex.malhar.kafka;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -250,8 +251,12 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
     KafkaSinglePortInputOperator node = dag.addOperator("Kafka input", 
KafkaSinglePortInputOperator.class);
     node.setInitialPartitionCount(1);
     // set topic
-    node.setTopics(testName);
+    List<String> topics = new ArrayList<>();
+    topics.add(testName);
+    node.setTopics(topics);
     
node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
+    List<String> clusters = new ArrayList<>();
+    topics.add(testName);
     node.setClusters(getClusterConfig());
     node.setStrategy(partition);
     if(idempotent) {
@@ -306,12 +311,13 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
     operator.setMaxTuplesPerWindow(500);
   }
 
-  private String getClusterConfig() {
+  private List<String> getClusterConfig() {
     String l = "localhost:";
-    return l + TEST_KAFKA_BROKER_PORT[0][0] +
-      (hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") +
-      (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") +
-      (hasMultiCluster && hasMultiPartition ? "," + l  + 
TEST_KAFKA_BROKER_PORT[1][1] : "");
+    String clustersDelimited = l + TEST_KAFKA_BROKER_PORT[0][0] +
+        (hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") +
+        (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") +
+        (hasMultiCluster && hasMultiPartition ? "," + l  + 
TEST_KAFKA_BROKER_PORT[1][1] : "");
+    return Arrays.asList(clustersDelimited.split(";"));
   }
 
 

Reply via email to