APEXMALHAR-2459 1)Refactor the existing Kafka Input Operator. 2)Added the 
support of KafkaInputOperator using 0.10 consumer API


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/b42d8e74
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/b42d8e74
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/b42d8e74

Branch: refs/heads/master
Commit: b42d8e741cf2c8241749351b31e8fd7fef546916
Parents: 4df6858
Author: chaitanya <chai...@apache.org>
Authored: Wed Apr 5 15:31:47 2017 +0530
Committer: chaitanya <chai...@apache.org>
Committed: Sun Jul 2 23:14:06 2017 +0530

----------------------------------------------------------------------
 kafka/XmlJavadocCommentsExtractor.xsl           |  48 --
 .../XmlJavadocCommentsExtractor.xsl             |  48 ++
 kafka/kafka-common/pom.xml                      | 223 +++++++
 .../malhar/kafka/AbstractKafkaConsumer.java     | 127 ++++
 .../kafka/AbstractKafkaInputOperator.java       | 635 +++++++++++++++++++
 .../kafka/AbstractKafkaOutputOperator.java      | 126 ++++
 .../malhar/kafka/AbstractKafkaPartitioner.java  | 335 ++++++++++
 .../apex/malhar/kafka/KafkaConsumerWrapper.java | 387 +++++++++++
 .../apache/apex/malhar/kafka/KafkaMetrics.java  | 140 ++++
 .../apex/malhar/kafka/KafkaPartition.java       | 142 +++++
 .../kafka/KafkaSinglePortOutputOperator.java    |  46 ++
 .../apex/malhar/kafka/OneToManyPartitioner.java |  72 +++
 .../apex/malhar/kafka/OneToOnePartitioner.java  |  61 ++
 .../apex/malhar/kafka/PartitionStrategy.java    |  45 ++
 .../AbstractKafkaConsumerPropertiesTest.java    |  84 +++
 .../src/test/resources/log4j.properties         |  49 ++
 kafka/kafka010/XmlJavadocCommentsExtractor.xsl  |  48 ++
 kafka/kafka010/pom.xml                          |  98 +++
 .../apex/malhar/kafka/KafkaConsumer010.java     | 200 ++++++
 .../kafka/KafkaSinglePortInputOperator.java     |  59 ++
 .../kafka/KafkaConsumerPropertiesTest.java      |  35 +
 .../malhar/kafka/KafkaInputOperatorTest.java    | 397 ++++++++++++
 .../malhar/kafka/KafkaOperatorTestBase.java     | 285 +++++++++
 .../apex/malhar/kafka/KafkaTestPartitioner.java |  64 ++
 .../apex/malhar/kafka/KafkaTestProducer.java    | 181 ++++++
 .../src/test/resources/log4j.properties         |  49 ++
 kafka/kafka09/XmlJavadocCommentsExtractor.xsl   |  48 ++
 kafka/kafka09/pom.xml                           |  86 +++
 .../apex/malhar/kafka/KafkaConsumer09.java      | 200 ++++++
 ...afkaSinglePortExactlyOnceOutputOperator.java | 413 ++++++++++++
 .../kafka/KafkaSinglePortInputOperator.java     |  60 ++
 .../apache/apex/malhar/kafka/EmbeddedKafka.java | 166 +++++
 .../kafka/KafkaConsumerPropertiesTest.java      |  35 +
 .../apache/apex/malhar/kafka/KafkaHelper.java   |  66 ++
 .../malhar/kafka/KafkaInputOperatorTest.java    | 397 ++++++++++++
 .../malhar/kafka/KafkaOperatorTestBase.java     | 285 +++++++++
 .../malhar/kafka/KafkaOutputOperatorTest.java   | 425 +++++++++++++
 .../apex/malhar/kafka/KafkaTestPartitioner.java |  64 ++
 .../apex/malhar/kafka/KafkaTestProducer.java    | 181 ++++++
 .../kafka09/src/test/resources/log4j.properties |  49 ++
 kafka/pom.xml                                   | 306 ++++-----
 .../kafka/AbstractKafkaInputOperator.java       | 622 ------------------
 .../kafka/AbstractKafkaOutputOperator.java      | 126 ----
 .../malhar/kafka/AbstractKafkaPartitioner.java  | 335 ----------
 .../apex/malhar/kafka/KafkaConsumerWrapper.java | 389 ------------
 .../apache/apex/malhar/kafka/KafkaMetrics.java  | 140 ----
 .../apex/malhar/kafka/KafkaPartition.java       | 142 -----
 ...afkaSinglePortExactlyOnceOutputOperator.java | 413 ------------
 .../kafka/KafkaSinglePortInputOperator.java     |  46 --
 .../kafka/KafkaSinglePortOutputOperator.java    |  46 --
 .../apex/malhar/kafka/OneToManyPartitioner.java |  72 ---
 .../apex/malhar/kafka/OneToOnePartitioner.java  |  61 --
 .../apex/malhar/kafka/PartitionStrategy.java    |  45 --
 .../apache/apex/malhar/kafka/EmbeddedKafka.java | 166 -----
 .../kafka/KafkaConsumerPropertiesTest.java      |  81 ---
 .../apache/apex/malhar/kafka/KafkaHelper.java   |  66 --
 .../malhar/kafka/KafkaInputOperatorTest.java    | 399 ------------
 .../malhar/kafka/KafkaOperatorTestBase.java     | 292 ---------
 .../malhar/kafka/KafkaOutputOperatorTest.java   | 429 -------------
 .../apex/malhar/kafka/KafkaTestPartitioner.java |  64 --
 .../apex/malhar/kafka/KafkaTestProducer.java    | 179 ------
 kafka/src/test/resources/log4j.properties       |  50 --
 sql/pom.xml                                     |   6 -
 63 files changed, 6549 insertions(+), 4385 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/XmlJavadocCommentsExtractor.xsl
