This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fb3945  Kafka spout bazel build (#3254)
8fb3945 is described below

commit 8fb39456f7e172fed70031755f05c0f7d6d5768b
Author: SiMing Weng <[email protected]>
AuthorDate: Thu May 16 00:00:34 2019 -0400

    Kafka spout bazel build (#3254)
    
    * convert the kafka spout sub project to a bazel build
    
    * change imports to apache package name
    
    * sort the order of dependencies
    
    * trigger unit tests for kafka spout in Travis
    
    * fix checkstyle errors
    
    * fix more checkstyle errors
---
 .travis.yml                                        |   2 -
 WORKSPACE                                          |   2 +-
 .../heron-kafka-spout-sample/pom.xml               |  42 --
 .../heron-kafka-spout-sample/src/main/java/BUILD   |  18 +
 .../sample/HeronKafkaSpoutSampleTopology.java      | 162 +++--
 .../heron-kafka-spout/pom.xml                      |  60 --
 .../heron-kafka-spout/src/main/java/BUILD          |  18 +
 .../spouts/kafka/ConsumerRecordTransformer.java    |  43 +-
 .../kafka/DefaultConsumerRecordTransformer.java    |  20 +-
 .../spouts/kafka/DefaultKafkaConsumerFactory.java  |  57 +-
 .../spouts/kafka/DefaultTopicPatternProvider.java  |  52 +-
 .../heron/spouts/kafka/KafkaConsumerFactory.java   |  37 +-
 .../heron/spouts/kafka/KafkaMetricDecorator.java   |  39 +-
 .../org/apache/heron/spouts/kafka/KafkaSpout.java  | 794 +++++++++++----------
 .../heron/spouts/kafka/TopicPatternProvider.java   |  28 +-
 .../heron-kafka-spout/src/test/java/BUILD          |  44 ++
 .../DefaultConsumerRecordTransformerTest.java      |  87 +--
 .../kafka/DefaultKafkaConsumerFactoryTest.java     |  66 +-
 .../kafka/DefaultTopicPatternProviderTest.java     |  37 +-
 .../spouts/kafka/KafkaMetricDecoratorTest.java     |  57 +-
 .../apache/heron/spouts/kafka/KafkaSpoutTest.java  | 492 +++++++------
 .../kafka/java/heron-kafka-spout-parent/pom.xml    |  76 --
 .../eco/builder/heron/HeronStreamBuilderTest.java  |   3 +-
 scripts/get_all_heron_paths.sh                     |   2 +-
 scripts/travis/build.sh                            |   4 +-
 25 files changed, 1148 insertions(+), 1094 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index d9aafc7..464e5b4 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -49,5 +49,3 @@ script:
   - which python2.7
   - python2.7 -V
   - scripts/travis/ci.sh
-  # run unit tests for heron kafka spout
-  - mvn -f contrib/spouts/kafka/java/heron-kafka-spout-parent/pom.xml test
diff --git a/WORKSPACE b/WORKSPACE
index e56e40f..9172be7 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -454,7 +454,7 @@ maven_jar(
 
 maven_jar(
   name = "org_apache_kafka_kafka_clients",
-  artifact = "org.apache.kafka:kafka-clients:0.8.2.1",
+  artifact = "org.apache.kafka:kafka-clients:2.2.0",
 )
 
 maven_jar(
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/pom.xml
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/pom.xml
deleted file mode 100644
index 9eb1571..0000000
--- 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/pom.xml
+++ /dev/null
@@ -1,42 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Copyright 2019
-  ~
-  ~ Licensed under the Apache License, Version 2.0 (the "License");
-  ~ you may not use this file except in compliance with the License.
-  ~ You may obtain a copy of the License at
-  ~
-  ~ http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xmlns="http://maven.apache.org/POM/4.0.0";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <parent>
-        <artifactId>heron-kafka-spout-parent</artifactId>
-        <groupId>org.apache.heron</groupId>
-        <version>1.0-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>heron-kafka-spout-sample</artifactId>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.heron</groupId>
-            <artifactId>heron-kafka-spout</artifactId>
-            <version>1.0-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>com.twitter.heron</groupId>
-            <artifactId>heron-storm</artifactId>
-        </dependency>
-    </dependencies>
-
-</project>
\ No newline at end of file
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/src/main/java/BUILD
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/src/main/java/BUILD
new file mode 100644
index 0000000..04d69b0
--- /dev/null
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/src/main/java/BUILD
@@ -0,0 +1,18 @@
+licenses(["notice"])
+
+package(default_visibility = ["//visibility:public"])
+
+heron_kafka_spout_sample_dep = [
+    
"//contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java:heron-kafka-spout-java",
+    "//heron/api/src/java:api-java-low-level",
+    "//heron/common/src/java:basics-java",
+    "//heron/simulator/src/java:simulator-java",
+    "@org_apache_kafka_kafka_clients//jar",
+    "@org_slf4j_slf4j_api//jar",
+]
+
+java_binary(
+    name = "heron-kafka-spout-java-sample",
+    srcs = glob(["org/apache/heron/spouts/kafka/**/*.java"]),
+    deps = heron_kafka_spout_sample_dep,
+)
\ No newline at end of file
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/src/main/java/org/apache/heron/spouts/kafka/sample/HeronKafkaSpoutSampleTopology.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/src/main/java/org/apache/heron/spouts/kafka/sample/HeronKafkaSpoutSampleTopology.java
index 9a7de40..c63a6da 100644
--- 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/src/main/java/org/apache/heron/spouts/kafka/sample/HeronKafkaSpoutSampleTopology.java
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout-sample/src/main/java/org/apache/heron/spouts/kafka/sample/HeronKafkaSpoutSampleTopology.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,80 +18,90 @@
 
 package org.apache.heron.spouts.kafka.sample;
 
-import com.twitter.heron.api.Config;
-import com.twitter.heron.api.bolt.BaseRichBolt;
-import com.twitter.heron.api.bolt.OutputCollector;
-import com.twitter.heron.api.topology.OutputFieldsDeclarer;
-import com.twitter.heron.api.topology.TopologyBuilder;
-import com.twitter.heron.api.topology.TopologyContext;
-import com.twitter.heron.api.tuple.Tuple;
-import com.twitter.heron.common.basics.ByteAmount;
-import com.twitter.heron.simulator.Simulator;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.bolt.BaseRichBolt;
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.common.basics.ByteAmount;
+import org.apache.heron.simulator.Simulator;
 import org.apache.heron.spouts.kafka.DefaultKafkaConsumerFactory;
 import org.apache.heron.spouts.kafka.KafkaConsumerFactory;
 import org.apache.heron.spouts.kafka.KafkaSpout;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+public final class HeronKafkaSpoutSampleTopology {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HeronKafkaSpoutSampleTopology.class);
+  private static final String KAFKA_SPOUT_NAME = "kafka-spout";
+  private static final String LOGGING_BOLT_NAME = "logging-bolt";
+
+  private HeronKafkaSpoutSampleTopology() {
+  }
+
+  public static void main(String[] args) {
+    Map<String, Object> kafkaConsumerConfig = new HashMap<>();
+    kafkaConsumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
+    kafkaConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, 
"sample-kafka-spout");
+    kafkaConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+        "org.apache.kafka.common.serialization.StringDeserializer");
+    kafkaConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+        "org.apache.kafka.common.serialization.StringDeserializer");
+    LOG.info("Kafka Consumer Config: {}", kafkaConsumerConfig);
+
+    KafkaConsumerFactory<String, String> kafkaConsumerFactory =
+        new DefaultKafkaConsumerFactory<>(kafkaConsumerConfig);
+
+    TopologyBuilder topologyBuilder = new TopologyBuilder();
+    topologyBuilder.setSpout(KAFKA_SPOUT_NAME, new 
KafkaSpout<>(kafkaConsumerFactory,
+        Collections.singletonList("test-topic")));
+    topologyBuilder.setBolt(LOGGING_BOLT_NAME, new 
LoggingBolt()).shuffleGrouping(KAFKA_SPOUT_NAME);
+    Config config = new Config();
+    config.setNumStmgrs(1);
+    config.setContainerCpuRequested(1);
+    config.setContainerRamRequested(ByteAmount.fromGigabytes(1));
+    config.setContainerDiskRequested(ByteAmount.fromGigabytes(1));
+
+    config.setComponentCpu(KAFKA_SPOUT_NAME, 0.25);
+    config.setComponentRam(KAFKA_SPOUT_NAME, ByteAmount.fromMegabytes(256));
+    config.setComponentDisk(KAFKA_SPOUT_NAME, ByteAmount.fromMegabytes(512));
+
+    config.setComponentCpu(LOGGING_BOLT_NAME, 0.25);
+    config.setComponentRam(LOGGING_BOLT_NAME, ByteAmount.fromMegabytes(256));
+    config.setComponentDisk(LOGGING_BOLT_NAME, ByteAmount.fromMegabytes(256));
+
+    Simulator simulator = new Simulator();
+    simulator.submitTopology("heron-kafka-spout-sample-topology", config,
+        topologyBuilder.createTopology());
+  }
+
+  public static class LoggingBolt extends BaseRichBolt {
+    private static final Logger LOG = 
LoggerFactory.getLogger(LoggingBolt.class);
+    private transient OutputCollector outputCollector;
+
+    @Override
+    public void prepare(Map<String, Object> heronConf, TopologyContext context,
+                        OutputCollector collector) {
+      this.outputCollector = collector;
+    }
 
-public class HeronKafkaSpoutSampleTopology {
-    private static final Logger LOG = 
LoggerFactory.getLogger(HeronKafkaSpoutSampleTopology.class);
-    private static final String KAFKA_SPOUT_NAME = "kafka-spout";
-    private static final String LOGGING_BOLT_NAME = "logging-bolt";
-
-    public static void main(String[] args) {
-        Map<String, Object> kafkaConsumerConfig = new HashMap<>();
-        kafkaConsumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
-        kafkaConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, 
"sample-kafka-spout");
-        kafkaConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
-        
kafkaConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
-        LOG.info("Kafka Consumer Config: {}", kafkaConsumerConfig);
-
-        KafkaConsumerFactory<String, String> kafkaConsumerFactory = new 
DefaultKafkaConsumerFactory<>(kafkaConsumerConfig);
-
-        TopologyBuilder topologyBuilder = new TopologyBuilder();
-        topologyBuilder.setSpout(KAFKA_SPOUT_NAME, new 
KafkaSpout<>(kafkaConsumerFactory, Collections.singletonList("test-topic")));
-        topologyBuilder.setBolt(LOGGING_BOLT_NAME, new 
LoggingBolt()).shuffleGrouping(KAFKA_SPOUT_NAME);
-        Config config = new Config();
-        config.setNumStmgrs(1);
-        config.setContainerCpuRequested(1);
-        config.setContainerRamRequested(ByteAmount.fromGigabytes(1));
-        config.setContainerDiskRequested(ByteAmount.fromGigabytes(1));
-
-        config.setComponentCpu(KAFKA_SPOUT_NAME, 0.25);
-        config.setComponentRam(KAFKA_SPOUT_NAME, 
ByteAmount.fromMegabytes(256));
-        config.setComponentDisk(KAFKA_SPOUT_NAME, 
ByteAmount.fromMegabytes(512));
-
-        config.setComponentCpu(LOGGING_BOLT_NAME, 0.25);
-        config.setComponentRam(LOGGING_BOLT_NAME, 
ByteAmount.fromMegabytes(256));
-        config.setComponentDisk(LOGGING_BOLT_NAME, 
ByteAmount.fromMegabytes(256));
-
-        Simulator simulator = new Simulator();
-        simulator.submitTopology("heron-kafka-spout-sample-topology", config, 
topologyBuilder.createTopology());
+    @Override
+    public void execute(Tuple input) {
+      LOG.info("{}", input);
+      outputCollector.ack(input);
     }
 
-    public static class LoggingBolt extends BaseRichBolt {
-        private static final Logger LOG = 
LoggerFactory.getLogger(LoggingBolt.class);
-        private transient OutputCollector outputCollector;
-
-        @Override
-        public void prepare(Map<String, Object> heronConf, TopologyContext 
context, OutputCollector collector) {
-            this.outputCollector = collector;
-        }
-
-        @Override
-        public void execute(Tuple input) {
-            LOG.info("{}", input);
-            outputCollector.ack(input);
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            //do nothing
-        }
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      //do nothing
     }
+  }
 }
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/pom.xml 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/pom.xml
deleted file mode 100644
index 2c7e072..0000000
--- 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/pom.xml
+++ /dev/null
@@ -1,60 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Copyright 2019
-  ~
-  ~ Licensed under the Apache License, Version 2.0 (the "License");
-  ~ you may not use this file except in compliance with the License.
-  ~ You may obtain a copy of the License at
-  ~
-  ~ http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xmlns="http://maven.apache.org/POM/4.0.0";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.heron</groupId>
-        <artifactId>heron-kafka-spout-parent</artifactId>
-        <version>1.0-SNAPSHOT</version>
-    </parent>
-
-    <artifactId>heron-kafka-spout</artifactId>
-
-    <dependencies>
-        <dependency>
-            <groupId>com.twitter.heron</groupId>
-            <artifactId>heron-storm</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka-clients</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.junit.jupiter</groupId>
-            <artifactId>junit-jupiter-api</artifactId>
-            <version>5.4.0</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.junit.jupiter</groupId>
-            <artifactId>junit-jupiter-engine</artifactId>
-            <version>5.4.0</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-junit-jupiter</artifactId>
-            <version>2.24.5</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-</project>
\ No newline at end of file
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/BUILD
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/BUILD
new file mode 100644
index 0000000..efa3736
--- /dev/null
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/BUILD
@@ -0,0 +1,18 @@
+licenses(["notice"])
+
+package(default_visibility = ["//visibility:public"])
+
+kafka_spout_deps = [
+    "//storm-compatibility/src/java:storm-compatibility-java-neverlink",
+    "//heron/api/src/java:api-java-low-level",
+    "//heron/common/src/java:basics-java",
+    "//heron/common/src/java:config-java",
+    "//third_party/java:logging",
+    "@org_apache_kafka_kafka_clients//jar",
+]
+
+java_library(
+    name = "heron-kafka-spout-java",
+    srcs = glob(["org/apache/heron/spouts/kafka/**/*.java"]),
+    deps = kafka_spout_deps,
+)
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/ConsumerRecordTransformer.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/ConsumerRecordTransformer.java
index 51b4281..cf00b2f 100644
--- 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/ConsumerRecordTransformer.java
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/ConsumerRecordTransformer.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,14 +18,14 @@
 
 package org.apache.heron.spouts.kafka;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
 /**
  * This is the transformer class whose responsibility is to:
  *
@@ -33,22 +35,23 @@ import java.util.Map;
  * <li>translate the incoming Kafka record into the list of values of the 
output tuple</li>
  * </ol>
  * <p>
- * The default behavior of the built-in transformer will output to stream 
"default", with 2 fields, "key" and "value" which are the key and value field 
of the incoming Kafka record.
+ * The default behavior of the built-in transformer will output to stream 
"default",
+ * with 2 fields, "key" and "value" which are the key and value field of the 
incoming Kafka record.
  *
  * @param <K> the type of the key of the Kafka record
  * @param <V> the type of the value of the Kafka record
  * @see KafkaSpout#setConsumerRecordTransformer(ConsumerRecordTransformer)
  */
 public interface ConsumerRecordTransformer<K, V> extends Serializable {
-    default List<String> getOutputStreams() {
-        return Collections.singletonList("default");
-    }
+  default List<String> getOutputStreams() {
+    return Collections.singletonList("default");
+  }
 
-    default List<String> getFieldNames(String streamId) {
-        return Arrays.asList("key", "value");
-    }
+  default List<String> getFieldNames(String streamId) {
+    return Arrays.asList("key", "value");
+  }
 
-    default Map<String, List<Object>> transform(ConsumerRecord<K, V> record) {
-        return Collections.singletonMap("default", Arrays.asList(record.key(), 
record.value()));
-    }
+  default Map<String, List<Object>> transform(ConsumerRecord<K, V> record) {
+    return Collections.singletonMap("default", Arrays.asList(record.key(), 
record.value()));
+  }
 }
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformer.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformer.java
index ac00b4e..7202769 100644
--- 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformer.java
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformer.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,5 +20,5 @@ package org.apache.heron.spouts.kafka;
 
 @SuppressWarnings("WeakerAccess")
 public class DefaultConsumerRecordTransformer<K, V> implements 
ConsumerRecordTransformer<K, V> {
-    private static final long serialVersionUID = -8971687732883148619L;
+  private static final long serialVersionUID = -8971687732883148619L;
 }
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactory.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactory.java
index bbc7ac5..1178352 100644
--- 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactory.java
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactory.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,35 +18,36 @@
 
 package org.apache.heron.spouts.kafka;
 
+import java.util.Map;
+
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 
-import java.util.Map;
-
 /**
- * a simple Kafka Consumer factory that builds a KafkaConsumer instance from a 
{@link Map} as the properties to configure it.
+ * a simple Kafka Consumer factory that builds a KafkaConsumer instance from a 
{@link Map} as
+ * the properties to configure it.
  *
  * @param <K> the type of the key of the Kafka record
  * @param <V> the type of the value of the Kafka record
  */
 public class DefaultKafkaConsumerFactory<K, V> implements 
KafkaConsumerFactory<K, V> {
-    private static final long serialVersionUID = -2346087278604915148L;
-    private Map<String, Object> config;
+  private static final long serialVersionUID = -2346087278604915148L;
+  private Map<String, Object> config;
 
-    /**
-     * the config map, key strings should be from {@link ConsumerConfig}
-     *
-     * @param config the configuration map
-     * @see <a 
href="https://kafka.apache.org/documentation/#consumerconfigs";>Kafka Consumer 
Configs</a>
-     */
-    public DefaultKafkaConsumerFactory(Map<String, Object> config) {
-        this.config = config;
-    }
+  /**
+   * the config map, key strings should be from {@link ConsumerConfig}
+   *
+   * @param config the configuration map
+   * @see <a 
href="https://kafka.apache.org/documentation/#consumerconfigs";>Kafka Consumer 
Configs</a>
+   */
+  public DefaultKafkaConsumerFactory(Map<String, Object> config) {
+    this.config = config;
+  }
 
-    @Override
-    public Consumer<K, V> create() {
-        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-        return new KafkaConsumer<>(config);
-    }
+  @Override
+  public Consumer<K, V> create() {
+    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+    return new KafkaConsumer<>(config);
+  }
 }
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProvider.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProvider.java
index b48e2ca..bf0de6e 100644
--- 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProvider.java
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProvider.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,24 +24,24 @@ import java.util.regex.Pattern;
  * the built-in default pattern provider to create a topic pattern out of a 
regex string
  */
 public class DefaultTopicPatternProvider implements TopicPatternProvider {
-    private static final long serialVersionUID = 5534026856505613199L;
-    private String regex;
+  private static final long serialVersionUID = 5534026856505613199L;
+  private String regex;
 
-    /**
-     * create a provider out of a regular expression string
-     *
-     * @param regex topic name regular expression
-     */
-    @SuppressWarnings("WeakerAccess")
-    public DefaultTopicPatternProvider(String regex) {
-        this.regex = regex;
-    }
+  /**
+   * create a provider out of a regular expression string
+   *
+   * @param regex topic name regular expression
+   */
+  @SuppressWarnings("WeakerAccess")
+  public DefaultTopicPatternProvider(String regex) {
+    this.regex = regex;
+  }
 
-    @Override
-    public Pattern create() {
-        if (regex == null) {
-            throw new IllegalArgumentException("regex can not be null");
-        }
-        return Pattern.compile(regex);
+  @Override
+  public Pattern create() {
+    if (regex == null) {
+      throw new IllegalArgumentException("regex can not be null");
     }
+    return Pattern.compile(regex);
+  }
 }
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaConsumerFactory.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaConsumerFactory.java
index c67aeac..6ac2d2e 100644
--- 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaConsumerFactory.java
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaConsumerFactory.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,21 +18,22 @@
 
 package org.apache.heron.spouts.kafka;
 
-import org.apache.kafka.clients.consumer.Consumer;
-
 import java.io.Serializable;
 
+import org.apache.kafka.clients.consumer.Consumer;
+
 /**
- * the factory to create the underlying KafkaConsumer instance the Kafka Spout 
will be using to consume data from Kafka cluster
+ * the factory to create the underlying KafkaConsumer instance the Kafka Spout
+ * will be using to consume data from Kafka cluster
  *
  * @param <K> the type of the key of the Kafka record
  * @param <V> the type of the value of the Kafka record
  */
 public interface KafkaConsumerFactory<K, V> extends Serializable {
-    /**
-     * create the underlying KafkaConsumer
-     *
-     * @return kafka consumer instance
-     */
-    Consumer<K, V> create();
+  /**
+   * create the underlying KafkaConsumer
+   *
+   * @return kafka consumer instance
+   */
+  Consumer<K, V> create();
 }
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaMetricDecorator.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaMetricDecorator.java
index 9363c3a..55f718f 100644
--- 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaMetricDecorator.java
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaMetricDecorator.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,23 +18,24 @@
 
 package org.apache.heron.spouts.kafka;
 
-import com.twitter.heron.api.metric.IMetric;
+import org.apache.heron.api.metric.IMetric;
 import org.apache.kafka.common.Metric;
 
 /**
- * a decorator to convert a Kafka Metric to a Heron Metric so that Kafka 
metrics can be exposed via Heron Metrics Manager
+ * a decorator to convert a Kafka Metric to a Heron Metric so that Kafka
+ * metrics can be exposed via Heron Metrics Manager
  *
  * @param <M> the Kafka Metric type
  */
 public class KafkaMetricDecorator<M extends Metric> implements IMetric<Object> 
{
-    private M metric;
+  private M metric;
 
-    KafkaMetricDecorator(M metric) {
-        this.metric = metric;
-    }
+  KafkaMetricDecorator(M metric) {
+    this.metric = metric;
+  }
 
-    @Override
-    public Object getValueAndReset() {
-        return metric.metricValue();
-    }
+  @Override
+  public Object getValueAndReset() {
+    return metric.metricValue();
+  }
 }
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java
index 6334001..bc13d0a 100644
--- 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/KafkaSpout.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,410 +18,458 @@
 
 package org.apache.heron.spouts.kafka;
 
-import com.twitter.heron.api.Config;
-import com.twitter.heron.api.spout.BaseRichSpout;
-import com.twitter.heron.api.spout.SpoutOutputCollector;
-import com.twitter.heron.api.state.State;
-import com.twitter.heron.api.topology.IStatefulComponent;
-import com.twitter.heron.api.topology.OutputFieldsDeclarer;
-import com.twitter.heron.api.topology.TopologyContext;
-import com.twitter.heron.api.tuple.Fields;
-import com.twitter.heron.common.basics.SingletonRegistry;
-import com.twitter.heron.common.config.SystemConfig;
-import org.apache.kafka.clients.consumer.*;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.time.Duration;
-import java.util.*;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Queue;
+import java.util.Set;
+import java.util.SortedMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.stream.Collectors;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.spout.BaseRichSpout;
+import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.IStatefulComponent;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.common.basics.SingletonRegistry;
+import org.apache.heron.common.config.SystemConfig;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+
 /**
  * Kafka spout to consume data from Kafka topic(s), each record is converted 
into a tuple via {@link ConsumerRecordTransformer}, and emitted into a topology
  *
  * @param <K> the type of the key field of the Kafka record
  * @param <V> the type of the value field of the Kafka record
  */
-public class KafkaSpout<K, V> extends BaseRichSpout implements 
IStatefulComponent<TopicPartition, Long> {
-    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSpout.class);
-    private static final long serialVersionUID = -2271355516537883361L;
-    private int metricsIntervalInSecs = 60;
-    private KafkaConsumerFactory<K, V> kafkaConsumerFactory;
-    private TopicPatternProvider topicPatternProvider;
-    private Collection<String> topicNames;
-    private ConsumerRecordTransformer<K, V> consumerRecordTransformer = new 
DefaultConsumerRecordTransformer<>();
-    private transient SpoutOutputCollector collector;
-    private transient TopologyContext topologyContext;
-    private transient Queue<ConsumerRecord<K, V>> buffer;
-    private transient Consumer<K, V> consumer;
-    private transient Set<TopicPartition> assignedPartitions;
-    private transient Set<MetricName> reportedMetrics;
-    private transient Map<TopicPartition, NavigableMap<Long, Long>> 
ackRegistry;
-    private transient Map<TopicPartition, Long> failureRegistry;
-    private Config.TopologyReliabilityMode topologyReliabilityMode = 
Config.TopologyReliabilityMode.ATMOST_ONCE;
-    private long previousKafkaMetricsUpdatedTimestamp = 0;
-    private State<TopicPartition, Long> state;
-
-    /**
-     * create a KafkaSpout instance that subscribes to a list of topics
-     *
-     * @param kafkaConsumerFactory kafka consumer factory
-     * @param topicNames           list of topic names
-     */
-    public KafkaSpout(KafkaConsumerFactory<K, V> kafkaConsumerFactory, 
Collection<String> topicNames) {
-        this.kafkaConsumerFactory = kafkaConsumerFactory;
-        this.topicNames = topicNames;
+public class KafkaSpout<K, V> extends BaseRichSpout
+    implements IStatefulComponent<TopicPartition, Long> {
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
+  private static final long serialVersionUID = -2271355516537883361L;
+  private int metricsIntervalInSecs = 60;
+  private KafkaConsumerFactory<K, V> kafkaConsumerFactory;
+  private TopicPatternProvider topicPatternProvider;
+  private Collection<String> topicNames;
+  private ConsumerRecordTransformer<K, V> consumerRecordTransformer =
+      new DefaultConsumerRecordTransformer<>();
+  private transient SpoutOutputCollector collector;
+  private transient TopologyContext topologyContext;
+  private transient Queue<ConsumerRecord<K, V>> buffer;
+  private transient Consumer<K, V> consumer;
+  private transient Set<TopicPartition> assignedPartitions;
+  private transient Set<MetricName> reportedMetrics;
+  private transient Map<TopicPartition, NavigableMap<Long, Long>> ackRegistry;
+  private transient Map<TopicPartition, Long> failureRegistry;
+  private Config.TopologyReliabilityMode topologyReliabilityMode =
+      Config.TopologyReliabilityMode.ATMOST_ONCE;
+  private long previousKafkaMetricsUpdatedTimestamp = 0;
+  private State<TopicPartition, Long> state;
+
+  /**
+   * create a KafkaSpout instance that subscribes to a list of topics
+   *
+   * @param kafkaConsumerFactory kafka consumer factory
+   * @param topicNames list of topic names
+   */
+  public KafkaSpout(KafkaConsumerFactory<K, V> kafkaConsumerFactory,
+                    Collection<String> topicNames) {
+    this.kafkaConsumerFactory = kafkaConsumerFactory;
+    this.topicNames = topicNames;
+  }
+
+  /**
+   * create a KafkaSpout instance that subscribe to all topics matching the 
topic pattern
+   *
+   * @param kafkaConsumerFactory kafka consumer factory
+   * @param topicPatternProvider provider of the topic matching pattern
+   */
+  @SuppressWarnings("WeakerAccess")
+  public KafkaSpout(KafkaConsumerFactory<K, V> kafkaConsumerFactory,
+                    TopicPatternProvider topicPatternProvider) {
+    this.kafkaConsumerFactory = kafkaConsumerFactory;
+    this.topicPatternProvider = topicPatternProvider;
+  }
+
+  /**
+   * return the consumer record transformer
+   *
+   * @return the Kafka record transformer instance used by this Kafka Spout
+   */
+  @SuppressWarnings("WeakerAccess")
+  public ConsumerRecordTransformer<K, V> getConsumerRecordTransformer() {
+    return consumerRecordTransformer;
+  }
+
+  /**
+   * set the Kafka record transformer
+   *
+   * @param consumerRecordTransformer kafka record transformer
+   */
+  @SuppressWarnings("WeakerAccess")
+  public void setConsumerRecordTransformer(ConsumerRecordTransformer<K, V>
+                                               consumerRecordTransformer) {
+    this.consumerRecordTransformer = consumerRecordTransformer;
+  }
+
+  @Override
+  public void initState(State<TopicPartition, Long> aState) {
+    this.state = aState;
+    LOG.info("initial state {}", aState);
+  }
+
+  @Override
+  public void preSave(String checkpointId) {
+    LOG.info("save state {}", state);
+    consumer.commitAsync(state.entrySet()
+        .stream()
+        .collect(Collectors.toMap(Map.Entry::getKey, entry ->
+            new OffsetAndMetadata(entry.getValue() + 1))), null);
+  }
+
+  @Override
+  public void open(Map<String, Object> conf, TopologyContext context,
+                   SpoutOutputCollector aCollector) {
+    this.collector = aCollector;
+    this.topologyContext = context;
+    this.topologyReliabilityMode = Config.TopologyReliabilityMode.valueOf(
+        conf.get(Config.TOPOLOGY_RELIABILITY_MODE).toString());
+    metricsIntervalInSecs = (int) ((SystemConfig) SingletonRegistry.INSTANCE
+        .getSingleton(SystemConfig.HERON_SYSTEM_CONFIG))
+        .getHeronMetricsExportInterval().getSeconds();
+    consumer = kafkaConsumerFactory.create();
+    if (topicNames != null) {
+      consumer.subscribe(topicNames, new KafkaConsumerRebalanceListener());
+    } else {
+      consumer.subscribe(topicPatternProvider.create(), new 
KafkaConsumerRebalanceListener());
     }
-
-    /**
-     * create a KafkaSpout instance that subscribe to all topics matching the 
topic pattern
-     *
-     * @param kafkaConsumerFactory kafka consumer factory
-     * @param topicPatternProvider provider of the topic matching pattern
-     */
-    @SuppressWarnings("WeakerAccess")
-    public KafkaSpout(KafkaConsumerFactory<K, V> kafkaConsumerFactory, 
TopicPatternProvider topicPatternProvider) {
-        this.kafkaConsumerFactory = kafkaConsumerFactory;
-        this.topicPatternProvider = topicPatternProvider;
-    }
-
-    /**
-     * @return the Kafka record transformer instance used by this Kafka Spout
-     */
-    @SuppressWarnings("WeakerAccess")
-    public ConsumerRecordTransformer<K, V> getConsumerRecordTransformer() {
-        return consumerRecordTransformer;
+    buffer = new ArrayDeque<>(500);
+    ackRegistry = new ConcurrentHashMap<>();
+    failureRegistry = new ConcurrentHashMap<>();
+    assignedPartitions = new HashSet<>();
+    reportedMetrics = new HashSet<>();
+  }
+
+  @Override
+  public void nextTuple() {
+    ConsumerRecord<K, V> record = buffer.poll();
+    if (record != null) {
+      // there are still records remaining for emission from the previous poll
+      emitConsumerRecord(record);
+    } else {
+      //all the records from previous poll have been
+      //emitted or this is very first time to poll
+      if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+        ackRegistry.forEach((key, value) -> {
+          if (value != null) {
+            //seek back to the earliest failed offset if there is any
+            rewindAndDiscardAck(key, value);
+            //commit based on the first continuous acknowledgement range
+            manualCommit(key, value);
+          }
+        });
+      }
+      poll().forEach(kvConsumerRecord -> buffer.offer(kvConsumerRecord));
     }
+  }
 
-    /**
-     * set the Kafka record transformer
-     *
-     * @param consumerRecordTransformer kafka record transformer
-     */
-    @SuppressWarnings("WeakerAccess")
-    public void setConsumerRecordTransformer(ConsumerRecordTransformer<K, V> 
consumerRecordTransformer) {
-        this.consumerRecordTransformer = consumerRecordTransformer;
+  @Override
+  public void activate() {
+    super.activate();
+    if (!assignedPartitions.isEmpty()) {
+      consumer.resume(assignedPartitions);
     }
+  }
 
-    @Override
-    public void initState(State<TopicPartition, Long> state) {
-        this.state = state;
-        LOG.info("initial state {}", state);
+  @Override
+  public void deactivate() {
+    super.deactivate();
+    if (!assignedPartitions.isEmpty()) {
+      consumer.pause(assignedPartitions);
     }
-
-    @Override
-    public void preSave(String checkpointId) {
-        LOG.info("save state {}", state);
-        consumer.commitAsync(state.entrySet()
-                .stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, entry -> new 
OffsetAndMetadata(entry.getValue() + 1))), null);
+  }
+
+  @Override
+  public void ack(Object msgId) {
+    super.ack(msgId);
+    long start = System.nanoTime();
+    ConsumerRecordMessageId consumerRecordMessageId = 
(ConsumerRecordMessageId) msgId;
+    TopicPartition topicPartition = 
consumerRecordMessageId.getTopicPartition();
+    long offset = consumerRecordMessageId.getOffset();
+    ackRegistry.putIfAbsent(topicPartition, new ConcurrentSkipListMap<>());
+    NavigableMap<Long, Long> navigableMap = ackRegistry.get(topicPartition);
+
+    Map.Entry<Long, Long> floorRange = navigableMap.floorEntry(offset);
+    Map.Entry<Long, Long> ceilingRange = navigableMap.ceilingEntry(offset);
+
+    long floorBottom = floorRange != null ? floorRange.getKey() : 
Long.MIN_VALUE;
+    long floorTop = floorRange != null ? floorRange.getValue() : 
Long.MIN_VALUE;
+    long ceilingBottom = ceilingRange != null ? ceilingRange.getKey() : 
Long.MAX_VALUE;
+    long ceilingTop = ceilingRange != null ? ceilingRange.getValue() : 
Long.MAX_VALUE;
+
+    //the ack is for a message that has already been acknowledged.
+    //This happens when a failed tuple has caused
+    //Kafka consumer to seek back to earlier position and some messages are 
replayed.
+    if ((offset >= floorBottom && offset <= floorTop)
+        || (offset >= ceilingBottom && offset <= ceilingTop)) {
+      return;
     }
-
-    @Override
-    public void open(Map<String, Object> conf, TopologyContext context, 
SpoutOutputCollector collector) {
-        this.collector = collector;
-        this.topologyContext = context;
-        this.topologyReliabilityMode = 
Config.TopologyReliabilityMode.valueOf(conf.get(Config.TOPOLOGY_RELIABILITY_MODE).toString());
-        metricsIntervalInSecs = (int) ((SystemConfig) 
SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG)).getHeronMetricsExportInterval().getSeconds();
-        consumer = kafkaConsumerFactory.create();
-        if (topicNames != null) {
-            consumer.subscribe(topicNames, new 
KafkaConsumerRebalanceListener());
-        } else {
-            consumer.subscribe(topicPatternProvider.create(), new 
KafkaConsumerRebalanceListener());
-        }
-        buffer = new ArrayDeque<>(500);
-        ackRegistry = new ConcurrentHashMap<>();
-        failureRegistry = new ConcurrentHashMap<>();
-        assignedPartitions = new HashSet<>();
-        reportedMetrics = new HashSet<>();
+    if (ceilingBottom - floorTop == 2) {
+      //the ack connects the two adjacent range
+      navigableMap.put(floorBottom, ceilingTop);
+      navigableMap.remove(ceilingBottom);
+    } else if (offset == floorTop + 1) {
+      //the acknowledged offset is the immediate neighbour
+      // of the upper bound of the floor range
+      navigableMap.put(floorBottom, offset);
+    } else if (offset == ceilingBottom - 1) {
+      //the acknowledged offset is the immediate neighbour
+      // of the lower bound of the ceiling range
+      navigableMap.remove(ceilingBottom);
+      navigableMap.put(offset, ceilingTop);
+    } else {
+      //it is a new born range
+      navigableMap.put(offset, offset);
     }
-
-    @Override
-    public void nextTuple() {
-        ConsumerRecord<K, V> record = buffer.poll();
-        if (record != null) {
-            // there are still records remaining for emission from the 
previous poll
-            emitConsumerRecord(record);
-        } else {
-            //all the records from previous poll have been emitted or this is 
very first time to poll
-            if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
-                ackRegistry.forEach((key, value) -> {
-                    if (value != null) {
-                        //seek back to the earliest failed offset if there is 
any
-                        rewindAndDiscardAck(key, value);
-                        //commit based on the first continuous acknowledgement 
range
-                        manualCommit(key, value);
-                    }
-                });
+    LOG.debug("ack {} in {} ns", msgId, System.nanoTime() - start);
+    LOG.debug("{}", 
ackRegistry.get(consumerRecordMessageId.getTopicPartition()));
+  }
+
+  @Override
+  public void fail(Object msgId) {
+    super.fail(msgId);
+    ConsumerRecordMessageId consumerRecordMessageId = 
(ConsumerRecordMessageId) msgId;
+    TopicPartition topicPartition = 
consumerRecordMessageId.getTopicPartition();
+    long offset = consumerRecordMessageId.getOffset();
+    failureRegistry.put(topicPartition, 
Math.min(failureRegistry.getOrDefault(topicPartition,
+        Long.MAX_VALUE), offset));
+    LOG.warn("fail {}", msgId);
+  }
+
+  @Override
+  public void close() {
+    consumer.close();
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    consumerRecordTransformer.getOutputStreams()
+        .forEach(s -> declarer.declareStream(s,
+            new Fields(consumerRecordTransformer.getFieldNames(s))));
+  }
+
+  private void emitConsumerRecord(ConsumerRecord<K, V> record) {
+    consumerRecordTransformer.transform(record)
+        .forEach((s, objects) -> {
+          if (topologyReliabilityMode != 
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+            collector.emit(s, objects);
+            //only in effective once mode, we need to track the offset of the 
record that is just
+            //emitted into the topology
+            if (topologyReliabilityMode
+                == Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
+              state.put(new TopicPartition(record.topic(), record.partition()),
+                  record.offset());
             }
-            poll().forEach(kvConsumerRecord -> buffer.offer(kvConsumerRecord));
-        }
+          } else {
+            //build message id based on topic, partition, offset of the 
consumer record
+            ConsumerRecordMessageId consumerRecordMessageId =
+                new ConsumerRecordMessageId(new TopicPartition(record.topic(),
+                    record.partition()), record.offset());
+            //emit tuple with the message id
+            collector.emit(s, objects, consumerRecordMessageId);
+          }
+        });
+  }
+
+  private void rewindAndDiscardAck(TopicPartition topicPartition,
+                                   NavigableMap<Long, Long> ackRanges) {
+    if (failureRegistry.containsKey(topicPartition)) {
+      long earliestFailedOffset = failureRegistry.get(topicPartition);
+      //rewind back to the earliest failed offset
+      consumer.seek(topicPartition, earliestFailedOffset);
+      //discard the ack whose offset is greater than the earliest failed 
offset if there
+      //is any because we've rewound the consumer back
+      SortedMap<Long, Long> sortedMap = 
ackRanges.headMap(earliestFailedOffset);
+      if (!sortedMap.isEmpty()) {
+        sortedMap.put(sortedMap.lastKey(), Math.min(earliestFailedOffset,
+            sortedMap.get(sortedMap.lastKey())));
+      }
+      ackRegistry.put(topicPartition, new ConcurrentSkipListMap<>(sortedMap));
+      //failure for this partition has been dealt with
+      failureRegistry.remove(topicPartition);
     }
-
-    @Override
-    public void activate() {
-        super.activate();
-        if (!assignedPartitions.isEmpty()) {
-            consumer.resume(assignedPartitions);
-        }
+  }
+
+  private void manualCommit(TopicPartition topicPartition, NavigableMap<Long, 
Long> ackRanges) {
+    //the first entry in the acknowledgement registry keeps track of the 
lowest possible
+    //offset that can be committed
+    Map.Entry<Long, Long> firstEntry = ackRanges.firstEntry();
+    if (firstEntry != null) {
+      consumer.commitAsync(Collections.singletonMap(topicPartition,
+          new OffsetAndMetadata(firstEntry.getValue() + 1)), null);
     }
-
-    @Override
-    public void deactivate() {
-        super.deactivate();
-        if (!assignedPartitions.isEmpty()) {
-            consumer.pause(assignedPartitions);
-        }
+  }
+
+  private Iterable<ConsumerRecord<K, V>> poll() {
+    ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(200));
+    if (!records.isEmpty()) {
+      //since the Kafka Consumer metrics are built gradually based on the 
partitions it consumes,
+      //we need to periodically check whether there's any new metrics to 
register after
+      //each polling.
+      if (System.currentTimeMillis() - previousKafkaMetricsUpdatedTimestamp
+          > metricsIntervalInSecs) {
+        registerConsumerMetrics();
+        previousKafkaMetricsUpdatedTimestamp = System.currentTimeMillis();
+      }
+      if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.ATMOST_ONCE) {
+        consumer.commitAsync();
+      }
+      return records;
     }
-
-    @Override
-    public void ack(Object msgId) {
-        super.ack(msgId);
-        long start = System.nanoTime();
-        ConsumerRecordMessageId consumerRecordMessageId = 
(ConsumerRecordMessageId) msgId;
-        TopicPartition topicPartition = 
consumerRecordMessageId.getTopicPartition();
-        long offset = consumerRecordMessageId.getOffset();
-        ackRegistry.putIfAbsent(topicPartition, new ConcurrentSkipListMap<>());
-        NavigableMap<Long, Long> navigableMap = 
ackRegistry.get(topicPartition);
-
-        Map.Entry<Long, Long> floorRange = navigableMap.floorEntry(offset);
-        Map.Entry<Long, Long> ceilingRange = navigableMap.ceilingEntry(offset);
-
-        long floorBottom = floorRange != null ? floorRange.getKey() : 
Long.MIN_VALUE;
-        long floorTop = floorRange != null ? floorRange.getValue() : 
Long.MIN_VALUE;
-        long ceilingBottom = ceilingRange != null ? ceilingRange.getKey() : 
Long.MAX_VALUE;
-        long ceilingTop = ceilingRange != null ? ceilingRange.getValue() : 
Long.MAX_VALUE;
-
-        /*
-          the ack is for a message that has already been acknowledged. This 
happens when a failed tuple has caused
-          Kafka consumer to seek back to earlier position and some messages 
are replayed.
-         */
-        if ((offset >= floorBottom && offset <= floorTop) || (offset >= 
ceilingBottom && offset <= ceilingTop))
-            return;
-        if (ceilingBottom - floorTop == 2) {
-            /*
-            the ack connects the two adjacent range
-             */
-            navigableMap.put(floorBottom, ceilingTop);
-            navigableMap.remove(ceilingBottom);
-        } else if (offset == floorTop + 1) {
-            /*
-            the acknowledged offset is the immediate neighbour of the upper 
bound of the floor range
-             */
-            navigableMap.put(floorBottom, offset);
-        } else if (offset == ceilingBottom - 1) {
-            /*
-            the acknowledged offset is the immediate neighbour of the lower 
bound of the ceiling range
-             */
-            navigableMap.remove(ceilingBottom);
-            navigableMap.put(offset, ceilingTop);
-        } else {
-            /*
-            it is a new born range
-             */
-            navigableMap.put(offset, offset);
-        }
-        LOG.debug("ack {} in {} ns", msgId, System.nanoTime() - start);
-        LOG.debug("{}", 
ackRegistry.get(consumerRecordMessageId.getTopicPartition()));
+    return Collections.emptyList();
+  }
+
+  private void registerConsumerMetrics() {
+    consumer.metrics().forEach((metricName, o) -> {
+      if (!reportedMetrics.contains(metricName)) {
+        reportedMetrics.add(metricName);
+        String exposedName = extractKafkaMetricName(metricName);
+        LOG.info("register Kakfa Consumer metric {}", exposedName);
+        topologyContext.registerMetric(exposedName, new 
KafkaMetricDecorator<>(o),
+            metricsIntervalInSecs);
+      }
+    });
+  }
+
+  private String extractKafkaMetricName(MetricName metricName) {
+    StringBuilder builder = new StringBuilder()
+        .append(metricName.name())
+        .append('-')
+        .append(metricName.group());
+    metricName.tags().forEach((s, s2) -> builder.append('-')
+        .append(s)
+        .append('-')
+        .append(s2));
+    LOG.info("register Kakfa Consumer metric {}", builder);
+    return builder.toString();
+  }
+
+  static class ConsumerRecordMessageId {
+    private TopicPartition topicPartition;
+    private long offset;
+
+    ConsumerRecordMessageId(TopicPartition topicPartition, long offset) {
+      this.topicPartition = topicPartition;
+      this.offset = offset;
     }
 
     @Override
-    public void fail(Object msgId) {
-        super.fail(msgId);
-        ConsumerRecordMessageId consumerRecordMessageId = 
(ConsumerRecordMessageId) msgId;
-        TopicPartition topicPartition = 
consumerRecordMessageId.getTopicPartition();
-        long offset = consumerRecordMessageId.getOffset();
-        failureRegistry.put(topicPartition, 
Math.min(failureRegistry.getOrDefault(topicPartition, Long.MAX_VALUE), offset));
-        LOG.warn("fail {}", msgId);
+    public String toString() {
+      return "ConsumerRecordMessageId{"
+          + "topicPartition=" + topicPartition
+          + ", offset=" + offset
+          + '}';
     }
 
-    @Override
-    public void close() {
-        consumer.close();
+    TopicPartition getTopicPartition() {
+      return topicPartition;
     }
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        consumerRecordTransformer.getOutputStreams()
-                .forEach(s -> declarer.declareStream(s, new 
Fields(consumerRecordTransformer.getFieldNames(s))));
+    long getOffset() {
+      return offset;
     }
 
-    private void emitConsumerRecord(ConsumerRecord<K, V> record) {
-        consumerRecordTransformer.transform(record)
-                .forEach((s, objects) -> {
-                    if (topologyReliabilityMode != 
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
-                        collector.emit(s, objects);
-                        //only in effective once mode, we need to track the 
offset of the record that is just emitted into the topology
-                        if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
-                            state.put(new TopicPartition(record.topic(), 
record.partition()), record.offset());
-                        }
-                    } else {
-                        //build message id based on topic, partition, offset 
of the consumer record
-                        ConsumerRecordMessageId consumerRecordMessageId = new 
ConsumerRecordMessageId(new TopicPartition(record.topic(), record.partition()), 
record.offset());
-                        //emit tuple with the message id
-                        collector.emit(s, objects, consumerRecordMessageId);
-                    }
-                });
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      ConsumerRecordMessageId that = (ConsumerRecordMessageId) o;
+
+      if (offset != that.offset) {
+        return false;
+      }
+      return topicPartition.equals(that.topicPartition);
     }
 
-    private void rewindAndDiscardAck(TopicPartition topicPartition, 
NavigableMap<Long, Long> ackRanges) {
-        if (failureRegistry.containsKey(topicPartition)) {
-            long earliestFailedOffset = failureRegistry.get(topicPartition);
-            //rewind back to the earliest failed offset
-            consumer.seek(topicPartition, earliestFailedOffset);
-            //discard the ack whose offset is greater than the earliest failed 
offset if there is any because we've rewound the consumer back
-            SortedMap<Long, Long> sortedMap = 
ackRanges.headMap(earliestFailedOffset);
-            if (!sortedMap.isEmpty()) {
-                sortedMap.put(sortedMap.lastKey(), 
Math.min(earliestFailedOffset, sortedMap.get(sortedMap.lastKey())));
-            }
-            ackRegistry.put(topicPartition, new 
ConcurrentSkipListMap<>(sortedMap));
-            //failure for this partition has been dealt with
-            failureRegistry.remove(topicPartition);
-        }
+    @Override
+    public int hashCode() {
+      int result = topicPartition.hashCode();
+      result = 31 * result + (int) (offset ^ (offset >>> 32));
+      return result;
     }
+  }
 
-    private void manualCommit(TopicPartition topicPartition, 
NavigableMap<Long, Long> ackRanges) {
-        //the first entry in the acknowledgement registry keeps track of the 
lowest possible offset that can be committed
-        Map.Entry<Long, Long> firstEntry = ackRanges.firstEntry();
-        if (firstEntry != null) {
-            consumer.commitAsync(Collections.singletonMap(topicPartition, new 
OffsetAndMetadata(firstEntry.getValue() + 1)), null);
-        }
-    }
+  class KafkaConsumerRebalanceListener implements ConsumerRebalanceListener {
 
-    private Iterable<ConsumerRecord<K, V>> poll() {
-        ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(200));
-        if (!records.isEmpty()) {
-            /*
-            since the Kafka Consumer metrics are built gradually based on the 
partitions it consumes,
-            we need to periodically check whether there's any new metrics to 
register after each polling.
-             */
-            if (System.currentTimeMillis() - 
previousKafkaMetricsUpdatedTimestamp > metricsIntervalInSecs) {
-                registerConsumerMetrics();
-                previousKafkaMetricsUpdatedTimestamp = 
System.currentTimeMillis();
-            }
-            if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.ATMOST_ONCE) {
-                consumer.commitAsync();
-            }
-            return records;
-        }
-        return Collections.emptyList();
-    }
-
-    private void registerConsumerMetrics() {
-        consumer.metrics().forEach((metricName, o) -> {
-            if (!reportedMetrics.contains(metricName)) {
-                reportedMetrics.add(metricName);
-                String exposedName = extractKafkaMetricName(metricName);
-                LOG.info("register Kakfa Consumer metric {}", exposedName);
-                topologyContext.registerMetric(exposedName, new 
KafkaMetricDecorator<>(o), metricsIntervalInSecs);
+    @Override
+    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
+      assignedPartitions.removeAll(collection);
+      if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+        collection.forEach(topicPartition -> {
+          NavigableMap<Long, Long> navigableMap = 
ackRegistry.remove(topicPartition);
+          if (navigableMap != null) {
+            Map.Entry<Long, Long> entry = navigableMap.firstEntry();
+            if (entry != null) {
+              consumer.commitAsync(Collections.singletonMap(topicPartition,
+                  new OffsetAndMetadata(Math.min(
+                      failureRegistry.getOrDefault(topicPartition, 
Long.MAX_VALUE),
+                      entry.getValue()) + 1)), null);
             }
+          }
+          failureRegistry.remove(topicPartition);
         });
+      } else if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
+        collection.forEach(topicPartition -> state.remove(topicPartition));
+      }
     }
 
-    private String extractKafkaMetricName(MetricName metricName) {
-        StringBuilder builder = new StringBuilder()
-                .append(metricName.name())
-                .append('-')
-                .append(metricName.group());
-        metricName.tags().forEach((s, s2) -> builder.append('-')
-                .append(s)
-                .append('-')
-                .append(s2));
-        LOG.info("register Kakfa Consumer metric {}", builder);
-        return builder.toString();
-    }
-
-    static class ConsumerRecordMessageId {
-        private TopicPartition topicPartition;
-        private long offset;
-
-        ConsumerRecordMessageId(TopicPartition topicPartition, long offset) {
-            this.topicPartition = topicPartition;
-            this.offset = offset;
-        }
-
-        @Override
-        public String toString() {
-            return "ConsumerRecordMessageId{" +
-                    "topicPartition=" + topicPartition +
-                    ", offset=" + offset +
-                    '}';
-        }
-
-        TopicPartition getTopicPartition() {
-            return topicPartition;
-        }
-
-        long getOffset() {
-            return offset;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-
-            ConsumerRecordMessageId that = (ConsumerRecordMessageId) o;
-
-            if (offset != that.offset) return false;
-            return topicPartition.equals(that.topicPartition);
-        }
-
-        @Override
-        public int hashCode() {
-            int result = topicPartition.hashCode();
-            result = 31 * result + (int) (offset ^ (offset >>> 32));
-            return result;
-        }
-    }
-
-    class KafkaConsumerRebalanceListener implements ConsumerRebalanceListener {
-
-        @Override
-        public void onPartitionsRevoked(Collection<TopicPartition> collection) 
{
-            assignedPartitions.removeAll(collection);
-            if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
-                collection.forEach(topicPartition -> {
-                    NavigableMap<Long, Long> navigableMap = 
ackRegistry.remove(topicPartition);
-                    if (navigableMap != null) {
-                        Map.Entry<Long, Long> entry = 
navigableMap.firstEntry();
-                        if (entry != null) {
-                            
consumer.commitAsync(Collections.singletonMap(topicPartition, new 
OffsetAndMetadata(Math.min(failureRegistry.getOrDefault(topicPartition, 
Long.MAX_VALUE), entry.getValue()) + 1)), null);
-                        }
-                    }
-                    failureRegistry.remove(topicPartition);
-                });
-            } else if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
-                collection.forEach(topicPartition -> 
state.remove(topicPartition));
-            }
-        }
-
-        @Override
-        public void onPartitionsAssigned(Collection<TopicPartition> 
collection) {
-            assignedPartitions.addAll(collection);
-            if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
-                collection.forEach(topicPartition -> {
-                    try {
-                        long nextRecordPosition = 
consumer.position(topicPartition, Duration.ofSeconds(5));
-                        ackRegistry.put(topicPartition, new 
ConcurrentSkipListMap<>(Collections.singletonMap(nextRecordPosition - 1, 
nextRecordPosition - 1)));
-                    } catch (TimeoutException e) {
-                        LOG.warn("can not get the position of the next record 
to consume for partition {}", topicPartition);
-                        ackRegistry.remove(topicPartition);
-                    }
-                    failureRegistry.remove(topicPartition);
-                });
-            } else if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
-                collection.forEach(topicPartition -> {
-                    if (state.containsKey(topicPartition)) {
-                        consumer.seek(topicPartition, 
state.get(topicPartition));
-                    }
-                });
-            }
-        }
+    @Override
+    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
+      assignedPartitions.addAll(collection);
+      if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+        collection.forEach(topicPartition -> {
+          try {
+            long nextRecordPosition = consumer.position(topicPartition,
+                Duration.ofSeconds(5));
+            ackRegistry.put(topicPartition, new ConcurrentSkipListMap<>(
+                Collections.singletonMap(nextRecordPosition - 1, 
nextRecordPosition - 1)
+            ));
+          } catch (TimeoutException e) {
+            LOG.warn("can not get the position of the next record to consume 
for partition {}",
+                topicPartition);
+            ackRegistry.remove(topicPartition);
+          }
+          failureRegistry.remove(topicPartition);
+        });
+      } else if (topologyReliabilityMode == 
Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
+        collection.forEach(topicPartition -> {
+          if (state.containsKey(topicPartition)) {
+            consumer.seek(topicPartition, state.get(topicPartition));
+          }
+        });
+      }
     }
+  }
 }
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/TopicPatternProvider.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/TopicPatternProvider.java
index 1448609..43d6c6a 100644
--- 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/TopicPatternProvider.java
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java/org/apache/heron/spouts/kafka/TopicPatternProvider.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,8 +28,10 @@ import java.util.regex.Pattern;
  */
 public interface TopicPatternProvider extends Serializable {
 
-    /**
-     * @return a matching pattern for topics to subscribe to
-     */
-    Pattern create();
+  /**
+   * create the topic matching regex pattern
+   *
+   * @return a matching pattern for topics to subscribe to
+   */
+  Pattern create();
 }
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/BUILD
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/BUILD
new file mode 100644
index 0000000..1d9c400
--- /dev/null
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/BUILD
@@ -0,0 +1,44 @@
+heron_kafka_spouts_test_dep = [
+    
"//contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/main/java:heron-kafka-spout-java",
+    "//heron/api/src/java:api-java-low-level",
+    "//heron/common/src/java:basics-java",
+    "//heron/common/src/java:config-java",
+    "//third_party/java:junit4",
+    "@org_apache_kafka_kafka_clients//jar",
+    "@org_mockito_mockito_all//jar",
+]
+
+java_test(
+    name = "KafkaSpoutTest",
+    srcs = ["org/apache/heron/spouts/kafka/KafkaSpoutTest.java"],
+    test_class = "org.apache.heron.spouts.kafka.KafkaSpoutTest",
+    deps = heron_kafka_spouts_test_dep,
+)
+
+java_test(
+    name = "KafkaMetricDecoratorTest",
+    srcs = ["org/apache/heron/spouts/kafka/KafkaMetricDecoratorTest.java"],
+    test_class = "org.apache.heron.spouts.kafka.KafkaMetricDecoratorTest",
+    deps = heron_kafka_spouts_test_dep,
+)
+
+java_test(
+    name = "DefaultTopicPatternProviderTest",
+    srcs = 
["org/apache/heron/spouts/kafka/DefaultTopicPatternProviderTest.java"],
+    test_class = 
"org.apache.heron.spouts.kafka.DefaultTopicPatternProviderTest",
+    deps = heron_kafka_spouts_test_dep,
+)
+
+java_test(
+    name = "DefaultKafkaConsumerFactoryTest",
+    srcs = 
["org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactoryTest.java"],
+    test_class = 
"org.apache.heron.spouts.kafka.DefaultKafkaConsumerFactoryTest",
+    deps = heron_kafka_spouts_test_dep,
+)
+
+java_test(
+    name = "DefaultConsumerRecordTransformerTest",
+    srcs = 
["org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformerTest.java"],
+    test_class = 
"org.apache.heron.spouts.kafka.DefaultConsumerRecordTransformerTest",
+    deps = heron_kafka_spouts_test_dep,
+)
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformerTest.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformerTest.java
index c9d676e..69158b6 100644
--- 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformerTest.java
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultConsumerRecordTransformerTest.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,40 +18,45 @@
 
 package org.apache.heron.spouts.kafka;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-class DefaultConsumerRecordTransformerTest {
-    private static final String DEFAULT_STREAM = "default";
-    private ConsumerRecordTransformer<String, byte[]> 
consumerRecordTransformer;
-
-    @BeforeEach
-    void setUp() {
-        consumerRecordTransformer = new DefaultConsumerRecordTransformer<>();
-    }
-
-    @Test
-    void getOutputStreams() {
-        assertEquals(Collections.singletonList(DEFAULT_STREAM), 
consumerRecordTransformer.getOutputStreams());
-    }
-
-    @Test
-    void getFieldNames() {
-        assertEquals(Arrays.asList("key", "value"), 
consumerRecordTransformer.getFieldNames(DEFAULT_STREAM));
-    }
-
-    @Test
-    void transform() {
-        ConsumerRecord<String, byte[]> consumerRecord = new 
ConsumerRecord<>("partition", 0, 0, "key", new byte[]{0x1, 0x2, 0x3});
-        Map<String, List<Object>> expected = 
Collections.singletonMap(DEFAULT_STREAM, Arrays.asList(consumerRecord.key(), 
consumerRecord.value()));
-        assertEquals(expected, 
consumerRecordTransformer.transform(consumerRecord));
-    }
-}
\ No newline at end of file
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import static org.junit.Assert.assertEquals;
+
+public class DefaultConsumerRecordTransformerTest {
+  private static final String DEFAULT_STREAM = "default";
+  private ConsumerRecordTransformer<String, byte[]> consumerRecordTransformer;
+
+  @Before
+  public void setUp() {
+    consumerRecordTransformer = new DefaultConsumerRecordTransformer<>();
+  }
+
+  @Test
+  public void getOutputStreams() {
+    assertEquals(Collections.singletonList(DEFAULT_STREAM),
+        consumerRecordTransformer.getOutputStreams());
+  }
+
+  @Test
+  public void getFieldNames() {
+    assertEquals(Arrays.asList("key", "value"),
+        consumerRecordTransformer.getFieldNames(DEFAULT_STREAM));
+  }
+
+  @Test
+  public void transform() {
+    ConsumerRecord<String, byte[]> consumerRecord = new 
ConsumerRecord<>("partition", 0,
+        0, "key", new byte[]{0x1, 0x2, 0x3});
+    Map<String, List<Object>> expected = 
Collections.singletonMap(DEFAULT_STREAM,
+        Arrays.asList(consumerRecord.key(), consumerRecord.value()));
+    assertEquals(expected, 
consumerRecordTransformer.transform(consumerRecord));
+  }
+}
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactoryTest.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactoryTest.java
index 205053c..1a6a187 100644
--- 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactoryTest.java
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultKafkaConsumerFactoryTest.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,33 +18,35 @@
 
 package org.apache.heron.spouts.kafka;
 
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 
-class DefaultKafkaConsumerFactoryTest {
-    private KafkaConsumerFactory<String, byte[]> kafkaConsumerFactory;
+import static org.junit.Assert.assertTrue;
 
-    @BeforeEach
-    void setUp() {
-        Map<String, Object> config = new HashMap<>();
-        config.put("bootstrap.servers", "localhost:9092");
-        config.put("group.id", "tower-kafka-spout");
-        config.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
-        config.put("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(config);
-    }
+public class DefaultKafkaConsumerFactoryTest {
+  private KafkaConsumerFactory<String, byte[]> kafkaConsumerFactory;
+
+  @Before
+  public void setUp() {
+    Map<String, Object> config = new HashMap<>();
+    config.put("bootstrap.servers", "localhost:9092");
+    config.put("group.id", "tower-kafka-spout");
+    config.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+    config.put("value.deserializer",
+        "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+    kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(config);
+  }
 
-    @Test
-    void create() {
-        try (Consumer<String, byte[]> consumer = 
kafkaConsumerFactory.create()) {
-            assertTrue(consumer instanceof KafkaConsumer);
-        }
+  @Test
+  public void create() {
+    try (Consumer<String, byte[]> consumer = kafkaConsumerFactory.create()) {
+      assertTrue(consumer instanceof KafkaConsumer);
     }
-}
\ No newline at end of file
+  }
+}
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProviderTest.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProviderTest.java
index 21449e4..118426f 100644
--- 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProviderTest.java
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/DefaultTopicPatternProviderTest.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,16 +18,17 @@
 
 package org.apache.heron.spouts.kafka;
 
-import org.junit.jupiter.api.Test;
-
 import java.util.regex.Pattern;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
 
-class DefaultTopicPatternProviderTest {
+public class DefaultTopicPatternProviderTest {
 
-    @Test
-    void create() {
-        assertEquals(Pattern.compile("a").pattern(), new 
DefaultTopicPatternProvider("a").create().pattern());
-    }
-}
\ No newline at end of file
+  @Test
+  public void create() {
+    assertEquals(Pattern.compile("a").pattern(),
+        new DefaultTopicPatternProvider("a").create().pattern());
+  }
+}
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaMetricDecoratorTest.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaMetricDecoratorTest.java
index d1bb516..35bd956 100644
--- 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaMetricDecoratorTest.java
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaMetricDecoratorTest.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,28 +18,29 @@
 
 package org.apache.heron.spouts.kafka;
 
-import org.apache.kafka.common.Metric;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import org.apache.kafka.common.Metric;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.when;
 
-@ExtendWith(MockitoExtension.class)
-class KafkaMetricDecoratorTest {
-    @Mock
-    private Metric metric;
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaMetricDecoratorTest {
+  @Mock
+  private Metric metric;
 
-    @BeforeEach
-    void setUp() {
-        when(metric.metricValue()).thenReturn("dummy value");
-    }
+  @Before
+  public void setUp() {
+    when(metric.metricValue()).thenReturn("dummy value");
+  }
 
-    @Test
-    void getValueAndReset() {
-        assertEquals("dummy value", new 
KafkaMetricDecorator<>(metric).getValueAndReset());
-    }
-}
\ No newline at end of file
+  @Test
+  public void getValueAndReset() {
+    assertEquals("dummy value", new 
KafkaMetricDecorator<>(metric).getValueAndReset());
+  }
+}
diff --git 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaSpoutTest.java
 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaSpoutTest.java
index 3fa026e..dce386a 100644
--- 
a/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaSpoutTest.java
+++ 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/heron-kafka-spout/src/test/java/org/apache/heron/spouts/kafka/KafkaSpoutTest.java
@@ -1,12 +1,14 @@
-/*
- * Copyright 2019
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,247 +18,299 @@
 
 package org.apache.heron.spouts.kafka;
 
-import com.twitter.heron.api.Config;
-import com.twitter.heron.api.metric.IMetric;
-import com.twitter.heron.api.spout.SpoutOutputCollector;
-import com.twitter.heron.api.topology.OutputFieldsDeclarer;
-import com.twitter.heron.api.topology.TopologyContext;
-import com.twitter.heron.api.tuple.Fields;
-import com.twitter.heron.common.basics.SingletonRegistry;
-import com.twitter.heron.common.config.SystemConfig;
-import com.twitter.heron.common.config.SystemConfigKey;
-import org.apache.kafka.clients.consumer.*;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.regex.Pattern;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.runners.MockitoJUnitRunner;
 
-import java.time.Duration;
-import java.util.*;
-import java.util.regex.Pattern;
+import org.apache.heron.api.Config;
+import org.apache.heron.api.metric.IMetric;
+import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.common.basics.SingletonRegistry;
+import org.apache.heron.common.config.SystemConfig;
+import org.apache.heron.common.config.SystemConfigKey;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
 
-import static 
com.twitter.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE;
-import static com.twitter.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE;
-import static org.junit.jupiter.api.Assertions.*;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.*;
+import static org.apache.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE;
+import static org.apache.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE;
 
-@ExtendWith(MockitoExtension.class)
-class KafkaSpoutTest {
-    private static final Random random = new Random();
-    private static final String DUMMY_TOPIC_NAME = "topic";
-    private KafkaSpout<String, byte[]> kafkaSpout;
-    @Mock
-    private KafkaConsumerFactory<String, byte[]> kafkaConsumerFactory;
-    @Mock
-    private Consumer<String, byte[]> consumer;
-    @Mock
-    private TopologyContext topologyContext;
-    @Mock
-    private SpoutOutputCollector collector;
-    @Mock
-    private Metric metric;
-    @Captor
-    private ArgumentCaptor<Pattern> patternArgumentCaptor;
-    @Captor
-    private ArgumentCaptor<IMetric<Object>> kafkaMetricDecoratorArgumentCaptor;
-    @Mock
-    private OutputFieldsDeclarer declarer;
-    @Captor
-    private ArgumentCaptor<Fields> fieldsArgumentCaptor;
-    @Captor
-    private ArgumentCaptor<List<Object>> listArgumentCaptor;
-    @Captor
-    private ArgumentCaptor<ConsumerRebalanceListener> 
consumerRebalanceListenerArgumentCaptor;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
-    @BeforeAll
-    static void setUpAll() {
-        if 
(!SingletonRegistry.INSTANCE.containsSingleton(SystemConfig.HERON_SYSTEM_CONFIG))
 {
-            
SingletonRegistry.INSTANCE.registerSingleton(SystemConfig.HERON_SYSTEM_CONFIG, 
SystemConfig.newBuilder(true)
-                    .put(SystemConfigKey.HERON_METRICS_EXPORT_INTERVAL, 60)
-                    .build());
-        }
-    }
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-    @BeforeEach
-    void setUp() {
-        kafkaSpout = new KafkaSpout<>(kafkaConsumerFactory, 
Collections.singleton(DUMMY_TOPIC_NAME));
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaSpoutTest {
+  private static final Random RANDOM = new Random();
+  private static final String DUMMY_TOPIC_NAME = "topic";
+  private KafkaSpout<String, byte[]> kafkaSpout;
+  @Mock
+  private KafkaConsumerFactory<String, byte[]> kafkaConsumerFactory;
+  @Mock
+  private Consumer<String, byte[]> consumer;
+  @Mock
+  private TopologyContext topologyContext;
+  @Mock
+  private SpoutOutputCollector collector;
+  @Mock
+  private Metric metric;
+  @Captor
+  private ArgumentCaptor<Pattern> patternArgumentCaptor;
+  @Captor
+  private ArgumentCaptor<IMetric<Object>> kafkaMetricDecoratorArgumentCaptor;
+  @Mock
+  private OutputFieldsDeclarer declarer;
+  @Captor
+  private ArgumentCaptor<Fields> fieldsArgumentCaptor;
+  @Captor
+  private ArgumentCaptor<List<Object>> listArgumentCaptor;
+  @Captor
+  private ArgumentCaptor<ConsumerRebalanceListener> 
consumerRebalanceListenerArgumentCaptor;
+
+  @BeforeClass
+  public static void setUpAll() {
+    if 
(!SingletonRegistry.INSTANCE.containsSingleton(SystemConfig.HERON_SYSTEM_CONFIG))
 {
+      
SingletonRegistry.INSTANCE.registerSingleton(SystemConfig.HERON_SYSTEM_CONFIG,
+          SystemConfig.newBuilder(true)
+              .put(SystemConfigKey.HERON_METRICS_EXPORT_INTERVAL, 60)
+              .build());
     }
+  }
 
-    @Test
-    void getConsumerRecordTransformer() {
-        assertTrue(kafkaSpout.getConsumerRecordTransformer() instanceof 
DefaultConsumerRecordTransformer);
+  @Before
+  public void setUp() {
+    kafkaSpout = new KafkaSpout<>(kafkaConsumerFactory,
+        Collections.singleton(DUMMY_TOPIC_NAME));
+  }
 
-    }
+  @Test
+  public void getConsumerRecordTransformer() {
+    assertTrue(kafkaSpout.getConsumerRecordTransformer()
+        instanceof DefaultConsumerRecordTransformer);
 
-    @Test
-    void setConsumerRecordTransformer() {
-        ConsumerRecordTransformer<String, byte[]> consumerRecordTransformer = 
new DefaultConsumerRecordTransformer<>();
-        kafkaSpout.setConsumerRecordTransformer(consumerRecordTransformer);
-        assertEquals(consumerRecordTransformer, 
kafkaSpout.getConsumerRecordTransformer());
-    }
+  }
 
-    @Test
-    void open() {
-        when(kafkaConsumerFactory.create()).thenReturn(consumer);
+  @Test
+  public void setConsumerRecordTransformer() {
+    ConsumerRecordTransformer<String, byte[]> consumerRecordTransformer =
+        new DefaultConsumerRecordTransformer<>();
+    kafkaSpout.setConsumerRecordTransformer(consumerRecordTransformer);
+    assertEquals(consumerRecordTransformer, 
kafkaSpout.getConsumerRecordTransformer());
+  }
 
-        
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, 
ATMOST_ONCE.name()), topologyContext, collector);
-        
verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)), 
any(KafkaSpout.KafkaConsumerRebalanceListener.class));
+  @Test
+  public void open() {
+    when(kafkaConsumerFactory.create()).thenReturn(consumer);
 
-        kafkaSpout = new KafkaSpout<>(kafkaConsumerFactory, new 
DefaultTopicPatternProvider("a"));
-        
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, 
ATMOST_ONCE.name()), topologyContext, collector);
-        verify(consumer).subscribe(patternArgumentCaptor.capture(), 
any(KafkaSpout.KafkaConsumerRebalanceListener.class));
-        assertEquals("a", patternArgumentCaptor.getValue().pattern());
-    }
+    kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
+        ATMOST_ONCE.name()), topologyContext, collector);
+    verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
+        any(KafkaSpout.KafkaConsumerRebalanceListener.class));
 
-    @Test
-    void nextTuple() {
-        when(kafkaConsumerFactory.create()).thenReturn(consumer);
-        ConsumerRecords<String, byte[]> consumerRecords = new 
ConsumerRecords<>(Collections.singletonMap(new TopicPartition(DUMMY_TOPIC_NAME, 
0), Collections.singletonList(new ConsumerRecord<>(DUMMY_TOPIC_NAME, 0, 0, 
"key", new byte[]{0xF}))));
-        when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
-        doReturn(Collections.singletonMap(new MetricName("name", "group", 
"description", Collections.singletonMap("name", "value")), 
metric)).when(consumer).metrics();
-        when(metric.metricValue()).thenReturn("sample value");
+    kafkaSpout = new KafkaSpout<>(kafkaConsumerFactory,
+        new DefaultTopicPatternProvider("a"));
+    kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
+        ATMOST_ONCE.name()), topologyContext, collector);
+    verify(consumer).subscribe(patternArgumentCaptor.capture(),
+        any(KafkaSpout.KafkaConsumerRebalanceListener.class));
+    assertEquals("a", patternArgumentCaptor.getValue().pattern());
+  }
 
-        
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, 
ATMOST_ONCE.name()), topologyContext, collector);
-        
verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)), 
consumerRebalanceListenerArgumentCaptor.capture());
-        ConsumerRebalanceListener consumerRebalanceListener = 
consumerRebalanceListenerArgumentCaptor.getValue();
-        TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 
0);
-        
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
+  @Test
+  public void nextTuple() {
+    when(kafkaConsumerFactory.create()).thenReturn(consumer);
+    ConsumerRecords<String, byte[]> consumerRecords = new ConsumerRecords<>(
+        Collections.singletonMap(new TopicPartition(DUMMY_TOPIC_NAME, 0),
+            Collections.singletonList(new ConsumerRecord<>(DUMMY_TOPIC_NAME, 
0, 0,
+                "key", new byte[]{0xF}))));
+    when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+    doReturn(Collections.singletonMap(new MetricName("name", "group", 
"description",
+        Collections.singletonMap("name", "value")), 
metric)).when(consumer).metrics();
+    when(metric.metricValue()).thenReturn("sample value");
 
-        kafkaSpout.nextTuple();
-        verify(consumer).commitAsync();
-        verify(topologyContext).registerMetric(eq("name-group-name-value"), 
kafkaMetricDecoratorArgumentCaptor.capture(), eq(60));
-        assertEquals("sample value", 
kafkaMetricDecoratorArgumentCaptor.getValue().getValueAndReset());
+    kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
+        ATMOST_ONCE.name()), topologyContext, collector);
+    verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
+        consumerRebalanceListenerArgumentCaptor.capture());
+    ConsumerRebalanceListener consumerRebalanceListener =
+        consumerRebalanceListenerArgumentCaptor.getValue();
+    TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 0);
+    
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
 
-        kafkaSpout.nextTuple();
-        verify(collector).emit(eq("default"), listArgumentCaptor.capture());
-        assertEquals("key", listArgumentCaptor.getValue().get(0));
-        assertArrayEquals(new byte[]{0xF}, (byte[]) 
listArgumentCaptor.getValue().get(1));
-    }
+    kafkaSpout.nextTuple();
+    verify(consumer).commitAsync();
+    verify(topologyContext).registerMetric(eq("name-group-name-value"),
+        kafkaMetricDecoratorArgumentCaptor.capture(), eq(60));
+    assertEquals("sample value",
+        kafkaMetricDecoratorArgumentCaptor.getValue().getValueAndReset());
 
-    @Test
-    void ack() {
-        when(kafkaConsumerFactory.create()).thenReturn(consumer);
-        TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 
0);
-        List<ConsumerRecord<String, byte[]>> recordList = new ArrayList<>();
-        byte[] randomBytes = new byte[1];
-        for (int i = 0; i < 5; i++) {
-            random.nextBytes(randomBytes);
-            recordList.add(new ConsumerRecord<>(DUMMY_TOPIC_NAME, 0, i, "key", 
Arrays.copyOf(randomBytes, randomBytes.length)));
-        }
-        ConsumerRecords<String, byte[]> consumerRecords = new 
ConsumerRecords<>(Collections.singletonMap(topicPartition, recordList));
-        when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
+    kafkaSpout.nextTuple();
+    verify(collector).emit(eq("default"), listArgumentCaptor.capture());
+    assertEquals("key", listArgumentCaptor.getValue().get(0));
+    assertArrayEquals(new byte[]{0xF}, (byte[]) 
listArgumentCaptor.getValue().get(1));
+  }
 
-        
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, 
ATLEAST_ONCE.name()), topologyContext, collector);
-        //poll the topic
-        kafkaSpout.nextTuple();
-        //emit all of the five records
-        for (int i = 0; i < 5; i++) {
-            kafkaSpout.nextTuple();
-        }
-        //ack came in out of order and the third record is not acknowledged
-        kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 
4));
-        kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 
0));
-        kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 
1));
-        kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 
3));
-        //commit and poll
-        kafkaSpout.nextTuple();
-        verify(consumer).commitAsync(Collections.singletonMap(topicPartition, 
new OffsetAndMetadata(2)), null);
+  @Test
+  public void ack() {
+    when(kafkaConsumerFactory.create()).thenReturn(consumer);
+    TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 0);
+    List<ConsumerRecord<String, byte[]>> recordList = new ArrayList<>();
+    byte[] randomBytes = new byte[1];
+    for (int i = 0; i < 5; i++) {
+      RANDOM.nextBytes(randomBytes);
+      recordList.add(new ConsumerRecord<>(DUMMY_TOPIC_NAME, 0, i, "key",
+          Arrays.copyOf(randomBytes, randomBytes.length)));
     }
+    ConsumerRecords<String, byte[]> consumerRecords = new ConsumerRecords<>(
+        Collections.singletonMap(topicPartition, recordList));
+    when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
 
-    @Test
-    void fail() {
-        when(kafkaConsumerFactory.create()).thenReturn(consumer);
-        TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 
0);
-        List<ConsumerRecord<String, byte[]>> recordList = new ArrayList<>();
-        byte[] randomBytes = new byte[1];
-        for (int i = 0; i < 5; i++) {
-            random.nextBytes(randomBytes);
-            recordList.add(new ConsumerRecord<>(DUMMY_TOPIC_NAME, 0, i, "key", 
Arrays.copyOf(randomBytes, randomBytes.length)));
-        }
-        ConsumerRecords<String, byte[]> consumerRecords = new 
ConsumerRecords<>(Collections.singletonMap(topicPartition, recordList));
-        when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
-
-        
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, 
ATLEAST_ONCE.name()), topologyContext, collector);
-        //poll the topic
-        kafkaSpout.nextTuple();
-        //emit all of the five records
-        for (int i = 0; i < 5; i++) {
-            kafkaSpout.nextTuple();
-        }
-        //ack came in out of order, second and third record fails
-        kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 
4));
-        kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 
0));
-        kafkaSpout.fail(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 
1));
-        kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 
3));
-        kafkaSpout.fail(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 
2));
-        //commit and poll
-        kafkaSpout.nextTuple();
-        verify(consumer).seek(topicPartition, 1);
-        verify(consumer).commitAsync(Collections.singletonMap(topicPartition, 
new OffsetAndMetadata(1)), null);
+    kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
+        ATLEAST_ONCE.name()), topologyContext, collector);
+    //poll the topic
+    kafkaSpout.nextTuple();
+    //emit all of the five records
+    for (int i = 0; i < 5; i++) {
+      kafkaSpout.nextTuple();
     }
+    //ack came in out of order and the third record is not acknowledged
+    kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 4));
+    kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 0));
+    kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 1));
+    kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 3));
+    //commit and poll
+    kafkaSpout.nextTuple();
+    verify(consumer).commitAsync(Collections.singletonMap(topicPartition,
+        new OffsetAndMetadata(2)), null);
+  }
 
-    @Test
-    void close() {
-        when(kafkaConsumerFactory.create()).thenReturn(consumer);
-        
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, 
ATMOST_ONCE.name()), topologyContext, collector);
-        kafkaSpout.close();
-        verify(consumer).close();
+  @Test
+  public void fail() {
+    when(kafkaConsumerFactory.create()).thenReturn(consumer);
+    TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 0);
+    List<ConsumerRecord<String, byte[]>> recordList = new ArrayList<>();
+    byte[] randomBytes = new byte[1];
+    for (int i = 0; i < 5; i++) {
+      RANDOM.nextBytes(randomBytes);
+      recordList.add(new ConsumerRecord<>(DUMMY_TOPIC_NAME, 0, i, "key",
+          Arrays.copyOf(randomBytes, randomBytes.length)));
     }
+    ConsumerRecords<String, byte[]> consumerRecords = new ConsumerRecords<>(
+        Collections.singletonMap(topicPartition, recordList));
+    when(consumer.poll(any(Duration.class))).thenReturn(consumerRecords);
 
-    @Test
-    void declareOutputFields() {
-        kafkaSpout.declareOutputFields(declarer);
-        verify(declarer).declareStream(eq("default"), 
fieldsArgumentCaptor.capture());
-        assertEquals(Arrays.asList("key", "value"), 
fieldsArgumentCaptor.getValue().toList());
+    kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
+        ATLEAST_ONCE.name()), topologyContext, collector);
+    //poll the topic
+    kafkaSpout.nextTuple();
+    //emit all of the five records
+    for (int i = 0; i < 5; i++) {
+      kafkaSpout.nextTuple();
     }
+    //ack came in out of order, second and third record fails
+    kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 4));
+    kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 0));
+    kafkaSpout.fail(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 1));
+    kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 3));
+    kafkaSpout.fail(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 2));
+    //commit and poll
+    kafkaSpout.nextTuple();
+    verify(consumer).seek(topicPartition, 1);
+    verify(consumer).commitAsync(Collections.singletonMap(topicPartition,
+        new OffsetAndMetadata(1)), null);
+  }
 
-    @Test
-    void consumerRebalanceListener() {
-        when(kafkaConsumerFactory.create()).thenReturn(consumer);
+  @Test
+  public void close() {
+    when(kafkaConsumerFactory.create()).thenReturn(consumer);
+    kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
+        ATMOST_ONCE.name()), topologyContext, collector);
+    kafkaSpout.close();
+    verify(consumer).close();
+  }
 
-        
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, 
ATLEAST_ONCE.name()), topologyContext, collector);
-        
verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)), 
consumerRebalanceListenerArgumentCaptor.capture());
-        ConsumerRebalanceListener consumerRebalanceListener = 
consumerRebalanceListenerArgumentCaptor.getValue();
-        TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 
0);
-        
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
-        verify(consumer).position(topicPartition, Duration.ofSeconds(5));
+  @Test
+  public void declareOutputFields() {
+    kafkaSpout.declareOutputFields(declarer);
+    verify(declarer).declareStream(eq("default"), 
fieldsArgumentCaptor.capture());
+    assertEquals(Arrays.asList("key", "value"), 
fieldsArgumentCaptor.getValue().toList());
+  }
 
-        kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 
0));
-        kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 
1));
-        
consumerRebalanceListener.onPartitionsRevoked(Collections.singleton(topicPartition));
-        verify(consumer).commitAsync(Collections.singletonMap(topicPartition, 
new OffsetAndMetadata(2)), null);
-    }
+  @Test
+  public void consumerRebalanceListener() {
+    when(kafkaConsumerFactory.create()).thenReturn(consumer);
 
-    @Test
-    void activate() {
-        when(kafkaConsumerFactory.create()).thenReturn(consumer);
-        
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, 
ATMOST_ONCE.name()), topologyContext, collector);
-        
verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)), 
consumerRebalanceListenerArgumentCaptor.capture());
-        ConsumerRebalanceListener consumerRebalanceListener = 
consumerRebalanceListenerArgumentCaptor.getValue();
-        TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 
0);
-        
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
-        kafkaSpout.activate();
-        verify(consumer).resume(Collections.singleton(topicPartition));
-    }
+    kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
+        ATLEAST_ONCE.name()), topologyContext, collector);
+    verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
+        consumerRebalanceListenerArgumentCaptor.capture());
+    ConsumerRebalanceListener consumerRebalanceListener =
+        consumerRebalanceListenerArgumentCaptor.getValue();
+    TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 0);
+    
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
+    verify(consumer).position(topicPartition, Duration.ofSeconds(5));
 
-    @Test
-    void deactivate() {
-        when(kafkaConsumerFactory.create()).thenReturn(consumer);
-        
kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, 
ATMOST_ONCE.name()), topologyContext, collector);
-        
verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)), 
consumerRebalanceListenerArgumentCaptor.capture());
-        ConsumerRebalanceListener consumerRebalanceListener = 
consumerRebalanceListenerArgumentCaptor.getValue();
-        TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 
0);
-        
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
-        kafkaSpout.deactivate();
-        verify(consumer).pause(Collections.singleton(topicPartition));
-    }
-}
\ No newline at end of file
+    kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 0));
+    kafkaSpout.ack(new KafkaSpout.ConsumerRecordMessageId(topicPartition, 1));
+    
consumerRebalanceListener.onPartitionsRevoked(Collections.singleton(topicPartition));
+    verify(consumer).commitAsync(Collections.singletonMap(topicPartition,
+        new OffsetAndMetadata(2)), null);
+  }
+
+  @Test
+  public void activate() {
+    when(kafkaConsumerFactory.create()).thenReturn(consumer);
+    kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
+        ATMOST_ONCE.name()), topologyContext, collector);
+    verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
+        consumerRebalanceListenerArgumentCaptor.capture());
+    ConsumerRebalanceListener consumerRebalanceListener =
+        consumerRebalanceListenerArgumentCaptor.getValue();
+    TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 0);
+    
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
+    kafkaSpout.activate();
+    verify(consumer).resume(Collections.singleton(topicPartition));
+  }
+
+  @Test
+  public void deactivate() {
+    when(kafkaConsumerFactory.create()).thenReturn(consumer);
+    kafkaSpout.open(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE,
+        ATMOST_ONCE.name()), topologyContext, collector);
+    verify(consumer).subscribe(eq(Collections.singleton(DUMMY_TOPIC_NAME)),
+        consumerRebalanceListenerArgumentCaptor.capture());
+    ConsumerRebalanceListener consumerRebalanceListener =
+        consumerRebalanceListenerArgumentCaptor.getValue();
+    TopicPartition topicPartition = new TopicPartition(DUMMY_TOPIC_NAME, 0);
+    
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(topicPartition));
+    kafkaSpout.deactivate();
+    verify(consumer).pause(Collections.singleton(topicPartition));
+  }
+}
diff --git a/contrib/spouts/kafka/java/heron-kafka-spout-parent/pom.xml 
b/contrib/spouts/kafka/java/heron-kafka-spout-parent/pom.xml
deleted file mode 100644
index bd1b5e97..0000000
--- a/contrib/spouts/kafka/java/heron-kafka-spout-parent/pom.xml
+++ /dev/null
@@ -1,76 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Copyright 2019
-  ~
-  ~ Licensed under the Apache License, Version 2.0 (the "License");
-  ~ you may not use this file except in compliance with the License.
-  ~ You may obtain a copy of the License at
-  ~
-  ~ http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xmlns="http://maven.apache.org/POM/4.0.0";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-
-    <groupId>org.apache.heron</groupId>
-    <artifactId>heron-kafka-spout-parent</artifactId>
-    <version>1.0-SNAPSHOT</version>
-    <packaging>pom</packaging>
-
-    <properties>
-        <maven.compiler.source>1.8</maven.compiler.source>
-        <maven.compiler.target>1.8</maven.compiler.target>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    </properties>
-
-    <modules>
-        <module>heron-kafka-spout</module>
-        <module>heron-kafka-spout-sample</module>
-    </modules>
-
-    <dependencyManagement>
-        <dependencies>
-            <dependency>
-                <groupId>com.twitter.heron</groupId>
-                <artifactId>heron-api</artifactId>
-                <version>0.17.8</version>
-            </dependency>
-            <dependency>
-                <groupId>com.twitter.heron</groupId>
-                <artifactId>heron-storm</artifactId>
-                <version>0.17.8</version>
-            </dependency>
-            <dependency>
-                <groupId>org.apache.kafka</groupId>
-                <artifactId>kafka-clients</artifactId>
-                <version>2.1.1</version>
-            </dependency>
-            <dependency>
-                <groupId>org.slf4j</groupId>
-                <artifactId>slf4j-jdk14</artifactId>
-                <version>1.7.26</version>
-            </dependency>
-        </dependencies>
-    </dependencyManagement>
-
-    <build>
-        <pluginManagement>
-            <plugins>
-                <plugin>
-                    <groupId>org.apache.maven.plugins</groupId>
-                    <artifactId>maven-surefire-plugin</artifactId>
-                    <version>2.22.1</version>
-                </plugin>
-            </plugins>
-        </pluginManagement>
-    </build>
-
-</project>
\ No newline at end of file
diff --git 
a/eco/tests/java/org/apache/heron/eco/builder/heron/HeronStreamBuilderTest.java 
b/eco/tests/java/org/apache/heron/eco/builder/heron/HeronStreamBuilderTest.java
index 72ea2a8..3e9d73f 100644
--- 
a/eco/tests/java/org/apache/heron/eco/builder/heron/HeronStreamBuilderTest.java
+++ 
b/eco/tests/java/org/apache/heron/eco/builder/heron/HeronStreamBuilderTest.java
@@ -307,7 +307,8 @@ public class HeronStreamBuilderTest {
     verify(mockContext).getTopologyDefinition();
     verify(mockContext).getBolt(eq(to));
     verify(mockDefinition).parallelismForBolt(eq(to));
-    verify(mockTopologyBuilder).setBolt(eq(to), eq(mockIStatefulWindowedBolt), 
eq(iRichBoltParallelism));
+    verify(mockTopologyBuilder).setBolt(eq(to), eq(mockIStatefulWindowedBolt),
+        eq(iRichBoltParallelism));
     verify(mockBoltDeclarer).customGrouping(eq(from), eq(streamId), 
eq(mockCustomStreamGrouping));
     verify(mockContext).setStreams(anyMap());
     verify(mockDefinition).getStreams();
diff --git a/scripts/get_all_heron_paths.sh b/scripts/get_all_heron_paths.sh
index 0bae1ed..42e9281 100755
--- a/scripts/get_all_heron_paths.sh
+++ b/scripts/get_all_heron_paths.sh
@@ -67,7 +67,7 @@ function get_package_of() {
 }
 
 function get_heron_java_paths() {
-  local java_paths=$(find 
{heron,heron/tools,tools,integration_test,storm-compatibility,eco,eco-storm-examples,eco-heron-examples}
 -name "*.java" | sed "s|/src/java/.*$|/src/java|"| sed 
"s|/java/src/.*$|/java/src|" |  sed "s|/tests/java/.*$|/tests/java|" | sort -u 
| fgrep -v "heron/scheduler/" | fgrep -v "heron/scheduler/" )
+  local java_paths=$(find 
{heron,heron/tools,tools,integration_test,storm-compatibility,eco,eco-storm-examples,eco-heron-examples,contrib}
 -name "*.java" | sed "s|/src/java/.*$|/src/java|"| sed 
"s|/java/src/.*$|/java/src|" |  sed "s|/tests/java/.*$|/tests/java|" | sort -u 
| fgrep -v "heron/scheduler/" | fgrep -v "heron/scheduler/" )
   if [ "$(uname -s | tr 'A-Z' 'a-z')" != "darwin" ]; then
     java_paths=$(echo "${java_paths}" | fgrep -v "/objc_tools/")
   fi
diff --git a/scripts/travis/build.sh b/scripts/travis/build.sh
index d767d3a..a02eddf 100755
--- a/scripts/travis/build.sh
+++ b/scripts/travis/build.sh
@@ -82,7 +82,7 @@ start_timer "$T"
 python ${UTILS}/save-logs.py "heron_build.txt" bazel\
   --bazelrc=tools/travis/bazel.rc build --config=$PLATFORM heron/... \
   heronpy/... examples/... storm-compatibility-examples/... \
-  eco-storm-examples/... eco-heron-examples/...
+  eco-storm-examples/... eco-heron-examples/... contrib/...
 end_timer "$T"
 
 # run heron unit tests
@@ -93,7 +93,7 @@ python ${UTILS}/save-logs.py "heron_test_non_flaky.txt" bazel\
   --test_summary=detailed --test_output=errors\
   --config=$PLATFORM --test_tag_filters=-flaky heron/... \
   heronpy/... examples/... storm-compatibility-examples/... \
-  eco-storm-examples/... eco-heron-examples/...
+  eco-storm-examples/... eco-heron-examples/... contrib/... 
 end_timer "$T"
 
 # flaky tests are often due to test port race conditions,

Reply via email to