This is an automated email from the ASF dual-hosted git repository.

rzo1 pushed a commit to branch STORM-3988
in repository https://gitbox.apache.org/repos/asf/storm.git

commit 78b799cb28fd1e1ac23eec0c45264aabdb93eb21
Author: Richard Zowalla <[email protected]>
AuthorDate: Thu Oct 19 08:58:09 2023 +0200

    STORM-3988 - Remove "storm-kinesis"
---
 external/storm-kinesis/README.md                   | 140 ------
 external/storm-kinesis/pom.xml                     |  73 ----
 .../kinesis/spout/CredentialsProviderChain.java    |  40 --
 .../kinesis/spout/ExponentialBackoffRetrier.java   | 169 -------
 .../kinesis/spout/FailedMessageRetryHandler.java   |  48 --
 .../apache/storm/kinesis/spout/KinesisConfig.java  | 144 ------
 .../storm/kinesis/spout/KinesisConnection.java     | 121 -----
 .../storm/kinesis/spout/KinesisConnectionInfo.java | 117 -----
 .../storm/kinesis/spout/KinesisMessageId.java      |  81 ----
 .../storm/kinesis/spout/KinesisRecordsManager.java | 486 ---------------------
 .../apache/storm/kinesis/spout/KinesisSpout.java   |  86 ----
 .../storm/kinesis/spout/RecordToTupleMapper.java   |  40 --
 .../apache/storm/kinesis/spout/ZkConnection.java   |  97 ----
 .../org/apache/storm/kinesis/spout/ZkInfo.java     | 124 ------
 .../storm/kinesis/spout/test/KinesisBoltTest.java  |  48 --
 .../kinesis/spout/test/KinesisSpoutTopology.java   |  57 ---
 .../spout/test/TestRecordToTupleMapper.java        |  58 ---
 pom.xml                                            |   1 -
 18 files changed, 1930 deletions(-)