----------------------------------------------------------------------
diff --git a/kafka/XmlJavadocCommentsExtractor.xsl 
b/kafka/XmlJavadocCommentsExtractor.xsl
deleted file mode 100644
index ec72325..0000000
--- a/kafka/XmlJavadocCommentsExtractor.xsl
+++ /dev/null
@@ -1,48 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<!--
-    Document   : XmlJavadocCommentsExtractor.xsl
-    Created on : September 16, 2014, 11:30 AM
-    Description:
-        The transformation strips off all information except for comments and 
tags from xml javadoc generated by xml-doclet.
--->
-
-<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"; version="1.0">
-  <xsl:output method="xml" standalone="yes"/>
-
-  <!-- copy xml by selecting only the following nodes, attrbutes and text -->
-  <xsl:template match="node()|text()|@*">
-    <xsl:copy>
-      <xsl:apply-templates 
select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/>
-    </xsl:copy>
-  </xsl:template>
-
-  <!-- Strip off the following paths from the selected xml -->
-  <xsl:template match="//root/package/interface/interface
-                      |//root/package/interface/method/@qualified
-                      |//root/package/class/interface
-                      |//root/package/class/class
-                      |//root/package/class/method/@qualified
-                      |//root/package/class/field/@qualified" />
-
-  <xsl:strip-space elements="*"/>
-</xsl:stylesheet>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/XmlJavadocCommentsExtractor.xsl
----------------------------------------------------------------------
diff --git a/kafka/kafka-common/XmlJavadocCommentsExtractor.xsl 
b/kafka/kafka-common/XmlJavadocCommentsExtractor.xsl
new file mode 100644
index 0000000..ec72325
--- /dev/null
+++ b/kafka/kafka-common/XmlJavadocCommentsExtractor.xsl
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<!--
+    Document   : XmlJavadocCommentsExtractor.xsl
+    Created on : September 16, 2014, 11:30 AM
+    Description:
+        The transformation strips off all information except for comments and 
tags from xml javadoc generated by xml-doclet.
+-->
+
+<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"; version="1.0">
+  <xsl:output method="xml" standalone="yes"/>
+
+  <!-- copy xml by selecting only the following nodes, attrbutes and text -->
+  <xsl:template match="node()|text()|@*">
+    <xsl:copy>
+      <xsl:apply-templates 
select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/>
+    </xsl:copy>
+  </xsl:template>
+
+  <!-- Strip off the following paths from the selected xml -->
+  <xsl:template match="//root/package/interface/interface
+                      |//root/package/interface/method/@qualified
+                      |//root/package/class/interface
+                      |//root/package/class/class
+                      |//root/package/class/method/@qualified
+                      |//root/package/class/field/@qualified" />
+
+  <xsl:strip-space elements="*"/>
+</xsl:stylesheet>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/pom.xml
----------------------------------------------------------------------
diff --git a/kafka/kafka-common/pom.xml b/kafka/kafka-common/pom.xml
new file mode 100755
index 0000000..71a8a09
--- /dev/null
+++ b/kafka/kafka-common/pom.xml
@@ -0,0 +1,223 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar-kafka-connectors</artifactId>
+    <version>3.8.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>malhar-kafka-common</artifactId>
+  <packaging>jar</packaging>
+  <name>Apache Apex Malhar Common Utilities for Kafka Support</name>
+
+  <properties>
+    <checkstyle.console>false</checkstyle.console>
+  </properties>
+
+  <build>
+    <plugins>
+      <!-- Publish tests jar -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.4</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+            <phase>package</phase>
+          </execution>
+        </executions>
+      </plugin>
+      <!-- create resource directory for xml javadoc-->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>createJavadocDirectory</id>
+            <phase>generate-resources</phase>
+            <configuration>
+              <tasks>
+                <delete 
dir="${project.build.directory}/generated-resources/xml-javadoc"/>
+                <mkdir 
dir="${project.build.directory}/generated-resources/xml-javadoc"/>
+              </tasks>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <!-- generate javdoc -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+        <executions>
+          <!-- generate xml javadoc -->
+          <execution>
+            <id>xml-doclet</id>
+            <phase>generate-resources</phase>
+            <goals>
+              <goal>javadoc</goal>
+            </goals>
+            <configuration>
+              <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet>
+              <additionalparam>-d 
${project.build.directory}/generated-resources/xml-javadoc -filename 
${project.artifactId}-${project.version}-javadoc.xml</additionalparam>
+              <useStandardDocletOptions>false</useStandardDocletOptions>
+              <docletArtifact>
+                <groupId>com.github.markusbernhardt</groupId>
+                <artifactId>xml-doclet</artifactId>
+                <version>1.0.4</version>
+              </docletArtifact>
+            </configuration>
+          </execution>
+          <!-- generate default javadoc jar with custom tags -->
+          <execution>
+            <id>attach-sources</id>
+            <goals>
+              <goal>jar</goal>
+            </goals>
+            <configuration>
+              <skip>true</skip>
+              <tags>
+                <tag>
+                  <name>customTag1</name>
+                  <placement>a</placement>
+                  <head>Custom Tag One:</head>
+                </tag>
+                <tag>
+                  <name>customTag2</name>
+                  <placement>a</placement>
+                  <head>Custom Tag two:</head>
+                </tag>
+                <tag>
+                  <name>customTag3</name>
+                  <placement>a</placement>
+                  <head>Custom Tag three:</head>
+                </tag>
+              </tags>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <!-- Transform xml javadoc to stripped down version containing only 
class/interface comments and tags-->
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>xml-maven-plugin</artifactId>
+        <version>1.0</version>
+        <executions>
+          <execution>
+            <id>transform-xmljavadoc</id>
+            <phase>generate-resources</phase>
+            <goals>
+              <goal>transform</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <transformationSets>
+            <transformationSet>
+              
<dir>${project.build.directory}/generated-resources/xml-javadoc</dir>
+              <includes>
+                
<include>${project.artifactId}-${project.version}-javadoc.xml</include>
+              </includes>
+              <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet>
+              
<outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir>
+            </transformationSet>
+          </transformationSets>
+        </configuration>
+      </plugin>
+      <!-- copy xml javadoc to class jar -->
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <version>2.6</version>
+        <executions>
+          <execution>
+            <id>copy-resources</id>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${basedir}/target/classes</outputDirectory>
+              <resources>
+                <resource>
+                  
<directory>${project.build.directory}/generated-resources/xml-javadoc</directory>
+                  <includes>
+                    
<include>${project.artifactId}-${project.version}-javadoc.xml</include>
+                  </includes>
+                  <filtering>true</filtering>
+                </resource>
+              </resources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <argLine>-Xmx2048m</argLine>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.11</artifactId>
+      <version>0.9.0.1</version>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-simple</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.11</artifactId>
+      <version>0.9.0.1</version>
+      <classifier>test</classifier>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaConsumer.java
 
b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaConsumer.java
new file mode 100644
index 0000000..ebb46e5
--- /dev/null
+++ 
b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaConsumer.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Interface for Kafka Consumer. It wraps around the KafkaConsumer.
+ */
+public interface AbstractKafkaConsumer
+{
+  /**
+   * Checks whether the consumer contains the specified partition or not
+   * @param topicPartition topic partition
+   * @return true if consumer contains the given partition, otherwise false
+   */
+  boolean isConsumerContainsPartition(TopicPartition topicPartition);
+
+  /**
+   * Seek to the specified offset for the given partition
+   * @param topicPartition topic partition
+   * @param offset given offset
+   */
+  void seekToOffset(TopicPartition topicPartition, long offset);
+
+  /**
+   * Fetch data for the topics or partitions specified using assign API.
+   * @param timeOut time in milliseconds, spent waiting in poll if data is not 
available in buffer.
+   * @return records
+   */
+  ConsumerRecords<byte[], byte[]> pollRecords(long timeOut);
+
+  /**
+   * Commit the specified offsets for the specified list of topics and 
partitions to Kafka.
+   * @param offsets given offsets
+   * @param callback Callback to invoke when the commit completes
+   */
+  void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, 
OffsetCommitCallback callback);
+
+  /**
+   * Assign the specified list of partitions to the consumer
+   * @param partitions list of partitions
+   */
+  void assignPartitions(List<TopicPartition> partitions);
+
+  /**
+   * Seek to the first offset for the specified list of partitions
+   * @param partitions list of partitions
+   */
+  void seekToBeginning(TopicPartition... partitions);
+
+  /**
+   * Seek to the last offser for the specified list of partitions
+   * @param partitions list of partitions
+   */
+  void seekToEnd(TopicPartition... partitions);
+
+  /**
+   * Wrapper for Wakeup the consumer
+   */
+  void wakeup();
+
+  /**
+   * Return the metrics kept by the consumer
+   * @return metrics
+   */
+  Map<MetricName, ? extends Metric> metrics();
+
+  /**
+   * Wrapper for close the consumer
+   */
+  void close();
+
+  /**
+   * Resume all the partitions
+   */
+  void resumeAllPartitions();
+
+  /**
+   * Return the list of partitions assigned to this consumer
+   * @return list of partitions
+   */
+  Collection<TopicPartition> getPartitions();
+
+  /**
+   * Resume the specified partition
+   * @param tp partition
+   */
+  void resumePartition(TopicPartition tp);
+
+  /**
+   * Pause the specified partition
+   * @param tp partition
+   */
+  void pausePartition(TopicPartition tp);
+
+  /**
+   * Return the offset of the next record that will be fetched
+   * @param tp partition
+   */
+  long positionPartition(TopicPartition tp);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git 
a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
 
