This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new f5cbfd5  Feature/create kafka spout (#3198)
f5cbfd5 is described below

commit f5cbfd524d33bb61400a6a41e1f9a3f7ca33008d
Author: SiMing Weng <[email protected]>
AuthorDate: Mon Apr 1 13:56:42 2019 -0400

    Feature/create kafka spout (#3198)
    
    * initial commit
    
    * remove dependency on heron-storm compatibility library
    
    * insert Copyright header
    
    * use constant in ConsumerConfig for client configuration
    add sample topology
    
    * change to default Kafka Broker port number
    
    * add Javadoc documentation
    
    * add documentation 1
    
    * include unit tests for heron kafka spout in Travis
    
    * add documentation 2
    
    * support EFFECTIVE_ONCE mode
    allow defining multiple output streams in ConsumerRecordTransformer
---
 .travis.yml                                        |   2 +
 .../java/heron-kafka-spout-parent/doc/README.md    |  62 +++
 .../heron-kafka-spout-sample/pom.xml               |  42 ++
 .../sample/HeronKafkaSpoutSampleTopology.java      |  95 +++++
 .../heron-kafka-spout/pom.xml                      |  60 +++
 .../spouts/kafka/ConsumerRecordTransformer.java    |  54 +++
 .../kafka/DefaultConsumerRecordTransformer.java    |  22 ++
 .../spouts/kafka/DefaultKafkaConsumerFactory.java  |  50 +++
 .../spouts/kafka/DefaultTopicPatternProvider.java  |  45 +++
 .../heron/spouts/kafka/KafkaConsumerFactory.java   |  36 ++
 .../heron/spouts/kafka/KafkaMetricDecorator.java   |  38 ++
 .../org/apache/heron/spouts/kafka/KafkaSpout.java  | 425 +++++++++++++++++++++
 .../heron/spouts/kafka/TopicPatternProvider.java   |  33 ++
 .../DefaultConsumerRecordTransformerTest.java      |  55 +++
 .../kafka/DefaultKafkaConsumerFactoryTest.java     |  48 +++
 .../kafka/DefaultTopicPatternProviderTest.java     |  31 ++
 .../spouts/kafka/KafkaMetricDecoratorTest.java     |  43 +++
 .../apache/heron/spouts/kafka/KafkaSpoutTest.java  | 262 +++++++++++++
 .../kafka/java/heron-kafka-spout-parent/pom.xml    |  76 ++++
 19 files changed, 1479 insertions(+)

diff --git a/.travis.yml b/.travis.yml
index 464e5b4..d9aafc7 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -49,3 +49,5 @@ script:
   - which python2.7
   - python2.7 -V
   - scripts/travis/ci.sh
+  # run unit tests for heron kafka spout
+  - mvn -f contrib/spouts/kafka/java/heron-kafka-spout-parent/pom.xml test
diff --git a/contrib/spouts/kafka/java/heron-kafka-spout-parent/doc/README.md 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/doc/README.md
new file mode 100644
index 0000000..6666e5c
--- /dev/null
+++ b/contrib/spouts/kafka/java/heron-kafka-spout-parent/doc/README.md
@@ -0,0 +1,62 @@
+# Heron Kafka Spout
+
+[Kafka 
Spout](../heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java)
 enables a Heron topology to consume data from Kafka cluster as input into the 
stream processing pipeline. Primarily, it is written using 2 APIs, the Heron 
API and Kafka Client API.
+
+##Configuring the underlying Kafka Consumer
+
+Each Kafka Spout instance creates its underlying 
[Consumer](https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)
 instance via a factory interface 
[KafkaConsumerFactory](../heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaConsumerFactory.java)
 that is passed in as one of the constructor arguments.
+
+The simplest way is to use the provided 
[DefaultKafkaConsumerFactory](../heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactory.java).
 It takes a `Map<String, Object>` as its only input, which should contain all 
the user configured properties as instituted by 
[ConsumerConfig](https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)
+
+_Note: `enable.auto.commit` is always set to false in 
`DefaultKafkaConsumerFactory` because the Kafka Spout needs to manually manage 
the committing of offset. Any custom implementation of `KafkaConsumerFactory` 
should adhere to the same thing_
+
+```java
+Map<String, Object> kafkaConsumerConfig = new HashMap<>();
+//connect to Kafka broker at localhost:9092
+kafkaConsumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
+//group ID of the consumer group
+kafkaConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-kafka-spout");
+//key and value serializer
+kafkaConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
+kafkaConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
+
+KafkaConsumerFactory<String, String> kafkaConsumerFactory = new 
DefaultKafkaConsumerFactory<>(kafkaConsumerConfig);
+```
+
+##Subscribe to topic(s)
+
+The Kafka Spout instance can be configured to subscribe either a collection of 
topics by specifying the list of topic name strings in `Collection<String>`, or 
it can take an implementation of 
[TopicPatternProvider](../heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/TopicPatternProvider.java)
 to provide a regular expression to match all the topics that it wants to 
subscribe to. There is a 
[DefaultTopicPatternProvider](../heron-kafka-spout/src/main/java/org/apache/heron/spout
 [...]
+
+```java
+//subscribe to specific named topic
+new KafkaSpout<>(kafkaConsumerFactory, Collections.singletonList("test-topic"))
+
+//subscribe to topics matching a pattern
+new KafkaSpout<>(kafkaConsumerFactory, new 
DefaultTopicPatternProvider("test-.*"));
+```
+
+##Convert ConsumerRecord to Tuple
+
+The Spout delegates the conversion of each Kafka 
[ConsumerRecord](https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)
 into an output tuple to the 
[ConsumerRecordTransformer](../heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/ConsumerRecordTransformer.java),
 the 
[DefaultConsumerRecordTransformer](../heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformer.java)
 is provided to simply conver [...]
+
+User can create their own implementation of the `ConsumerRecordTransformer` 
interface, and set it to the `KafkaSpout` via 
[setConsumerRecordTransformer](../heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java)
 method.
+
+##Behavior in Different Topology Reliability Mode
+
+### `ATMOST_ONCE` mode
+The whole topology will not turn the `acking` mechanism on. so, the KafkaSpout 
can afford to emit the tuple without any message id, and it also immediately 
commit the currently-read offset back to Kafka broker, and neither `ack()` nor 
`fail()` callback will be invoked. Therefore, "in-flight" tuple will just get 
lost in case the KafkaSpout instance is blown up or the topology is restarted. 
That's what `ATMOST_ONCE` offers.
+
+### `ATLEAST_ONCE` mode
+The `acking` mechanism is turned on topology-wise, so the KafkaSpout uses the 
`ack registry` to keep tracking all the **continuous** acknowledgement ranges 
for each partition, while the `failure registry` keeps tracking the **lowest** 
failed acknowledgement for each partition. When it comes to the time that the 
Kafka Consumer needs to poll the Kafka cluster for more records (because it's 
emitted everything it got from the previous poll), then the KafkaSpout 
reconciles as following for ea [...]
+
+1. if there's any failed tuple, seek back to the lowest corresponding offset
+2. discard all the acknowledgements that it's received but is greater than the 
lowest failed offset
+3. clear the lowest failed offset in `failure registry`
+4. commit the offset to be the upper boundary of the first range in the `ack 
registry`
+
+then, it polls the Kafka cluster for next batch of records (i.e. from the 
lowest failed tuple if any)
+
+So, it guarantees each tuple emitted by the KafkaSpout must be successfully 
processed across the whole topology at least once.
+
+### `EFFECTIVE_ONCE`
+Not implemented yet
\ No newline at end of file
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/pom.xml
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/pom.xml
new file mode 100644
index 0000000..9eb1571
--- /dev/null
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2019
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xmlns="http://maven.apache.org/POM/4.0.0";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>heron-kafka-spout-parent</artifactId>
+        <groupId>org.apache.heron</groupId>
+        <version>1.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>heron-kafka-spout-sample</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.heron</groupId>
+            <artifactId>heron-kafka-spout</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>com.twitter.heron</groupId>
+            <artifactId>heron-storm</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/src/main/java/org/apache/heron/spouts/kafka/sample/HeronKafkaSpoutSampleTopology.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/src/main/java/org/apache/heron/spouts/kafka/sample/HeronKafkaSpoutSampleTopology.java
new file mode 100644
index 0000000..9a7de40
--- /dev/null
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/src/main/java/org/apache/heron/spouts/kafka/sample/HeronKafkaSpoutSampleTopology.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka.sample;
+
+import com.twitter.heron.api.Config;
+import com.twitter.heron.api.bolt.BaseRichBolt;
+import com.twitter.heron.api.bolt.OutputCollector;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyBuilder;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Tuple;
+import com.twitter.heron.common.basics.ByteAmount;
+import com.twitter.heron.simulator.Simulator;
+import org.apache.heron.spouts.kafka.DefaultKafkaConsumerFactory;
+import org.apache.heron.spouts.kafka.KafkaConsumerFactory;
+import org.apache.heron.spouts.kafka.KafkaSpout;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class HeronKafkaSpoutSampleTopology {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HeronKafkaSpoutSampleTopology.class);
+    private static final String KAFKA_SPOUT_NAME = "kafka-spout";
+    private static final String LOGGING_BOLT_NAME = "logging-bolt";
+
+    public static void main(String[] args) {
+        Map<String, Object> kafkaConsumerConfig = new HashMap<>();
+        kafkaConsumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
+        kafkaConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, 
"sample-kafka-spout");
+        kafkaConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
+        
kafkaConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
+        LOG.info("Kafka Consumer Config: {}", kafkaConsumerConfig);
+
+        KafkaConsumerFactory<String, String> kafkaConsumerFactory = new 
DefaultKafkaConsumerFactory<>(kafkaConsumerConfig);
+
+        TopologyBuilder topologyBuilder = new TopologyBuilder();
+        topologyBuilder.setSpout(KAFKA_SPOUT_NAME, new 
KafkaSpout<>(kafkaConsumerFactory, Collections.singletonList("test-topic")));
+        topologyBuilder.setBolt(LOGGING_BOLT_NAME, new 
LoggingBolt()).shuffleGrouping(KAFKA_SPOUT_NAME);
+        Config config = new Config();
+        config.setNumStmgrs(1);
+        config.setContainerCpuRequested(1);
+        config.setContainerRamRequested(ByteAmount.fromGigabytes(1));
+        config.setContainerDiskRequested(ByteAmount.fromGigabytes(1));
+
+        config.setComponentCpu(KAFKA_SPOUT_NAME, 0.25);
+        config.setComponentRam(KAFKA_SPOUT_NAME, 
ByteAmount.fromMegabytes(256));
+        config.setComponentDisk(KAFKA_SPOUT_NAME, 
ByteAmount.fromMegabytes(512));
+
+        config.setComponentCpu(LOGGING_BOLT_NAME, 0.25);
+        config.setComponentRam(LOGGING_BOLT_NAME, 
ByteAmount.fromMegabytes(256));
+        config.setComponentDisk(LOGGING_BOLT_NAME, 
ByteAmount.fromMegabytes(256));
+
+        Simulator simulator = new Simulator();
+        simulator.submitTopology("heron-kafka-spout-sample-topology", config, 
topologyBuilder.createTopology());
+    }
+
+    public static class LoggingBolt extends BaseRichBolt {
+        private static final Logger LOG = 
LoggerFactory.getLogger(LoggingBolt.class);
+        private transient OutputCollector outputCollector;
+
+        @Override
+        public void prepare(Map<String, Object> heronConf, TopologyContext 
context, OutputCollector collector) {
+            this.outputCollector = collector;
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            LOG.info("{}", input);
+            outputCollector.ack(input);
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            //do nothing
+        }
+    }
+}
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/pom.xml 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/pom.xml
new file mode 100644
index 0000000..2c7e072
--- /dev/null
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2019
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xmlns="http://maven.apache.org/POM/4.0.0";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.heron</groupId>
+        <artifactId>heron-kafka-spout-parent</artifactId>
+        <version>1.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>heron-kafka-spout</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.twitter.heron</groupId>
+            <artifactId>heron-storm</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <version>5.4.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <version>5.4.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-junit-jupiter</artifactId>
+            <version>2.24.5</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/ConsumerRecordTransformer.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/ConsumerRecordTransformer.java
new file mode 100644
index 0000000..51b4281
--- /dev/null
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/ConsumerRecordTransformer.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is the transformer class whose responsibility is to:
+ *
+ * <ol>
+ * <li>define the id of the output streams</li>
+ * <li>declare the list of fields of the output tuple</li>
+ * <li>translate the incoming Kafka record into the list of values of the 
output tuple</li>
+ * </ol>
+ * <p>
+ * The default behavior of the built-in transformer will output to stream 
"default", with 2 fields, "key" and "value" which are the key and value field 
of the incoming Kafka record.
+ *
+ * @param <K> the type of the key of the Kafka record
+ * @param <V> the type of the value of the Kafka record
+ * @see KafkaSpout#setConsumerRecordTransformer(ConsumerRecordTransformer)
+ */
+public interface ConsumerRecordTransformer<K, V> extends Serializable {
+    default List<String> getOutputStreams() {
+        return Collections.singletonList("default");
+    }
+
+    default List<String> getFieldNames(String streamId) {
+        return Arrays.asList("key", "value");
+    }
+
+    default Map<String, List<Object>> transform(ConsumerRecord<K, V> record) {
+        return Collections.singletonMap("default", Arrays.asList(record.key(), 
record.value()));
+    }
+}
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformer.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformer.java
new file mode 100644
index 0000000..ac00b4e
--- /dev/null
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformer.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+@SuppressWarnings("WeakerAccess")
+public class DefaultConsumerRecordTransformer<K, V> implements 
ConsumerRecordTransformer<K, V> {
+    private static final long serialVersionUID = -8971687732883148619L;
+}
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactory.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactory.java
new file mode 100644
index 0000000..bbc7ac5
--- /dev/null
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import java.util.Map;
+
+/**
+ * a simple Kafka Consumer factory that builds a KafkaConsumer instance from a 
{@link Map} as the properties to configure it.
+ *
+ * @param <K> the type of the key of the Kafka record
+ * @param <V> the type of the value of the Kafka record
+ */
+public class DefaultKafkaConsumerFactory<K, V> implements 
KafkaConsumerFactory<K, V> {
+    private static final long serialVersionUID = -2346087278604915148L;
+    private Map<String, Object> config;
+
+    /**
+     * the config map, key strings should be from {@link ConsumerConfig}
+     *
+     * @param config the configuration map
+     * @see <a 
href="https://kafka.apache.org/documentation/#consumerconfigs";>Kafka Consumer 
Configs</a>
+     */
+    public DefaultKafkaConsumerFactory(Map<String, Object> config) {
+        this.config = config;
+    }
+
+    @Override
+    public Consumer<K, V> create() {
+        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        return new KafkaConsumer<>(config);
+    }
+}
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProvider.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProvider.java
new file mode 100644
index 0000000..b48e2ca
--- /dev/null
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProvider.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+import java.util.regex.Pattern;
+
+/**
+ * the built-in default pattern provider to create a topic pattern out of a 
regex string
+ */
+public class DefaultTopicPatternProvider implements TopicPatternProvider {
+    private static final long serialVersionUID = 5534026856505613199L;
+    private String regex;
+
+    /**
+     * create a provider out of a regular expression string
+     *
+     * @param regex topic name regular expression
+     */
+    @SuppressWarnings("WeakerAccess")
+    public DefaultTopicPatternProvider(String regex) {
+        this.regex = regex;
+    }
+
+    @Override
+    public Pattern create() {
+        if (regex == null) {
+            throw new IllegalArgumentException("regex can not be null");
+        }
+        return Pattern.compile(regex);
+    }
+}
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaConsumerFactory.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaConsumerFactory.java
new file mode 100644
index 0000000..c67aeac
--- /dev/null
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaConsumerFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+import org.apache.kafka.clients.consumer.Consumer;
+
+import java.io.Serializable;
+
+/**
+ * the factory to create the underlying KafkaConsumer instance the Kafka Spout 
will be using to consume data from Kafka cluster
+ *
+ * @param <K> the type of the key of the Kafka record
+ * @param <V> the type of the value of the Kafka record
+ */
+public interface KafkaConsumerFactory<K, V> extends Serializable {
+    /**
+     * create the underlying KafkaConsumer
+     *
+     * @return kafka consumer instance
+     */
+    Consumer<K, V> create();
+}
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaMetricDecorator.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaMetricDecorator.java
new file mode 100644
index 0000000..9363c3a
--- /dev/null
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaMetricDecorator.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+import com.twitter.heron.api.metric.IMetric;
+import org.apache.kafka.common.Metric;
+
+/**
+ * a decorator to convert a Kafka Metric to a Heron Metric so that Kafka 
metrics can be exposed via Heron Metrics Manager
+ *
+ * @param <M> the Kafka Metric type
+ */
+public class KafkaMetricDecorator<M extends Metric> implements IMetric<Object> 
{
+    private M metric;
+
+    KafkaMetricDecorator(M metric) {
+        this.metric = metric;
+    }
+
+    @Override
+    public Object getValueAndReset() {
+        return metric.metricValue();
+    }
+}
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java
new file mode 100644
index 0000000..6334001
--- /dev/null
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java
@@ -0,0 +1,425 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+import com.twitter.heron.api.Config;
+import com.twitter.heron.api.spout.BaseRichSpout;
+import com.twitter.heron.api.spout.SpoutOutputCollector;
+import com.twitter.heron.api.state.State;
+import com.twitter.heron.api.topology.IStatefulComponent;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.common.basics.SingletonRegistry;
+import com.twitter.heron.common.config.SystemConfig;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+
+/**
+ * Kafka spout to consume data from Kafka topic(s), each record is converted 
into a tuple via {@link ConsumerRecordTransformer}, and emitted into a topology
+ *
+ * @param <K> the type of the key field of the Kafka record
+ * @param <V> the type of the value field of the Kafka record
+ */
+public class KafkaSpout<K, V> extends BaseRichSpout implements 
IStatefulComponent<TopicPartition, Long> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSpout.class);
+    private static final long serialVersionUID = -2271355516537883361L;
+    private int metricsIntervalInSecs = 60;
+    private KafkaConsumerFactory<K, V> kafkaConsumerFactory;
+    private TopicPatternProvider topicPatternProvider;
+    private Collection<String> topicNames;
+    private ConsumerRecordTransformer<K, V> consumerRecordTransformer = new 
DefaultConsumerRecordTransformer<>();
+    private transient SpoutOutputCollector collector;
+    private transient TopologyContext topologyContext;
+    private transient Queue<ConsumerRecord<K, V>> buffer;
+    private transient Consumer<K, V> consumer;
+    private transient Set<TopicPartition> assignedPartitions;
+    private transient Set<MetricName> reportedMetrics;
+    private transient Map<TopicPartition, NavigableMap<Long, Long>> 
ackRegistry;
+    private transient Map<TopicPartition, Long> failureRegistry;
+    private Config.TopologyReliabilityMode topologyReliabilityMode = 
Config.TopologyReliabilityMode.ATMOST_ONCE;
+    private long previousKafkaMetricsUpdatedTimestamp = 0;
+    private State<TopicPartition, Long> state;
+
+    /**
+     * create a KafkaSpout instance that subscribes to a list of topics
+     *
+     * @param kafkaConsumerFactory kafka consumer factory
+     * @param topicNames           list of topic names
+     */
+    public KafkaSpout(KafkaConsumerFactory<K, V> kafkaConsumerFactory, 
Collection<String> topicNames) {
+        this.kafkaConsumerFactory = kafkaConsumerFactory;
+        this.topicNames = topicNames;
+    }
+
+    /**
+     * create a KafkaSpout instance that subscribe to all topics matching the 
topic pattern
+     *
+     * @param kafkaConsumerFactory kafka consumer factory
+     * @param topicPatternProvider provider of the topic matching pattern
+     */
+    @SuppressWarnings("WeakerAccess")
+    public KafkaSpout(KafkaConsumerFactory<K, V> kafkaConsumerFactory, 
TopicPatternProvider topicPatternProvider) {
+        this.kafkaConsumerFactory = kafkaConsumerFactory;
+        this.topicPatternProvider = topicPatternProvider;
+    }
+
+    /**
+     * @return the Kafka record transformer instance used by this Kafka Spout
+     */
+    @SuppressWarnings("WeakerAccess")
+    public ConsumerRecordTransformer<K, V> getConsumerRecordTransformer() {
+        return consumerRecordTransformer;
+    }
+
+    /**
+     * set the Kafka record transformer
+     *
+     * @param consumerRecordTransformer kafka record transformer
+     */
+    @SuppressWarnings("WeakerAccess")
+    public void setConsumerRecordTransformer(ConsumerRecordTransformer<K, V> 
consumerRecordTransformer) {
+        this.consumerRecordTransformer = consumerRecordTransformer;
+    }
+
+    @Override
+    public void initState(State<TopicPartition, Long> state) {
+        this.state = state;
+        LOG.info("initial state {}", state);
+    }
+
+    @Override
+    public void preSave(String checkpointId) {
+        LOG.info("save state {}", state);
+        consumer.commitAsync(state.entrySet()
+                .stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, entry -> new 
OffsetAndMetadata(entry.getValue() + 1))), null);
+    }
+
+    @Override
+    public void open(Map<String, Object> conf, TopologyContext context, 
SpoutOutputCollector collector) {
+        this.collector = collector;
+        this.topologyContext = context;
+        this.topologyReliabilityMode = 
Config.TopologyReliabilityMode.valueOf(conf.get(Config.TOPOLOGY_RELIABILITY_MODE).toString());
+        metricsIntervalInSecs = (int) ((SystemConfig) 
SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG)).getHeronMetricsExportInterval().getSeconds();
+        consumer = kafkaConsumerFactory.create();
+        if (topicNames != null) {
+            consumer.subscribe(topicNames, new 
KafkaConsumerRebalanceListener());
+        } else {
+            consumer.subscribe(topicPatternProvider.create(), new 
KafkaConsumerRebalanceListener());
+        }
+        buffer = new ArrayDeque<>(500);
+        ackRegistry = new ConcurrentHashMap<>();
+        failureRegistry = new ConcurrentHashMap<>();
+        assignedPartitions = new HashSet<>();
+        reportedMetrics = new HashSet<>();
+    }
+
+    @Override
+    public void nextTuple() {
+        ConsumerRecord<K, V> record = buffer.poll();
+        if (record != null) {
+            // there are still records remaining for emission from the 
previous poll
+            emitConsumerRecord(record);
+        } else {
+            //all the records from previous poll have been emitted or this is 
very first time to poll
+            if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+                ackRegistry.forEach((key, value) -> {
+                    if (value != null) {
+                        //seek back to the earliest failed offset if there is 
any
+                        rewindAndDiscardAck(key, value);
+                        //commit based on the first continuous acknowledgement 
range
+                        manualCommit(key, value);
+                    }
+                });
+            }
+            poll().forEach(kvConsumerRecord -> buffer.offer(kvConsumerRecord));
+        }
+    }
+
+    @Override
+    public void activate() {
+        super.activate();
+        if (!assignedPartitions.isEmpty()) {
+            consumer.resume(assignedPartitions);
+        }
+    }
+
+    @Override
+    public void deactivate() {
+        super.deactivate();
+        if (!assignedPartitions.isEmpty()) {
+            consumer.pause(assignedPartitions);
+        }
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        super.ack(msgId);
+        long start = System.nanoTime();
+        ConsumerRecordMessageId consumerRecordMessageId = 
(ConsumerRecordMessageId) msgId;
+        TopicPartition topicPartition = 
consumerRecordMessageId.getTopicPartition();
+        long offset = consumerRecordMessageId.getOffset();
+        ackRegistry.putIfAbsent(topicPartition, new ConcurrentSkipListMap<>());
+        NavigableMap<Long, Long> navigableMap = 
ackRegistry.get(topicPartition);
+
+        Map.Entry<Long, Long> floorRange = navigableMap.floorEntry(offset);
+        Map.Entry<Long, Long> ceilingRange = navigableMap.ceilingEntry(offset);
+
+        long floorBottom = floorRange != null ? floorRange.getKey() : 
Long.MIN_VALUE;
+        long floorTop = floorRange != null ? floorRange.getValue() : 
Long.MIN_VALUE;
+        long ceilingBottom = ceilingRange != null ? ceilingRange.getKey() : 
Long.MAX_VALUE;
+        long ceilingTop = ceilingRange != null ? ceilingRange.getValue() : 
Long.MAX_VALUE;
+
+        /*
+          the ack is for a message that has already been acknowledged. This 
happens when a failed tuple has caused
+          Kafka consumer to seek back to earlier position and some messages 
are replayed.
+         */
+        if ((offset >= floorBottom && offset <= floorTop) || (offset >= 
ceilingBottom && offset <= ceilingTop))
+            return;
+        if (ceilingBottom - floorTop == 2) {
+            /*
+            the ack connects the two adjacent range
+             */
+            navigableMap.put(floorBottom, ceilingTop);
+            navigableMap.remove(ceilingBottom);
+        } else if (offset == floorTop + 1) {
+            /*
+            the acknowledged offset is the immediate neighbour of the upper 
bound of the floor range
+             */
+            navigableMap.put(floorBottom, offset);
+        } else if (offset == ceilingBottom - 1) {
+            /*
+            the acknowledged offset is the immediate neighbour of the lower 
bound of the ceiling range
+             */
+            navigableMap.remove(ceilingBottom);
+            navigableMap.put(offset, ceilingTop);
+        } else {
+            /*
+            it is a new born range
+             */
+            navigableMap.put(offset, offset);
+        }
+        LOG.debug("ack {} in {} ns", msgId, System.nanoTime() - start);
+        LOG.debug("{}", 
ackRegistry.get(consumerRecordMessageId.getTopicPartition()));
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        super.fail(msgId);
+        ConsumerRecordMessageId consumerRecordMessageId = 
(ConsumerRecordMessageId) msgId;
+        TopicPartition topicPartition = 
consumerRecordMessageId.getTopicPartition();
+        long offset = consumerRecordMessageId.getOffset();
+        failureRegistry.put(topicPartition, 
Math.min(failureRegistry.getOrDefault(topicPartition, Long.MAX_VALUE), offset));
+        LOG.warn("fail {}", msgId);
+    }
+
+    @Override
+    public void close() {
+        consumer.close();
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        consumerRecordTransformer.getOutputStreams()
+                .forEach(s -> declarer.declareStream(s, new 
Fields(consumerRecordTransformer.getFieldNames(s))));
+    }
+
+    private void emitConsumerRecord(ConsumerRecord<K, V> record) {
+        consumerRecordTransformer.transform(record)
+                .forEach((s, objects) -> {
+                    if (topologyReliabilityMode != 
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+                        collector.emit(s, objects);
+                        //only in effective once mode, we need to track the 
offset of the record that is just emitted into the topology
+                        if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
+                            state.put(new TopicPartition(record.topic(), 
record.partition()), record.offset());
+                        }
+                    } else {
+                        //build message id based on topic, partition, offset 
of the consumer record
+                        ConsumerRecordMessageId consumerRecordMessageId = new 
ConsumerRecordMessageId(new TopicPartition(record.topic(), record.partition()), 
record.offset());
+                        //emit tuple with the message id
+                        collector.emit(s, objects, consumerRecordMessageId);
+                    }
+                });
+    }
+
+    private void rewindAndDiscardAck(TopicPartition topicPartition, 
NavigableMap<Long, Long> ackRanges) {
+        if (failureRegistry.containsKey(topicPartition)) {
+            long earliestFailedOffset = failureRegistry.get(topicPartition);
+            //rewind back to the earliest failed offset
+            consumer.seek(topicPartition, earliestFailedOffset);
+            //discard the ack whose offset is greater than the earliest failed 
offset if there is any because we've rewound the consumer back
+            SortedMap<Long, Long> sortedMap = 
ackRanges.headMap(earliestFailedOffset);
+            if (!sortedMap.isEmpty()) {
+                sortedMap.put(sortedMap.lastKey(), 
Math.min(earliestFailedOffset, sortedMap.get(sortedMap.lastKey())));
+            }
+            ackRegistry.put(topicPartition, new 
ConcurrentSkipListMap<>(sortedMap));
+            //failure for this partition has been dealt with
+            failureRegistry.remove(topicPartition);
+        }
+    }
+
+    private void manualCommit(TopicPartition topicPartition, 
NavigableMap<Long, Long> ackRanges) {
+        //the first entry in the acknowledgement registry keeps track of the 
lowest possible offset that can be committed
+        Map.Entry<Long, Long> firstEntry = ackRanges.firstEntry();
+        if (firstEntry != null) {
+            consumer.commitAsync(Collections.singletonMap(topicPartition, new 
OffsetAndMetadata(firstEntry.getValue() + 1)), null);
+        }
+    }
+
+    private Iterable<ConsumerRecord<K, V>> poll() {
+        ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(200));
+        if (!records.isEmpty()) {
+            /*
+            since the Kafka Consumer metrics are built gradually based on the 
partitions it consumes,
+            we need to periodically check whether there's any new metrics to 
register after each polling.
+             */
+            if (System.currentTimeMillis() - 
previousKafkaMetricsUpdatedTimestamp > metricsIntervalInSecs) {
+                registerConsumerMetrics();
+                previousKafkaMetricsUpdatedTimestamp = 
System.currentTimeMillis();
+            }
+            if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.ATMOST_ONCE) {
+                consumer.commitAsync();
+            }
+            return records;
+        }
+        return Collections.emptyList();
+    }
+
+    private void registerConsumerMetrics() {
+        consumer.metrics().forEach((metricName, o) -> {
+            if (!reportedMetrics.contains(metricName)) {
+                reportedMetrics.add(metricName);
+                String exposedName = extractKafkaMetricName(metricName);
+                LOG.info("register Kakfa Consumer metric {}", exposedName);
+                topologyContext.registerMetric(exposedName, new 
KafkaMetricDecorator<>(o), metricsIntervalInSecs);
+            }
+        });
+    }
+
+    private String extractKafkaMetricName(MetricName metricName) {
+        StringBuilder builder = new StringBuilder()
+                .append(metricName.name())
+                .append('-')
+                .append(metricName.group());
+        metricName.tags().forEach((s, s2) -> builder.append('-')
+                .append(s)
+                .append('-')
+                .append(s2));
+        LOG.info("register Kakfa Consumer metric {}", builder);
+        return builder.toString();
+    }
+
+    static class ConsumerRecordMessageId {
+        private TopicPartition topicPartition;
+        private long offset;
+
+        ConsumerRecordMessageId(TopicPartition topicPartition, long offset) {
+            this.topicPartition = topicPartition;
+            this.offset = offset;
+        }
+
+        @Override
+        public String toString() {
+            return "ConsumerRecordMessageId{" +
+                    "topicPartition=" + topicPartition +
+                    ", offset=" + offset +
+                    '}';
+        }
+
+        TopicPartition getTopicPartition() {
+            return topicPartition;
+        }
+
+        long getOffset() {
+            return offset;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            ConsumerRecordMessageId that = (ConsumerRecordMessageId) o;
+
+            if (offset != that.offset) return false;
+            return topicPartition.equals(that.topicPartition);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = topicPartition.hashCode();
+            result = 31 * result + (int) (offset ^ (offset >>> 32));
+            return result;
+        }
+    }
+
+    class KafkaConsumerRebalanceListener implements ConsumerRebalanceListener {
+
+        @Override
+        public void onPartitionsRevoked(Collection<TopicPartition> collection) 
{
+            assignedPartitions.removeAll(collection);
+            if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+                collection.forEach(topicPartition -> {
+                    NavigableMap<Long, Long> navigableMap = 
ackRegistry.remove(topicPartition);
+                    if (navigableMap != null) {
+                        Map.Entry<Long, Long> entry = 
navigableMap.firstEntry();
+                        if (entry != null) {
+                            
consumer.commitAsync(Collections.singletonMap(topicPartition, new 
OffsetAndMetadata(Math.min(failureRegistry.getOrDefault(topicPartition, 
Long.MAX_VALUE), entry.getValue()) + 1)), null);
+                        }
+                    }
+                    failureRegistry.remove(topicPartition);
+                });
+            } else if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
+                collection.forEach(topicPartition -> 
state.remove(topicPartition));
+            }
+        }
+
+        @Override
+        public void onPartitionsAssigned(Collection<TopicPartition> 
collection) {
+            assignedPartitions.addAll(collection);
+            if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+                collection.forEach(topicPartition -> {
+                    try {
+                        long nextRecordPosition = 
consumer.position(topicPartition, Duration.ofSeconds(5));
+                        ackRegistry.put(topicPartition, new 
ConcurrentSkipListMap<>(Collections.singletonMap(nextRecordPosition - 1, 
nextRecordPosition - 1)));
+                    } catch (TimeoutException e) {
+                        LOG.warn("can not get the position of the next record 
to consume for partition {}", topicPartition);
+                        ackRegistry.remove(topicPartition);
+                    }
+                    failureRegistry.remove(topicPartition);
+                });
+            } else if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
+                collection.forEach(topicPartition -> {
+                    if (state.containsKey(topicPartition)) {
+                        consumer.seek(topicPartition, 
state.get(topicPartition));
+                    }
+                });
+            }
+        }
+    }
+}
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/TopicPatternProvider.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/TopicPatternProvider.java
new file mode 100644
index 0000000..1448609
--- /dev/null
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/TopicPatternProvider.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+import java.io.Serializable;
+import java.util.regex.Pattern;
+
+/**
+ * convenient interface to provider a topic match pattern
+ *
+ * @see org.apache.kafka.clients.consumer.Consumer#subscribe(Pattern)
+ */
+public interface TopicPatternProvider extends Serializable {
+
+    /**
+     * @return a matching pattern for topics to subscribe to
+     */
+    Pattern create();
+}
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformerTest.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformerTest.java
new file mode 100644
index 0000000..c9d676e
--- /dev/null
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class DefaultConsumerRecordTransformerTest {
+    private static final String DEFAULT_STREAM = "default";
+    private ConsumerRecordTransformer<String, byte[]> 
consumerRecordTransformer;
+
+    @BeforeEach
+    void setUp() {
+        consumerRecordTransformer = new DefaultConsumerRecordTransformer<>();
+    }
+
+    @Test
+    void getOutputStreams() {
+        assertEquals(Collections.singletonList(DEFAULT_STREAM), 
consumerRecordTransformer.getOutputStreams());
+    }
+
+    @Test
+    void getFieldNames() {
+        assertEquals(Arrays.asList("key", "value"), 
consumerRecordTransformer.getFieldNames(DEFAULT_STREAM));
+    }
+
+    @Test
+    void transform() {
+        ConsumerRecord<String, byte[]> consumerRecord = new 
ConsumerRecord<>("partition", 0, 0, "key", new byte[]{0x1, 0x2, 0x3});
+        Map<String, List<Object>> expected = 
Collections.singletonMap(DEFAULT_STREAM, Arrays.asList(consumerRecord.key(), 
consumerRecord.value()));
+        assertEquals(expected, 
consumerRecordTransformer.transform(consumerRecord));
+    }
+}
\ No newline at end of file
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactoryTest.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactoryTest.java
new file mode 100644
index 0000000..205053c
--- /dev/null
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactoryTest.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class DefaultKafkaConsumerFactoryTest {
+    private KafkaConsumerFactory<String, byte[]> kafkaConsumerFactory;
+
+    @BeforeEach
+    void setUp() {
+        Map<String, Object> config = new HashMap<>();
+        config.put("bootstrap.servers", "localhost:9092");
+        config.put("group.id", "tower-kafka-spout");
+        config.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+        config.put("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(config);
+    }
+
+    @Test
+    void create() {
+        try (Consumer<String, byte[]> consumer = 
kafkaConsumerFactory.create()) {
+            assertTrue(consumer instanceof KafkaConsumer);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProviderTest.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProviderTest.java
new file mode 100644
index 0000000..21449e4
--- /dev/null
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProviderTest.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.regex.Pattern;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class DefaultTopicPatternProviderTest {
+
+    @Test
+    void create() {
+        assertEquals(Pattern.compile("a").pattern(), new 
DefaultTopicPatternProvider("a").create().pattern());
+    }
+}
\ No newline at end of file
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaMetricDecoratorTest.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaMetricDecoratorTest.java
new file mode 100644
index 0000000..d1bb516
--- /dev/null
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaMetricDecoratorTest.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+import org.apache.kafka.common.Metric;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class KafkaMetricDecoratorTest {
+    @Mock
+    private Metric metric;
+
+    @BeforeEach
+    void setUp() {
+        when(metric.metricValue()).thenReturn("dummy value");
+    }
+
+    @Test
+    void getValueAndReset() {
+        assertEquals("dummy value", new 
KafkaMetricDecorator<>(metric).getValueAndReset());
+    }
+}
\ No newline at end of file
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaSpoutTest.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaSpoutTest.java
new file mode 100644
index 0000000..3fa026e
--- /dev/null
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaSpoutTest.java
@@ -0,0 +1,262 @@
+/*
+ * Copyright 2019
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.heron.spouts.kafka;
+
+import com.twitter.heron.api.Config;
+import com.twitter.heron.api.metric.IMetric;
+import com.twitter.heron.api.spout.SpoutOutputCollector;
+import com.twitter.heron.api.topology.OutputFieldsDeclarer;
+import com.twitter.heron.api.topology.TopologyContext;
+import com.twitter.heron.api.tuple.Fields;
+import com.twitter.heron.common.basics.SingletonRegistry;
+import com.twitter.heron.common.config.SystemConfig;
+import com.twitter.heron.common.config.SystemConfigKey;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.time.Duration;
+import java.util.*;
+import java.util.regex.Pattern;
+
+import static 
com.twitter.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE;
+import static com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE;
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.*;
+
+@ExtendWith(MockitoExtension.class)
+class KafkaSpoutTest {
+    private static final Random random = new Random();
+    private static final String DUMMY_TOPIC_NAME = "topic";
+    private KafkaSpout<String, byte[]> kafkaSpout;
+    @Mock
+    private KafkaConsumerFactory<String, byte[]> kafkaConsumerFactory;
+    @Mock
+    private Consumer<String, byte[]> consumer;
+    @Mock
+    private TopologyContext topologyContext;
+    @Mock
+    private SpoutOutputCollector collector;
+    @Mock
+    private Metric metric;
+    @Captor
+    private ArgumentCaptor<Pattern> patternArgumentCaptor;
+    @Captor
+    private ArgumentCaptor<IMetric<Object>> kafkaMetricDecoratorArgumentCaptor;
+    @Mock
+    private OutputFieldsDeclarer declarer;
+    @Captor
+    private ArgumentCaptor<Fields> fieldsArgumentCaptor;
+    @Captor
+    private ArgumentCaptor<List<Object>> listArgumentCaptor;
+    @Captor
+    private ArgumentCaptor<ConsumerRebalanceListener> 
consumerRebalanceListenerArgumentCaptor;
+
+    @BeforeAll
+    static void setUpAll() {
+        if 
(!SingletonRegistry.INSTANCE.containsSingleton(SystemConfig.HERON_SYSTEM_CONFIG))
 {
+            
SingletonRegistry.INSTANCE.registerSingleton(SystemConfig.HERON_SYSTEM_CONFIG, 
SystemConfig.newBuilder(true)
+                    .put(SystemConfigKey.HERON_METRICS_EXPORT_INTERVAL, 60)
+                    .build());
+        }
+    }
+
+    @BeforeEach
+    void setUp() {
+        kafkaSpout = new KafkaSpout<>(kafkaConsumerFactory, 
Collections.singleton(DUMMY_TOPIC_NAME));
+    }
+
+    @Test
+    void getConsumerRecordTransformer() {
+        assertTrue(kafkaSpout.getConsumerRecordTransformer() instanceof 
DefaultConsumerRecordTransformer);
+
+    }
+
+    @Test
+    void setConsumerRecordTransformer() {
+        ConsumerRecordTransformer<String, byte[]> consumerRecordTransformer = 
new DefaultConsumerRecordTransformer<>();
+        kafkaSpout.setConsumerRecordTransformer(consumerRecordTransformer);
+        assertEquals(consumerRecordTransformer, 
kafkaSpout.getConsumerRecordTransformer());
+    }
+
+    @Test
+    void open() {
+        when(kafkaConsumerFactory.create()).thenReturn(consumer);
+
+        
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, 
ATMOST_ONCE.name()), topologyContext, collector);
+        
verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)), 
any(KafkaSpout.KafkaConsumerRebalanceListener.class));
+
+        kafkaSpout = new KafkaSpout<>(kafkaConsumerFactory, new 
DefaultTopicPatternProvider("a"));
+        
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, 
ATMOST_ONCE.name()), topologyContext, collector);
+        verify(consumer).subscribe(patternArgumentCaptor.capture(), 
any(KafkaSpout.KafkaConsumerRebalanceListener.class));
+        assertEquals("a", patternArgumentCaptor.getValue().pattern());
+    }
+
+    @Test
+    void nextTuple() {
+        when(kafkaConsumerFactory.create()).thenReturn(consumer);
+        ConsumerRecords<String, byte[]> consumerRecords = new 
ConsumerRecords<>(Collections.singletonMap(new TopicPartition(DUMMY_TOPIC_NAME, 
0), Collections.singletonList(new ConsumerRecord<>(DUMMY_TOPIC_NAME, 0, 0, 
"key", new byte[]{0xF}))));
+        when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+        doReturn(Collections.singletonMap(new MetricName("name", "group", 
"description", Collections.singletonMap("name", "value")), 
metric)).when(consumer).metrics();
+        when(metric.metricValue()).thenReturn("sample value");
+
+        
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, 
ATMOST_ONCE.name()), topologyContext, collector);
+        
verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)), 
consumerRebalanceListenerArgumentCaptor.capture());
+        ConsumerRebalanceListener consumerRebalanceListener = 
consumerRebalanceListenerArgumentCaptor.getValue();
+        TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 
0);
+        
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
+
+        kafkaSpout.nextTuple();
+        verify(consumer).commitAsync();
+        verify(topologyContext).registerMetric(eq("name-group-name-value"), 
kafkaMetricDecoratorArgumentCaptor.capture(), eq(60));
+        assertEquals("sample value", 
kafkaMetricDecoratorArgumentCaptor.getValue().getValueAndReset());
+
+        kafkaSpout.nextTuple();
+        verify(collector).emit(eq("default"), listArgumentCaptor.capture());
+        assertEquals("key", listArgumentCaptor.getValue().get(0));
+        assertArrayEquals(new byte[]{0xF}, (byte[]) 
listArgumentCaptor.getValue().get(1));
+    }
+
+    @Test
+    void ack() {
+        when(kafkaConsumerFactory.create()).thenReturn(consumer);
+        TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 
0);
+        List<ConsumerRecord<String, byte[]>> recordList = new ArrayList<>();
+        byte[] randomBytes = new byte[1];
+        for (int i = 0; i < 5; i++) {
+            random.nextBytes(randomBytes);
+            recordList.add(new ConsumerRecord<>(DUMMY_TOPIC_NAME, 0, i, "key", 
Arrays.copyOf(randomBytes, randomBytes.length)));
+        }
+        ConsumerRecords<String, byte[]> consumerRecords = new 
ConsumerRecords<>(Collections.singletonMap(topicPartition, recordList));
+        when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+
+        
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, 
ATLEAST_ONCE.name()), topologyContext, collector);
+        //poll the topic
+        kafkaSpout.nextTuple();
+        //emit all of the five records
+        for (int i = 0; i < 5; i++) {
+            kafkaSpout.nextTuple();
+        }
+        //ack came in out of order and the third record is not acknowledged
+        kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 
4));
+        kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 
0));
+        kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 
1));
+        kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 
3));
+        //commit and poll
+        kafkaSpout.nextTuple();
+        verify(consumer).commitAsync(Collections.singletonMap(topicPartition, 
new OffsetAndMetadata(2)), null);
+    }
+
+    @Test
+    void fail() {
+        when(kafkaConsumerFactory.create()).thenReturn(consumer);
+        TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 
0);
+        List<ConsumerRecord<String, byte[]>> recordList = new ArrayList<>();
+        byte[] randomBytes = new byte[1];
+        for (int i = 0; i < 5; i++) {
+            random.nextBytes(randomBytes);
+            recordList.add(new ConsumerRecord<>(DUMMY_TOPIC_NAME, 0, i, "key", 
Arrays.copyOf(randomBytes, randomBytes.length)));
+        }
+        ConsumerRecords<String, byte[]> consumerRecords = new 
ConsumerRecords<>(Collections.singletonMap(topicPartition, recordList));
+        when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+
+        
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, 
ATLEAST_ONCE.name()), topologyContext, collector);
+        //poll the topic
+        kafkaSpout.nextTuple();
+        //emit all of the five records
+        for (int i = 0; i < 5; i++) {
+            kafkaSpout.nextTuple();
+        }
+        //ack came in out of order, second and third record fails
+        kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 
4));
+        kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 
0));
+        kafkaSpout.fail(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 
1));
+        kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 
3));
+        kafkaSpout.fail(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 
2));
+        //commit and poll
+        kafkaSpout.nextTuple();
+        verify(consumer).seek(topicPartition, 1);
+        verify(consumer).commitAsync(Collections.singletonMap(topicPartition, 
new OffsetAndMetadata(1)), null);
+    }
+
+    @Test
+    void close() {
+        when(kafkaConsumerFactory.create()).thenReturn(consumer);
+        
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, 
ATMOST_ONCE.name()), topologyContext, collector);
+        kafkaSpout.close();
+        verify(consumer).close();
+    }
+
+    @Test
+    void declareOutputFields() {
+        kafkaSpout.declareOutputFields(declarer);
+        verify(declarer).declareStream(eq("default"), 
fieldsArgumentCaptor.capture());
+        assertEquals(Arrays.asList("key", "value"), 
fieldsArgumentCaptor.getValue().toList());
+    }
+
+    @Test
+    void consumerRebalanceListener() {
+        when(kafkaConsumerFactory.create()).thenReturn(consumer);
+
+        
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, 
ATLEAST_ONCE.name()), topologyContext, collector);
+        
verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)), 
consumerRebalanceListenerArgumentCaptor.capture());
+        ConsumerRebalanceListener consumerRebalanceListener = 
consumerRebalanceListenerArgumentCaptor.getValue();
+        TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 
0);
+        
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
+        verify(consumer).position(topicPartition, Duration.ofSeconds(5));
+
+        kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 
0));
+        kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 
1));
+        
consumerRebalanceListener.onPartitionsRevoked(Collections.singleton(topicPartition));
+        verify(consumer).commitAsync(Collections.singletonMap(topicPartition, 
new OffsetAndMetadata(2)), null);
+    }
+
+    @Test
+    void activate() {
+        when(kafkaConsumerFactory.create()).thenReturn(consumer);
+        
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, 
ATMOST_ONCE.name()), topologyContext, collector);
+        
verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)), 
consumerRebalanceListenerArgumentCaptor.capture());
+        ConsumerRebalanceListener consumerRebalanceListener = 
consumerRebalanceListenerArgumentCaptor.getValue();
+        TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 
0);
+        
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
+        kafkaSpout.activate();
+        verify(consumer).resume(Collections.singleton(topicPartition));
+    }
+
+    @Test
+    void deactivate() {
+        when(kafkaConsumerFactory.create()).thenReturn(consumer);
+        
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, 
ATMOST_ONCE.name()), topologyContext, collector);
+        
verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)), 
consumerRebalanceListenerArgumentCaptor.capture());
+        ConsumerRebalanceListener consumerRebalanceListener = 
consumerRebalanceListenerArgumentCaptor.getValue();
+        TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 
0);
+        
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
+        kafkaSpout.deactivate();
+        verify(consumer).pause(Collections.singleton(topicPartition));
+    }
+}
\ No newline at end of file
diff --git a/contrib/spouts/kafka/java/heron-kafka-spout-parent/pom.xml 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/pom.xml
new file mode 100644
index 0000000..bd1b5e97
--- /dev/null
+++ b/contrib/spouts/kafka/java/heron-kafka-spout-parent/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2019
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xmlns="http://maven.apache.org/POM/4.0.0";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.heron</groupId>
+    <artifactId>heron-kafka-spout-parent</artifactId>
+    <version>1.0-SNAPSHOT</version>
+    <packaging>pom</packaging>
+
+    <properties>
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <modules>
+        <module>heron-kafka-spout</module>
+        <module>heron-kafka-spout-sample</module>
+    </modules>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>com.twitter.heron</groupId>
+                <artifactId>heron-api</artifactId>
+                <version>0.17.8</version>
+            </dependency>
+            <dependency>
+                <groupId>com.twitter.heron</groupId>
+                <artifactId>heron-storm</artifactId>
+                <version>0.17.8</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.kafka</groupId>
+                <artifactId>kafka-clients</artifactId>
+                <version>2.1.1</version>
+            </dependency>
+            <dependency>
+                <groupId>org.slf4j</groupId>
+                <artifactId>slf4j-jdk14</artifactId>
+                <version>1.7.26</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <build>
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-surefire-plugin</artifactId>
+                    <version>2.22.1</version>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+    </build>
+
+</project>
\ No newline at end of file

Reply via email to