diff --git a/external/storm-kinesis/README.md b/external/storm-kinesis/README.md
deleted file mode 100644
index 8eaf53278..000000000
--- a/external/storm-kinesis/README.md
+++ /dev/null
@@ -1,140 +0,0 @@
-#Storm Kinesis Spout
-Provides core storm spout for consuming data from a stream in Amazon Kinesis 
Streams. It stores the sequence numbers that can be committed in zookeeper and 
-starts consuming records after that sequence number on restart by default. 
Below is the code sample to create a sample topology that uses the spout. Each 
-object used in configuring the spout is explained below. Ideally, the number 
of spout tasks should be equal to number of shards in kinesis. However each 
task 
-can read from more than one shard.
-
-```java
-public class KinesisSpoutTopology {
-    public static void main (String args[]) throws InvalidTopologyException, 
AuthorizationException, AlreadyAliveException {
-        String topologyName = args[0];
-        RecordToTupleMapper recordToTupleMapper = new 
TestRecordToTupleMapper();
-        KinesisConnectionInfo kinesisConnectionInfo = new 
KinesisConnectionInfo(new CredentialsProviderChain(), new 
ClientConfiguration(), Regions.US_WEST_2,
-                1000);
-        ZkInfo zkInfo = new ZkInfo("localhost:2181", "/kinesisOffsets", 20000, 
15000, 10000L, 3, 2000);
-        KinesisConfig kinesisConfig = new KinesisConfig(args[1], 
ShardIteratorType.TRIM_HORIZON,
-                recordToTupleMapper, new Date(), new 
ExponentialBackoffRetrier(), zkInfo, kinesisConnectionInfo, 10000L);
-        KinesisSpout kinesisSpout = new KinesisSpout(kinesisConfig);
-        TopologyBuilder topologyBuilder = new TopologyBuilder();
-        topologyBuilder.setSpout("spout", kinesisSpout, 3);
-        topologyBuilder.setBolt("bolt", new KinesisBoltTest(), 
1).shuffleGrouping("spout");
-        Config topologyConfig = new Config();
-        topologyConfig.setDebug(true);
-        topologyConfig.setNumWorkers(3);
-        StormSubmitter.submitTopology(topologyName, topologyConfig, 
topologyBuilder.createTopology());
-    }
-}
-```
-As you can see above the spout takes an object of KinesisConfig in its 
constructor. The constructor of KinesisConfig takes 8 objects as explained 
below.
-
-#### `String` streamName
-name of kinesis stream to consume data from
-
-#### `ShardIteratorType` shardIteratorType
-3 types are supported - TRIM_HORIZON(beginning of shard), LATEST and 
AT_TIMESTAMP. By default this argument is ignored if state for shards 
-is found in zookeeper. Hence they will apply the first time a topology is 
started. If you want to use any of these in subsequent runs of the topology, 
you 
-will need to clear the state of zookeeper node used for storing sequence 
numbers
-
-#### `RecordToTupleMapper` recordToTupleMapper
-an implementation of `RecordToTupleMapper` interface that spout will call to 
convert a kinesis record to a storm tuple. It has two methods. getOutputFields 
-tells the spout the fields that will be present in the tuple emitted from the 
getTuple method. If getTuple returns null, the record will be acked
-```java
-    Fields getOutputFields ();
-    List<Object> getTuple (Record record);
-```
-
-#### `Date` timestamp
-used in conjunction with the AT_TIMESTAMP shardIteratorType argument. This 
will make the spout fetch records from kinesis starting at that time or later. 
The
-time used by kinesis is the server side time associated to the record by 
kinesis
-
-#### `FailedMessageRetryHadnler` failedMessageRetryHandler 
-an implementation of the `FailedMessageRetryHandler` interface. By default 
this module provides an implementation that supports a exponential backoff retry
-mechanism for failed messages. That implementation has two constructors. 
Default no args constructor will configure first retry at 100 milliseconds and 
-subsequent retires at Math.pow(2, i-1) where i is the retry number in the 
range 2 to LONG.MAX_LONG. 2 represents the base for exponential function in 
seconds. 
-Other constructor takes retry interval in millis for first retry as first 
argument, base for exponential function in seconds as second argument and 
number of 
-retries as third argument. The methods of this interface and its working in 
accord with the spout is explained below
-```java
-    boolean failed (KinesisMessageId messageId);
-    KinesisMessageId getNextFailedMessageToRetry ();
-    void failedMessageEmitted (KinesisMessageId messageId);
-    void acked (KinesisMessageId messageId);
-```
-failed method will be called on every tuple that failed in the spout. It 
should return true if that failed message is scheduled to be retried, false 
otherwise.
-
-getNextFailedMessageToRetry method will be called the first thing every time a 
spout wants to emit a tuple. It should return a message that should be retried
-if any or null otherwise. Note that it can return null in the case it does not 
have any message to retry as of that moment. However, it should eventually 
-return every message for which it returned true when failed method was called 
for that message
-
-failedMessageEmitted will be called if spout successfully manages to get the 
record from kinesis and emit it. If not, the implementation should return the 
same 
-message when getNextFailedMessageToRetry is called again
-
-acked will be called once the failed message was re-emitted and successfully 
acked by the spout. If it was failed by the spout failed will be called again
-
-#### `ZkInfo` zkInfo
-an object encapsulating information for zookeeper interaction. The constructor 
takes zkUrl as first argument which is a comma separated string of zk host and
-port, zkNode as second that will be used as the root node for storing 
committed sequence numbers, session timeout as third in milliseconds, 
connection timeout
-as fourth in milliseconds, commit interval as fifth in milliseconds for 
committing sequence numbers to zookeeper, retry attempts as sixth for zk client
-connection retry attempts, retry interval as seventh in milliseconds for time 
to wait before retrying to connect. 
-
-#### `KinesisConnectionInfo` kinesisConnectionInfo
-an object that captures arguments for connecting to kinesis using kinesis 
client. It has a constructor that takes an implementation of 
`AWSCredentialsProvider`
-as first argument. This module provides an implementation called 
`CredentialsProviderChain` that allows the spout to authenticate with kinesis 
using one of 
-the 5 mechanisms in this order - `EnvironmentVariableCredentialsProvider`, 
`SystemPropertiesCredentialsProvider`, 
`ClasspathPropertiesFileCredentialsProvider`, 
-`InstanceProfileCredentialsProvider`, `ProfileCredentialsProvider`. It takes 
an object of `ClientConfiguration` as second argument for configuring the 
kinesis 
-client, `Regions` as third argument that sets the region to connect to on the 
client and recordsLimit as the fourth argument which represents the maximum 
number
-of records kinesis client will retrieve for every GetRecords request. This 
limit should be carefully chosen based on the size of the record, kinesis 
-throughput rate limits and per tuple latency in storm for the topology. Also 
if one task will be reading from more than one shards then that will also affect
-the choice of limit argument
-
-#### `Long` maxUncommittedRecords
-this represents the maximum number of uncommitted sequence numbers allowed per 
task. Once this number is reached spout will not fetch any new records from 
-kinesis. Uncommited sequence numbers are defined as the sum of all the 
messages for a task that have not been committed to zookeeper. This is 
different from 
-topology level max pending messages. For example if this value is set to 10, 
and the spout emitted sequence numbers from 1 to 10. Sequence number 1 is 
pending 
-and 2 to 10 acked. In that case the number of uncommitted sequence numbers is 
10 since no sequence number in the range 1 to 10 can be committed to zk. 
-However, storm can still call next tuple on the spout because there is only 1 
pending message
- 
-### Maven dependencies
-Aws sdk version that this was tested with is 1.10.77
-
-```xml
- <dependencies>
-        <dependency>
-            <groupId>com.amazonaws</groupId>
-            <artifactId>aws-java-sdk</artifactId>
-            <version>${aws-java-sdk.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-client</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-framework</artifactId>
-            <version>${curator.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>com.googlecode.json-simple</groupId>
-            <artifactId>json-simple</artifactId>
-        </dependency>
- </dependencies>
-```
-
-#Future Work
-Handle merging or splitting of shards in kinesis, Trident spout implementation 
and metrics
-
-## Committer Sponsors
-
- * Sriharsha Chintalapani ([[email protected]](mailto:[email protected]))
diff --git a/external/storm-kinesis/pom.xml b/external/storm-kinesis/pom.xml
deleted file mode 100644
index 9c2805c75..000000000
--- a/external/storm-kinesis/pom.xml
+++ /dev/null
@@ -1,73 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-     http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <parent>
-        <artifactId>storm</artifactId>
-        <groupId>org.apache.storm</groupId>
-        <version>2.6.0-SNAPSHOT</version>
-        <relativePath>../../pom.xml</relativePath>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-    <packaging>jar</packaging>
-    <artifactId>storm-kinesis</artifactId>
-    <name>storm-kinesis</name>
-
-    <properties>
-        <aws-java-sdk.version>1.10.77</aws-java-sdk.version>
-    </properties>
-
-    <dependencies>
-        <dependency>
-            <groupId>com.amazonaws</groupId>
-            <artifactId>aws-java-sdk</artifactId>
-            <version>${aws-java-sdk.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-client</artifactId>
-            <version>${project.version}</version>
-            <scope>${provided.scope}</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-framework</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.zookeeper</groupId>
-            <artifactId>zookeeper</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>net.minidev</groupId>
-            <artifactId>json-smart</artifactId>
-        </dependency>
-    </dependencies>
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-checkstyle-plugin</artifactId>
-                <!--Note - the version would be inherited-->
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-pmd-plugin</artifactId>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>
-
diff --git 
a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java
 
b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java
deleted file mode 100644
index 5820851cf..000000000
--- 
a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kinesis.spout;
-
-import com.amazonaws.auth.AWSCredentialsProviderChain;
-import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider;
-import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
-import com.amazonaws.auth.InstanceProfileCredentialsProvider;
-import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
-import com.amazonaws.auth.profile.ProfileCredentialsProvider;
-
-/**
- * Class representing chain of mechanisms that will be used in order to 
connect to kinesis.
- */
-public class CredentialsProviderChain extends AWSCredentialsProviderChain {
-
-    public CredentialsProviderChain() {
-        super(new EnvironmentVariableCredentialsProvider(),
-                new SystemPropertiesCredentialsProvider(),
-                new ClasspathPropertiesFileCredentialsProvider(),
-                new InstanceProfileCredentialsProvider(),
-                new ProfileCredentialsProvider());
-    }
-}
diff --git 
a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
 
b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
deleted file mode 100644
index 2920913d2..000000000
--- 
a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kinesis.spout;
-
-import java.io.Serializable;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, 
Serializable {
-    private static final Logger LOG = 
LoggerFactory.getLogger(ExponentialBackoffRetrier.class);
-    // Wait interfal for retrying after first failure
-    private final Long initialDelayMillis;
-    // Base for exponential function in seconds for retrying for second, third 
and so on failures
-    private final Long baseSeconds;
-    // Maximum number of retries
-    private final Long maxRetries;
-    // map to track number of failures for each kinesis message that failed
-    private Map<KinesisMessageId, Long> failCounts = new HashMap<>();
-    // map to track next retry time for each kinesis message that failed
-    private Map<KinesisMessageId, Long> retryTimes = new HashMap<>();
-    // sorted set of records to be retrued based on retry time. earliest 
retryTime record comes first
-    private SortedSet<KinesisMessageId> retryMessageSet = new TreeSet<>(new 
RetryTimeComparator());
-
-    /**
-     * No args constructor that uses defaults of 100 ms for first retry, max 
retries of Long.MAX_VALUE and an
-     * exponential backoff of {@code Math.pow(2,i-1)} secs for retry {@code i} 
where {@code i = 2,3,...}.
-     */
-    public ExponentialBackoffRetrier() {
-        this(100L, 2L, Long.MAX_VALUE);
-    }
-
-    /**
-     * Creates a new exponential backoff retrier.
-     * @param initialDelayMillis delay in milliseconds for first retry
-     * @param baseSeconds base for exponent function in seconds
-     * @param maxRetries maximum number of retries before the record is 
discarded/acked
-     */
-    public ExponentialBackoffRetrier(Long initialDelayMillis, Long 
baseSeconds, Long maxRetries) {
-        this.initialDelayMillis = initialDelayMillis;
-        this.baseSeconds = baseSeconds;
-        this.maxRetries = maxRetries;
-        validate();
-    }
-
-    private void validate() {
-        if (initialDelayMillis < 0) {
-            throw new IllegalArgumentException("initialDelayMillis cannot be 
negative.");
-        }
-        if (baseSeconds < 0) {
-            throw new IllegalArgumentException("baseSeconds cannot be 
negative.");
-        }
-        if (maxRetries < 0) {
-            throw new IllegalArgumentException("maxRetries cannot be 
negative.");
-        }
-    }
-
-    @Override
-    public boolean failed(KinesisMessageId messageId) {
-        LOG.debug("Handling failed message {}", messageId);
-        // if maxRetries is 0, dont retry and return false as per interface 
contract
-        if (maxRetries == 0) {
-            LOG.warn("maxRetries set to 0. Hence not queueing " + messageId);
-            return false;
-        }
-        // if first failure add it to the count map
-        if (!failCounts.containsKey(messageId)) {
-            failCounts.put(messageId, 0L);
-        }
-        // increment the fail count as we started with 0
-        Long failCount = failCounts.get(messageId);
-        failCounts.put(messageId, ++failCount);
-        // if fail count is greater than maxRetries, discard or ack. for e.g. 
for maxRetries 3, 4 failures are allowed at maximum
-        if (failCount > maxRetries) {
-            LOG.warn("maxRetries reached so dropping " + messageId);
-            failCounts.remove(messageId);
-            return false;
-        }
-        // if reached so far, add it to the set of messages waiting to be 
retried with next retry time based on how many times it failed
-        retryTimes.put(messageId, getRetryTime(failCount));
-        retryMessageSet.add(messageId);
-        LOG.debug("Scheduled {} for retry at {} and retry attempt {}", 
messageId, retryTimes.get(messageId), failCount);
-        return true;
-    }
-
-    @Override
-    public void acked(KinesisMessageId messageId) {
-        // message was acked after being retried. so clear the state for that 
message
-        LOG.debug("Ack received for {}. Hence cleaning state.", messageId);
-        failCounts.remove(messageId);
-    }
-
-    @Override
-    public KinesisMessageId getNextFailedMessageToRetry() {
-        KinesisMessageId result = null;
-        // return the first message to be retried from the set. It will return 
the message with the earliest retry time <= current time
-        if (!retryMessageSet.isEmpty()) {
-            result = retryMessageSet.first();
-            if (!(retryTimes.get(result) <= System.nanoTime())) {
-                result = null;
-            }
-        }
-        LOG.debug("Returning {} to spout for retrying.", result);
-        return result;
-    }
-
-    @Override
-    public void failedMessageEmitted(KinesisMessageId messageId) {
-        // spout notified that message returned by us for retrying was 
actually emitted. hence remove it from set and
-        // wait for its ack or fail
-        // but still keep it in counts map to retry again on failure or remove 
on ack
-        LOG.debug("Spout says {} emitted. Hence removing it from queue and 
wait for its ack or fail", messageId);
-        retryMessageSet.remove(messageId);
-        retryTimes.remove(messageId);
-    }
-
-    /**
-     * private helper method to get next retry time for retry attempt i 
(handles long overflow as well by capping it to
-     * Long.MAX_VALUE).
-     */
-    private Long getRetryTime(Long retryNum) {
-        Long retryTime = System.nanoTime();
-        Long nanoMultiplierForMillis = 1000000L;
-        // if first retry then retry time  = current time  + initial delay
-        if (retryNum == 1) {
-            retryTime += initialDelayMillis * nanoMultiplierForMillis;
-        } else {
-            // else use the exponential backoff logic and handle long overflow
-            Long maxValue = Long.MAX_VALUE;
-            double time = Math.pow(baseSeconds, retryNum - 1) * 1000 * 
nanoMultiplierForMillis;
-            // if delay or delay + current time are bigger than long max value
-            // second predicate for or condition uses the fact that long 
addition over the limit circles back
-            if ((time >= maxValue.doubleValue()) || ((retryTime + (long) time) 
< retryTime)) {
-                retryTime = maxValue;
-            } else {
-                retryTime += (long) time;
-            }
-        }
-        return retryTime;
-    }
-
-    private class RetryTimeComparator implements Serializable, 
Comparator<KinesisMessageId> {
-        @Override
-        public int compare(KinesisMessageId o1, KinesisMessageId o2) {
-            return retryTimes.get(o1).compareTo(retryTimes.get(o2));
-        }
-    }
-}
diff --git 
a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java
 
b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java
deleted file mode 100644
index c005c5986..000000000
--- 
a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kinesis.spout;
-
-import java.io.Serializable;
-
-public interface FailedMessageRetryHandler extends Serializable {
-    /**
-     * message with messageId failed in the spout.
-     * @param messageId the message id
-     * @return true if this failed message was scheduled to be retried, false 
otherwise
-     */
-    boolean failed(KinesisMessageId messageId);
-
-    /**
-     * message with messageId succeeded/acked in the spout.
-     * @param messageId the message id
-     */
-    void acked(KinesisMessageId messageId);
-
-    /**
-     * Get the next failed message's id to retry if any, null otherwise.
-     * @return messageId
-     */
-    KinesisMessageId getNextFailedMessageToRetry();
-
-    /**
-     * message with messageId returned by last call to 
getNextFailedMessageToRetry was emitted/retried by the spout.
-     * @param messageId the message id
-     */
-    void failedMessageEmitted(KinesisMessageId messageId);
-}
diff --git 
a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java
 
b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java
deleted file mode 100644
index eeb58968c..000000000
--- 
a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kinesis.spout;
-
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
-
-import java.io.Serializable;
-import java.util.Date;
-
-public class KinesisConfig implements Serializable {
-    // kinesis stream name to read from
-    private final String streamName;
-    // shard iterator type based on kinesis api - beginning of time, latest, 
at timestamp are only supported
-    private final ShardIteratorType shardIteratorType;
-    // implementation for converting a Kinesis record to a storm tuple
-    private final RecordToTupleMapper recordToTupleMapper;
-    // timestamp to be used for shardIteratorType AT_TIMESTAMP - can be null
-    private final Date timestamp;
-    // implementation for handling the failed messages retry logic
-    private final FailedMessageRetryHandler failedMessageRetryHandler;
-    // object capturing all zk related information for storing committed 
sequence numbers
-    private final ZkInfo zkInfo;
-    // object representing information on paramaters to use while connecting 
to kinesis using kinesis client
-    private final KinesisConnectionInfo kinesisConnectionInfo;
-    /**
-     * This number represents the number of messages that are still not 
committed to zk. it will prevent the spout from
-     * emitting further. for e.g. if 1 failed and 2,3,4,5..... all have been 
acked by storm, they still can't be
-     * committed to zk because 1 is still in failed set. As a result the acked 
queue can infinitely grow without any of
-     * them being committed to zk. topology max pending does not help since 
from storm's view they are acked
-     */
-    private final Long maxUncommittedRecords;
-
-    public KinesisConfig(String streamName,
-            ShardIteratorType shardIteratorType,
-            RecordToTupleMapper recordToTupleMapper,
-            Date timestamp,
-            FailedMessageRetryHandler failedMessageRetryHandler,
-            ZkInfo zkInfo,
-            KinesisConnectionInfo kinesisConnectionInfo,
-            Long maxUncommittedRecords) {
-        this.streamName = streamName;
-        this.shardIteratorType = shardIteratorType;
-        this.recordToTupleMapper = recordToTupleMapper;
-        this.timestamp = timestamp;
-        this.failedMessageRetryHandler = failedMessageRetryHandler;
-        this.zkInfo = zkInfo;
-        this.kinesisConnectionInfo = kinesisConnectionInfo;
-        this.maxUncommittedRecords = maxUncommittedRecords;
-        validate();
-    }
-
-    private void validate() {
-        if (streamName == null || streamName.length() < 1) {
-            throw new IllegalArgumentException("streamName is required and 
cannot be of length 0.");
-        }
-        if (shardIteratorType == null
-                || 
shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
-                || 
shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) {
-            throw new IllegalArgumentException("shardIteratorType has to be 
one of the " + ShardIteratorType.AT_TIMESTAMP
-                    + "," + ShardIteratorType.LATEST
-                    + "," + ShardIteratorType.TRIM_HORIZON);
-        }
-        if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP) && 
timestamp == null) {
-            throw new IllegalArgumentException("timestamp must be provided if 
shardIteratorType is " + ShardIteratorType.AT_TIMESTAMP);
-        }
-        if (recordToTupleMapper == null) {
-            throw new IllegalArgumentException("recordToTupleMapper cannot be 
null");
-        }
-        if (failedMessageRetryHandler == null) {
-            throw new IllegalArgumentException("failedMessageRetryHandler 
cannot be null");
-        }
-        if (zkInfo == null) {
-            throw new IllegalArgumentException("zkInfo cannot be null");
-        }
-        if (kinesisConnectionInfo == null) {
-            throw new IllegalArgumentException("kinesisConnectionInfo cannot 
be null");
-        }
-        if (maxUncommittedRecords == null || maxUncommittedRecords < 1) {
-            throw new IllegalArgumentException("maxUncommittedRecords has to 
be a positive integer");
-        }
-    }
-
-    public String getStreamName() {
-        return streamName;
-    }
-
-    public ShardIteratorType getShardIteratorType() {
-        return shardIteratorType;
-    }
-
-    public RecordToTupleMapper getRecordToTupleMapper() {
-        return recordToTupleMapper;
-    }
-
-    public Date getTimestamp() {
-        return timestamp;
-    }
-
-    public FailedMessageRetryHandler getFailedMessageRetryHandler() {
-        return failedMessageRetryHandler;
-    }
-
-    public ZkInfo getZkInfo() {
-        return zkInfo;
-    }
-
-    public KinesisConnectionInfo getKinesisConnectionInfo() {
-        return kinesisConnectionInfo;
-    }
-
-    public Long getMaxUncommittedRecords() {
-        return maxUncommittedRecords;
-    }
-
-    @Override
-    public String toString() {
-        return "KinesisConfig{"
-                + "streamName='" + streamName + '\''
-                + ", shardIteratorType=" + shardIteratorType
-                + ", recordToTupleMapper=" + recordToTupleMapper
-                + ", timestamp=" + timestamp
-                + ", zkInfo=" + zkInfo
-                + ", kinesisConnectionInfo=" + kinesisConnectionInfo
-                + ", failedMessageRetryHandler =" + failedMessageRetryHandler
-                + ", maxUncommittedRecords=" + maxUncommittedRecords
-                + '}';
-    }
-}
diff --git 
a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java
 
b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java
deleted file mode 100644
index b8b969aee..000000000
--- 
a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kinesis.spout;
-
-import com.amazonaws.regions.Region;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
-import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
-import com.amazonaws.services.kinesis.model.DescribeStreamResult;
-import com.amazonaws.services.kinesis.model.GetRecordsRequest;
-import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
-import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
-import com.amazonaws.services.kinesis.model.Shard;
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-class KinesisConnection {
-    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisRecordsManager.class);
-    private final KinesisConnectionInfo kinesisConnectionInfo;
-    private AmazonKinesisClient kinesisClient;
-
-    KinesisConnection(KinesisConnectionInfo kinesisConnectionInfo) {
-        this.kinesisConnectionInfo = kinesisConnectionInfo;
-    }
-
-    void initialize() {
-        kinesisClient = new 
AmazonKinesisClient(kinesisConnectionInfo.getCredentialsProvider(),
-                kinesisConnectionInfo.getClientConfiguration());
-        
kinesisClient.setRegion(Region.getRegion(kinesisConnectionInfo.getRegion()));
-    }
-
-    List<Shard> getShardsForStream(String stream) {
-        DescribeStreamRequest describeStreamRequest = new 
DescribeStreamRequest();
-        describeStreamRequest.setStreamName(stream);
-        List<Shard> shards = new ArrayList<>();
-        String exclusiveStartShardId = null;
-        do {
-            
describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId);
-            DescribeStreamResult describeStreamResult = 
kinesisClient.describeStream(describeStreamRequest);
-            
shards.addAll(describeStreamResult.getStreamDescription().getShards());
-            if (describeStreamResult.getStreamDescription().getHasMoreShards() 
&& shards.size() > 0) {
-                exclusiveStartShardId = shards.get(shards.size() - 
1).getShardId();
-            } else {
-                exclusiveStartShardId = null;
-            }
-        } while (exclusiveStartShardId != null);
-        LOG.info("Number of shards for stream " + stream + " are " + 
shards.size());
-        return shards;
-    }
-
-    String getShardIterator(String stream,
-            String shardId,
-            ShardIteratorType shardIteratorType,
-            String sequenceNumber,
-            Date timestamp) {
-        String shardIterator = "";
-        try {
-            GetShardIteratorRequest getShardIteratorRequest = new 
GetShardIteratorRequest();
-            getShardIteratorRequest.setStreamName(stream);
-            getShardIteratorRequest.setShardId(shardId);
-            getShardIteratorRequest.setShardIteratorType(shardIteratorType);
-            if 
(shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
-                    || 
shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) {
-                
getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber);
-            } else if 
(shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP)) {
-                getShardIteratorRequest.setTimestamp(timestamp);
-            }
-            GetShardIteratorResult getShardIteratorResult = 
kinesisClient.getShardIterator(getShardIteratorRequest);
-            if (getShardIteratorResult != null) {
-                shardIterator = getShardIteratorResult.getShardIterator();
-            }
-        } catch (Exception e) {
-            LOG.warn("Exception occured while getting shardIterator for shard 
" + shardId
-                    + " shardIteratorType " + shardIteratorType
-                    + " sequence number " + sequenceNumber
-                    + " timestamp " + timestamp,
-                    e);
-        }
-        LOG.warn("Returning shardIterator " + shardIterator
-                + " for shardId " + shardId
-                + " shardIteratorType " + shardIteratorType
-                + " sequenceNumber " + sequenceNumber
-                + " timestamp" + timestamp);
-        return shardIterator;
-    }
-
-    GetRecordsResult fetchRecords(String shardIterator) {
-        GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
-        getRecordsRequest.setShardIterator(shardIterator);
-        getRecordsRequest.setLimit(kinesisConnectionInfo.getRecordsLimit());
-        GetRecordsResult getRecordsResult = 
kinesisClient.getRecords(getRecordsRequest);
-        return getRecordsResult;
-    }
-
-    void shutdown() {
-        kinesisClient.shutdown();
-    }
-
-}
diff --git 
a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
 
b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
deleted file mode 100644
index 886720b1c..000000000
--- 
a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kinesis.spout;
-
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.regions.Regions;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.Serializable;
-import java.util.Arrays;
-
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-public class KinesisConnectionInfo implements Serializable {
-    private final byte[] serializedKinesisCredsProvider;
-    private final byte[] serializedkinesisClientConfig;
-    private final Integer recordsLimit;
-    private final Regions region;
-
-    private transient AWSCredentialsProvider credentialsProvider;
-    private transient ClientConfiguration clientConfiguration;
-
-    /**
-     * Create a new Kinesis connection info.
-     * @param credentialsProvider implementation to provide credentials to 
connect to kinesis
-     * @param clientConfiguration client configuration to pass to kinesis 
client
-     * @param region region to connect to
-     * @param recordsLimit max records to be fetched in a getRecords request 
to kinesis
-     */
-    public KinesisConnectionInfo(AWSCredentialsProvider credentialsProvider,
-            ClientConfiguration clientConfiguration,
-            Regions region,
-            Integer recordsLimit) {
-        if (recordsLimit == null || recordsLimit <= 0) {
-            throw new IllegalArgumentException("recordsLimit has to be a 
positive integer");
-        }
-        if (region == null) {
-            throw new IllegalArgumentException("region cannot be null");
-        }
-        serializedKinesisCredsProvider = 
getKryoSerializedBytes(credentialsProvider);
-        serializedkinesisClientConfig = 
getKryoSerializedBytes(clientConfiguration);
-        this.recordsLimit = recordsLimit;
-        this.region = region;
-    }
-
-    public Integer getRecordsLimit() {
-        return recordsLimit;
-    }
-
-    public AWSCredentialsProvider getCredentialsProvider() {
-        if (credentialsProvider == null) {
-            credentialsProvider = (AWSCredentialsProvider) 
this.getKryoDeserializedObject(serializedKinesisCredsProvider);
-        }
-        return credentialsProvider;
-    }
-
-    public ClientConfiguration getClientConfiguration() {
-        if (clientConfiguration == null) {
-            clientConfiguration = (ClientConfiguration) 
this.getKryoDeserializedObject(serializedkinesisClientConfig);
-        }
-        return clientConfiguration;
-    }
-
-    public Regions getRegion() {
-        return region;
-    }
-
-    private byte[] getKryoSerializedBytes(final Object obj) {
-        final Kryo kryo = new Kryo();
-        kryo.setRegistrationRequired(false);
-        final ByteArrayOutputStream os = new ByteArrayOutputStream();
-        final Output output = new Output(os);
-        kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
-        kryo.writeClassAndObject(output, obj);
-        output.flush();
-        return os.toByteArray();
-    }
-
-    private Object getKryoDeserializedObject(final byte[] ser) {
-        final Kryo kryo = new Kryo();
-        kryo.setRegistrationRequired(false);
-        final Input input = new Input(new ByteArrayInputStream(ser));
-        kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
-        return kryo.readClassAndObject(input);
-    }
-
-    @Override
-    public String toString() {
-        return "KinesisConnectionInfo{"
-                + "serializedKinesisCredsProvider=" + 
Arrays.toString(serializedKinesisCredsProvider)
-                + ", serializedkinesisClientConfig=" + 
Arrays.toString(serializedkinesisClientConfig)
-                + ", region=" + region
-                + ", recordsLimit=" + recordsLimit
-                + '}';
-    }
-}
diff --git 
a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java
 
b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java
deleted file mode 100644
index 7fd179fbf..000000000
--- 
a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kinesis.spout;
-
-public class KinesisMessageId {
-    private final String streamName;
-    private final String shardId;
-    private final String sequenceNumber;
-
-    KinesisMessageId(String streamName, String shardId, String sequenceNumber) 
{
-        this.streamName = streamName;
-        this.shardId = shardId;
-        this.sequenceNumber = sequenceNumber;
-    }
-
-    public String getStreamName() {
-        return streamName;
-    }
-
-    public String getShardId() {
-        return shardId;
-    }
-
-    public String getSequenceNumber() {
-        return sequenceNumber;
-    }
-
-    @Override
-    public String toString() {
-        return "KinesisMessageId{"
-                + "streamName='" + streamName + '\''
-                + ", shardId='" + shardId + '\''
-                + ", sequenceNumber='" + sequenceNumber + '\''
-                + '}';
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        KinesisMessageId that = (KinesisMessageId) o;
-
-        if (streamName != null ? !streamName.equals(that.streamName) : 
that.streamName != null) {
-            return false;
-        }
-        if (shardId != null ? !shardId.equals(that.shardId) : that.shardId != 
null) {
-            return false;
-        }
-        return !(sequenceNumber != null ? 
!sequenceNumber.equals(that.sequenceNumber) : that.sequenceNumber != null);
-
-    }
-
-    @Override
-    public int hashCode() {
-        int result = streamName != null ? streamName.hashCode() : 0;
-        result = 31 * result + (shardId != null ? shardId.hashCode() : 0);
-        result = 31 * result + (sequenceNumber != null ? 
sequenceNumber.hashCode() : 0);
-        return result;
-    }
-}
diff --git 
a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
 
b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
deleted file mode 100644
index 33dba4aa5..000000000
--- 
a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
+++ /dev/null
@@ -1,486 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kinesis.spout;
-
-import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
-import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
-import com.amazonaws.services.kinesis.model.Record;
-import com.amazonaws.services.kinesis.model.Shard;
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
-
-import java.math.BigInteger;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-
-import org.apache.storm.spout.SpoutOutputCollector;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-class KinesisRecordsManager {
-    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisRecordsManager.class);
-    // object handling zk interaction
-    private transient ZkConnection zkConnection;
-    // object handling interaction with kinesis
-    private transient KinesisConnection kinesisConnection;
-    // Kinesis Spout KinesisConfig object
-    private final transient KinesisConfig kinesisConfig;
-    // Queue of records per shard fetched from kinesis and are waiting to be 
emitted
-    private transient Map<String, LinkedList<Record>> toEmitPerShard = new 
HashMap<>();
-    // Map of records  that were fetched from kinesis as a result of failure 
and are waiting to be emitted
-    private transient Map<KinesisMessageId, Record> failedandFetchedRecords = 
new HashMap<>();
-    /**
-     * Sequence numbers per shard that have been emitted. LinkedHashSet as we 
need to remove on ack or fail.
-     * At the same time order is needed to figure out the sequence number to 
commit. Logic explained in commit
-     */
-    private transient Map<String, TreeSet<BigInteger>> emittedPerShard = new 
HashMap<>();
-    // sorted acked sequence numbers - needed to figure out what sequence 
number can be committed
-    private transient Map<String, TreeSet<BigInteger>> ackedPerShard = new 
HashMap<>();
-    // sorted failed sequence numbers - needed to figure out what sequence 
number can be committed
-    private transient Map<String, TreeSet<BigInteger>> failedPerShard = new 
HashMap<>();
-    // shard iterator corresponding to position in shard for new messages
-    private transient Map<String, String> shardIteratorPerShard = new 
HashMap<>();
-    // last fetched sequence number corresponding to position in shard
-    private transient Map<String, String> fetchedSequenceNumberPerShard = new 
HashMap<>();
-    // shard iterator corresponding to position in shard for failed messages
-    private transient Map<KinesisMessageId, String> 
shardIteratorPerFailedMessage = new HashMap<>();
-    // timestamp to decide when to commit to zk again
-    private transient long lastCommitTime;
-    // boolean to track deactivated state
-    private transient boolean deactivated;
-
-    KinesisRecordsManager(KinesisConfig kinesisConfig) {
-        this.kinesisConfig = kinesisConfig;
-        this.zkConnection = new ZkConnection(kinesisConfig.getZkInfo());
-        this.kinesisConnection = new 
KinesisConnection(kinesisConfig.getKinesisConnectionInfo());
-    }
-
-    void initialize(int myTaskIndex, int totalTasks) {
-        deactivated = false;
-        lastCommitTime = System.currentTimeMillis();
-        kinesisConnection.initialize();
-        zkConnection.initialize();
-        List<Shard> shards = 
kinesisConnection.getShardsForStream(kinesisConfig.getStreamName());
-        LOG.info("myTaskIndex is " + myTaskIndex);
-        LOG.info("totalTasks is " + totalTasks);
-        int i = myTaskIndex;
-        while (i < shards.size()) {
-            LOG.info("Shard id " + shards.get(i).getShardId() + " assigned to 
task " + myTaskIndex);
-            toEmitPerShard.put(shards.get(i).getShardId(), new 
LinkedList<Record>());
-            i += totalTasks;
-        }
-        initializeFetchedSequenceNumbers();
-        refreshShardIteratorsForNewRecords();
-    }
-
-    void next(SpoutOutputCollector collector) {
-        if (shouldCommit()) {
-            commit();
-        }
-        KinesisMessageId failedMessageId = 
kinesisConfig.getFailedMessageRetryHandler().getNextFailedMessageToRetry();
-        if (failedMessageId  != null) {
-            // if the retry service returns a message that is not in failed 
set then ignore it. should never happen
-            BigInteger failedSequenceNumber = new 
BigInteger(failedMessageId.getSequenceNumber());
-            if (failedPerShard.containsKey(failedMessageId.getShardId())
-                    && 
failedPerShard.get(failedMessageId.getShardId()).contains(failedSequenceNumber))
 {
-                if (!failedandFetchedRecords.containsKey(failedMessageId)) {
-                    fetchFailedRecords(failedMessageId);
-                }
-                if (emitFailedRecord(collector, failedMessageId)) {
-                    
failedPerShard.get(failedMessageId.getShardId()).remove(failedSequenceNumber);
-                    
kinesisConfig.getFailedMessageRetryHandler().failedMessageEmitted(failedMessageId);
-                    return;
-                } else {
-                    LOG.warn("failedMessageEmitted not called on retrier for " 
+ failedMessageId
-                            + ". This can happen a few times but should not 
happen infinitely");
-                }
-            } else {
-                LOG.warn("failedPerShard does not contain " + failedMessageId 
+ ". This should never happen.");
-            }
-        }
-        LOG.debug("No failed record to emit for now. Hence will try to emit 
new records");
-        // if maximum uncommitted records count has reached, so dont emit any 
new records and return
-        if (!(getUncommittedRecordsCount() < 
kinesisConfig.getMaxUncommittedRecords())) {
-            LOG.warn("maximum uncommitted records count has reached. so not 
emitting any new records and returning");
-            return;
-        }
-        // early return as no shard is assigned - probably because number of 
executors > number of shards
-        if (toEmitPerShard.isEmpty()) {
-            LOG.warn("No shard is assigned to this task. Hence not emitting 
any tuple.");
-            return;
-        }
-
-        if (shouldFetchNewRecords()) {
-            fetchNewRecords();
-        }
-        emitNewRecord(collector);
-    }
-
-    void ack(KinesisMessageId kinesisMessageId) {
-        // for an acked message add it to acked set and remove it from emitted 
and failed
-        String shardId = kinesisMessageId.getShardId();
-        BigInteger sequenceNumber = new 
BigInteger(kinesisMessageId.getSequenceNumber());
-        LOG.debug("Ack received for shardId: {} sequenceNumber: {}", shardId, 
sequenceNumber);
-        // if an ack is received for a message then add it to the 
ackedPerShard TreeSet. TreeSet because while
-        // committing we need to figure out what is the
-        // highest sequence number that can be committed for this shard
-        if (!ackedPerShard.containsKey(shardId)) {
-            ackedPerShard.put(shardId, new TreeSet<BigInteger>());
-        }
-        ackedPerShard.get(shardId).add(sequenceNumber);
-        // if the acked message was in emittedPerShard that means we need to 
remove it from the emittedPerShard (which
-        // keeps track of in flight tuples)
-        if (emittedPerShard.containsKey(shardId)) {
-            TreeSet<BigInteger> emitted = emittedPerShard.get(shardId);
-            emitted.remove(sequenceNumber);
-        }
-        // an acked message should not be in failed since if it fails and gets 
re-emitted it moves to emittedPerShard
-        // from failedPerShard. Defensive coding.
-        // Remove it from failedPerShard anyway
-        if (failedPerShard.containsKey(shardId)) {
-            failedPerShard.get(shardId).remove(sequenceNumber);
-        }
-        // if an ack is for a message that failed once at least and was 
re-emitted then the record itself will be in
-        // failedAndFetchedRecords. We use that to
-        // determine if the FailedMessageRetryHandler needs to be told about 
it and then remove the record itself to
-        // clean up memory
-        if (failedandFetchedRecords.containsKey(kinesisMessageId)) {
-            
kinesisConfig.getFailedMessageRetryHandler().acked(kinesisMessageId);
-            failedandFetchedRecords.remove(kinesisMessageId);
-        }
-        // keep committing when topology is deactivated since ack and fail 
keep getting called on deactivated topology
-        if (deactivated) {
-            commit();
-        }
-    }
-
-    void fail(KinesisMessageId kinesisMessageId) {
-        String shardId = kinesisMessageId.getShardId();
-        BigInteger sequenceNumber = new 
BigInteger(kinesisMessageId.getSequenceNumber());
-        LOG.debug("Fail received for shardId: {} sequenceNumber: {}", shardId, 
sequenceNumber);
-        // for a failed message add it to failed set if it will be retried, 
otherwise ack it; remove from emitted either way
-        if 
(kinesisConfig.getFailedMessageRetryHandler().failed(kinesisMessageId)) {
-            if (!failedPerShard.containsKey(shardId)) {
-                failedPerShard.put(shardId, new TreeSet<BigInteger>());
-            }
-            failedPerShard.get(shardId).add(sequenceNumber);
-            TreeSet<BigInteger> emitted = emittedPerShard.get(shardId);
-            emitted.remove(sequenceNumber);
-        } else {
-            ack(kinesisMessageId);
-        }
-        // keep committing when topology is deactivated since ack and fail 
keep getting called on deactivated topology
-        if (deactivated) {
-            commit();
-        }
-    }
-
-    void commit() {
-        // We have three mutually disjoint treesets per shard at any given 
time to keep track of what sequence number
-        // can be committed to zookeeper.
-        // emittedPerShard, ackedPerShard and failedPerShard. Any record 
starts by entering emittedPerShard. On ack
-        // it moves from emittedPerShard to
-        // ackedPerShard and on fail if retry service tells us to retry then 
it moves from emittedPerShard to
-        // failedPerShard. The failed records will move from
-        // failedPerShard to emittedPerShard when the failed record is emitted 
again as a retry.
-        // Logic for deciding what sequence number to commit is find the 
highest sequence number from ackedPerShard
-        // called X such that there is no sequence
-        // number Y in emittedPerShard or failedPerShard that satisfies X > Y. 
For e.g. if ackedPerShard is 1,4,5,
-        // emittedPerShard is 2,6 and
-        // failedPerShard is 3,7 then we can only commit 1 and not 4 because 2 
is still pending and 3 has failed
-        for (String shardId: toEmitPerShard.keySet()) {
-            if (ackedPerShard.containsKey(shardId)) {
-                BigInteger commitSequenceNumberBound = null;
-                if (failedPerShard.containsKey(shardId) && 
!failedPerShard.get(shardId).isEmpty()) {
-                    commitSequenceNumberBound = 
failedPerShard.get(shardId).first();
-                }
-                if (emittedPerShard.containsKey(shardId) && 
!emittedPerShard.get(shardId).isEmpty()) {
-                    BigInteger smallestEmittedSequenceNumber = 
emittedPerShard.get(shardId).first();
-                    if (commitSequenceNumberBound == null
-                            || 
(commitSequenceNumberBound.compareTo(smallestEmittedSequenceNumber) == 1)) {
-                        commitSequenceNumberBound = 
smallestEmittedSequenceNumber;
-                    }
-                }
-                Iterator<BigInteger> ackedSequenceNumbers = 
ackedPerShard.get(shardId).iterator();
-                BigInteger ackedSequenceNumberToCommit = null;
-                while (ackedSequenceNumbers.hasNext()) {
-                    BigInteger ackedSequenceNumber = 
ackedSequenceNumbers.next();
-                    if (commitSequenceNumberBound == null
-                            || 
(commitSequenceNumberBound.compareTo(ackedSequenceNumber) == 1)) {
-                        ackedSequenceNumberToCommit = ackedSequenceNumber;
-                        ackedSequenceNumbers.remove();
-                    } else {
-                        break;
-                    }
-                }
-                if (ackedSequenceNumberToCommit != null) {
-                    Map<Object, Object> state = new HashMap<>();
-                    state.put("committedSequenceNumber", 
ackedSequenceNumberToCommit.toString());
-                    LOG.debug("Committing sequence number {} for shardId {}",
-                            ackedSequenceNumberToCommit.toString(),
-                            shardId);
-                    zkConnection.commitState(kinesisConfig.getStreamName(), 
shardId, state);
-                }
-            }
-        }
-        lastCommitTime = System.currentTimeMillis();
-    }
-
-    void activate() {
-        LOG.info("Activate called");
-        deactivated = false;
-        kinesisConnection.initialize();
-    }
-
-    void deactivate() {
-        LOG.info("Deactivate called");
-        deactivated = true;
-        commit();
-        kinesisConnection.shutdown();
-    }
-
-    void close() {
-        commit();
-        kinesisConnection.shutdown();
-        zkConnection.shutdown();
-    }
-
-    // fetch records from kinesis starting at sequence number for message 
passed as argument. Any other messages fetched
-    // and are in the failed queue will also
-    // be kept in memory to avoid going to kinesis again for retry
-    private void fetchFailedRecords(KinesisMessageId kinesisMessageId) {
-        // if shard iterator not present for this message, get it
-        if (!shardIteratorPerFailedMessage.containsKey(kinesisMessageId)) {
-            refreshShardIteratorForFailedRecord(kinesisMessageId);
-        }
-        String shardIterator = 
shardIteratorPerFailedMessage.get(kinesisMessageId);
-        LOG.debug("Fetching failed records for shard id :{} at sequence number 
{} using shardIterator {}",
-                kinesisMessageId.getShardId(),
-                kinesisMessageId.getSequenceNumber(),
-                shardIterator);
-        try {
-            GetRecordsResult getRecordsResult = 
kinesisConnection.fetchRecords(shardIterator);
-            if (getRecordsResult != null) {
-                List<Record> records = getRecordsResult.getRecords();
-                LOG.debug("Records size from fetchFailedRecords is {}", 
records.size());
-                // update the shard iterator to next one in case this fetch 
does not give the message.
-                shardIteratorPerFailedMessage.put(kinesisMessageId, 
getRecordsResult.getNextShardIterator());
-                if (records.size() == 0) {
-                    LOG.warn("No records returned from kinesis. Hence sleeping 
for 1 second");
-                    Thread.sleep(1000);
-                } else {
-                    // add all fetched records to the set of failed records if 
they are present in failed set
-                    for (Record record: records) {
-                        KinesisMessageId current = new 
KinesisMessageId(kinesisMessageId.getStreamName(),
-                                kinesisMessageId.getShardId(),
-                                record.getSequenceNumber());
-                        if 
(failedPerShard.get(kinesisMessageId.getShardId()).contains(new 
BigInteger(current.getSequenceNumber()))) {
-                            failedandFetchedRecords.put(current, record);
-                            shardIteratorPerFailedMessage.remove(current);
-                        }
-                    }
-                }
-            }
-        } catch (InterruptedException ie) {
-            LOG.warn("Thread interrupted while sleeping", ie);
-        } catch (ExpiredIteratorException ex) {
-            LOG.warn("shardIterator for failedRecord " + kinesisMessageId + " 
has expired. Refreshing shardIterator");
-            refreshShardIteratorForFailedRecord(kinesisMessageId);
-        } catch (ProvisionedThroughputExceededException pe) {
-            try {
-                LOG.warn("ProvisionedThroughputExceededException occured. 
Check your limits. Sleeping for 1 second.", pe);
-                Thread.sleep(1000);
-            } catch (InterruptedException e) {
-                LOG.warn("Thread interrupted exception", e);
-            }
-        }
-    }
-
-    private void fetchNewRecords() {
-        for (Map.Entry<String, LinkedList<Record>> entry : 
toEmitPerShard.entrySet()) {
-            String shardId = entry.getKey();
-            try {
-                String shardIterator = shardIteratorPerShard.get(shardId);
-                LOG.debug("Fetching new records for shard id :{} using 
shardIterator {} after sequence number {}",
-                        shardId,
-                        shardIterator,
-                        fetchedSequenceNumberPerShard.get(shardId));
-                GetRecordsResult getRecordsResult = 
kinesisConnection.fetchRecords(shardIterator);
-                if (getRecordsResult != null) {
-                    List<Record> records = getRecordsResult.getRecords();
-                    LOG.debug("Records size from fetchNewRecords is {}", 
records.size());
-                    // update the shard iterator to next one in case this 
fetch does not give the message.
-                    shardIteratorPerShard.put(shardId, 
getRecordsResult.getNextShardIterator());
-                    if (records.size() == 0) {
-                        LOG.warn("No records returned from kinesis. Hence 
sleeping for 1 second");
-                        Thread.sleep(1000);
-                    } else {
-                        entry.getValue().addAll(records);
-                        fetchedSequenceNumberPerShard.put(shardId, 
records.get(records.size() - 1).getSequenceNumber());
-                    }
-                }
-            } catch (InterruptedException ie) {
-                LOG.warn("Thread interrupted while sleeping", ie);
-            } catch (ExpiredIteratorException ex) {
-                LOG.warn("shardIterator for shardId " + shardId + " has 
expired. Refreshing shardIterator");
-                refreshShardIteratorForNewRecords(shardId);
-            } catch (ProvisionedThroughputExceededException pe) {
-                try {
-                    LOG.warn("ProvisionedThroughputExceededException occured. 
Check your limits. Sleeping for 1 second.", pe);
-                    Thread.sleep(1000);
-                } catch (InterruptedException e) {
-                    LOG.warn("Thread interrupted exception", e);
-                }
-            }
-        }
-    }
-
-    private void emitNewRecord(SpoutOutputCollector collector) {
-        for (Map.Entry<String, LinkedList<Record>> entry: 
toEmitPerShard.entrySet()) {
-            String shardId = entry.getKey();
-            LinkedList<Record> listOfRecords = entry.getValue();
-            Record record;
-            while ((record = listOfRecords.pollFirst()) != null) {
-                KinesisMessageId kinesisMessageId = new 
KinesisMessageId(kinesisConfig.getStreamName(),
-                        shardId,
-                        record.getSequenceNumber());
-                if (emitRecord(collector, record, kinesisMessageId)) {
-                    return;
-                }
-            }
-        }
-    }
-
-    private boolean emitFailedRecord(SpoutOutputCollector collector, 
KinesisMessageId kinesisMessageId) {
-        if (!failedandFetchedRecords.containsKey(kinesisMessageId)) {
-            return false;
-        }
-        return emitRecord(collector, 
failedandFetchedRecords.get(kinesisMessageId), kinesisMessageId);
-    }
-
-    private boolean emitRecord(SpoutOutputCollector collector, Record record, 
KinesisMessageId kinesisMessageId) {
-        boolean result = false;
-        List<Object> tuple = 
kinesisConfig.getRecordToTupleMapper().getTuple(record);
-        // if a record is returned put the sequence number in the 
emittedPerShard to tie back with ack or fail
-        if (tuple != null && tuple.size() > 0) {
-            collector.emit(tuple, kinesisMessageId);
-            if (!emittedPerShard.containsKey(kinesisMessageId.getShardId())) {
-                emittedPerShard.put(kinesisMessageId.getShardId(), new 
TreeSet<BigInteger>());
-            }
-            emittedPerShard.get(kinesisMessageId.getShardId()).add(new 
BigInteger(record.getSequenceNumber()));
-            result = true;
-        } else {
-            // ack to not process the record again on restart and move on to 
next message
-            LOG.warn("Record " + record + " did not return a tuple to emit. 
Hence acking it");
-            ack(kinesisMessageId);
-        }
-        return result;
-    }
-
-    private boolean shouldCommit() {
-        return (System.currentTimeMillis() - lastCommitTime >= 
kinesisConfig.getZkInfo().getCommitIntervalMs());
-    }
-
-    private void initializeFetchedSequenceNumbers() {
-        for (String shardId : toEmitPerShard.keySet()) {
-            Map<Object, Object> state = 
zkConnection.readState(kinesisConfig.getStreamName(), shardId);
-            // if state found for this shard in zk, then set the sequence 
number in fetchedSequenceNumber
-            if (state != null) {
-                Object committedSequenceNumber = 
state.get("committedSequenceNumber");
-                LOG.info("State read is committedSequenceNumber: " + 
committedSequenceNumber + " shardId:" + shardId);
-                if (committedSequenceNumber != null) {
-                    fetchedSequenceNumberPerShard.put(shardId, (String) 
committedSequenceNumber);
-                }
-            }
-        }
-    }
-
-    private void refreshShardIteratorsForNewRecords() {
-        for (String shardId: toEmitPerShard.keySet()) {
-            refreshShardIteratorForNewRecords(shardId);
-        }
-    }
-
-    private void refreshShardIteratorForNewRecords(String shardId) {
-        String shardIterator = null;
-        String lastFetchedSequenceNumber = 
fetchedSequenceNumberPerShard.get(shardId);
-        ShardIteratorType shardIteratorType = (lastFetchedSequenceNumber == 
null
-                ? kinesisConfig.getShardIteratorType()
-                : ShardIteratorType.AFTER_SEQUENCE_NUMBER);
-        // Set the shard iterator for last fetched sequence number to start 
from correct position in shard
-        shardIterator = 
kinesisConnection.getShardIterator(kinesisConfig.getStreamName(),
-                shardId,
-                shardIteratorType,
-                lastFetchedSequenceNumber,
-                kinesisConfig.getTimestamp());
-        if (shardIterator != null && !shardIterator.isEmpty()) {
-            LOG.warn("Refreshing shard iterator for new records for shardId " 
+ shardId
-                    + " with shardIterator " + shardIterator);
-            shardIteratorPerShard.put(shardId, shardIterator);
-        }
-    }
-
-    private void refreshShardIteratorForFailedRecord(KinesisMessageId 
kinesisMessageId) {
-        String shardIterator = null;
-        // Set the shard iterator for last fetched sequence number to start 
from correct position in shard
-        shardIterator = 
kinesisConnection.getShardIterator(kinesisConfig.getStreamName(),
-                kinesisMessageId.getShardId(),
-                ShardIteratorType.AT_SEQUENCE_NUMBER,
-                kinesisMessageId.getSequenceNumber(),
-                null);
-        if (shardIterator != null && !shardIterator.isEmpty()) {
-            LOG.warn("Refreshing shard iterator for failed records for message 
" + kinesisMessageId
-                    + " with shardIterator " + shardIterator);
-            shardIteratorPerFailedMessage.put(kinesisMessageId, shardIterator);
-        }
-    }
-
-    private Long getUncommittedRecordsCount() {
-        Long result = 0L;
-        for (Map.Entry<String, TreeSet<BigInteger>> emitted: 
emittedPerShard.entrySet()) {
-            result += emitted.getValue().size();
-        }
-        for (Map.Entry<String, TreeSet<BigInteger>> acked: 
ackedPerShard.entrySet()) {
-            result += acked.getValue().size();
-        }
-        for (Map.Entry<String, TreeSet<BigInteger>> failed: 
failedPerShard.entrySet()) {
-            result += failed.getValue().size();
-        }
-        LOG.debug("Returning uncommittedRecordsCount as {}", result);
-        return result;
-    }
-
-    private boolean shouldFetchNewRecords() {
-        // check to see if any shard has already fetched records waiting to be 
emitted, in which case dont fetch more
-        boolean fetchRecords = true;
-        for (Map.Entry<String, LinkedList<Record>> entry: 
toEmitPerShard.entrySet()) {
-            if (!entry.getValue().isEmpty()) {
-                fetchRecords = false;
-                break;
-            }
-        }
-        return fetchRecords;
-    }
-
-}
diff --git 
a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
 
b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
deleted file mode 100644
index cf9295a78..000000000
--- 
a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kinesis.spout;
-
-import java.util.Map;
-
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-
-public class KinesisSpout extends BaseRichSpout {
-
-    private final KinesisConfig kinesisConfig;
-    private transient KinesisRecordsManager kinesisRecordsManager;
-    private transient SpoutOutputCollector collector;
-
-    public KinesisSpout(KinesisConfig kinesisConfig) {
-        this.kinesisConfig = kinesisConfig;
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        
declarer.declare(kinesisConfig.getRecordToTupleMapper().getOutputFields());
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return super.getComponentConfiguration();
-    }
-
-    @Override
-    public void open(Map<String, Object> conf, TopologyContext context, 
SpoutOutputCollector collector) {
-        this.collector = collector;
-        kinesisRecordsManager = new KinesisRecordsManager(kinesisConfig);
-        kinesisRecordsManager.initialize(context.getThisTaskIndex(), 
context.getComponentTasks(context.getThisComponentId()).size());
-    }
-
-    @Override
-    public void close() {
-        kinesisRecordsManager.close();
-    }
-
-    @Override
-    public void activate() {
-        kinesisRecordsManager.activate();
-    }
-
-    @Override
-    public void deactivate() {
-        kinesisRecordsManager.deactivate();
-    }
-
-    @Override
-    public void ack(Object msgId) {
-        kinesisRecordsManager.ack((KinesisMessageId) msgId);
-    }
-
-    @Override
-    public void fail(Object msgId) {
-        kinesisRecordsManager.fail((KinesisMessageId) msgId);
-    }
-
-    @Override
-    public void nextTuple() {
-        kinesisRecordsManager.next(collector);
-    }
-}
-
-
diff --git 
a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java
 
b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java
deleted file mode 100644
index bee9e80be..000000000
--- 
a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kinesis.spout;
-
-import com.amazonaws.services.kinesis.model.Record;
-
-import java.util.List;
-
-import org.apache.storm.tuple.Fields;
-
-public interface RecordToTupleMapper {
-    /**
-     * Retrieve the names of fields.
-     * @return names of fields in the emitted tuple
-     */
-    Fields getOutputFields();
-
-    /**
-     * Retrieve the tuple.
-     * @param record kinesis record
-     * @return storm tuple to be emitted for this record, null if no tuple 
should be emitted
-     */
-    List<Object> getTuple(Record record);
-}
diff --git 
a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkConnection.java
 
b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkConnection.java
deleted file mode 100644
index ca982538d..000000000
--- 
a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkConnection.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kinesis.spout;
-
-import java.nio.charset.Charset;
-import java.util.Map;
-import net.minidev.json.JSONValue;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
-import org.apache.zookeeper.CreateMode;
-
-class ZkConnection {
-
-    private final ZkInfo zkInfo;
-    private CuratorFramework curatorFramework;
-
-    ZkConnection(ZkInfo zkInfo) {
-        this.zkInfo = zkInfo;
-    }
-
-    void initialize() {
-        curatorFramework = CuratorFrameworkFactory.newClient(zkInfo.getZkUrl(),
-                zkInfo.getSessionTimeoutMs(),
-                zkInfo.getConnectionTimeoutMs(),
-                new RetryNTimes(zkInfo.getRetryAttempts(), 
zkInfo.getRetryIntervalMs()));
-        curatorFramework.start();
-    }
-
-    void commitState(String stream, String shardId, Map<Object, Object> state) 
{
-        byte[] bytes = 
JSONValue.toJSONString(state).getBytes(Charset.forName("UTF-8"));
-        try {
-            String path = getZkPath(stream, shardId);
-            if (curatorFramework.checkExists().forPath(path) == null) {
-                curatorFramework.create()
-                        .creatingParentsIfNeeded()
-                        .withMode(CreateMode.PERSISTENT)
-                        .forPath(path, bytes);
-            } else {
-                curatorFramework.setData().forPath(path, bytes);
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    Map<Object, Object> readState(String stream, String shardId) {
-        try {
-            String path = getZkPath(stream, shardId);
-            Map<Object, Object> state = null;
-            byte[] b = null;
-            if (curatorFramework.checkExists().forPath(path) != null) {
-                b = curatorFramework.getData().forPath(path);
-            }
-            if (b != null) {
-                state = (Map<Object, Object>) JSONValue.parseWithException(new 
String(b, "UTF-8"));
-            }
-            return state;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    void shutdown() {
-        curatorFramework.close();
-    }
-
-    private String getZkPath(String stream, String shardId) {
-        String path = "";
-        if (!zkInfo.getZkNode().startsWith("/")) {
-            path += "/";
-        }
-        path += zkInfo.getZkNode();
-        if (!zkInfo.getZkNode().endsWith("/")) {
-            path += "/";
-        }
-        path += (stream + "/" + shardId);
-        return path;
-    }
-}
diff --git 
a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
 
b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
deleted file mode 100644
index 35e289065..000000000
--- 
a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kinesis.spout;
-
-import java.io.Serializable;
-
-public class ZkInfo implements Serializable {
-    // comma separated list of zk connect strings to connect to zookeeper e.g. 
localhost:2181
-    private final String zkUrl;
-    // zk node under which to commit the sequence number of messages. e.g. 
/committed_sequence_numbers
-    private final String zkNode;
-    // zk session timeout in milliseconds
-    private final Integer sessionTimeoutMs;
-    // zk connection timeout in milliseconds
-    private final Integer connectionTimeoutMs;
-    // interval at which to commit offsets to zk in milliseconds
-    private final Long commitIntervalMs;
-    // number of retry attempts for zk
-    private final Integer retryAttempts;
-    // time to sleep between retries in milliseconds
-    private final Integer retryIntervalMs;
-
-    public ZkInfo(String zkUrl,
-            String zkNode,
-            Integer sessionTimeoutMs,
-            Integer connectionTimeoutMs,
-            Long commitIntervalMs,
-            Integer retryAttempts,
-            Integer retryIntervalMs) {
-        this.zkUrl = zkUrl;
-        this.zkNode = zkNode;
-        this.sessionTimeoutMs = sessionTimeoutMs;
-        this.connectionTimeoutMs = connectionTimeoutMs;
-        this.commitIntervalMs = commitIntervalMs;
-        this.retryAttempts = retryAttempts;
-        this.retryIntervalMs = retryIntervalMs;
-        validate();
-    }
-
-    public String getZkUrl() {
-        return zkUrl;
-    }
-
-    public String getZkNode() {
-        return zkNode;
-    }
-
-    public Integer getSessionTimeoutMs() {
-        return sessionTimeoutMs;
-    }
-
-    public Integer getConnectionTimeoutMs() {
-        return connectionTimeoutMs;
-    }
-
-    public Long getCommitIntervalMs() {
-        return commitIntervalMs;
-    }
-
-    public Integer getRetryAttempts() {
-        return retryAttempts;
-    }
-
-    public Integer getRetryIntervalMs() {
-        return retryIntervalMs;
-    }
-
-    private void validate() {
-
-        if (zkUrl == null || zkUrl.length() < 1) {
-            throw new IllegalArgumentException("zkUrl must be specified to 
connect to zookeeper");
-        }
-        if (zkNode == null || zkNode.length() < 1) {
-            throw new IllegalArgumentException("zkNode must be specified");
-        }
-        checkPositive(sessionTimeoutMs, "sessionTimeoutMs");
-        checkPositive(connectionTimeoutMs, "connectionTimeoutMs");
-        checkPositive(commitIntervalMs, "commitIntervalMs");
-        checkPositive(retryAttempts, "retryAttempts");
-        checkPositive(retryIntervalMs, "retryIntervalMs");
-    }
-
-    private void checkPositive(Integer argument, String name) {
-        if (argument == null && argument <= 0) {
-            throw new IllegalArgumentException(name + " must be positive");
-        }
-    }
-
-    private void checkPositive(Long argument, String name) {
-        if (argument == null && argument <= 0) {
-            throw new IllegalArgumentException(name + " must be positive");
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "ZkInfo{"
-                + "zkUrl='" + zkUrl + '\''
-                + ", zkNode='" + zkNode + '\''
-                + ", sessionTimeoutMs=" + sessionTimeoutMs
-                + ", connectionTimeoutMs=" + connectionTimeoutMs
-                + ", commitIntervalMs=" + commitIntervalMs
-                + ", retryAttempts=" + retryAttempts
-                + ", retryIntervalMs=" + retryIntervalMs
-                + '}';
-    }
-
-}
diff --git 
a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java
 
b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java
deleted file mode 100644
index 7701efdaf..000000000
--- 
a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.kinesis.spout.test;
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public class KinesisBoltTest extends BaseRichBolt {
-    protected static final Logger LOG = 
LoggerFactory.getLogger(KinesisBoltTest.class);
-    private transient OutputCollector collector;
-
-    @Override
-    public void prepare(Map<String, Object> topoConf, TopologyContext context, 
OutputCollector collector) {
-        this.collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        LOG.info("input = [" + input + "]");
-        collector.ack(input);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    }
-}
diff --git 
a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisSpoutTopology.java
 
b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisSpoutTopology.java
deleted file mode 100644
index 7028e3ec0..000000000
--- 
a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisSpoutTopology.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.kinesis.spout.test;
-
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.regions.Regions;
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import org.apache.storm.Config;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.AlreadyAliveException;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.InvalidTopologyException;
-import org.apache.storm.kinesis.spout.CredentialsProviderChain;
-import org.apache.storm.kinesis.spout.ExponentialBackoffRetrier;
-import org.apache.storm.kinesis.spout.KinesisConfig;
-import org.apache.storm.kinesis.spout.KinesisConnectionInfo;
-import org.apache.storm.kinesis.spout.KinesisSpout;
-import org.apache.storm.kinesis.spout.RecordToTupleMapper;
-import org.apache.storm.kinesis.spout.ZkInfo;
-import org.apache.storm.topology.TopologyBuilder;
-
-import java.util.Date;
-
-public class KinesisSpoutTopology {
-    public static void main (String args[]) throws InvalidTopologyException, 
AuthorizationException, AlreadyAliveException {
-        String topologyName = args[0];
-        RecordToTupleMapper recordToTupleMapper = new 
TestRecordToTupleMapper();
-        KinesisConnectionInfo kinesisConnectionInfo = new 
KinesisConnectionInfo(new CredentialsProviderChain(), new 
ClientConfiguration(), Regions.US_WEST_2,
-                1000);
-        ZkInfo zkInfo = new ZkInfo("localhost:2181", "/kinesisOffsets", 20000, 
15000, 10000L, 3, 2000);
-        KinesisConfig kinesisConfig = new KinesisConfig(args[1], 
ShardIteratorType.TRIM_HORIZON,
-                recordToTupleMapper, new Date(), new 
ExponentialBackoffRetrier(), zkInfo, kinesisConnectionInfo, 10000L);
-        KinesisSpout kinesisSpout = new KinesisSpout(kinesisConfig);
-        TopologyBuilder topologyBuilder = new TopologyBuilder();
-        topologyBuilder.setSpout("spout", kinesisSpout, 3);
-        topologyBuilder.setBolt("bolt", new KinesisBoltTest(), 
1).shuffleGrouping("spout");
-        Config topologyConfig = new Config();
-        topologyConfig.setDebug(true);
-        topologyConfig.setNumWorkers(3);
-        StormSubmitter.submitTopology(topologyName, topologyConfig, 
topologyBuilder.createTopology());
-    }
-}
diff --git 
a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/TestRecordToTupleMapper.java
 
b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/TestRecordToTupleMapper.java
deleted file mode 100644
index 2879c3e7c..000000000
--- 
a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/TestRecordToTupleMapper.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.kinesis.spout.test;
-
-import com.amazonaws.services.kinesis.model.Record;
-import org.apache.storm.kinesis.spout.RecordToTupleMapper;
-import org.apache.storm.tuple.Fields;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetDecoder;
-import java.util.ArrayList;
-import java.util.List;
-
-public class TestRecordToTupleMapper implements RecordToTupleMapper, 
Serializable {
-    private static final Logger LOG = 
LoggerFactory.getLogger(TestRecordToTupleMapper.class);
-    @Override
-    public Fields getOutputFields() {
-        return new Fields("partitionKey", "sequenceNumber", "data");
-    }
-
-    @Override
-    public List<Object> getTuple(Record record) {
-        CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
-        List<Object> tuple = new ArrayList<>();
-        tuple.add(record.getPartitionKey());
-        tuple.add(record.getSequenceNumber());
-        try {
-            String data = decoder.decode(record.getData()).toString();
-            LOG.info("data is " + data);
-            tuple.add(data);
-        } catch (CharacterCodingException e) {
-            e.printStackTrace();
-            LOG.warn("Exception occured. Emitting tuple with empty string 
data", e);
-            tuple.add("");
-        }
-        LOG.info("Tuple from record is " + tuple);
-        return tuple;
-    }
-}
diff --git a/pom.xml b/pom.xml
index 160afc6ac..b460324d4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -501,7 +501,6 @@
                 <module>external/storm-kafka-migration</module>
                 <module>external/storm-opentsdb</module>
                 <module>external/storm-kafka-monitor</module>
-                <module>external/storm-kinesis</module>
                 <module>external/storm-jms</module>
                 <module>external/storm-pmml</module>
                 <module>external/storm-rocketmq</module>


Reply via email to