b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
new file mode 100644
index 0000000..40747eb
--- /dev/null
+++ 
b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
@@ -0,0 +1,635 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * The abstract kafka input operator using kafka 0.9.0 new consumer API
+ * A scalable, fault-tolerant, at-least-once kafka input operator
+ * Key features includes:
+ *
+ * <ol>
+ * <li>Out-of-box One-to-one and one-to-many partition strategy support plus 
customizable partition strategy
+ * refer to AbstractKafkaPartitioner </li>
+ * <li>Fault-tolerant when the input operator goes down, it redeploys on other 
node</li>
+ * <li>At-least-once semantics for operator failure (no matter which operator 
fails)</li>
+ * <li>At-least-once semantics for cold restart (no data loss even if you 
restart the application)</li>
+ * <li>Multi-cluster support, one operator can consume data from more than one 
kafka clusters</li>
+ * <li>Multi-topic support, one operator can subscribe multiple topics</li>
+ * <li>Throughput control support, you can throttle number of tuple for each 
streaming window</li>
+ * </ol>
+ *
+ * @since 3.3.0
+ */
+@InterfaceStability.Evolving
+public abstract class AbstractKafkaInputOperator implements InputOperator,
+    Operator.ActivationListener<Context.OperatorContext>, 
Operator.CheckpointNotificationListener,
+    Partitioner<AbstractKafkaInputOperator>, StatsListener, 
OffsetCommitCallback
+{
+
+  private static final Logger logger = 
LoggerFactory.getLogger(AbstractKafkaInputOperator.class);
+
+  static {
+    // We create new consumers periodically to pull metadata (Kafka consumer 
keeps metadata in cache)
+    // Skip log4j log for ConsumerConfig class to avoid too much noise in 
application
+    LogManager.getLogger(ConsumerConfig.class).setLevel(Level.WARN);
+  }
+
+  public enum InitialOffset
+  {
+    EARLIEST, // consume from beginning of the partition every time when 
application restart
+    LATEST, // consume from latest of the partition every time when 
application restart
+    // consume from committed position from last run or earliest if there is 
no committed offset(s)
+    APPLICATION_OR_EARLIEST,
+    APPLICATION_OR_LATEST // consume from committed position from last run or 
latest if there is no committed offset(s)
+  }
+
+  @NotNull
+  private String[] clusters;
+
+  @NotNull
+  private String[] topics;
+
+  /**
+   * offset track for checkpoint
+   */
+  private final Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetTrack 
= new HashMap<>();
+
+  private final transient Map<AbstractKafkaPartitioner.PartitionMeta, Long> 
windowStartOffset = new HashMap<>();
+
+  private transient int operatorId;
+
+  private int initialPartitionCount = 1;
+
+  private long repartitionInterval = 30000L;
+
+  private long repartitionCheckInterval = 5000L;
+
+  @Min(1)
+  private int maxTuplesPerWindow = Integer.MAX_VALUE;
+
+  /**
+   * By default the operator start consuming from the committed offset or the 
latest one
+   */
+  private InitialOffset initialOffset = InitialOffset.APPLICATION_OR_LATEST;
+
+  private long metricsRefreshInterval = 5000L;
+
+  private long consumerTimeout = 5000L;
+
+  private int holdingBufferSize = 1024;
+
+  private Properties consumerProps = new Properties();
+
+  /**
+   * Assignment for each operator instance
+   */
+  private Set<AbstractKafkaPartitioner.PartitionMeta> assignment;
+
+  //=======================All transient fields==========================
+
+  /**
+   * Wrapper consumer object
+   * It wraps KafkaConsumer, maintains consumer thread and store messages in a 
queue
+   */
+  private final transient KafkaConsumerWrapper consumerWrapper = 
createConsumerWrapper();
+
+  /**
+   * By default the strategy is one to one
+   *
+   * @see PartitionStrategy
+   */
+  private PartitionStrategy strategy = PartitionStrategy.ONE_TO_ONE;
+
+  /**
+   * count the emitted message in each window<br>
+   * non settable
+   */
+  private transient int emitCount = 0;
+
+  /**
+   * store offsets with window id, only keep offsets with windows that have 
not been committed
+   */
+  private final transient List<Pair<Long, 
Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> offsetHistory =
+      new LinkedList<>();
+
+  /**
+   * Application name is used as group.id for kafka consumer
+   */
+  private transient String applicationName;
+
+  private transient AbstractKafkaPartitioner partitioner;
+
+  private transient long currentWindowId;
+
+  private transient long lastCheckTime = 0L;
+
+  private transient long lastRepartitionTime = 0L;
+
+  @AutoMetric
+  private transient KafkaMetrics metrics;
+
+  private WindowDataManager windowDataManager = new 
WindowDataManager.NoopWindowDataManager();
+
+  /**
+   * Creates the Wrapper consumer object
+   * It maintains consumer thread and store messages in a queue
+   * @return KafkaConsumerWrapper
+   */
+  public KafkaConsumerWrapper createConsumerWrapper()
+  {
+    return new KafkaConsumerWrapper();
+  }
+
+  // Creates the consumer object and it wraps KafkaConsumer.
+  public abstract AbstractKafkaConsumer createConsumer(Properties prop);
+
+  @Override
+  public void activate(Context.OperatorContext context)
+  {
+    consumerWrapper.start(isIdempotent());
+  }
+
+  @Override
+  public void deactivate()
+  {
+    consumerWrapper.stop();
+  }
+
+  @Override
+  public void checkpointed(long l)
+  {
+
+  }
+
+  @Override
+  public void beforeCheckpoint(long windowId)
+  {
+
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+    if (initialOffset == InitialOffset.LATEST || initialOffset == 
InitialOffset.EARLIEST) {
+      return;
+    }
+    //ask kafka consumer wrapper to store the committed offsets
+    for (Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, 
Long>>> iter =
+        offsetHistory.iterator(); iter.hasNext(); ) {
+      Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>> item = 
iter.next();
+      if (item.getLeft() <= windowId) {
+        if (item.getLeft() == windowId) {
+          consumerWrapper.commitOffsets(item.getRight());
+        }
+        iter.remove();
+      }
+    }
+    if (isIdempotent()) {
+      try {
+        windowDataManager.committed(windowId);
+      } catch (IOException e) {
+        DTThrowable.rethrow(e);
+      }
+    }
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    int count = consumerWrapper.messageSize();
+    if (maxTuplesPerWindow > 0) {
+      count = Math.min(count, maxTuplesPerWindow - emitCount);
+    }
+    for (int i = 0; i < count; i++) {
+      Pair<String, ConsumerRecord<byte[], byte[]>> tuple = 
consumerWrapper.pollMessage();
+      ConsumerRecord<byte[], byte[]> msg = tuple.getRight();
+      emitTuple(tuple.getLeft(), msg);
+      AbstractKafkaPartitioner.PartitionMeta pm = new 
AbstractKafkaPartitioner.PartitionMeta(tuple.getLeft(),
+          msg.topic(), msg.partition());
+      offsetTrack.put(pm, msg.offset() + 1);
+      if (isIdempotent() && !windowStartOffset.containsKey(pm)) {
+        windowStartOffset.put(pm, msg.offset());
+      }
+    }
+    emitCount += count;
+  }
+
+  protected abstract void emitTuple(String cluster, ConsumerRecord<byte[], 
byte[]> message);
+
+  @Override
+  public void beginWindow(long wid)
+  {
+    emitCount = 0;
+    currentWindowId = wid;
+    windowStartOffset.clear();
+    if (isIdempotent() && wid <= 
windowDataManager.getLargestCompletedWindow()) {
+      replay(wid);
+    } else {
+      consumerWrapper.afterReplay();
+    }
+  }
+
+  private void replay(long windowId)
+  {
+    try {
+      @SuppressWarnings("unchecked")
+      Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>> windowData 
=
+          (Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, 
Long>>)windowDataManager.retrieve(windowId);
+      consumerWrapper.emitImmediately(windowData);
+    } catch (IOException e) {
+      DTThrowable.rethrow(e);
+    }
+  }
+
+  @Override
+  public void endWindow()
+  {
+    // copy current offset track to history memory
+    Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetsWithWindow = new 
HashMap<>(offsetTrack);
+    offsetHistory.add(Pair.of(currentWindowId, offsetsWithWindow));
+
+    //update metrics
+    metrics.updateMetrics(clusters, consumerWrapper.getAllConsumerMetrics());
+
+    //update the windowDataManager
+    if (isIdempotent()) {
+      try {
+        Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>> 
windowData = new HashMap<>();
+        for (Map.Entry<AbstractKafkaPartitioner.PartitionMeta, Long> e : 
windowStartOffset.entrySet()) {
+          windowData.put(e.getKey(), new MutablePair<>(e.getValue(), 
offsetTrack.get(e.getKey()) - e.getValue()));
+        }
+        windowDataManager.save(windowData, currentWindowId);
+      } catch (IOException e) {
+        DTThrowable.rethrow(e);
+      }
+    }
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    applicationName = context.getValue(Context.DAGContext.APPLICATION_NAME);
+    consumerWrapper.create(this);
+    metrics = new KafkaMetrics(metricsRefreshInterval);
+    windowDataManager.setup(context);
+    operatorId = context.getId();
+  }
+
+  @Override
+  public void teardown()
+  {
+    windowDataManager.teardown();
+  }
+
+  private void initPartitioner()
+  {
+    if (partitioner == null) {
+      logger.info("Initialize Partitioner");
+      switch (strategy) {
+        case ONE_TO_ONE:
+          partitioner = new OneToOnePartitioner(clusters, topics, this);
+          break;
+        case ONE_TO_MANY:
+          partitioner = new OneToManyPartitioner(clusters, topics, this);
+          break;
+        case ONE_TO_MANY_HEURISTIC:
+          throw new UnsupportedOperationException("Not implemented yet");
+        default:
+          throw new RuntimeException("Invalid strategy");
+      }
+      logger.info("Actual Partitioner is {}", partitioner.getClass());
+    }
+
+  }
+
+  @Override
+  public Response processStats(BatchedOperatorStats batchedOperatorStats)
+  {
+    long t = System.currentTimeMillis();
+    if (repartitionInterval < 0 || repartitionCheckInterval < 0 ||
+        t - lastCheckTime < repartitionCheckInterval || t - 
lastRepartitionTime < repartitionInterval) {
+      // return false if it's within repartitionCheckInterval since last time 
it check the stats
+      Response response = new Response();
+      response.repartitionRequired = false;
+      return response;
+    }
+
+    try {
+      logger.debug("Process stats");
+      initPartitioner();
+      return partitioner.processStats(batchedOperatorStats);
+    } finally {
+      lastCheckTime = System.currentTimeMillis();
+    }
+  }
+
+  @Override
+  public Collection<Partition<AbstractKafkaInputOperator>> definePartitions(
+      Collection<Partition<AbstractKafkaInputOperator>> collection, 
PartitioningContext partitioningContext)
+  {
+    logger.debug("Define partitions");
+    initPartitioner();
+    return partitioner.definePartitions(collection, partitioningContext);
+  }
+
+  @Override
+  public void partitioned(Map<Integer, Partition<AbstractKafkaInputOperator>> 
map)
+  {
+    // update the last repartition time
+    lastRepartitionTime = System.currentTimeMillis();
+    initPartitioner();
+    partitioner.partitioned(map);
+  }
+
+  /**
+   * A callback from consumer after it commits the offset
+   *
+   * @param map
+   * @param e
+   */
+  public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception 
e)
+  {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Commit offsets complete {} ", 
Joiner.on(';').withKeyValueSeparator("=").join(map));
+    }
+    if (e != null) {
+      logger.warn("Exceptions in committing offsets {} : {} ",
+          Joiner.on(';').withKeyValueSeparator("=").join(map), e);
+    }
+  }
+
+  public void assign(Set<AbstractKafkaPartitioner.PartitionMeta> assignment)
+  {
+    this.assignment = assignment;
+  }
+
+  public Set<AbstractKafkaPartitioner.PartitionMeta> assignment()
+  {
+    return assignment;
+  }
+
+  private boolean isIdempotent()
+  {
+    return windowDataManager != null && !(windowDataManager instanceof 
WindowDataManager.NoopWindowDataManager);
+  }
+
+  //---------------------------------------------setters and 
getters----------------------------------------
+  public void setInitialPartitionCount(int partitionCount)
+  {
+    this.initialPartitionCount = partitionCount;
+  }
+
+  /**
+   * initial partition count
+   * only used with PartitionStrategy.ONE_TO_MANY
+   * or customized strategy
+   */
+  public int getInitialPartitionCount()
+  {
+    return initialPartitionCount;
+  }
+
+  public void setClusters(String clusters)
+  {
+    this.clusters = clusters.split(";");
+  }
+
+  /**
+   * Same setting as bootstrap.servers property to KafkaConsumer
+   * refer to http://kafka.apache.org/documentation.html#newconsumerconfigs
+   * To support multi cluster, you can have multiple bootstrap.servers 
separated by ";"
+   */
+  public String getClusters()
+  {
+    return Joiner.on(';').join(clusters);
+  }
+
+  public void setTopics(String topics)
+  {
+    this.topics = 
Iterables.toArray(Splitter.on(',').trimResults().omitEmptyStrings().split(topics),
 String.class);
+  }
+
+  /**
+   * The topics the operator consumes, separate by','
+   * Topic name can only contain ASCII alphanumerics, '.', '_' and '-'
+   */
+  public String getTopics()
+  {
+    return Joiner.on(", ").join(topics);
+  }
+
+  public void setStrategy(String policy)
+  {
+    this.strategy = PartitionStrategy.valueOf(policy.toUpperCase());
+  }
+
+  public String getStrategy()
+  {
+    return strategy.name();
+  }
+
+  public void setInitialOffset(String initialOffset)
+  {
+    this.initialOffset = InitialOffset.valueOf(initialOffset.toUpperCase());
+  }
+
+  /**
+   * Initial offset, it should be one of the following
+   * <ul>
+   * <li>earliest</li>
+   * <li>latest</li>
+   * <li>application_or_earliest</li>
+   * <li>application_or_latest</li>
+   * </ul>
+   */
+  public String getInitialOffset()
+  {
+    return initialOffset.name();
+  }
+
+  public String getApplicationName()
+  {
+    return applicationName;
+  }
+
+  public void setConsumerProps(Properties consumerProps)
+  {
+    this.consumerProps = consumerProps;
+  }
+
+  /**
+   * Extra kafka consumer properties
+   * http://kafka.apache.org/090/documentation.html#newconsumerconfigs
+   *
+   * Please be aware that the properties below are set by the operator, don't 
override it
+   *
+   * <ul>
+   * <li>bootstrap.servers</li>
+   * <li>group.id</li>
+   * <li>auto.offset.reset</li>
+   * <li>enable.auto.commit</li>
+   * <li>partition.assignment.strategy</li>
+   * <li>key.deserializer</li>
+   * <li>value.deserializer</li>
+   * </ul>
+   */
+  public Properties getConsumerProps()
+  {
+    return consumerProps;
+  }
+
+  public void setMaxTuplesPerWindow(int maxTuplesPerWindow)
+  {
+    this.maxTuplesPerWindow = maxTuplesPerWindow;
+  }
+
+  /**
+   * maximum tuples allowed to be emitted in each window
+   */
+  public int getMaxTuplesPerWindow()
+  {
+    return maxTuplesPerWindow;
+  }
+
+  /**
+   * @see <a 
href="http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)">
+   * org.apache.kafka.clients.consumer.KafkaConsumer.poll</a>
+   */
+  public long getConsumerTimeout()
+  {
+    return consumerTimeout;
+  }
+
+  public void setConsumerTimeout(long consumerTimeout)
+  {
+    this.consumerTimeout = consumerTimeout;
+  }
+
+  /**
+   * Number of messages kept in memory waiting for emission to downstream 
operator
+   */
+  public int getHoldingBufferSize()
+  {
+    return holdingBufferSize;
+  }
+
+  public void setHoldingBufferSize(int holdingBufferSize)
+  {
+    this.holdingBufferSize = holdingBufferSize;
+  }
+
+  /**
+   * metrics refresh interval
+   */
+  public long getMetricsRefreshInterval()
+  {
+    return metricsRefreshInterval;
+  }
+
+  public void setMetricsRefreshInterval(long metricsRefreshInterval)
+  {
+    this.metricsRefreshInterval = metricsRefreshInterval;
+  }
+
+  public void setRepartitionCheckInterval(long repartitionCheckInterval)
+  {
+    this.repartitionCheckInterval = repartitionCheckInterval;
+  }
+
+  /**
+   * Minimal interval between checking collected stats and decide whether it 
needs to repartition or not.
+   * And minimal interval between 2 offset updates
+   */
+  public long getRepartitionCheckInterval()
+  {
+    return repartitionCheckInterval;
+  }
+
+  public void setRepartitionInterval(long repartitionInterval)
+  {
+    this.repartitionInterval = repartitionInterval;
+  }
+
+  /**
+   * Minimal interval between 2 (re)partition actions
+   */
+  public long getRepartitionInterval()
+  {
+    return repartitionInterval;
+  }
+
+  public void setWindowDataManager(WindowDataManager windowDataManager)
+  {
+    this.windowDataManager = windowDataManager;
+  }
+
+  public WindowDataManager getWindowDataManager()
+  {
+    return windowDataManager;
+  }
+
+  /**
+   * @return current checkpointed offsets
+   * @omitFromUI
+   */
+  public Map<AbstractKafkaPartitioner.PartitionMeta, Long> getOffsetTrack()
+  {
+    return offsetTrack;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java
 
b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java
new file mode 100644
index 0000000..0e16fe1
--- /dev/null
+++ 
b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.util.Properties;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+
+/**
+ * This is the base implementation of a Kafka output operator(0.9.0), which 
writes data to the Kafka message bus.
+ *
+ * @displayName Abstract Kafka Output
+ * @category Messaging
+ * @tags output operator
+ *
+ *
+ * @since 3.5.0
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public abstract class AbstractKafkaOutputOperator<K, V> implements Operator
+{
+  private transient Producer<K, V> producer;
+  @NotNull
+  private String topic;
+  private Properties properties = new Properties();
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    producer = new KafkaProducer<K, V>(properties);
+  }
+
+  /**
+   * Implement Component Interface.
+   */
+  @Override
+  public void teardown()
+  {
+    producer.close();
+  }
+
+  /**
+   * Implement Operator Interface.
+   */
+  @Override
+  public void beginWindow(long windowId)
+  {
+  }
+
+  /**
+   * Implement Operator Interface.
+   */
+  @Override
+  public void endWindow()
+  {
+  }
+
+  public Properties getProperties()
+  {
+    return properties;
+  }
+
+  /**
+   * Set the Kafka producer properties.
+   *
+   * @param properties Producer properties
+   */
+  public void setProperties(Properties properties)
+  {
+    this.properties.putAll(properties);
+  }
+
+  /**
+   * Set the Kafka producer property.
+   *
+   * @param key Producer Property name
+   * @param val Producer Property value
+   */
+  public void setProperty(Object key, Object val)
+  {
+    properties.put(key, val);
+  }
+
+  public String getTopic()
+  {
+    return topic;
+  }
+
+  /**
+   * Set the Kafka topic
+   * @param  topic  Kafka topic for which the data is sent
+   */
+  public void setTopic(String topic)
+  {
+    this.topic = topic;
+  }
+
+  protected Producer<K, V> getProducer()
+  {
+    return producer;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git 
a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
 
b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
new file mode 100644
index 0000000..791972f
--- /dev/null
+++ 
b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import com.google.common.base.Joiner;
+
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+/**
+ * Abstract partitioner used to manage the partitions of kafka input operator.
+ * It use a number of kafka consumers(one for each cluster) to get the latest 
partition metadata for topics that
+ * the consumer subscribes and expose those to subclass which implements the 
assign method
+ *
+ * The partitioner is always stateless.
+ *
+ * @since 3.3.0
+ */
+@InterfaceStability.Evolving
+public abstract class AbstractKafkaPartitioner implements 
Partitioner<AbstractKafkaInputOperator>, StatsListener
+{
+
+  private static final Logger logger = 
LoggerFactory.getLogger(AbstractKafkaPartitioner.class);
+
+  private static final String META_CONSUMER_GROUP_NAME = 
AbstractKafkaInputOperator.class.getName() + "META_GROUP";
+
+  protected final String[] clusters;
+
+  protected final String[] topics;
+
+  protected final AbstractKafkaInputOperator prototypeOperator;
+
+  private ArrayList<KafkaConsumer<byte[], byte[]>> metadataRefreshClients;
+
+  // prevent null
+  private final List<Set<AbstractKafkaPartitioner.PartitionMeta>> 
currentPartitions = new LinkedList<>();
+
+  public AbstractKafkaPartitioner(String[] clusters, String[] topics, 
AbstractKafkaInputOperator prototypeOperator)
+  {
+    this.clusters = clusters;
+    this.topics = topics;
+    this.prototypeOperator = prototypeOperator;
+  }
+
+  abstract List<Set<PartitionMeta>> assign(Map<String, Map<String, 
List<PartitionInfo>>> metadata);
+
+  @Override
+  public Collection<Partition<AbstractKafkaInputOperator>> definePartitions(
+      Collection<Partition<AbstractKafkaInputOperator>> collection, 
PartitioningContext partitioningContext)
+  {
+    initMetadataClients();
+
+    Map<String, Map<String, List<PartitionInfo>>> metadata = new HashMap<>();
+
+    try {
+      for (int i = 0; i < clusters.length; i++) {
+        metadata.put(clusters[i], new HashMap<String, List<PartitionInfo>>());
+        for (String topic : topics) {
+          //try several time if partitionsFor(topic) returns null or throws 
exception.
+          //partitionsFor(topic) will return null if the topic is invalid or 
hasn't completed
+          int tryTime = 10;
+          while (tryTime-- > 0) {
+            try {
+              List<PartitionInfo> ptis = 
metadataRefreshClients.get(i).partitionsFor(topic);
+              if (ptis != null) {
+                if (logger.isDebugEnabled()) {
+                  logger.debug("Partition metadata for topic {} : {}", topic, 
Joiner.on(';').join(ptis));
+                }
+                metadata.get(clusters[i]).put(topic, ptis);
+                break;
+              }
+
+              logger.warn("Partition metadata for topic {} is null. 
retrying...", topic);
+
+            } catch (Exception e) {
+              logger.warn("Got Exception when trying get partition info for 
topic {}.", topic, e);
+            }
+
+            try {
+              Thread.sleep(100);
+            } catch (Exception e1) {
+              //ignore
+            }
+          } //end while
+
+          if (tryTime == 0) {
+            throw new RuntimeException(
+                "Get partition info for topic completely failed. Please check 
the log file. topic name: " + topic);
+          }
+        }
+      }
+    } finally {
+      closeClients();
+    }
+
+    List<Set<AbstractKafkaPartitioner.PartitionMeta>> parts = null;
+    try {
+      parts = assign(metadata);
+    } catch (Exception e) {
+      logger.error("assign() exception.", e);
+      e.printStackTrace();
+    }
+
+    if (currentPartitions == parts || currentPartitions.equals(parts)) {
+      logger.debug("No partition change found");
+      return collection;
+    } else {
+      logger.info("Partition change detected: ");
+      currentPartitions.clear();
+      currentPartitions.addAll(parts);
+      int i = 0;
+      List<Partition<AbstractKafkaInputOperator>> result = new LinkedList<>();
+      for (Iterator<Partition<AbstractKafkaInputOperator>> iter = 
collection.iterator(); iter.hasNext(); ) {
+        Partition<AbstractKafkaInputOperator> nextPartition = iter.next();
+        if (parts.remove(nextPartition.getPartitionedInstance().assignment())) 
{
+          if (logger.isInfoEnabled()) {
+            logger.info("[Existing] Partition {} with assignment {} ", i,
+                
Joiner.on(';').join(nextPartition.getPartitionedInstance().assignment()));
+          }
+          result.add(nextPartition);
+          i++;
+        }
+      }
+
+      for (Set<AbstractKafkaPartitioner.PartitionMeta> partitionAssignment : 
parts) {
+        if (logger.isInfoEnabled()) {
+          logger.info("[New] Partition {} with assignment {} ", i,
+              Joiner.on(';').join(partitionAssignment));
+        }
+        result.add(createPartition(partitionAssignment));
+        i++;
+      }
+
+      return result;
+    }
+  }
+
+  protected void closeClients()
+  {
+    for (KafkaConsumer<byte[], byte[]> consume : metadataRefreshClients) {
+      consume.close();
+    }
+    metadataRefreshClients = null;
+  }
+
+  @Override
+  public void partitioned(Map<Integer, Partition<AbstractKafkaInputOperator>> 
map)
+  {
+
+  }
+
+  @Override
+  public Response processStats(BatchedOperatorStats batchedOperatorStats)
+  {
+    Response response = new Response();
+    response.repartitionRequired = true;
+    return response;
+  }
+
+  protected Partitioner.Partition<AbstractKafkaInputOperator> createPartition(
+      Set<AbstractKafkaPartitioner.PartitionMeta> partitionAssignment)
+  {
+    Partitioner.Partition<AbstractKafkaInputOperator> p =
+        new 
DefaultPartition<AbstractKafkaInputOperator>(KryoCloneUtils.cloneObject(prototypeOperator));
+    p.getPartitionedInstance().assign(partitionAssignment);
+    return p;
+  }
+
+  /**
+   *
+   */
+  private void initMetadataClients()
+  {
+    if (metadataRefreshClients != null && metadataRefreshClients.size() == 
clusters.length) {
+      // The metadata client is active
+      return;
+    }
+
+    if (clusters == null || clusters.length == 0) {
+      throw new IllegalStateException("clusters can not be null");
+    }
+
+    metadataRefreshClients = new ArrayList<>(clusters.length);
+    int index = 0;
+    for (String c : clusters) {
+      Properties prop = prototypeOperator.getConsumerProps();
+      prop.put("group.id", META_CONSUMER_GROUP_NAME);
+      prop.put("bootstrap.servers", c);
+      prop.put("key.deserializer", ByteArrayDeserializer.class.getName());
+      prop.put("value.deserializer", ByteArrayDeserializer.class.getName());
+      prop.put("enable.auto.commit", "false");
+      if (logger.isInfoEnabled()) {
+        logger.info("Consumer Properties :  {} ", getPropertyAsString(prop));
+      }
+      metadataRefreshClients.add(index++, new KafkaConsumer<byte[], 
byte[]>(prop));
+    }
+  }
+
+  /**
+   * Converts the property list (key and element pairs) to String format
+   * This format is used to print to a Stream for debugging.
+   * @param prop
+   * @return String
+   */
+  private String getPropertyAsString(Properties prop)
+  {
+    StringWriter writer = new StringWriter();
+    try {
+      prop.store(writer, "");
+    } catch (IOException e) {
+      logger.error("Cannot retrieve consumer properties for Logging : {}", 
e.getMessage());
+    }
+    return writer.getBuffer().toString();
+  }
+
+  /**
+   * The key object used in the assignment map for each operator
+   */
+  public static class PartitionMeta
+  {
+
+    public PartitionMeta()
+    {
+    }
+
+    public PartitionMeta(String cluster, String topic, int partitionId)
+    {
+      this.cluster = cluster;
+      this.topic = topic;
+      this.partitionId = partitionId;
+      this.topicPartition = new TopicPartition(topic, partitionId);
+    }
+
+    private String cluster;
+
+    private transient TopicPartition topicPartition;
+
+    private String topic;
+
+    private int partitionId;
+
+    public String getCluster()
+    {
+      return cluster;
+    }
+
+    public int getPartitionId()
+    {
+      return partitionId;
+    }
+
+    public String getTopic()
+    {
+      return topic;
+    }
+
+    public TopicPartition getTopicPartition()
+    {
+      if (topicPartition == null) {
+        topicPartition = new TopicPartition(topic, partitionId);
+      }
+      return topicPartition;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      PartitionMeta that = (PartitionMeta)o;
+      return Objects.equals(partitionId, that.partitionId) &&
+        Objects.equals(cluster, that.cluster) &&
+        Objects.equals(topic, that.topic);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(cluster, topic, partitionId);
+    }
+
+    @Override
+    public String toString()
+    {
+      return "PartitionMeta{" +
+        "cluster='" + cluster + '\'' +
+        ", topicPartition=" + getTopicPartition() +
+        '}';
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
----------------------------------------------------------------------
diff --git 
a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
 
b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
new file mode 100644
index 0000000..efd0750
--- /dev/null
+++ 
b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import com.google.common.base.Joiner;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * This is the wrapper class for new Kafka consumer API
+ *
+ * It starts number of consumers(one for each cluster) in same number of 
threads.
+ * Maintains the consumer offsets
+ *
+ * It also use the consumers to commit the application processed offsets along 
with the application name
+ *
+ *
+ * @since 3.3.0
+ */
+@InterfaceStability.Evolving
+public class KafkaConsumerWrapper implements Closeable
+{
+
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaConsumerWrapper.class);
+
+  private AtomicBoolean isAlive = new AtomicBoolean(false);
+
+  private final Map<String, AbstractKafkaConsumer> consumers = new HashMap<>();
+
+  // The in memory buffer hold consumed messages
+  private ArrayBlockingQueue<Pair<String, ConsumerRecord<byte[], byte[]>>> 
holdingBuffer;
+
+  private AbstractKafkaInputOperator ownerOperator = null;
+
+  private ExecutorService kafkaConsumerExecutor;
+
+  private final Map<String, Map<TopicPartition, OffsetAndMetadata>> 
offsetsToCommit = new HashMap<>();
+
+  private boolean waitForReplay = false;
+
+  /**
+   *
+   * Only put the offset needs to be committed in the 
ConsumerThread.offsetToCommit map
+   * The consumer thread will commit the offset(s)
+   *
+   * @param offsetsInWindow
+   */
+  public void commitOffsets(Map<AbstractKafkaPartitioner.PartitionMeta, Long> 
offsetsInWindow)
+  {
+    if (offsetsInWindow == null) {
+      return;
+    }
+
+    // group offsets by cluster and topic partition
+    for (Map.Entry<AbstractKafkaPartitioner.PartitionMeta, Long> e : 
offsetsInWindow.entrySet()) {
+      String cluster = e.getKey().getCluster();
+      Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetMap = 
offsetsToCommit.get(cluster);
+      if (topicPartitionOffsetMap == null) {
+        logger.warn("committed offset map should be initialized by consumer 
thread!");
+        continue;
+      }
+      topicPartitionOffsetMap.put(e.getKey().getTopicPartition(), new 
OffsetAndMetadata(e.getValue()));
+    }
+
+  }
+
+  public void emitImmediately(Map<AbstractKafkaPartitioner.PartitionMeta, 
Pair<Long, Long>> windowData)
+  {
+    for (Map.Entry<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>> 
windowEntry : windowData.entrySet()) {
+      AbstractKafkaPartitioner.PartitionMeta meta = windowEntry.getKey();
+      Pair<Long, Long> replayOffsetSize = windowEntry.getValue();
+      AbstractKafkaConsumer kc = consumers.get(meta.getCluster());
+      if (kc == null && 
kc.isConsumerContainsPartition(windowEntry.getKey().getTopicPartition())) {
+        throw new RuntimeException("Coundn't find consumer to replay the 
message PartitionMeta : " + meta);
+      }
+      //pause other partition
+      for (TopicPartition tp : kc.getPartitions()) {
+        if (meta.getTopicPartition().equals(tp)) {
+          kc.resumePartition(tp);
+        } else {
+          try {
+            kc.positionPartition(tp);
+          } catch (NoOffsetForPartitionException e) {
+            //the poll() method of a consumer will throw exception
+            // if any of subscribed consumers not initialized with position
+            handleNoOffsetForPartitionException(e, kc);
+          }
+          kc.pausePartition(tp);
+        }
+      }
+      // set the offset to window start offset
+      kc.seekToOffset(meta.getTopicPartition(), replayOffsetSize.getLeft());
+      long windowCount = replayOffsetSize.getRight();
+      while (windowCount > 0) {
+        try {
+          ConsumerRecords<byte[], byte[]> records = 
kc.pollRecords(ownerOperator.getConsumerTimeout());
+          for (Iterator<ConsumerRecord<byte[], byte[]>> cri = 
records.iterator(); cri.hasNext() && windowCount > 0; ) {
+            ownerOperator.emitTuple(meta.getCluster(), cri.next());
+            windowCount--;
+          }
+        } catch (NoOffsetForPartitionException e) {
+          throw new RuntimeException("Couldn't replay the offset", e);
+        }
+      }
+      // set the offset after window
+      kc.seekToOffset(meta.getTopicPartition(), replayOffsetSize.getLeft() + 
replayOffsetSize.getRight());
+    }
+
+    // resume all topics
+    for (AbstractKafkaConsumer kc : consumers.values()) {
+      kc.resumeAllPartitions();
+    }
+
+  }
+
+  public void afterReplay()
+  {
+    waitForReplay = false;
+  }
+
+  static final class ConsumerThread implements Runnable
+  {
+
+    private final AbstractKafkaConsumer consumer;
+
+    private final String cluster;
+
+    private final KafkaConsumerWrapper wrapper;
+
+    private Map<TopicPartition, OffsetAndMetadata> offsetToCommit = null;
+
+    public ConsumerThread(String cluster, AbstractKafkaConsumer consumer, 
KafkaConsumerWrapper wrapper)
+    {
+      this.cluster = cluster;
+      this.consumer = consumer;
+      this.wrapper = wrapper;
+      this.offsetToCommit = new ConcurrentHashMap<>();
+      wrapper.offsetsToCommit.put(cluster, offsetToCommit);
+    }
+
+    @Override
+    public void run()
+    {
+      try {
+
+        while (wrapper.isAlive.get()) {
+          if (wrapper.waitForReplay) {
+            Thread.sleep(100);
+            continue;
+          }
+          if (!this.offsetToCommit.isEmpty()) {
+            // in each fetch cycle commit the offset if needed
+            if (logger.isDebugEnabled()) {
+              logger.debug("Commit offsets {}", 
Joiner.on(';').withKeyValueSeparator("=").join(this.offsetToCommit));
+            }
+            consumer.commitAsync(offsetToCommit, wrapper.ownerOperator);
+            offsetToCommit.clear();
+          }
+          try {
+            ConsumerRecords<byte[], byte[]> records = 
consumer.pollRecords(wrapper.ownerOperator.getConsumerTimeout());
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+              wrapper.putMessage(Pair.of(cluster, record));
+            }
+          } catch (NoOffsetForPartitionException e) {
+            wrapper.handleNoOffsetForPartitionException(e, consumer);
+          } catch (InterruptedException e) {
+            throw new IllegalStateException("Consumer thread is interrupted 
unexpectedly", e);
+          }
+        }
+      } catch (WakeupException we) {
+        logger.info("The consumer is being stopped");
+      } catch (InterruptedException e) {
+        DTThrowable.rethrow(e);
+      } finally {
+        consumer.close();
+      }
+    }
+  }
+
+  protected void 
handleNoOffsetForPartitionException(NoOffsetForPartitionException e,
+      AbstractKafkaConsumer consumer)
+  {
+    // if initialOffset is set to EARLIST or LATEST
+    // and the application is run as first time
+    // then there is no existing committed offset and this error will be caught
+    // we need to seek to either beginning or end of the partition
+    // based on the initial offset setting
+    AbstractKafkaInputOperator.InitialOffset io =
+        
AbstractKafkaInputOperator.InitialOffset.valueOf(ownerOperator.getInitialOffset());
+    if (io == AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_EARLIEST
+        || io == AbstractKafkaInputOperator.InitialOffset.EARLIEST) {
+      consumer.seekToBeginning(e.partitions().toArray(new TopicPartition[0]));
+    } else {
+      consumer.seekToEnd(e.partitions().toArray(new TopicPartition[0]));
+    }
+
+  }
+
+  /**
+   * This method is called in setup method of Abstract Kafka Input Operator
+   */
+  public void create(AbstractKafkaInputOperator ownerOperator)
+  {
+    holdingBuffer = new 
ArrayBlockingQueue<>(ownerOperator.getHoldingBufferSize());
+    this.ownerOperator = ownerOperator;
+    logger.info("Create consumer wrapper with holding buffer size: {} ", 
ownerOperator.getHoldingBufferSize());
+    if (logger.isInfoEnabled()) {
+      logger.info("Assignments are {} ", 
Joiner.on('\n').join(ownerOperator.assignment()));
+    }
+  }
+
+  /**
+   * This method is called in the activate method of the operator
+   */
+  public void start(boolean waitForReplay)
+  {
+    this.waitForReplay = waitForReplay;
+    isAlive.set(true);
+
+    // thread to consume the kafka data
+    // create thread pool for consumer threads
+    kafkaConsumerExecutor = Executors.newCachedThreadPool(
+      new ThreadFactoryBuilder().setNameFormat("kafka-consumer-%d").build());
+
+    // group list of PartitionMeta by cluster
+    Map<String, List<TopicPartition>> consumerAssignment = new HashMap<>();
+    Set<AbstractKafkaPartitioner.PartitionMeta> assignments = 
ownerOperator.assignment();
+    for (AbstractKafkaPartitioner.PartitionMeta partitionMeta : assignments) {
+      String cluster = partitionMeta.getCluster();
+      List<TopicPartition> cAssignment = consumerAssignment.get(cluster);
+      if (cAssignment == null) {
+        cAssignment = new LinkedList<>();
+        consumerAssignment.put(cluster, cAssignment);
+      }
+      cAssignment.add(new TopicPartition(partitionMeta.getTopic(), 
partitionMeta.getPartitionId()));
+    }
+
+    Map<AbstractKafkaPartitioner.PartitionMeta, Long> currentOffset = 
ownerOperator.getOffsetTrack();
+
+    //  create one thread for each cluster
+    // each thread use one KafkaConsumer to consume from 1+ partition(s) of 1+ 
topic(s)
+    for (Map.Entry<String, List<TopicPartition>> e : 
consumerAssignment.entrySet()) {
+
+      Properties prop = new Properties();
+      if (ownerOperator.getConsumerProps() != null) {
+        prop.putAll(ownerOperator.getConsumerProps());
+      }
+
+      prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, e.getKey());
+      prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+      // never auto commit the offsets
+      prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+      prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+      prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+      AbstractKafkaInputOperator.InitialOffset initialOffset =
+          
AbstractKafkaInputOperator.InitialOffset.valueOf(ownerOperator.getInitialOffset());
+
+      if (initialOffset == 
AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_EARLIEST ||
+          initialOffset == 
AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_LATEST) {
+        // commit the offset with application name if we set initialoffset to 
application
+        prop.put(ConsumerConfig.GROUP_ID_CONFIG, 
ownerOperator.getApplicationName() + "_Consumer");
+      }
+
+      AbstractKafkaConsumer kc = ownerOperator.createConsumer(prop);
+      kc.assignPartitions(e.getValue());
+      if (logger.isInfoEnabled()) {
+        logger.info("Create consumer with properties {} ", 
Joiner.on(";").withKeyValueSeparator("=").join(prop));
+        logger.info("Assign consumer to {}", 
Joiner.on('#').join(e.getValue()));
+      }
+      if (currentOffset != null && !currentOffset.isEmpty()) {
+        for (TopicPartition tp : e.getValue()) {
+          AbstractKafkaPartitioner.PartitionMeta partitionKey =
+              new AbstractKafkaPartitioner.PartitionMeta(e.getKey(), 
tp.topic(), tp.partition());
+          if (currentOffset.containsKey(partitionKey)) {
+            kc.seekToOffset(tp, currentOffset.get(partitionKey));
+          }
+        }
+      }
+
+      consumers.put(e.getKey(), kc);
+      kafkaConsumerExecutor.submit(new ConsumerThread(e.getKey(), kc, this));
+    }
+
+  }
+
+  /**
+   * The method is called in the deactivate method of the operator
+   */
+  public void stop()
+  {
+    isAlive.set(false);
+    for (AbstractKafkaConsumer c : consumers.values()) {
+      c.wakeup();
+    }
+    kafkaConsumerExecutor.shutdownNow();
+    holdingBuffer.clear();
+    IOUtils.closeQuietly(this);
+  }
+
+  /**
+   * This method is called in teardown method of the operator
+   */
+  public void teardown()
+  {
+    holdingBuffer.clear();
+  }
+
+  public Pair<String, ConsumerRecord<byte[], byte[]>> pollMessage()
+  {
+    return holdingBuffer.poll();
+  }
+
+  public int messageSize()
+  {
+    return holdingBuffer.size();
+  }
+
+  protected final void putMessage(Pair<String, ConsumerRecord<byte[], byte[]>> 
msg) throws InterruptedException
+  {
+    // block from receiving more message
+    holdingBuffer.put(msg);
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+  }
+
+  public Map<String, Map<MetricName, ? extends Metric>> getAllConsumerMetrics()
+  {
+    Map<String, Map<MetricName, ? extends Metric>> val = new HashMap<>();
+    for (Map.Entry<String, AbstractKafkaConsumer> e : consumers.entrySet()) {
+      val.put(e.getKey(), e.getValue().metrics());
+    }
+    return val;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java
----------------------------------------------------------------------
diff --git 
a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java
 
b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java
new file mode 100644
index 0000000..75449a1
--- /dev/null
+++ 
b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaMetrics.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+import com.datatorrent.api.AutoMetric;
+
+/**
+ * Metrics class
+ *
+ * @since 3.3.0
+ */
+@InterfaceStability.Evolving
+public class KafkaMetrics implements Serializable
+{
+  private KafkaConsumerStats[] stats;
+
+  private transient long lastMetricSampleTime = 0L;
+
+  private transient long metricsRefreshInterval;
+
+  public KafkaMetrics(long metricsRefreshInterval)
+  {
+    this.metricsRefreshInterval = metricsRefreshInterval;
+  }
+
+  void updateMetrics(String[] clusters, Map<String, Map<MetricName, ? extends 
Metric>> metricsMap)
+  {
+    long current = System.currentTimeMillis();
+    if (current - lastMetricSampleTime < metricsRefreshInterval) {
+      return;
+    }
+
+    lastMetricSampleTime = current;
+
+    if (stats == null) {
+      stats = new KafkaConsumerStats[clusters.length];
+    }
+
+    for (int i = 0; i < clusters.length; i++) {
+      if (stats[i] == null) {
+        stats[i] = new KafkaConsumerStats();
+        stats[i].cluster = clusters[i];
+      }
+      Map<MetricName, ? extends Metric> cMetrics = metricsMap.get(clusters[i]);
+      if (cMetrics == null || cMetrics.isEmpty()) {
+        stats[i].bytesPerSec = 0;
+        stats[i].msgsPerSec = 0;
+        continue;
+      }
+      if (stats[i].bytePerSecMK == null || stats[i].msgPerSecMK == null) {
+        for (MetricName mn : cMetrics.keySet()) {
+          if (mn.name().equals("bytes-consumed-rate")) {
+            stats[i].bytePerSecMK = mn;
+          } else if (mn.name().equals("records-consumed-rate")) {
+            stats[i].msgPerSecMK = mn;
+          }
+        }
+      }
+      stats[i].bytesPerSec = cMetrics.get(stats[i].bytePerSecMK).value();
+      stats[i].msgsPerSec = cMetrics.get(stats[i].msgPerSecMK).value();
+    }
+  }
+
+  public KafkaConsumerStats[] getStats()
+  {
+    return stats;
+  }
+
+  /**
+   * Counter class which gives the statistic value from the consumer
+   */
+  public static class KafkaConsumerStats implements Serializable
+  {
+    private static final long serialVersionUID = -2867402654990209006L;
+
+    public transient MetricName msgPerSecMK;
+    public transient MetricName bytePerSecMK;
+
+    public String cluster;
+    /**
+     * Metrics for each consumer
+     */
+    public double msgsPerSec;
+
+    public double bytesPerSec;
+
+    public KafkaConsumerStats()
+    {
+    }
+  }
+
+  public static class KafkaMetricsAggregator implements AutoMetric.Aggregator, 
Serializable
+  {
+
+    @Override
+    public Map<String, Object> aggregate(long l, 
Collection<AutoMetric.PhysicalMetricsContext> collection)
+    {
+      double totalBytesPerSec = 0;
+      double totalMsgsPerSec = 0;
+      Map<String, Object> total = new HashMap<>();
+      for (AutoMetric.PhysicalMetricsContext pmc : collection) {
+        KafkaMetrics km = (KafkaMetrics)pmc.getMetrics().get("metrics");
+        for (KafkaConsumerStats kcs : km.stats) {
+          totalBytesPerSec += kcs.bytesPerSec;
+          totalMsgsPerSec += kcs.msgsPerSec;
+        }
+      }
+      total.put("totalBytesPerSec", totalBytesPerSec);
+      total.put("totalMsgPerSec", totalMsgsPerSec);
+      return total;
+    }
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java
----------------------------------------------------------------------
diff --git 
a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java
 
b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java
new file mode 100644
index 0000000..a07fe33
--- /dev/null
+++ 
b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaPartition.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * @since 2.1.0
+ */
+@InterfaceStability.Evolving
+public class KafkaPartition implements Serializable
+{
+  protected static final String DEFAULT_CLUSTERID = 
"com.datatorrent.contrib.kafka.defaultcluster";
+
+  @SuppressWarnings("unused")
+  private KafkaPartition()
+  {
+  }
+
+  public KafkaPartition(String topic, int partitionId)
+  {
+    this(DEFAULT_CLUSTERID, topic, partitionId);
+  }
+
+  public KafkaPartition(String clusterId, String topic, int partitionId)
+  {
+    super();
+    this.clusterId = clusterId;
+    this.partitionId = partitionId;
+    this.topic = topic;
+  }
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 7556802229202221546L;
+
+  private String clusterId;
+
+  private int partitionId;
+
+  private String topic;
+
+  public String getClusterId()
+  {
+    return clusterId;
+  }
+
+  public void setClusterId(String clusterId)
+  {
+    this.clusterId = clusterId;
+  }
+
+  public int getPartitionId()
+  {
+    return partitionId;
+  }
+
+  public void setPartitionId(int partitionId)
+  {
+    this.partitionId = partitionId;
+  }
+
+  public String getTopic()
+  {
+    return topic;
+  }
+
+  public void setTopic(String topic)
+  {
+    this.topic = topic;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((clusterId == null) ? 0 : clusterId.hashCode());
+    result = prime * result + partitionId;
+    result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj)
+  {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    KafkaPartition other = (KafkaPartition)obj;
+    if (clusterId == null) {
+      if (other.clusterId != null) {
+        return false;
+      }
+    } else if (!clusterId.equals(other.clusterId)) {
+      return false;
+    }
+    if (partitionId != other.partitionId) {
+      return false;
+    }
+    if (topic == null) {
+      if (other.topic != null) {
+        return false;
+      }
+    } else if (!topic.equals(other.topic)) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "KafkaPartition [clusterId=" + clusterId + ", partitionId=" + 
partitionId + ", topic=" + topic + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java
 
b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java
new file mode 100644
index 0000000..c47cf3d
--- /dev/null
+++ 
b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import com.datatorrent.api.DefaultInputPort;
+
+/**
+ * Kafka output operator with single input port (inputPort).
+ * It supports atleast once processing guarantees
+ *
+ * @since 3.5.0
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class KafkaSinglePortOutputOperator<K, V> extends 
AbstractKafkaOutputOperator
+{
+  /**
+   * This input port receives tuples that will be written out to Kafka.
+   */
+  public final transient DefaultInputPort<V> inputPort = new 
DefaultInputPort<V>()
+  {
+    @Override
+    public void process(V tuple)
+    {
+      getProducer().send(new ProducerRecord<K, V>(getTopic(), tuple));
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
----------------------------------------------------------------------
diff --git 
a/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
 
b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
new file mode 100644
index 0000000..eb0cc40
--- /dev/null
+++ 
b/kafka/kafka-common/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.kafka.common.PartitionInfo;
+
+/**
+ * A one-to-many partitioner implementation that creates fix number of 
operator partitions and assign one or more
+ * Kafka partitions to each. It use round robin to assign partitions
+ *
+ * @since 3.3.0
+ */
+@InterfaceStability.Evolving
+public class OneToManyPartitioner extends AbstractKafkaPartitioner
+{
+
+  public OneToManyPartitioner(String[] clusters, String[] topics, 
AbstractKafkaInputOperator protoTypeOperator)
+  {
+    super(clusters, topics, protoTypeOperator);
+  }
+
+  @Override
+  List<Set<PartitionMeta>> assign(Map<String, Map<String, 
List<PartitionInfo>>> metadata)
+  {
+    if (prototypeOperator.getInitialPartitionCount() <= 0) {
+      throw new IllegalArgumentException("Num of partitions should be greater 
or equal to 1");
+    }
+
+    int partitionCount = prototypeOperator.getInitialPartitionCount();
+    ArrayList<Set<PartitionMeta>> eachPartitionAssignment =
+        new ArrayList<>(prototypeOperator.getInitialPartitionCount());
+    int i = 0;
+    for (Map.Entry<String, Map<String, List<PartitionInfo>>> clusterMap : 
metadata.entrySet()) {
+      for (Map.Entry<String, List<PartitionInfo>> topicPartition : 
clusterMap.getValue().entrySet()) {
+        for (PartitionInfo pif : topicPartition.getValue()) {
+          int index = i++ % partitionCount;
+          if (index >= eachPartitionAssignment.size()) {
+            eachPartitionAssignment.add(new HashSet<PartitionMeta>());
+          }
+          eachPartitionAssignment.get(index).add(new 
PartitionMeta(clusterMap.getKey(),
+              topicPartition.getKey(), pif.partition()));
+        }
+      }
+    }
+
+    return eachPartitionAssignment;
+  }
+
+}

Reply via email to