APEXMALHAR-1904 #comment
  Add new KafkaInputOperator using kafka 0.9.x consumer API
  Features includes:
      Out-of-box One-to-one and one-to-many partition scheme support plus 
customizable partition schema
      Fault-tolerant when the input operator goes down, it redeploys on other 
node
      At-least-once semantics for operator failure (no matter which operator 
fails)
      At-least-once semantics for cold restart (no data loss even if you 
restart the application)
      Multi-cluster support, one operator can consume data from more than one 
kafka clusters
      Multi-topic support, one operator can subscribe multiple topics
      Throughput control support, you can throttle number of tuple for each 
streaming window


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/7ee2c7e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/7ee2c7e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/7ee2c7e3

Branch: refs/heads/devel-3
Commit: 7ee2c7e3697663a5fad3112c9a780a5ffd4c3b11
Parents: 8790077
Author: Siyuan Hua <[email protected]>
Authored: Wed Jan 6 11:11:46 2016 -0800
Committer: Siyuan Hua <[email protected]>
Committed: Wed Jan 6 11:11:46 2016 -0800

----------------------------------------------------------------------
 kafka/XmlJavadocCommentsExtractor.xsl           |  48 ++
 kafka/pom.xml                                   | 229 ++++++++
 .../kafka/AbstractKafkaInputOperator.java       | 527 +++++++++++++++++++
 .../malhar/kafka/AbstractKafkaPartitioner.java  | 278 ++++++++++
 .../apex/malhar/kafka/KafkaConsumerWrapper.java | 328 ++++++++++++
 .../apache/apex/malhar/kafka/KafkaMetrics.java  | 136 +++++
 .../apex/malhar/kafka/KafkaPartition.java       | 143 +++++
 .../kafka/KafkaSinglePortInputOperator.java     |  44 ++
 .../apex/malhar/kafka/OneToManyPartitioner.java |  66 +++
 .../apex/malhar/kafka/OneToOnePartitioner.java  |  56 ++
 .../apex/malhar/kafka/PartitionStrategy.java    |  38 ++
 .../malhar/kafka/KafkaInputOperatorTest.java    | 216 ++++++++
 .../malhar/kafka/KafkaOperatorTestBase.java     | 308 +++++++++++
 .../apex/malhar/kafka/KafkaTestPartitioner.java |  62 +++
 .../apex/malhar/kafka/KafkaTestProducer.java    | 169 ++++++
 pom.xml                                         |   1 +
 16 files changed, 2649 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/XmlJavadocCommentsExtractor.xsl
----------------------------------------------------------------------
diff --git a/kafka/XmlJavadocCommentsExtractor.xsl 
b/kafka/XmlJavadocCommentsExtractor.xsl
new file mode 100644
index 0000000..ec72325
--- /dev/null
+++ b/kafka/XmlJavadocCommentsExtractor.xsl
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<!--
+    Document   : XmlJavadocCommentsExtractor.xsl
+    Created on : September 16, 2014, 11:30 AM
+    Description:
+        The transformation strips off all information except for comments and 
tags from xml javadoc generated by xml-doclet.
+-->
+
+<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"; version="1.0">
+  <xsl:output method="xml" standalone="yes"/>
+
+  <!-- copy xml by selecting only the following nodes, attrbutes and text -->
+  <xsl:template match="node()|text()|@*">
+    <xsl:copy>
+      <xsl:apply-templates 
select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/>
+    </xsl:copy>
+  </xsl:template>
+
+  <!-- Strip off the following paths from the selected xml -->
+  <xsl:template match="//root/package/interface/interface
+                      |//root/package/interface/method/@qualified
+                      |//root/package/class/interface
+                      |//root/package/class/class
+                      |//root/package/class/method/@qualified
+                      |//root/package/class/field/@qualified" />
+
+  <xsl:strip-space elements="*"/>
+</xsl:stylesheet>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/kafka/pom.xml b/kafka/pom.xml
new file mode 100755
index 0000000..ff47524
--- /dev/null
+++ b/kafka/pom.xml
@@ -0,0 +1,229 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar</artifactId>
+    <version>3.3.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>malhar-kafka</artifactId>
+  <name>Apache Apex Malhar (incubating) Kafka Support</name>
+  <packaging>jar</packaging>
+
+  <build>
+    <plugins>
+      <!-- Publish tests jar -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.4</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+            <phase>package</phase>
+          </execution>
+        </executions>
+      </plugin>
+     <!-- create resource directory for xml javadoc-->
+     <plugin>
+         <groupId>org.apache.maven.plugins</groupId>
+         <artifactId>maven-antrun-plugin</artifactId>
+         <executions>
+           <execution>
+               <id>createJavadocDirectory</id>
+               <phase>generate-resources</phase>
+               <configuration>
+                   <tasks>
+                     <delete 
dir="${project.build.directory}/generated-resources/xml-javadoc"/>
+                     <mkdir 
dir="${project.build.directory}/generated-resources/xml-javadoc"/>
+                   </tasks>
+               </configuration>
+               <goals>
+                   <goal>run</goal>
+               </goals>
+           </execution>
+         </executions>
+      </plugin>
+     <!-- generate javdoc -->
+     <plugin>
+       <groupId>org.apache.maven.plugins</groupId>
+       <artifactId>maven-javadoc-plugin</artifactId>
+       <executions>
+         <!-- generate xml javadoc -->
+         <execution>
+           <id>xml-doclet</id>
+           <phase>generate-resources</phase>
+           <goals>
+             <goal>javadoc</goal>
+           </goals>
+           <configuration>
+             <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
+             <additionalparam>-d 
${project.build.directory}/generated-resources/xml-javadoc -filename 
${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
+             <useStandardDocletOptions>false</useStandardDocletOptions>
+             <docletArtifact>
+               <groupId>com.github.markusbernhardt</groupId>
+               <artifactId>xml-doclet</artifactId>
+               <version>1.0.4</version>
+             </docletArtifact>
+           </configuration>
+         </execution>
+         <!-- generate default javadoc jar with custom tags -->
+         <execution>
+           <id>attach-sources</id>
+           <goals>
+             <goal>jar</goal>
+           </goals>
+           <configuration>
+             <skip>true</skip>
+             <tags>
+               <tag>
+                 <name>customTag1</name>
+                 <placement>a</placement>
+                 <head>Custom Tag One:</head>
+               </tag>
+               <tag>
+                 <name>customTag2</name>
+                 <placement>a</placement>
+                 <head>Custom Tag two:</head>
+               </tag>
+               <tag>
+                 <name>customTag3</name>
+                 <placement>a</placement>
+                 <head>Custom Tag three:</head>
+               </tag>
+             </tags>
+           </configuration>
+         </execution>
+       </executions>
+     </plugin>
+     <!-- Transform xml javadoc to stripped down version containing only 
class/interface comments and tags-->
+     <plugin>
+       <groupId>org.codehaus.mojo</groupId>
+       <artifactId>xml-maven-plugin</artifactId>
+       <version>1.0</version>
+       <executions>
+         <execution>
+           <id>transform-xmljavadoc</id>
+           <phase>generate-resources</phase>
+           <goals>
+             <goal>transform</goal>
+           </goals>
+         </execution>
+       </executions>
+       <configuration>
+         <transformationSets>
+           <transformationSet>
+             
<dir>${project.build.directory}/generated-resources/xml-javadoc</dir>
+             <includes>
+               
<include>${project.artifactId}-${project.version}-javadoc.xml</include>
+             </includes>
+             <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
+             
<outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir>
+           </transformationSet>
+         </transformationSets>
+       </configuration>
+     </plugin>
+     <!-- copy xml javadoc to class jar -->
+     <plugin>
+       <artifactId>maven-resources-plugin</artifactId>
+       <version>2.6</version>
+       <executions>
+         <execution>
+           <id>copy-resources</id>
+           <phase>process-resources</phase>
+           <goals>
+             <goal>copy-resources</goal>
+           </goals>
+           <configuration>
+             <outputDirectory>${basedir}/target/classes</outputDirectory>
+             <resources>
+               <resource>
+                 
<directory>${project.build.directory}/generated-resources/xml-javadoc</directory>
+                 <includes>
+                   
<include>${project.artifactId}-${project.version}-javadoc.xml</include>
+                 </includes>
+                 <filtering>true</filtering>
+               </resource>
+             </resources>
+           </configuration>
+         </execution>
+       </executions>
+     </plugin>
+
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.11</artifactId>
+      <version>0.9.0.0</version>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-simple</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>0.9.0.0</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>malhar-library</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>malhar-library</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <classifier>tests</classifier>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>apex-common</artifactId>
+      <version>${apex.core.version}</version>
+      <type>jar</type>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
new file mode 100644
index 0000000..4f2f704
--- /dev/null
+++ 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
@@ -0,0 +1,527 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+
+import com.google.common.base.Joiner;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.StatsListener;
+
+/**
+ * The abstract kafka input operator using kafka 0.9.0 new consumer API
+ * A scalable, fault-tolerant, at-least-once kafka input operator
+ * Key features includes:
+ *
+ * <ol>
+ * <li>Out-of-box One-to-one and one-to-many partition strategy support plus 
customizable partition strategy
+ *    refer to AbstractKafkaPartitioner </li>
+ * <li>Fault-tolerant when the input operator goes down, it redeploys on other 
node</li>
+ * <li>At-least-once semantics for operator failure (no matter which operator 
fails)</li>
+ * <li>At-least-once semantics for cold restart (no data loss even if you 
restart the application)</li>
+ * <li>Multi-cluster support, one operator can consume data from more than one 
kafka clusters</li>
+ * <li>Multi-topic support, one operator can subscribe multiple topics</li>
+ * <li>Throughput control support, you can throttle number of tuple for each 
streaming window</li>
+ * </ol>
+ */
+public abstract class AbstractKafkaInputOperator implements InputOperator, 
Operator.ActivationListener<Context.OperatorContext>, 
Operator.CheckpointListener, Partitioner<AbstractKafkaInputOperator>, 
StatsListener, OffsetCommitCallback
+{
+
+  private static final Logger logger = 
LoggerFactory.getLogger(AbstractKafkaInputOperator.class);
+
+  public enum InitialOffset
+  {
+    EARLIEST, // consume from beginning of the partition every time when 
application restart
+    LATEST, // consume from latest of the partition every time when 
application restart
+    APPLICATION_OR_EARLIEST, // consume from committed position from last run 
or earliest if there is no committed offset(s)
+    APPLICATION_OR_LATEST // consume from committed position from last run or 
latest if there is no committed offset(s)
+  }
+
+  @NotNull
+  private String[] clusters;
+
+  @NotNull
+  private String[] topics;
+
+  /**
+   *  offset track for checkpoint
+   */
+  private final Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetTrack 
= new HashMap<>();
+
+  private int initialPartitionCount = 1;
+
+  private long repartitionInterval = 30000L;
+
+  private long repartitionCheckInterval = 5000L;
+
+  @Min(1)
+  private int maxTuplesPerWindow = Integer.MAX_VALUE;
+
+  /**
+   * By default the operator start consuming from the committed offset or the 
latest one
+   */
+  private InitialOffset initialOffset = InitialOffset.APPLICATION_OR_LATEST;
+
+  private long metricsRefreshInterval = 5000L;
+
+  private long consumerTimeout = 5000L;
+
+  private int holdingBufferSize = 1024;
+
+  private Properties consumerProps;
+
+  /**
+   * Assignment for each operator instance
+   */
+  private Set<AbstractKafkaPartitioner.PartitionMeta> assignment;
+
+  //=======================All transient fields==========================
+
+  /**
+   * Wrapper consumer object
+   * It wraps KafkaConsumer, maintains consumer thread and store messages in a 
queue
+   */
+  private transient final KafkaConsumerWrapper consumerWrapper = new 
KafkaConsumerWrapper();
+
+  /**
+   * By default the strategy is one to one
+   * @see PartitionStrategy
+   */
+  private PartitionStrategy strategy = PartitionStrategy.ONE_TO_ONE;
+
+  /**
+   * count the emitted message in each window<br>
+   * non settable
+   */
+  private transient int emitCount = 0;
+
+  /**
+   * store offsets with window id, only keep offsets with windows that have 
not been committed
+   */
+  private transient final List<Pair<Long, 
Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> offsetHistory = new 
LinkedList<>();
+
+  /**
+   * Application name is used as group.id for kafka consumer
+   */
+  private transient String applicationName;
+
+  private transient AbstractKafkaPartitioner partitioner;
+
+  private transient long currentWindowId;
+
+  private transient long lastCheckTime = 0L;
+
+  private transient long lastRepartitionTime = 0L;
+
+  @AutoMetric
+  private transient KafkaMetrics metrics;
+
+  @Override
+  public void activate(Context.OperatorContext context)
+  {
+    consumerWrapper.start();
+  }
+
+  @Override
+  public void deactivate()
+  {
+    consumerWrapper.stop();
+  }
+
+  @Override
+  public void checkpointed(long l)
+  {
+
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+    //ask kafka consumer wrapper to store the committed offsets
+    for (Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, 
Long>>> iter = offsetHistory.iterator(); iter.hasNext(); ) {
+      Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>> item = 
iter.next();
+      if (item.getLeft() <= windowId) {
+        if (item.getLeft() == windowId) {
+          consumerWrapper.commitOffsets(item.getRight());
+        }
+        iter.remove();
+      }
+    }
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    int count = consumerWrapper.messageSize();
+    if (maxTuplesPerWindow > 0) {
+      count = Math.min(count, maxTuplesPerWindow - emitCount);
+    }
+    for (int i = 0; i < count; i++) {
+      Pair<String, ConsumerRecord<byte[], byte[]>> tuple = 
consumerWrapper.pollMessage();
+      ConsumerRecord<byte[], byte[]> msg = tuple.getRight();
+      emitTuple(tuple.getLeft(), msg);
+      AbstractKafkaPartitioner.PartitionMeta pm = new 
AbstractKafkaPartitioner.PartitionMeta(tuple.getLeft(),
+          msg.topic(), msg.partition());
+      offsetTrack.put(pm, msg.offset());
+    }
+    emitCount += count;
+  }
+
+  protected abstract void emitTuple(String cluster, ConsumerRecord<byte[], 
byte[]> message);
+
+  @Override
+  public void beginWindow(long wid)
+  {
+    emitCount = 0;
+    currentWindowId = wid;
+  }
+
+  @Override
+  public void endWindow()
+  {
+    // copy current offset track to history memory
+    Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetsWithWindow = new 
HashMap<>(offsetTrack);
+    offsetHistory.add(Pair.of(currentWindowId, offsetsWithWindow));
+
+    //update metrics
+    metrics.updateMetrics(clusters, consumerWrapper.getAllConsumerMetrics());
+  }
+
+
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    applicationName = context.getValue(Context.DAGContext.APPLICATION_NAME);
+    consumerWrapper.create(this);
+    metrics = new KafkaMetrics(metricsRefreshInterval);
+  }
+
+
+  @Override
+  public void teardown()
+  {
+
+  }
+
+  private void initPartitioner()
+  {
+    if (partitioner == null) {
+      logger.info("Initialize Partitioner");
+      switch (strategy) {
+        case ONE_TO_ONE:
+          partitioner = new OneToOnePartitioner(clusters, topics, this);
+          break;
+        case ONE_TO_MANY:
+          partitioner = new OneToManyPartitioner(clusters, topics, this);
+          break;
+        case ONE_TO_MANY_HEURISTIC:
+          throw new UnsupportedOperationException("Not implemented yet");
+        default:
+          throw new RuntimeException("Invalid strategy");
+      }
+      logger.info("Actual Partitioner is {}", partitioner.getClass());
+    }
+
+  }
+
+  @Override
+  public Response processStats(BatchedOperatorStats batchedOperatorStats)
+  {
+    long t = System.currentTimeMillis();
+    if (repartitionInterval < 0 || repartitionCheckInterval < 0 ||
+        t - lastCheckTime < repartitionCheckInterval || t - 
lastRepartitionTime < repartitionInterval) {
+      // return false if it's within repartitionCheckInterval since last time 
it check the stats
+      Response response = new Response();
+      response.repartitionRequired = false;
+      return response;
+    }
+
+    try {
+      logger.debug("Process stats");
+      initPartitioner();
+      return partitioner.processStats(batchedOperatorStats);
+    } finally {
+      lastCheckTime = System.currentTimeMillis();
+    }
+  }
+
+  @Override
+  public Collection<Partition<AbstractKafkaInputOperator>> definePartitions(
+      Collection<Partition<AbstractKafkaInputOperator>> collection, 
PartitioningContext partitioningContext)
+  {
+    logger.debug("Define partitions");
+    initPartitioner();
+    return partitioner.definePartitions(collection, partitioningContext);
+  }
+
+  @Override
+  public void partitioned(Map<Integer, Partition<AbstractKafkaInputOperator>> 
map)
+  {
+    // update the last repartition time
+    lastRepartitionTime = System.currentTimeMillis();
+    initPartitioner();
+    partitioner.partitioned(map);
+  }
+
+  /**
+   *
+   * A callback from consumer after it commits the offset
+   * @param map
+   * @param e
+   */
+  public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception 
e)
+  {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Commit offsets complete {} ", 
Joiner.on(';').withKeyValueSeparator("=").join(map));
+    }
+    if (e != null) {
+      logger.warn("Exceptions in committing offsets {} : {} ",
+        Joiner.on(';').withKeyValueSeparator("=").join(map), e);
+    }
+  }
+
+  public void assign(Set<AbstractKafkaPartitioner.PartitionMeta> assignment)
+  {
+    this.assignment = assignment;
+  }
+
+  public Set<AbstractKafkaPartitioner.PartitionMeta> assignment()
+  {
+    return assignment;
+  }
+
+  //---------------------------------------------setters and 
getters----------------------------------------
+  public void setInitialPartitionCount(int partitionCount)
+  {
+    this.initialPartitionCount = partitionCount;
+  }
+
+  /**
+   * initial partition count
+   * only used with PartitionStrategy.ONE_TO_MANY
+   * or customized strategy
+   */
+  public int getInitialPartitionCount()
+  {
+    return initialPartitionCount;
+  }
+
+  public void setClusters(String clusters)
+  {
+    this.clusters = clusters.split(";");
+  }
+
+  /**
+   *  Same setting as bootstrap.servers property to KafkaConsumer
+   *  refer to http://kafka.apache.org/documentation.html#newconsumerconfigs
+   *  To support multi cluster, you can have multiple bootstrap.servers 
separated by ";"
+   */
+  public String getClusters()
+  {
+    return Joiner.on(';').join(clusters);
+  }
+
+  public void setTopics(String... topics)
+  {
+    this.topics = topics;
+  }
+
+  /**
+   * The topics the operator consumes
+   */
+  public String[] getTopics()
+  {
+    return topics;
+  }
+
+  public void setStrategy(String policy)
+  {
+    this.strategy = PartitionStrategy.valueOf(policy.toUpperCase());
+  }
+
+  public String getStrategy()
+  {
+    return strategy.name();
+  }
+
+  public void setInitialOffset(String initialOffset)
+  {
+    this.initialOffset = InitialOffset.valueOf(initialOffset.toUpperCase());
+  }
+
+  /**
+   *  Initial offset, it should be one of the following
+   *  <ul>
+   *    <li>earliest</li>
+   *    <li>latest</li>
+   *    <li>application_or_earliest</li>
+   *    <li>application_or_latest</li>
+   *  </ul>
+   */
+  public String getInitialOffset()
+  {
+    return initialOffset.name();
+  }
+
+  public String getApplicationName()
+  {
+    return applicationName;
+  }
+
+  public void setConsumerProps(Properties consumerProps)
+  {
+    this.consumerProps = consumerProps;
+  }
+
+  /**
+   * Extra kafka consumer properties
+   * http://kafka.apache.org/090/documentation.html#newconsumerconfigs
+   *
+   * Please be aware that the properties below are set by the operator, don't 
override it
+   *
+   * <ul>
+   * <li>bootstrap.servers</li>
+   * <li>group.id</li>
+   * <li>auto.offset.reset</li>
+   * <li>enable.auto.commit</li>
+   * <li>partition.assignment.strategy</li>
+   * <li>key.deserializer</li>
+   * <li>value.deserializer</li>
+   * </ul>
+   *
+   */
+  public Properties getConsumerProps()
+  {
+    return consumerProps;
+  }
+
+  public void setMaxTuplesPerWindow(int maxTuplesPerWindow)
+  {
+    this.maxTuplesPerWindow = maxTuplesPerWindow;
+  }
+
+  /**
+   * maximum tuples allowed to be emitted in each window
+   */
+  public int getMaxTuplesPerWindow()
+  {
+    return maxTuplesPerWindow;
+  }
+
+  /**
+   * @see <a 
href="http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)">
+   *   org.apache.kafka.clients.consumer.KafkaConsumer.poll</a>
+   */
+  public long getConsumerTimeout()
+  {
+    return consumerTimeout;
+  }
+
+  public void setConsumerTimeout(long consumerTimeout)
+  {
+    this.consumerTimeout = consumerTimeout;
+  }
+
+  /**
+   * Number of messages kept in memory waiting for emission to downstream 
operator
+   */
+  public int getHoldingBufferSize()
+  {
+    return holdingBufferSize;
+  }
+
+  public void setHoldingBufferSize(int holdingBufferSize)
+  {
+    this.holdingBufferSize = holdingBufferSize;
+  }
+
+  /**
+   * metrics refresh interval
+   */
+  public long getMetricsRefreshInterval()
+  {
+    return metricsRefreshInterval;
+  }
+
+  public void setMetricsRefreshInterval(long metricsRefreshInterval)
+  {
+    this.metricsRefreshInterval = metricsRefreshInterval;
+  }
+
+  public void setRepartitionCheckInterval(long repartitionCheckInterval)
+  {
+    this.repartitionCheckInterval = repartitionCheckInterval;
+  }
+
+  /**
+   * Minimal interval between checking collected stats and decide whether it 
needs to repartition or not.
+   * And minimal interval between 2 offset updates
+   */
+  public long getRepartitionCheckInterval()
+  {
+    return repartitionCheckInterval;
+  }
+
+  public void setRepartitionInterval(long repartitionInterval)
+  {
+    this.repartitionInterval = repartitionInterval;
+  }
+
+  /**
+   * Minimal interval between 2 (re)partition actions
+   */
+  public long getRepartitionInterval()
+  {
+    return repartitionInterval;
+  }
+
+  /**
+   * @omitFromUI
+   * @return current checkpointed offsets
+   */
+  public Map<AbstractKafkaPartitioner.PartitionMeta, Long> getOffsetTrack()
+  {
+    return offsetTrack;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
new file mode 100644
index 0000000..0fdd721
--- /dev/null
+++ 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Joiner;
+
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.StatsListener;
+
+/**
+ * Abstract partitioner used to manage the partitions of kafka input operator.
+ * It use a number of kafka consumers(one for each cluster) to get the latest 
partition metadata for topics that
+ * the consumer subscribes and expose those to subclass which implements the 
assign method
+ *
+ * The partitioner is always stateless.
+ */
+public abstract class AbstractKafkaPartitioner implements 
Partitioner<AbstractKafkaInputOperator>, StatsListener
+{
+
+  private static final Logger logger = 
LoggerFactory.getLogger(AbstractKafkaPartitioner.class);
+
+  private static final String META_CONSUMER_GROUP_NAME = 
AbstractKafkaInputOperator.class.getName() + "META_GROUP";
+
+  protected final String[] clusters;
+
+  protected final String[] topics;
+
+  protected final AbstractKafkaInputOperator prototypeOperator;
+
+  private ArrayList<KafkaConsumer<byte[], byte[]>> metadataRefreshClients;
+
+
+  private final List<Set<AbstractKafkaPartitioner.PartitionMeta>> 
currentPartitions = new LinkedList<>(); // prevent null
+
+  public AbstractKafkaPartitioner(String[] clusters, String[] topics, 
AbstractKafkaInputOperator prototypeOperator)
+  {
+    this.clusters = clusters;
+    this.topics = topics;
+    this.prototypeOperator = prototypeOperator;
+  }
+
+  abstract List<Set<PartitionMeta>> assign(Map<String, 
Map<String,List<PartitionInfo>>> metadata);
+
+
+
+  @Override
+  public Collection<Partition<AbstractKafkaInputOperator>> 
definePartitions(Collection<Partition<AbstractKafkaInputOperator>> collection, 
PartitioningContext partitioningContext)
+  {
+
+    initMetadataClients();
+
+    Map<String, Map<String, List<PartitionInfo>>> metadata = new HashMap<>();
+
+
+    for (int i = 0; i < clusters.length; i++) {
+      metadata.put(clusters[i], new HashMap<String, List<PartitionInfo>>());
+      for (String topic : topics) {
+        List<PartitionInfo> ptis = 
metadataRefreshClients.get(i).partitionsFor(topic);
+        if (logger.isDebugEnabled()) {
+          logger.debug("Partition metadata for topic {} : {}", topic, 
Joiner.on(';').join(ptis));
+        }
+        metadata.get(clusters[i]).put(topic, ptis);
+      }
+      metadataRefreshClients.get(i).close();
+    }
+
+    metadataRefreshClients = null;
+
+    List<Set<AbstractKafkaPartitioner.PartitionMeta>> parts = assign(metadata);
+
+
+    if (currentPartitions == parts || currentPartitions.equals(parts)) {
+      logger.debug("No partition change found");
+      return collection;
+    } else {
+      logger.info("Partition change detected: ");
+      currentPartitions.clear();
+      currentPartitions.addAll(parts);
+      int i = 0;
+      List<Partition<AbstractKafkaInputOperator>> result = new LinkedList<>();
+      for (Iterator<Partition<AbstractKafkaInputOperator>> iter = 
collection.iterator(); iter.hasNext();) {
+        Partition<AbstractKafkaInputOperator> nextPartition = iter.next();
+        if (parts.remove(nextPartition.getPartitionedInstance().assignment())) 
{
+          if (logger.isInfoEnabled()) {
+            logger.info("[Existing] Partition {} with assignment {} ", i,
+                
Joiner.on(';').join(nextPartition.getPartitionedInstance().assignment()));
+          }
+          result.add(nextPartition);
+          i++;
+        }
+      }
+
+      for (Set<AbstractKafkaPartitioner.PartitionMeta> partitionAssignment : 
parts) {
+        if (logger.isInfoEnabled()) {
+          logger.info("[New] Partition {} with assignment {} ", i,
+              Joiner.on(';').join(partitionAssignment));
+        }
+        result.add(createPartition(partitionAssignment));
+        i++;
+      }
+
+      return result;
+    }
+  }
+
+  @Override
+  public void partitioned(Map<Integer, Partition<AbstractKafkaInputOperator>> 
map)
+  {
+
+  }
+
+  @Override
+  public Response processStats(BatchedOperatorStats batchedOperatorStats)
+  {
+    Response response = new Response();
+    response.repartitionRequired = true;
+    return response;
+  }
+
+  protected Partitioner.Partition<AbstractKafkaInputOperator> 
createPartition(Set<AbstractKafkaPartitioner.PartitionMeta> partitionAssignment)
+  {
+    Kryo kryo = new Kryo();
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    Output output = new Output(bos);
+    kryo.writeObject(output, prototypeOperator);
+    output.close();
+    Input lInput = new Input(bos.toByteArray());
+    Partitioner.Partition<AbstractKafkaInputOperator> p = 
(Partitioner.Partition<AbstractKafkaInputOperator>)
+        new DefaultPartition<>(kryo.readObject(lInput, 
prototypeOperator.getClass()));
+    p.getPartitionedInstance().assign(partitionAssignment);
+    return p;
+  }
+  /**
+   *
+   */
+  private void initMetadataClients()
+  {
+    if (metadataRefreshClients != null && metadataRefreshClients.size() == 
clusters.length) {
+      // The metadata client is active
+      return;
+    }
+
+    if (clusters == null || clusters.length == 0) {
+      throw new IllegalStateException("clusters can not be null");
+    }
+
+    metadataRefreshClients = new ArrayList<>(clusters.length);
+    int index = 0;
+    for (String c : clusters) {
+      Properties prop = new Properties();
+      prop.put("group.id", META_CONSUMER_GROUP_NAME);
+      prop.put("bootstrap.servers", c);
+      prop.put("key.deserializer", ByteArrayDeserializer.class.getName());
+      prop.put("value.deserializer", ByteArrayDeserializer.class.getName());
+      prop.put("enable.auto.commit", "false");
+      metadataRefreshClients.add(index++, new KafkaConsumer<byte[], 
byte[]>(prop));
+    }
+  }
+
+  /**
+   * The key object used in the assignment map for each operator
+   */
+  public static class PartitionMeta
+  {
+
+    public PartitionMeta()
+    {
+    }
+
+    public PartitionMeta(String cluster, String topic, int partitionId)
+    {
+      this.cluster = cluster;
+      this.topic = topic;
+      this.partitionId = partitionId;
+      this.topicPartition = new TopicPartition(topic, partitionId);
+    }
+
+    private String cluster;
+
+    private transient TopicPartition topicPartition;
+
+    private String topic;
+
+    private int partitionId;
+
+    public String getCluster()
+    {
+      return cluster;
+    }
+
+    public int getPartitionId()
+    {
+      return partitionId;
+    }
+
+    public String getTopic()
+    {
+      return topic;
+    }
+
+    public TopicPartition getTopicPartition()
+    {
+      if (topicPartition == null) {
+        topicPartition = new TopicPartition(topic, partitionId);
+      }
+      return topicPartition;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      PartitionMeta that = (PartitionMeta)o;
+      return Objects.equals(cluster, that.cluster) &&
+        Objects.equals(topicPartition, that.topicPartition);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(cluster, topicPartition);
+    }
+
+    @Override
+    public String toString()
+    {
+      return "PartitionMeta{" +
+        "cluster='" + cluster + '\'' +
+        ", topicPartition=" + topicPartition +
+        '}';
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
new file mode 100644
index 0000000..0903570
--- /dev/null
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import com.google.common.base.Joiner;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This is the wrapper class for new Kafka consumer API
+ *
+ * It starts number of consumers(one for each cluster) in same number of 
threads.
+ * Maintains the consumer offsets
+ *
+ * It also use the consumers to commit the application processed offsets along 
with the application name
+ *
+ */
+public class KafkaConsumerWrapper implements Closeable
+{
+
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaConsumerWrapper.class);
+
+  private boolean isAlive = false;
+
+  private final Map<String, KafkaConsumer<byte[], byte[]>> consumers = new 
HashMap<>();
+
+  // The in memory buffer hold consumed messages
+  private ArrayBlockingQueue<Pair<String, ConsumerRecord<byte[], byte[]>>> 
holdingBuffer;
+
+  private AbstractKafkaInputOperator ownerOperator = null;
+
+  private ExecutorService kafkaConsumerExecutor;
+
+  private final Map<String, Map<TopicPartition, OffsetAndMetadata>> 
offsetsToCommit = new HashMap<>();
+
+  /**
+   *
+   * Only put the offset needs to be committed in the 
ConsumerThread.offsetToCommit map
+   * The consumer thread will commit the offset(s)
+   *
+   * @param offsetsInWindow
+   */
+  public void commitOffsets(Map<AbstractKafkaPartitioner.PartitionMeta, Long> 
offsetsInWindow)
+  {
+    if (offsetsInWindow == null) {
+      return;
+    }
+
+    // group offsets by cluster and topic partition
+    for (Map.Entry<AbstractKafkaPartitioner.PartitionMeta, Long> e : 
offsetsInWindow.entrySet()) {
+      String cluster = e.getKey().getCluster();
+      Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetMap = 
offsetsToCommit.get(cluster);
+      if (topicPartitionOffsetMap == null) {
+        logger.warn("committed offset map should be initialized by consumer 
thread!");
+        continue;
+      }
+      topicPartitionOffsetMap.put(e.getKey().getTopicPartition(), new 
OffsetAndMetadata(e.getValue()));
+    }
+
+  }
+
+
+  static final class ConsumerThread implements Runnable
+  {
+
+    private final KafkaConsumer<byte[], byte[]> consumer;
+
+    private final String cluster;
+
+    private final KafkaConsumerWrapper wrapper;
+
+    private Map<TopicPartition, OffsetAndMetadata> offsetToCommit = null;
+
+    public ConsumerThread(String cluster, KafkaConsumer<byte[], byte[]> 
consumer, KafkaConsumerWrapper wrapper)
+    {
+      this.cluster = cluster;
+      this.consumer = consumer;
+      this.wrapper = wrapper;
+      this.offsetToCommit = new ConcurrentHashMap<>();
+      wrapper.offsetsToCommit.put(cluster, offsetToCommit);
+    }
+
+    @Override
+    public void run()
+    {
+      try {
+
+
+        while (wrapper.isAlive) {
+          if (!this.offsetToCommit.isEmpty()) {
+            // in each fetch cycle commit the offset if needed
+            if (logger.isDebugEnabled()) {
+              logger.debug("Commit offsets {}", 
Joiner.on(';').withKeyValueSeparator("=").join(this.offsetToCommit));
+            }
+            consumer.commitAsync(offsetToCommit, wrapper.ownerOperator);
+            offsetToCommit.clear();
+          }
+          try {
+            ConsumerRecords<byte[], byte[]> records = 
consumer.poll(wrapper.ownerOperator.getConsumerTimeout());
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+              wrapper.putMessage(Pair.of(cluster, record));
+            }
+          } catch (NoOffsetForPartitionException e) {
+            // if initialOffset is set to EARLIST or LATEST
+            // and the application is run as first time
+            // then there is no existing committed offset and this error will 
be caught
+            // we need to seek to either beginning or end of the partition
+            // based on the initial offset setting
+            AbstractKafkaInputOperator.InitialOffset io =
+                
AbstractKafkaInputOperator.InitialOffset.valueOf(wrapper.ownerOperator.getInitialOffset());
+            if (io == 
AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_EARLIEST
+                || io == AbstractKafkaInputOperator.InitialOffset.EARLIEST) {
+              consumer.seekToBeginning(e.partitions().toArray(new 
TopicPartition[0]));
+            } else {
+              consumer.seekToEnd(e.partitions().toArray(new 
TopicPartition[0]));
+            }
+          } catch (InterruptedException e) {
+            throw new IllegalStateException("Consumer thread is interrupted 
unexpectedly", e);
+          }
+        }
+      } catch (WakeupException we) {
+        logger.info("The consumer is being stopped");
+      } finally {
+        consumer.close();
+      }
+    }
+  }
+
+
+  /**
+   * This method is called in setup method of Abstract Kafka Input Operator
+   */
+  public void create(AbstractKafkaInputOperator ownerOperator)
+  {
+    holdingBuffer = new 
ArrayBlockingQueue<>(ownerOperator.getHoldingBufferSize());
+    this.ownerOperator = ownerOperator;
+    logger.info("Create consumer wrapper with holding buffer size: {} ", 
ownerOperator.getHoldingBufferSize());
+    if (logger.isInfoEnabled()) {
+      logger.info("Assignments are {} ", 
Joiner.on('\n').join(ownerOperator.assignment()));
+    }
+  }
+
+
+  /**
+   * This method is called in the activate method of the operator
+   */
+  public void start()
+  {
+    isAlive = true;
+
+
+    // thread to consume the kafka data
+    // create thread pool for consumer threads
+    kafkaConsumerExecutor = Executors.newCachedThreadPool(
+      new ThreadFactoryBuilder().setNameFormat("kafka-consumer-%d").build());
+
+    // group list of PartitionMeta by cluster
+    Map<String, List<TopicPartition>> consumerAssignment = new HashMap<>();
+    Set<AbstractKafkaPartitioner.PartitionMeta> assignments = 
ownerOperator.assignment();
+    for (AbstractKafkaPartitioner.PartitionMeta partitionMeta : assignments) {
+      String cluster = partitionMeta.getCluster();
+      List<TopicPartition> cAssignment = consumerAssignment.get(cluster);
+      if (cAssignment == null) {
+        cAssignment = new LinkedList<>();
+        consumerAssignment.put(cluster, cAssignment);
+      }
+      cAssignment.add(new TopicPartition(partitionMeta.getTopic(), 
partitionMeta.getPartitionId()));
+    }
+
+    Map<AbstractKafkaPartitioner.PartitionMeta, Long> currentOffset = 
ownerOperator.getOffsetTrack();
+
+
+    //  create one thread for each cluster
+    // each thread use one KafkaConsumer to consume from 1+ partition(s) of 1+ 
topic(s)
+    for (Map.Entry<String, List<TopicPartition>> e : 
consumerAssignment.entrySet()) {
+
+      Properties prop = new Properties();
+      if (ownerOperator.getConsumerProps() != null) {
+        prop.putAll(ownerOperator.getConsumerProps());
+      }
+
+      prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, e.getKey());
+      prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+      // never auto commit the offsets
+      prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+      prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+      prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+      AbstractKafkaInputOperator.InitialOffset initialOffset =
+          
AbstractKafkaInputOperator.InitialOffset.valueOf(ownerOperator.getInitialOffset());
+
+      if (initialOffset == 
AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_EARLIEST ||
+          initialOffset == 
AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_LATEST) {
+        // commit the offset with application name if we set initialoffset to 
application
+        prop.put(ConsumerConfig.GROUP_ID_CONFIG, 
ownerOperator.getApplicationName() + "_Consumer");
+      }
+
+      KafkaConsumer<byte[], byte[]> kc = new KafkaConsumer<>(prop);
+      kc.assign(e.getValue());
+      if (logger.isInfoEnabled()) {
+        logger.info("Create consumer with properties {} ", 
Joiner.on(";").withKeyValueSeparator("=").join(prop));
+        logger.info("Assign consumer to {}", 
Joiner.on('#').join(e.getValue()));
+      }
+      if (currentOffset != null && !currentOffset.isEmpty()) {
+        for (TopicPartition tp : e.getValue()) {
+          AbstractKafkaPartitioner.PartitionMeta partitionKey =
+              new AbstractKafkaPartitioner.PartitionMeta(e.getKey(), 
tp.topic(), tp.partition());
+          if (currentOffset.containsKey(partitionKey)) {
+            kc.seek(tp, currentOffset.get(partitionKey));
+          }
+        }
+      }
+
+      consumers.put(e.getKey(), kc);
+      kafkaConsumerExecutor.submit(new ConsumerThread(e.getKey(), kc, this));
+    }
+
+
+  }
+
+  /**
+   * The method is called in the deactivate method of the operator
+   */
+  public void stop()
+  {
+    for (KafkaConsumer<byte[], byte[]> c : consumers.values()) {
+      c.wakeup();
+    }
+    kafkaConsumerExecutor.shutdownNow();
+    isAlive = false;
+    holdingBuffer.clear();
+    IOUtils.closeQuietly(this);
+  }
+
+  /**
+   * This method is called in teardown method of the operator
+   */
+  public void teardown()
+  {
+    holdingBuffer.clear();
+  }
+
+  public boolean isAlive()
+  {
+    return isAlive;
+  }
+
+  public void setAlive(boolean isAlive)
+  {
+    this.isAlive = isAlive;
+  }
+
+  public Pair<String, ConsumerRecord<byte[], byte[]>> pollMessage()
+  {
+    return holdingBuffer.poll();
+  }
+
+  public int messageSize()
+  {
+    return holdingBuffer.size();
+  }
+
+  protected final void putMessage(Pair<String, ConsumerRecord<byte[], byte[]>> 
msg) throws InterruptedException
+  {
+    // block from receiving more message
+    holdingBuffer.put(msg);
+  }
+
+
+  @Override
+  public void close() throws IOException
+  {
+  }
+
+  public Map<String, Map<MetricName, ? extends Metric>> getAllConsumerMetrics()
+  {
+    Map<String, Map<MetricName, ? extends Metric>> val = new HashMap<>();
+    for (Map.Entry<String, KafkaConsumer<byte[], byte[]>> e : 
consumers.entrySet()) {
+      val.put(e.getKey(), e.getValue().metrics());
+    }
+    return val;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java
new file mode 100644
index 0000000..fdb1252
--- /dev/null
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+import com.datatorrent.api.AutoMetric;
+
+/**
+ * Metrics class
+ */
+public class KafkaMetrics implements Serializable
+{
+  private KafkaConsumerStats[] stats;
+
+  private transient long lastMetricSampleTime = 0L;
+
+  private transient long metricsRefreshInterval;
+
+  public KafkaMetrics(long metricsRefreshInterval)
+  {
+    this.metricsRefreshInterval = metricsRefreshInterval;
+  }
+
+  void updateMetrics(String[] clusters, Map<String, Map<MetricName, ? extends 
Metric>> metricsMap)
+  {
+    long current = System.currentTimeMillis();
+    if (current - lastMetricSampleTime < metricsRefreshInterval) {
+      return;
+    }
+
+    lastMetricSampleTime = current;
+
+    if (stats == null) {
+      stats = new KafkaConsumerStats[clusters.length];
+    }
+
+    for (int i = 0; i < clusters.length; i++) {
+      if (stats[i] == null) {
+        stats[i] = new KafkaConsumerStats();
+        stats[i].cluster = clusters[i];
+      }
+      Map<MetricName, ? extends Metric> cMetrics = metricsMap.get(clusters[i]);
+      if (cMetrics == null || cMetrics.isEmpty()) {
+        stats[i].bytesPerSec = 0;
+        stats[i].msgsPerSec = 0;
+        continue;
+      }
+      if (stats[i].bytePerSecMK == null || stats[i].msgPerSecMK == null) {
+        for (MetricName mn : cMetrics.keySet()) {
+          if (mn.name().equals("bytes-consumed-rate")) {
+            stats[i].bytePerSecMK = mn;
+          } else if (mn.name().equals("records-consumed-rate")) {
+            stats[i].msgPerSecMK = mn;
+          }
+        }
+      }
+      stats[i].bytesPerSec = cMetrics.get(stats[i].bytePerSecMK).value();
+      stats[i].msgsPerSec = cMetrics.get(stats[i].msgPerSecMK).value();
+    }
+  }
+
+  public KafkaConsumerStats[] getStats()
+  {
+    return stats;
+  }
+
+  /**
+   * Counter class which gives the statistic value from the consumer
+   */
+  public static class KafkaConsumerStats implements Serializable
+  {
+    private static final long serialVersionUID = -2867402654990209006L;
+
+    public transient MetricName msgPerSecMK;
+    public transient MetricName bytePerSecMK;
+
+    public String cluster;
+    /**
+     * Metrics for each consumer
+     */
+    public double msgsPerSec;
+
+    public double bytesPerSec;
+
+    public KafkaConsumerStats()
+    {
+    }
+  }
+
+  public static class KafkaMetricsAggregator implements AutoMetric.Aggregator, 
Serializable
+  {
+
+    @Override
+    public Map<String, Object> aggregate(long l, 
Collection<AutoMetric.PhysicalMetricsContext> collection)
+    {
+      double totalBytesPerSec = 0;
+      double totalMsgsPerSec = 0;
+      Map<String, Object> total = new HashMap<>();
+      for (AutoMetric.PhysicalMetricsContext pmc : collection) {
+        KafkaMetrics km = (KafkaMetrics)pmc.getMetrics().get("metrics");
+        for (KafkaConsumerStats kcs : km.stats) {
+          totalBytesPerSec += kcs.bytesPerSec;
+          totalMsgsPerSec += kcs.msgsPerSec;
+        }
+      }
+      total.put("totalBytesPerSec", totalBytesPerSec);
+      total.put("totalMsgPerSec", totalMsgsPerSec);
+      return total;
+    }
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java
new file mode 100644
index 0000000..4a4ebf3
--- /dev/null
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+
+import java.io.Serializable;
+
+/**
+ * @since 2.1.0
+ */
+public class KafkaPartition implements Serializable
+{
+  protected static final String DEFAULT_CLUSTERID = 
"com.datatorrent.contrib.kafka.defaultcluster";
+
+  @SuppressWarnings("unused")
+  private KafkaPartition()
+  {
+  }
+
+  public KafkaPartition(String topic, int partitionId)
+  {
+    this(DEFAULT_CLUSTERID, topic, partitionId);
+  }
+
+  public KafkaPartition(String clusterId, String topic, int partitionId)
+  {
+    super();
+    this.clusterId = clusterId;
+    this.partitionId = partitionId;
+    this.topic = topic;
+  }
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 7556802229202221546L;
+
+
+  private String clusterId;
+
+  private int partitionId;
+
+  private String topic;
+
+  public String getClusterId()
+  {
+    return clusterId;
+  }
+
+  public void setClusterId(String clusterId)
+  {
+    this.clusterId = clusterId;
+  }
+
+  public int getPartitionId()
+  {
+    return partitionId;
+  }
+
+  public void setPartitionId(int partitionId)
+  {
+    this.partitionId = partitionId;
+  }
+
+  public String getTopic()
+  {
+    return topic;
+  }
+
+  public void setTopic(String topic)
+  {
+    this.topic = topic;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((clusterId == null) ? 0 : clusterId.hashCode());
+    result = prime * result + partitionId;
+    result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj)
+  {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    KafkaPartition other = (KafkaPartition)obj;
+    if (clusterId == null) {
+      if (other.clusterId != null) {
+        return false;
+      }
+    } else if (!clusterId.equals(other.clusterId)) {
+      return false;
+    }
+    if (partitionId != other.partitionId) {
+      return false;
+    }
+    if (topic == null) {
+      if (other.topic != null) {
+        return false;
+      }
+    } else if (!topic.equals(other.topic)) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "KafkaPartition [clusterId=" + clusterId + ", partitionId=" + 
partitionId + ", topic=" + topic + "]";
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortInputOperator.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortInputOperator.java
 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortInputOperator.java
new file mode 100644
index 0000000..e563c42
--- /dev/null
+++ 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortInputOperator.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import com.datatorrent.api.DefaultOutputPort;
+
+/**
+ * This is just an example of single port operator emits only byte array 
messages
+ * The key and cluster information are ignored
+ * This class emit the value to the single output port
+ */
+public class KafkaSinglePortInputOperator extends AbstractKafkaInputOperator
+{
+
+  /**
+   * This output port emits tuples extracted from Kafka messages.
+   */
+  public final transient DefaultOutputPort<byte[]> outputPort = new 
DefaultOutputPort<>();
+
+  @Override
+  protected void emitTuple(String cluster, ConsumerRecord<byte[], byte[]> 
message)
+  {
+    outputPort.emit(message.value());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
new file mode 100644
index 0000000..09d22eb
--- /dev/null
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.PartitionInfo;
+
+/**
+ * A one-to-many partitioner implementation that creates fix number of 
operator partitions and assign one or more
+ * Kafka partitions to each. It use round robin to assign partitions
+ */
+public class OneToManyPartitioner extends AbstractKafkaPartitioner
+{
+
+  public OneToManyPartitioner(String[] clusters, String[] topics, 
AbstractKafkaInputOperator protoTypeOperator)
+  {
+    super(clusters, topics, protoTypeOperator);
+  }
+
+  @Override
+  List<Set<PartitionMeta>> assign(Map<String, Map<String, 
List<PartitionInfo>>> metadata)
+  {
+    if (prototypeOperator.getInitialPartitionCount() <= 0) {
+      throw new IllegalArgumentException("Num of partitions should be greater 
or equal to 1");
+    }
+
+    int partitionCount = prototypeOperator.getInitialPartitionCount();
+    ArrayList<Set<PartitionMeta>> eachPartitionAssignment = new 
ArrayList<>(prototypeOperator.getInitialPartitionCount());
+    int i = 0;
+    for (Map.Entry<String, Map<String, List<PartitionInfo>>> clusterMap : 
metadata.entrySet()) {
+      for (Map.Entry<String, List<PartitionInfo>> topicPartition : 
clusterMap.getValue().entrySet()) {
+        for (PartitionInfo pif : topicPartition.getValue()) {
+          int index = i++ % partitionCount;
+          if (eachPartitionAssignment.get(index) == null) {
+            eachPartitionAssignment.add(index, new HashSet<PartitionMeta>());
+          }
+          eachPartitionAssignment.get(index).add(new 
PartitionMeta(clusterMap.getKey(), topicPartition.getKey(), pif.partition()));
+        }
+      }
+    }
+
+    return eachPartitionAssignment;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
new file mode 100644
index 0000000..0cf7d44
--- /dev/null
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToOnePartitioner.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.PartitionInfo;
+
+import com.google.common.collect.Sets;
+
+/**
+ * An one-to-one partitioner implementation that always returns same amount of 
operator partitions as
+ * Kafka partitions for the topics that operator subscribe
+ */
+public class OneToOnePartitioner extends AbstractKafkaPartitioner
+{
+
+  public OneToOnePartitioner(String[] clusters, String[] topics, 
AbstractKafkaInputOperator prototypeOperator)
+  {
+    super(clusters, topics, prototypeOperator);
+  }
+
+  @Override
+  List<Set<PartitionMeta>> assign(Map<String, Map<String, 
List<PartitionInfo>>> metadata)
+  {
+    List<Set<PartitionMeta>> currentAssignment = new LinkedList<>();
+    for (Map.Entry<String, Map<String, List<PartitionInfo>>> clusterMap : 
metadata.entrySet()) {
+      for (Map.Entry<String, List<PartitionInfo>> topicPartition : 
clusterMap.getValue().entrySet()) {
+        for (PartitionInfo pif : topicPartition.getValue()) {
+          currentAssignment.add(Sets.newHashSet(new 
PartitionMeta(clusterMap.getKey(), topicPartition.getKey(), pif.partition())));
+        }
+      }
+    }
+    return currentAssignment;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java
new file mode 100644
index 0000000..cb21f3d
--- /dev/null
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/PartitionStrategy.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+public enum PartitionStrategy
+{
+  /**
+   * Each operator partition connect to only one kafka partition
+   */
+  ONE_TO_ONE,
+  /**
+   * Each operator consumes from several kafka partitions with overall input 
rate under some certain hard limit in msgs/s or bytes/s
+   * For now it <b>only</b> support <b>simple kafka consumer</b>
+   */
+  ONE_TO_MANY,
+  /**
+   * 1 to N partition based on the heuristic function
+   * <b>NOT</b> implemented yet
+   * TODO implement this later
+   */
+  ONE_TO_MANY_HEURISTIC
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java 
b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
new file mode 100644
index 0000000..17bc465
--- /dev/null
+++ 
b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * A bunch of test to verify the input operator will be automatically 
partitioned per kafka partition This test is launching its
+ * own Kafka cluster.
+ */
+@RunWith(Parameterized.class)
+public class KafkaInputOperatorTest extends KafkaOperatorTestBase
+{
+
+  private int totalBrokers = 0;
+
+
+
+  @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}")
+  public static Collection<Boolean[]> testScenario()
+  {
+    return Arrays.asList(new Boolean[][]{{true, false}, // multi cluster with 
single partition
+      {true, true}, // multi cluster with multi partitions
+      {false, true}, // single cluster with multi partitions
+      {false, false}, // single cluster with single partitions
+    });
+  }
+
+  public KafkaInputOperatorTest(boolean hasMultiCluster, boolean 
hasMultiPartition)
+  {
+    // This class want to initialize several kafka brokers for multiple 
partitions
+    this.hasMultiCluster = hasMultiCluster;
+    this.hasMultiPartition = hasMultiPartition;
+    int cluster = 1 + (hasMultiCluster ? 1 : 0);
+    totalBrokers = (1 + (hasMultiPartition ? 1 : 0)) * cluster;
+
+  }
+
+  private static final org.slf4j.Logger logger = 
LoggerFactory.getLogger(KafkaInputOperatorTest.class);
+  private static List<String> tupleCollection = new LinkedList<>();
+  private static CountDownLatch latch;
+  private static boolean hasFailure = false;
+  private static int failureTrigger = 3000;
+  private static int k = 0;
+
+  /**
+   * Test Operator to collect tuples from KafkaSingleInputStringOperator.
+   *
+   * @param
+   */
+  public static class CollectorModule extends BaseOperator
+  {
+    public final transient CollectorInputPort inputPort = new 
CollectorInputPort();
+  }
+
+  public static class CollectorInputPort extends DefaultInputPort<byte[]>
+  {
+
+    @Override
+    public void process(byte[] bt)
+    {
+      String tuple = new String(bt);
+      if (hasFailure && k++ == failureTrigger) {
+        //you can only kill yourself once
+        hasFailure = false;
+        throw new RuntimeException();
+      }
+      if (tuple.equals(KafkaOperatorTestBase.END_TUPLE)) {
+        if (latch != null) {
+          latch.countDown();
+        }
+        return;
+      }
+      tupleCollection.add(tuple);
+    }
+
+    @Override
+    public void setConnected(boolean flag)
+    {
+      if (flag) {
+        tupleCollection.clear();
+      }
+    }
+  }
+
+  /**
+   * Test AbstractKafkaSinglePortInputOperator (i.e. an input adapter for 
Kafka, aka consumer). This module receives
+   * data from an outside test generator through Kafka message bus and feed 
that data into Malhar streaming platform.
+   *
+   * [Generate message and send that to Kafka message bus] ==> [Receive that 
message through Kafka input adapter(i.e.
+   * consumer) and send using emitTuples() interface on output port]
+   *
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testPartitionableInputOperator() throws Exception
+  {
+    hasFailure = false;
+    testInputOperator(false);
+  }
+
+
+  @Test
+  public void testPartitionableInputOperatorWithFailure() throws Exception
+  {
+    hasFailure = true;
+    testInputOperator(true);
+  }
+
+  public void testInputOperator(boolean hasFailure) throws Exception
+  {
+
+    // each broker should get a END_TUPLE message
+    latch = new CountDownLatch(totalBrokers);
+
+    int totalCount = 10000;
+
+    // Start producer
+    KafkaTestProducer p = new KafkaTestProducer(TEST_TOPIC, hasMultiPartition, 
hasMultiCluster);
+    p.setSendCount(totalCount);
+    Thread t = new Thread(p);
+    t.start();
+
+    // Create DAG for testing.
+    LocalMode lma = LocalMode.newInstance();
+    DAG dag = lma.getDAG();
+
+    // Create KafkaSinglePortStringInputOperator
+    KafkaSinglePortInputOperator node = dag.addOperator("Kafka input", 
KafkaSinglePortInputOperator.class);
+    node.setInitialPartitionCount(1);
+    // set topic
+    node.setTopics(TEST_TOPIC);
+    
node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
+    node.setClusters(getClusterConfig());
+
+    // Create Test tuple collector
+    CollectorModule collector = dag.addOperator("TestMessageCollector", new 
CollectorModule());
+
+    // Connect ports
+    dag.addStream("Kafka message", node.outputPort, 
collector.inputPort).setLocality(Locality.CONTAINER_LOCAL);
+
+    // Create local cluster
+    final LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(false);
+
+    if (hasFailure) {
+      setupHasFailureTest(node, dag);
+    }
+    lc.runAsync();
+
+    // Wait 30s for consumer finish consuming all the messages
+    boolean notTimeout = latch.await(40000, TimeUnit.MILLISECONDS);
+    Assert.assertTrue("TIMEOUT: 40s Collected " + tupleCollection, notTimeout);
+
+    // Check results
+    Assert.assertEquals("Tuple count", totalCount, tupleCollection.size());
+    logger.debug(String.format("Number of emitted tuples: %d", 
tupleCollection.size()));
+
+    t.join();
+    p.close();
+    lc.shutdown();
+  }
+
+  private void setupHasFailureTest(KafkaSinglePortInputOperator operator, DAG 
dag)
+  {
+    operator.setHoldingBufferSize(5000);
+    dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
+    //dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new 
FSStorageAgent("target/ck", new Configuration()));
+    operator.setMaxTuplesPerWindow(500);
+  }
+
+  private String getClusterConfig() {
+    String l = "localhost:";
+    return l + TEST_KAFKA_BROKER_PORT[0][0] +
+      (hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") +
+      (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") +
+      (hasMultiCluster && hasMultiPartition ? "," + l  + 
TEST_KAFKA_BROKER_PORT[1][1] : "");
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java 
b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
new file mode 100644
index 0000000..7085348
--- /dev/null
+++ 
b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.Properties;
+
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+
+import kafka.admin.TopicCommand;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+import kafka.utils.ZkUtils;
+
+/**
+ * This is a base class setup/clean Kafka testing environment for all the 
input/output test If it's a multipartition
+ * test, this class creates 2 kafka partitions
+ */
+public class KafkaOperatorTestBase
+{
+
+  public static final String END_TUPLE = "END_TUPLE";
+  public static final int[] TEST_ZOOKEEPER_PORT;
+  public static final int[][] TEST_KAFKA_BROKER_PORT;
+  public static final String TEST_TOPIC = "testtopic";
+
+  // get available ports
+  static {
+    ServerSocket[] listeners = new ServerSocket[6];
+    int[] p = new int[6];
+
+    try {
+      for (int i = 0; i < 6; i++) {
+        listeners[i] = new ServerSocket(0);
+        p[i] = listeners[i].getLocalPort();
+      }
+
+      for (int i = 0; i < 6; i++) {
+        listeners[i].close();
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    TEST_ZOOKEEPER_PORT = new int[]{p[0], p[1]};
+    TEST_KAFKA_BROKER_PORT = new int[][] {
+      new int[] {p[2], p[3]},
+      new int[] {p[4], p[5]}
+    };
+  }
+
+  static final org.slf4j.Logger logger = 
LoggerFactory.getLogger(KafkaOperatorTestBase.class);
+  // since Kafka 0.8 use KafkaServerStatble instead of KafkaServer
+
+  // multiple brokers in multiple cluster
+  private KafkaServerStartable[][] broker = new KafkaServerStartable[2][2];
+
+  // multiple cluster
+  private ServerCnxnFactory[] zkFactory = new ServerCnxnFactory[2];
+
+  private ZooKeeperServer[] zkServer = new ZooKeeperServer[2];
+
+  public String baseDir = "target";
+
+  private final String zkBaseDir = "zookeeper-server-data";
+  private final String kafkaBaseDir = "kafka-server-data";
+  private final String[] zkdir = new String[] { "zookeeper-server-data/1", 
"zookeeper-server-data/2" };
+  private final String[][] kafkadir = new String[][] { new String[] { 
"kafka-server-data/1/1", "kafka-server-data/1/2" }, new String[] { 
"kafka-server-data/2/1", "kafka-server-data/2/2" } };
+  protected boolean hasMultiPartition = false;
+  protected boolean hasMultiCluster = false;
+
+  public void startZookeeper(final int clusterId)
+  {
+    try {
+
+      int numConnections = 10;
+      int tickTime = 2000;
+      File dir = new File(baseDir, zkdir[clusterId]);
+
+      zkServer[clusterId] = new TestZookeeperServer(dir, dir, tickTime);
+      zkFactory[clusterId] = new NIOServerCnxnFactory();
+      zkFactory[clusterId].configure(new 
InetSocketAddress(TEST_ZOOKEEPER_PORT[clusterId]), numConnections);
+
+      zkFactory[clusterId].startup(zkServer[clusterId]); // start the 
zookeeper server.
+      Thread.sleep(2000);
+      //kserver.startup();
+    } catch (Exception ex) {
+      logger.error(ex.getLocalizedMessage());
+    }
+  }
+
+  public void stopZookeeper()
+  {
+    for (ZooKeeperServer zs : zkServer) {
+      if (zs != null) {
+        zs.shutdown();
+      }
+    }
+
+    for (ServerCnxnFactory zkf : zkFactory) {
+      if (zkf != null) {
+        zkf.closeAll();
+        zkf.shutdown();
+      }
+    }
+    zkServer =  new ZooKeeperServer[2];
+    zkFactory =  new ServerCnxnFactory[2];
+  }
+
+  public void startKafkaServer(int clusterid, int brokerid)
+  {
+    Properties props = new Properties();
+    props.setProperty("broker.id", "" + brokerid);
+    props.setProperty("log.dirs", new File(baseDir, 
kafkadir[clusterid][brokerid]).toString());
+    props.setProperty("zookeeper.connect", "localhost:" + 
TEST_ZOOKEEPER_PORT[clusterid]);
+    props.setProperty("port", "" + 
TEST_KAFKA_BROKER_PORT[clusterid][brokerid]);
+    props.setProperty("default.replication.factor", "1");
+    // set this to 50000 to boost the performance so most test data are in 
memory before flush to disk
+    props.setProperty("log.flush.interval.messages", "50000");
+    if (hasMultiPartition) {
+      props.setProperty("num.partitions", "2");
+    } else {
+      props.setProperty("num.partitions", "1");
+    }
+
+    broker[clusterid][brokerid] = new KafkaServerStartable(new 
KafkaConfig(props));
+    broker[clusterid][brokerid].startup();
+
+  }
+
+  public void startKafkaServer()
+  {
+
+    FileUtils.deleteQuietly(new File(baseDir, kafkaBaseDir));
+    boolean[][] startable = new boolean[][] { new boolean[] { true, 
hasMultiPartition }, new boolean[] { hasMultiCluster, hasMultiCluster && 
hasMultiPartition } };
+    for (int i = 0; i < startable.length; i++) {
+      for (int j = 0; j < startable[i].length; j++) {
+        if (startable[i][j])
+          startKafkaServer(i, j);
+      }
+    }
+
+    // startup is asynch operation. wait 2 sec for server to startup
+
+  }
+
+  public void stopKafkaServer()
+  {
+    for (int i = 0; i < broker.length; i++) {
+      for (int j = 0; j < broker[i].length; j++) {
+        if (broker[i][j] != null) {
+          broker[i][j].shutdown();
+          broker[i][j].awaitShutdown();
+          broker[i][j] = null;
+        }
+      }
+    }
+  }
+
+  @Before
+  public void beforeTest()
+  {
+    try {
+      startZookeeper();
+      startKafkaServer();
+      createTopic(0, TEST_TOPIC);
+      if (hasMultiCluster) {
+        createTopic(1, TEST_TOPIC);
+      }
+    } catch (java.nio.channels.CancelledKeyException ex) {
+      logger.debug("LSHIL {}", ex.getLocalizedMessage());
+    }
+  }
+
+  public void startZookeeper()
+  {
+    FileUtils.deleteQuietly(new File(baseDir, zkBaseDir));
+    startZookeeper(0);
+    if (hasMultiCluster) {
+      startZookeeper(1);
+    }
+  }
+
+  public void createTopic(int clusterid, String topicName)
+  {
+    String[] args = new String[9];
+    args[0] = "--zookeeper";
+    args[1] = "localhost:" + TEST_ZOOKEEPER_PORT[clusterid];
+    args[2] = "--replication-factor";
+    args[3] = "1";
+    args[4] = "--partitions";
+    if (hasMultiPartition) {
+      args[5] = "2";
+    } else {
+      args[5] = "1";
+    }
+    args[6] = "--topic";
+    args[7] = topicName;
+    args[8] = "--create";
+
+    ZkUtils zu = ZkUtils.apply("localhost:" + TEST_ZOOKEEPER_PORT[clusterid], 
30000, 30000, false);
+    TopicCommand.createTopic(zu, new TopicCommand.TopicCommandOptions(args));
+
+    // Right now, there is no programmatic synchronized way to create the 
topic. have to wait 2 sec to make sure the
+    // topic is created
+    // So the tests will not hit any bizarre failure
+    try {
+      Thread.sleep(5000);
+      zu.close();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @After
+  public void afterTest()
+  {
+    try {
+      stopKafkaServer();
+      stopZookeeper();
+    } catch (Exception ex) {
+      logger.debug("LSHIL {}", ex.getLocalizedMessage());
+    }
+  }
+
+  public void setHasMultiPartition(boolean hasMultiPartition)
+  {
+    this.hasMultiPartition = hasMultiPartition;
+  }
+
+  public void setHasMultiCluster(boolean hasMultiCluster)
+  {
+    this.hasMultiCluster = hasMultiCluster;
+  }
+
+  public static class TestZookeeperServer extends ZooKeeperServer
+  {
+
+    public TestZookeeperServer()
+    {
+      super();
+      // TODO Auto-generated constructor stub
+    }
+
+    public TestZookeeperServer(File snapDir, File logDir, int tickTime) throws 
IOException
+    {
+      super(snapDir, logDir, tickTime);
+      // TODO Auto-generated constructor stub
+    }
+
+    public TestZookeeperServer(FileTxnSnapLog txnLogFactory, DataTreeBuilder 
treeBuilder) throws IOException
+    {
+      super(txnLogFactory, treeBuilder);
+      // TODO Auto-generated constructor stub
+    }
+
+    public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, 
DataTreeBuilder treeBuilder) throws IOException
+    {
+      super(txnLogFactory, tickTime, treeBuilder);
+      // TODO Auto-generated constructor stub
+    }
+
+    public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int 
minSessionTimeout, int maxSessionTimeout, DataTreeBuilder treeBuilder, 
ZKDatabase zkDb)
+    {
+      super(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, 
treeBuilder, zkDb);
+      // TODO Auto-generated constructor stub
+    }
+
+    @Override
+    protected void registerJMX()
+    {
+    }
+
+    @Override
+    protected void unregisterJMX()
+    {
+    }
+
+  }
+}


Reply via email to