http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
deleted file mode 100644
index a06fdca..0000000
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ /dev/null
@@ -1,679 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.internals;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
-import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
-import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
-import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
-import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
-import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
-import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
-import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
-import org.apache.flink.util.InstantiationUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A KinesisDataFetcher is responsible for fetching data from multiple Kinesis 
shards. Each parallel subtask instantiates
- * and runs a single fetcher throughout the subtask's lifetime. The fetcher 
accomplishes the following:
- * <ul>
- *     <li>1. continuously poll Kinesis to discover shards that the subtask 
should subscribe to. The subscribed subset
- *                       of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
- *                       subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
- *                       to the same subset of shards even after 
restoring)</li>
- *     <li>2. decide where in each discovered shard should the fetcher start 
subscribing to</li>
- *     <li>3. subscribe to shards by creating a single thread for each 
shard</li>
- * </ul>
- *
- * <p>The fetcher manages two states: 1) last seen shard ids of each 
subscribed stream (used for continuous shard discovery),
- * and 2) last processed sequence numbers of each subscribed shard. Since 
operations on the second state will be performed
- * by multiple threads, these operations should only be done using the handler 
methods provided in this class.
- */
-public class KinesisDataFetcher<T> {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
-
-       // 
------------------------------------------------------------------------
-       //  Consumer-wide settings
-       // 
------------------------------------------------------------------------
-
-       /** Configuration properties for the Flink Kinesis Consumer */
-       private final Properties configProps;
-
-       /** The list of Kinesis streams that the consumer is subscribing to */
-       private final List<String> streams;
-
-       /**
-        * The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
-        * Note that since this might not be thread-safe, {@link 
ShardConsumer}s using this must
-        * clone a copy using {@link 
KinesisDataFetcher#getClonedDeserializationSchema()}.
-        */
-       private final KinesisDeserializationSchema<T> deserializationSchema;
-
-       // 
------------------------------------------------------------------------
-       //  Subtask-specific settings
-       // 
------------------------------------------------------------------------
-
-       /** Runtime context of the subtask that this fetcher was created in */
-       private final RuntimeContext runtimeContext;
-
-       private final int totalNumberOfConsumerSubtasks;
-
-       private final int indexOfThisConsumerSubtask;
-
-       /**
-        * This flag should be set by {@link FlinkKinesisConsumer} using
-        * {@link KinesisDataFetcher#setIsRestoringFromFailure(boolean)}
-        */
-       private boolean isRestoredFromFailure;
-
-       // 
------------------------------------------------------------------------
-       //  Executor services to run created threads
-       // 
------------------------------------------------------------------------
-
-       /** Executor service to run {@link ShardConsumer}s to consume Kinesis 
shards */
-       private final ExecutorService shardConsumersExecutor;
-
-       // 
------------------------------------------------------------------------
-       //  Managed state, accessed and updated across multiple threads
-       // 
------------------------------------------------------------------------
-
-       /** The last discovered shard ids of each subscribed stream, updated as 
the fetcher discovers new shards in.
-        * Note: this state will be updated if new shards are found when {@link 
KinesisDataFetcher#discoverNewShardsToSubscribe()} is called.
-        */
-       private final Map<String, String> 
subscribedStreamsToLastDiscoveredShardIds;
-
-       /**
-        * The shards, along with their last processed sequence numbers, that 
this fetcher is subscribed to. The fetcher
-        * will add new subscribed shard states to this list as it discovers 
new shards. {@link ShardConsumer} threads update
-        * the last processed sequence number of subscribed shards as they 
fetch and process records.
-        *
-        * <p>Note that since multiple {@link ShardConsumer} threads will be 
performing operations on this list, all operations
-        * must be wrapped in synchronized blocks on the {@link 
KinesisDataFetcher#checkpointLock} lock. For this purpose,
-        * all threads must use the following thread-safe methods this class 
provides to operate on this list:
-        * <ul>
-        *     <li>{@link 
KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}</li>
-        *     <li>{@link KinesisDataFetcher#updateState(int, 
SequenceNumber)}</li>
-        *     <li>{@link KinesisDataFetcher#emitRecordAndUpdateState(T, long, 
int, SequenceNumber)}</li>
-        * </ul>
-        */
-       private final List<KinesisStreamShardState> subscribedShardsState;
-
-       private final SourceFunction.SourceContext<T> sourceContext;
-
-       /** Checkpoint lock, also used to synchronize operations on 
subscribedShardsState */
-       private final Object checkpointLock;
-
-       /** Reference to the first error thrown by any of the {@link 
ShardConsumer} threads */
-       private final AtomicReference<Throwable> error;
-
-       /** The Kinesis proxy that the fetcher will be using to discover new 
shards */
-       private final KinesisProxyInterface kinesis;
-
-       /** Thread that executed runFetcher() */
-       private Thread mainThread;
-
-       /**
-        * The current number of shards that are actively read by this fetcher.
-        *
-        * This value is updated in {@link 
KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)},
-        * and {@link KinesisDataFetcher#updateState(int, SequenceNumber)}.
-        */
-       private final AtomicInteger numberOfActiveShards = new AtomicInteger(0);
-
-       private volatile boolean running = true;
-
-       /**
-        * Creates a Kinesis Data Fetcher.
-        *
-        * @param streams the streams to subscribe to
-        * @param sourceContext context of the source function
-        * @param runtimeContext this subtask's runtime context
-        * @param configProps the consumer configuration properties
-        * @param deserializationSchema deserialization schema
-        */
-       public KinesisDataFetcher(List<String> streams,
-                                                       
SourceFunction.SourceContext<T> sourceContext,
-                                                       RuntimeContext 
runtimeContext,
-                                                       Properties configProps,
-                                                       
KinesisDeserializationSchema<T> deserializationSchema) {
-               this(streams,
-                       sourceContext,
-                       sourceContext.getCheckpointLock(),
-                       runtimeContext,
-                       configProps,
-                       deserializationSchema,
-                       new AtomicReference<Throwable>(),
-                       new LinkedList<KinesisStreamShardState>(),
-                       
createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
-                       KinesisProxy.create(configProps));
-       }
-
-       /** This constructor is exposed for testing purposes */
-       protected KinesisDataFetcher(List<String> streams,
-                                                               
SourceFunction.SourceContext<T> sourceContext,
-                                                               Object 
checkpointLock,
-                                                               RuntimeContext 
runtimeContext,
-                                                               Properties 
configProps,
-                                                               
KinesisDeserializationSchema<T> deserializationSchema,
-                                                               
AtomicReference<Throwable> error,
-                                                               
LinkedList<KinesisStreamShardState> subscribedShardsState,
-                                                               HashMap<String, 
String> subscribedStreamsToLastDiscoveredShardIds,
-                                                               
KinesisProxyInterface kinesis) {
-               this.streams = checkNotNull(streams);
-               this.configProps = checkNotNull(configProps);
-               this.sourceContext = checkNotNull(sourceContext);
-               this.checkpointLock = checkNotNull(checkpointLock);
-               this.runtimeContext = checkNotNull(runtimeContext);
-               this.totalNumberOfConsumerSubtasks = 
runtimeContext.getNumberOfParallelSubtasks();
-               this.indexOfThisConsumerSubtask = 
runtimeContext.getIndexOfThisSubtask();
-               this.deserializationSchema = 
checkNotNull(deserializationSchema);
-               this.kinesis = checkNotNull(kinesis);
-
-               this.error = checkNotNull(error);
-               this.subscribedShardsState = 
checkNotNull(subscribedShardsState);
-               this.subscribedStreamsToLastDiscoveredShardIds = 
checkNotNull(subscribedStreamsToLastDiscoveredShardIds);
-
-               this.shardConsumersExecutor =
-                       
createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());
-       }
-
-       /**
-        * Starts the fetcher. After starting the fetcher, it can only
-        * be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}.
-        *
-        * @throws Exception the first error or exception thrown by the fetcher 
or any of the threads created by the fetcher.
-        */
-       public void runFetcher() throws Exception {
-
-               // check that we are running before proceeding
-               if (!running) {
-                       return;
-               }
-
-               this.mainThread = Thread.currentThread();
-
-               // 
------------------------------------------------------------------------
-               //  Procedures before starting the infinite while loop:
-               // 
------------------------------------------------------------------------
-
-               //  1. query for any new shards that may have been created 
while the Kinesis consumer was not running,
-               //     and register them to the subscribedShardState list.
-               if (LOG.isDebugEnabled()) {
-                       String logFormat = (!isRestoredFromFailure)
-                               ? "Subtask {} is trying to discover initial 
shards ..."
-                               : "Subtask {} is trying to discover any new 
shards that were created while the consumer wasn't " +
-                               "running due to failure ...";
-
-                       LOG.debug(logFormat, indexOfThisConsumerSubtask);
-               }
-               List<KinesisStreamShard> newShardsCreatedWhileNotRunning = 
discoverNewShardsToSubscribe();
-               for (KinesisStreamShard shard : 
newShardsCreatedWhileNotRunning) {
-                       // the starting state for new shards created while the 
consumer wasn't running depends on whether or not
-                       // we are starting fresh (not restoring from a 
checkpoint); when we are starting fresh, this simply means
-                       // all existing shards of streams we are subscribing to 
are new shards; when we are restoring from checkpoint,
-                       // any new shards due to Kinesis resharding from the 
time of the checkpoint will be considered new shards.
-                       InitialPosition initialPosition = 
InitialPosition.valueOf(configProps.getProperty(
-                               
ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION));
-
-                       SentinelSequenceNumber startingStateForNewShard = 
(isRestoredFromFailure)
-                               ? 
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM
-                               : initialPosition.toSentinelSequenceNumber();
-
-                       if (LOG.isInfoEnabled()) {
-                               String logFormat = (!isRestoredFromFailure)
-                                       ? "Subtask {} will be seeded with 
initial shard {}, starting state set as sequence number {}"
-                                       : "Subtask {} will be seeded with new 
shard {} that was created while the consumer wasn't " +
-                                       "running due to failure, starting state 
set as sequence number {}";
-
-                               LOG.info(logFormat, indexOfThisConsumerSubtask, 
shard.toString(), startingStateForNewShard.get());
-                       }
-                       registerNewSubscribedShardState(new 
KinesisStreamShardState(shard, startingStateForNewShard.get()));
-               }
-
-               //  2. check that there is at least one shard in the subscribed 
streams to consume from (can be done by
-               //     checking if at least one value in 
subscribedStreamsToLastDiscoveredShardIds is not null)
-               boolean hasShards = false;
-               StringBuilder streamsWithNoShardsFound = new StringBuilder();
-               for (Map.Entry<String, String> streamToLastDiscoveredShardEntry 
: subscribedStreamsToLastDiscoveredShardIds.entrySet()) {
-                       if (streamToLastDiscoveredShardEntry.getValue() != 
null) {
-                               hasShards = true;
-                       } else {
-                               
streamsWithNoShardsFound.append(streamToLastDiscoveredShardEntry.getKey()).append(",
 ");
-                       }
-               }
-
-               if (streamsWithNoShardsFound.length() != 0 && 
LOG.isWarnEnabled()) {
-                       LOG.warn("Subtask {} has failed to find any shards for 
the following subscribed streams: {}",
-                               indexOfThisConsumerSubtask, 
streamsWithNoShardsFound.toString());
-               }
-
-               if (!hasShards) {
-                       throw new RuntimeException("No shards can be found for 
all subscribed streams: " + streams);
-               }
-
-               //  3. start consuming any shard state we already have in the 
subscribedShardState up to this point; the
-               //     subscribedShardState may already be seeded with values 
due to step 1., or explicitly added by the
-               //     consumer using a restored state checkpoint
-               for (int seededStateIndex = 0; seededStateIndex < 
subscribedShardsState.size(); seededStateIndex++) {
-                       KinesisStreamShardState seededShardState = 
subscribedShardsState.get(seededStateIndex);
-
-                       // only start a consuming thread if the seeded 
subscribed shard has not been completely read already
-                       if 
(!seededShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()))
 {
-
-                               if (LOG.isInfoEnabled()) {
-                                       LOG.info("Subtask {} will start 
consuming seeded shard {} from sequence number {} with ShardConsumer {}",
-                                               indexOfThisConsumerSubtask, 
seededShardState.getKinesisStreamShard().toString(),
-                                               
seededShardState.getLastProcessedSequenceNum(), seededStateIndex);
-                                       }
-
-                               shardConsumersExecutor.submit(
-                                       new ShardConsumer<>(
-                                               this,
-                                               seededStateIndex,
-                                               
subscribedShardsState.get(seededStateIndex).getKinesisStreamShard(),
-                                               
subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum()));
-                       }
-               }
-
-               // 
------------------------------------------------------------------------
-
-               // finally, start the infinite shard discovery and consumer 
launching loop;
-               // we will escape from this loop only when shutdownFetcher() or 
stopWithError() is called
-
-               final long discoveryIntervalMillis = Long.valueOf(
-                       configProps.getProperty(
-                               
ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
-                               
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)));
-
-               // FLINK-4341:
-               // For downstream operators that work on time (ex. window 
operators), we are required to emit a max value watermark
-               // for subtasks that won't continue to have shards to read from 
unless resharding happens in the future, otherwise
-               // the downstream watermarks would not advance, leading to 
unbounded accumulating state.
-               //
-               // The side-effect of this limitation is that on resharding, we 
must fail hard if the newly discovered shard
-               // is to be subscribed by a subtask that has previously emitted 
a max value watermark, otherwise the watermarks
-               // will be messed up.
-               //
-               // There are 2 cases were we need to either emit a max value 
watermark, or deliberately fail hard:
-               //  (a) if this subtask has no more shards to read from unless 
resharding happens in the future, we emit a max
-               //      value watermark. This case is encountered when 1) all 
previously read shards by this subtask were closed
-               //      due to resharding, 2) when this subtask was initially 
only subscribed to closed shards while the consumer
-               //      was told to start from TRIM_HORIZON, or 3) there was 
initially no shards for this subtask to read on startup.
-               //  (b) this subtask has discovered new shards to read from due 
to a reshard; if this subtask has already emitted
-               //      a max value watermark, we must deliberately fail hard 
to avoid messing up the watermarks. The new shards
-               //      will be subscribed by this subtask after restore as 
initial shards on startup.
-               //
-               // TODO: This is a temporary workaround until a min-watermark 
information service is available in the JobManager
-               // Please see FLINK-4341 for more detail
-
-               boolean emittedMaxValueWatermark = false;
-
-               if (this.numberOfActiveShards.get() == 0) {
-                       // FLINK-4341 workaround case (a) - please see the 
above for details on this case
-                       LOG.info("Subtask {} has no initial shards to read on 
startup; emitting max value watermark ...",
-                               indexOfThisConsumerSubtask);
-                       sourceContext.emitWatermark(new 
Watermark(Long.MAX_VALUE));
-                       emittedMaxValueWatermark = true;
-               }
-
-               while (running) {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Subtask {} is trying to discover new 
shards that were created due to resharding ...",
-                                       indexOfThisConsumerSubtask);
-                       }
-                       List<KinesisStreamShard> newShardsDueToResharding = 
discoverNewShardsToSubscribe();
-
-                       // -- NOTE: Potential race condition between 
newShardsDueToResharding and numberOfActiveShards --
-                       // Since numberOfActiveShards is updated by parallel 
shard consuming threads in updateState(), there exists
-                       // a race condition with the currently queried 
newShardsDueToResharding. Therefore, numberOfActiveShards
-                       // may not correctly reflect the discover result in the 
below case determination. This may lead to incorrect
-                       // case determination on the current discovery attempt, 
but can still be correctly handled on future attempts.
-                       //
-                       // Although this can be resolved by wrapping the 
current shard discovery attempt with the below
-                       // case determination within a synchronized block on 
the checkpoint lock for atomicity, there will be
-                       // considerable throughput performance regression as 
shard discovery is a remote call to AWS. Therefore,
-                       // since the case determination is a temporary 
workaround for FLINK-4341, the race condition is tolerable as
-                       // we can still eventually handle max value watermark 
emitting / deliberately failing on successive
-                       // discovery attempts.
-
-                       if (newShardsDueToResharding.size() == 0 && 
this.numberOfActiveShards.get() == 0 && !emittedMaxValueWatermark) {
-                               // FLINK-4341 workaround case (a) - please see 
the above for details on this case
-                               LOG.info("Subtask {} has completed reading all 
shards; emitting max value watermark ...",
-                                       indexOfThisConsumerSubtask);
-                               sourceContext.emitWatermark(new 
Watermark(Long.MAX_VALUE));
-                               emittedMaxValueWatermark = true;
-                       } else if (newShardsDueToResharding.size() > 0 && 
emittedMaxValueWatermark) {
-                               // FLINK-4341 workaround case (b) - please see 
the above for details on this case
-                               //
-                               // Note that in the case where on resharding 
this subtask ceased to read all of it's previous shards
-                               // but new shards is also to be subscribed by 
this subtask immediately after, emittedMaxValueWatermark
-                               // will be false; this allows the fetcher to 
continue reading the new shards without failing on such cases.
-                               // However, due to the race condition mentioned 
above, we might still fall into case (a) first, and
-                               // then (b) on the next discovery attempt. 
Although the failure is ideally unnecessary, max value
-                               // watermark emitting still remains to be 
correct.
-
-                               LOG.warn("Subtask {} has discovered {} new 
shards to subscribe, but is failing hard to avoid messing" +
-                                               " up watermarks; the new shards 
will be subscribed by this subtask after restore ...",
-                                       indexOfThisConsumerSubtask, 
newShardsDueToResharding.size());
-                               throw new RuntimeException("Deliberate failure 
to avoid messing up watermarks");
-                       }
-
-                       for (KinesisStreamShard shard : 
newShardsDueToResharding) {
-                               // since there may be delay in discovering a 
new shard, all new shards due to
-                               // resharding should be read starting from the 
earliest record possible
-                               KinesisStreamShardState newShardState =
-                                       new KinesisStreamShardState(shard, 
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
-                               int newStateIndex = 
registerNewSubscribedShardState(newShardState);
-
-                               if (LOG.isInfoEnabled()) {
-                                       LOG.info("Subtask {} has discovered a 
new shard {} due to resharding, and will start consuming " +
-                                                       "the shard from 
sequence number {} with ShardConsumer {}",
-                                               indexOfThisConsumerSubtask, 
newShardState.getKinesisStreamShard().toString(),
-                                               
newShardState.getLastProcessedSequenceNum(), newStateIndex);
-                               }
-
-                               shardConsumersExecutor.submit(
-                                       new ShardConsumer<>(
-                                               this,
-                                               newStateIndex,
-                                               
newShardState.getKinesisStreamShard(),
-                                               
newShardState.getLastProcessedSequenceNum()));
-                       }
-
-                       // we also check if we are running here so that we 
won't start the discovery sleep
-                       // interval if the running flag was set to false during 
the middle of the while loop
-                       if (running && discoveryIntervalMillis != 0) {
-                               try {
-                                       Thread.sleep(discoveryIntervalMillis);
-                               } catch (InterruptedException iex) {
-                                       // the sleep may be interrupted by 
shutdownFetcher()
-                               }
-                       }
-               }
-
-               // make sure all resources have been terminated before leaving
-               awaitTermination();
-
-               // any error thrown in the shard consumer threads will be 
thrown to the main thread
-               Throwable throwable = this.error.get();
-               if (throwable != null) {
-                       if (throwable instanceof Exception) {
-                               throw (Exception) throwable;
-                       } else if (throwable instanceof Error) {
-                               throw (Error) throwable;
-                       } else {
-                               throw new Exception(throwable);
-                       }
-               }
-       }
-
-       /**
-        * Creates a snapshot of the current last processed sequence numbers of 
each subscribed shard.
-        *
-        * @return state snapshot
-        */
-       public HashMap<KinesisStreamShard, SequenceNumber> snapshotState() {
-               // this method assumes that the checkpoint lock is held
-               assert Thread.holdsLock(checkpointLock);
-
-               HashMap<KinesisStreamShard, SequenceNumber> stateSnapshot = new 
HashMap<>();
-               for (KinesisStreamShardState shardWithState : 
subscribedShardsState) {
-                       
stateSnapshot.put(shardWithState.getKinesisStreamShard(), 
shardWithState.getLastProcessedSequenceNum());
-               }
-               return stateSnapshot;
-       }
-
-       /**
-        * Starts shutting down the fetcher. Must be called to allow {@link 
KinesisDataFetcher#runFetcher()} to complete.
-        * Once called, the shutdown procedure will be executed and all shard 
consuming threads will be interrupted.
-        */
-       public void shutdownFetcher() {
-               running = false;
-               mainThread.interrupt(); // the main thread may be sleeping for 
the discovery interval
-
-               if (LOG.isInfoEnabled()) {
-                       LOG.info("Shutting down the shard consumer threads of 
subtask {} ...", indexOfThisConsumerSubtask);
-               }
-               shardConsumersExecutor.shutdownNow();
-       }
-
-       /** After calling {@link KinesisDataFetcher#shutdownFetcher()}, this 
can be called to await the fetcher shutdown */
-       public void awaitTermination() throws InterruptedException {
-               while(!shardConsumersExecutor.isTerminated()) {
-                       Thread.sleep(50);
-               }
-       }
-
-       /** Called by created threads to pass on errors. Only the first thrown 
error is set.
-        * Once set, the shutdown process will be executed and all shard 
consuming threads will be interrupted. */
-       protected void stopWithError(Throwable throwable) {
-               if (this.error.compareAndSet(null, throwable)) {
-                       shutdownFetcher();
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Functions that update the subscribedStreamToLastDiscoveredShardIds 
state
-       // 
------------------------------------------------------------------------
-
-       /** Updates the last discovered shard of a subscribed stream; only 
updates if the update is valid */
-       public void advanceLastDiscoveredShardOfStream(String stream, String 
shardId) {
-               String lastSeenShardIdOfStream = 
this.subscribedStreamsToLastDiscoveredShardIds.get(stream);
-
-               // the update is valid only if the given shard id is greater
-               // than the previous last seen shard id of the stream
-               if (lastSeenShardIdOfStream == null) {
-                       // if not previously set, simply put as the last seen 
shard id
-                       
this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId);
-               } else if (KinesisStreamShard.compareShardIds(shardId, 
lastSeenShardIdOfStream) > 0) {
-                       
this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId);
-               }
-       }
-
-       /**
-        * A utility function that does the following:
-        *
-        * 1. Find new shards for each stream that we haven't seen before
-        * 2. For each new shard, determine whether this consumer subtask 
should subscribe to them;
-        *        if yes, it is added to the returned list of shards
-        * 3. Update the subscribedStreamsToLastDiscoveredShardIds state so 
that we won't get shards
-        *    that we have already seen before the next time this function is 
called
-        */
-       private List<KinesisStreamShard> discoverNewShardsToSubscribe() throws 
InterruptedException {
-
-               List<KinesisStreamShard> newShardsToSubscribe = new 
LinkedList<>();
-
-               GetShardListResult shardListResult = 
kinesis.getShardList(subscribedStreamsToLastDiscoveredShardIds);
-               if (shardListResult.hasRetrievedShards()) {
-                       Set<String> streamsWithNewShards = 
shardListResult.getStreamsWithRetrievedShards();
-
-                       for (String stream : streamsWithNewShards) {
-                               List<KinesisStreamShard> newShardsOfStream = 
shardListResult.getRetrievedShardListOfStream(stream);
-                               for (KinesisStreamShard newShard : 
newShardsOfStream) {
-                                       if 
(isThisSubtaskShouldSubscribeTo(newShard, totalNumberOfConsumerSubtasks, 
indexOfThisConsumerSubtask)) {
-                                               
newShardsToSubscribe.add(newShard);
-                                       }
-                               }
-
-                               advanceLastDiscoveredShardOfStream(
-                                       stream, 
shardListResult.getLastSeenShardOfStream(stream).getShard().getShardId());
-                       }
-               }
-
-               return newShardsToSubscribe;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Functions to get / set information about the consumer
-       // 
------------------------------------------------------------------------
-
-       public void setIsRestoringFromFailure(boolean bool) {
-               this.isRestoredFromFailure = bool;
-       }
-
-       protected Properties getConsumerConfiguration() {
-               return configProps;
-       }
-
-       protected KinesisDeserializationSchema<T> 
getClonedDeserializationSchema() {
-               try {
-                       return InstantiationUtil.clone(deserializationSchema, 
runtimeContext.getUserCodeClassLoader());
-               } catch (IOException | ClassNotFoundException ex) {
-                       // this really shouldn't happen; simply wrap it around 
a runtime exception
-                       throw new RuntimeException(ex);
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Thread-safe operations for record emitting and shard state updating
-       //  that assure atomicity with respect to the checkpoint lock
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Atomic operation to collect a record and update state to the 
sequence number of the record.
-        * This method is called by {@link ShardConsumer}s.
-        *
-        * @param record the record to collect
-        * @param recordTimestamp timestamp to attach to the collected record
-        * @param shardStateIndex index of the shard to update in 
subscribedShardsState;
-        *                        this index should be the returned value from
-        *                        {@link 
KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, 
called
-        *                        when the shard state was registered.
-        * @param lastSequenceNumber the last sequence number value to update
-        */
-       protected void emitRecordAndUpdateState(T record, long recordTimestamp, 
int shardStateIndex, SequenceNumber lastSequenceNumber) {
-               synchronized (checkpointLock) {
-                       sourceContext.collectWithTimestamp(record, 
recordTimestamp);
-                       updateState(shardStateIndex, lastSequenceNumber);
-               }
-       }
-
-       /**
-        * Update the shard to last processed sequence number state.
-        * This method is called by {@link ShardConsumer}s.
-        *
-        * @param shardStateIndex index of the shard to update in 
subscribedShardsState;
-        *                        this index should be the returned value from
-        *                        {@link 
KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, 
called
-        *                        when the shard state was registered.
-        * @param lastSequenceNumber the last sequence number value to update
-        */
-       protected void updateState(int shardStateIndex, SequenceNumber 
lastSequenceNumber) {
-               synchronized (checkpointLock) {
-                       
subscribedShardsState.get(shardStateIndex).setLastProcessedSequenceNum(lastSequenceNumber);
-
-                       // if a shard's state is updated to be 
SENTINEL_SHARD_ENDING_SEQUENCE_NUM by its consumer thread,
-                       // we've finished reading the shard and should 
determine it to be non-active
-                       if 
(lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()))
 {
-                               this.numberOfActiveShards.decrementAndGet();
-                               LOG.info("Subtask {} has reached the end of 
subscribed shard: {}",
-                                       indexOfThisConsumerSubtask, 
subscribedShardsState.get(shardStateIndex).getKinesisStreamShard());
-                       }
-               }
-       }
-
-       /**
-        * Register a new subscribed shard state.
-        *
-        * @param newSubscribedShardState the new shard state that this fetcher 
is to be subscribed to
-        */
-       public int registerNewSubscribedShardState(KinesisStreamShardState 
newSubscribedShardState) {
-               synchronized (checkpointLock) {
-                       subscribedShardsState.add(newSubscribedShardState);
-
-                       // If a registered shard has initial state that is not 
SENTINEL_SHARD_ENDING_SEQUENCE_NUM (will be the case
-                       // if the consumer had already finished reading a shard 
before we failed and restored), we determine that
-                       // this subtask has a new active shard
-                       if 
(!newSubscribedShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()))
 {
-                               this.numberOfActiveShards.incrementAndGet();
-                       }
-
-                       return subscribedShardsState.size()-1;
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Miscellaneous utility functions
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Utility function to determine whether a shard should be subscribed 
by this consumer subtask.
-        *
-        * @param shard the shard to determine
-        * @param totalNumberOfConsumerSubtasks total number of consumer 
subtasks
-        * @param indexOfThisConsumerSubtask index of this consumer subtask
-        */
-       private static boolean 
isThisSubtaskShouldSubscribeTo(KinesisStreamShard shard,
-                                                                               
                                int totalNumberOfConsumerSubtasks,
-                                                                               
                                int indexOfThisConsumerSubtask) {
-               return (Math.abs(shard.hashCode() % 
totalNumberOfConsumerSubtasks)) == indexOfThisConsumerSubtask;
-       }
-
-       private static ExecutorService createShardConsumersThreadPool(final 
String subtaskName) {
-               return Executors.newCachedThreadPool(new ThreadFactory() {
-                       @Override
-                       public Thread newThread(Runnable runnable) {
-                               final AtomicLong threadCount = new 
AtomicLong(0);
-                               Thread thread = new Thread(runnable);
-                               thread.setName("shardConsumers-" + subtaskName 
+ "-thread-" + threadCount.getAndIncrement());
-                               thread.setDaemon(true);
-                               return thread;
-                       }
-               });
-       }
-
-       /**
-        * Utility function to create an initial map of the last discovered 
shard id of each subscribed stream, set to null;
-        * This is called in the constructor; correct values will be set later 
on by calling advanceLastDiscoveredShardOfStream()
-        *
-        * @param streams the list of subscribed streams
-        * @return the initial map for subscribedStreamsToLastDiscoveredShardIds
-        */
-       protected static HashMap<String, String> 
createInitialSubscribedStreamsToLastDiscoveredShardsState(List<String> streams) 
{
-               HashMap<String, String> initial = new HashMap<>();
-               for (String stream : streams) {
-                       initial.put(stream, null);
-               }
-               return initial;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
deleted file mode 100644
index 612a4a7..0000000
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.internals;
-
-import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
-import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
-import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import com.amazonaws.services.kinesis.model.Record;
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
-import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
-import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
-import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
-import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Properties;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Thread that does the actual data pulling from AWS Kinesis shards. Each 
thread is in charge of one Kinesis shard only.
- */
-public class ShardConsumer<T> implements Runnable {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(ShardConsumer.class);
-
-       private final KinesisDeserializationSchema<T> deserializer;
-
-       private final KinesisProxyInterface kinesis;
-
-       private final int subscribedShardStateIndex;
-
-       private final KinesisDataFetcher<T> fetcherRef;
-
-       private final KinesisStreamShard subscribedShard;
-
-       private final int maxNumberOfRecordsPerFetch;
-       private final long fetchIntervalMillis;
-
-       private SequenceNumber lastSequenceNum;
-
-       /**
-        * Creates a shard consumer.
-        *
-        * @param fetcherRef reference to the owning fetcher
-        * @param subscribedShardStateIndex the state index of the shard this 
consumer is subscribed to
-        * @param subscribedShard the shard this consumer is subscribed to
-        * @param lastSequenceNum the sequence number in the shard to start 
consuming
-        */
-       public ShardConsumer(KinesisDataFetcher<T> fetcherRef,
-                                               Integer 
subscribedShardStateIndex,
-                                               KinesisStreamShard 
subscribedShard,
-                                               SequenceNumber lastSequenceNum) 
{
-               this(fetcherRef,
-                       subscribedShardStateIndex,
-                       subscribedShard,
-                       lastSequenceNum,
-                       
KinesisProxy.create(fetcherRef.getConsumerConfiguration()));
-       }
-
-       /** This constructor is exposed for testing purposes */
-       protected ShardConsumer(KinesisDataFetcher<T> fetcherRef,
-                                                       Integer 
subscribedShardStateIndex,
-                                                       KinesisStreamShard 
subscribedShard,
-                                                       SequenceNumber 
lastSequenceNum,
-                                                       KinesisProxyInterface 
kinesis) {
-               this.fetcherRef = checkNotNull(fetcherRef);
-               this.subscribedShardStateIndex = 
checkNotNull(subscribedShardStateIndex);
-               this.subscribedShard = checkNotNull(subscribedShard);
-               this.lastSequenceNum = checkNotNull(lastSequenceNum);
-               checkArgument(
-                       
!lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()),
-                       "Should not start a ShardConsumer if the shard has 
already been completely read.");
-
-               this.deserializer = fetcherRef.getClonedDeserializationSchema();
-
-               Properties consumerConfig = 
fetcherRef.getConsumerConfiguration();
-               this.kinesis = kinesis;
-               this.maxNumberOfRecordsPerFetch = 
Integer.valueOf(consumerConfig.getProperty(
-                       ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
-                       
Integer.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX)));
-               this.fetchIntervalMillis = 
Long.valueOf(consumerConfig.getProperty(
-                       
ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
-                       
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public void run() {
-               String nextShardItr;
-
-               try {
-                       // before infinitely looping, we set the initial 
nextShardItr appropriately
-
-                       if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get()))
 {
-                               // if the shard is already closed, there will 
be no latest next record to get for this shard
-                               if (subscribedShard.isClosed()) {
-                                       nextShardItr = null;
-                               } else {
-                                       nextShardItr = 
kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), 
null);
-                               }
-                       } else if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()))
 {
-                               nextShardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.TRIM_HORIZON.toString(), null);
-                       } else if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()))
 {
-                               nextShardItr = null;
-                       } else {
-                               // we will be starting from an actual sequence 
number (due to restore from failure).
-                               // if the last sequence number refers to an 
aggregated record, we need to clean up any dangling sub-records
-                               // from the last aggregated record; otherwise, 
we can simply start iterating from the record right after.
-
-                               if (lastSequenceNum.isAggregated()) {
-                                       String itrForLastAggregatedRecord =
-                                               
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), 
lastSequenceNum.getSequenceNumber());
-
-                                       // get only the last aggregated record
-                                       GetRecordsResult getRecordsResult = 
getRecords(itrForLastAggregatedRecord, 1);
-
-                                       List<UserRecord> fetchedRecords = 
deaggregateRecords(
-                                               getRecordsResult.getRecords(),
-                                               
subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
-                                               
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
-
-                                       long lastSubSequenceNum = 
lastSequenceNum.getSubSequenceNumber();
-                                       for (UserRecord record : 
fetchedRecords) {
-                                               // we have found a dangling 
sub-record if it has a larger subsequence number
-                                               // than our last sequence 
number; if so, collect the record and update state
-                                               if 
(record.getSubSequenceNumber() > lastSubSequenceNum) {
-                                                       
deserializeRecordForCollectionAndUpdateState(record);
-                                               }
-                                       }
-
-                                       // set the nextShardItr so we can 
continue iterating in the next while loop
-                                       nextShardItr = 
getRecordsResult.getNextShardIterator();
-                               } else {
-                                       // the last record was non-aggregated, 
so we can simply start from the next record
-                                       nextShardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), 
lastSequenceNum.getSequenceNumber());
-                               }
-                       }
-
-                       while(isRunning()) {
-                               if (nextShardItr == null) {
-                                       
fetcherRef.updateState(subscribedShardStateIndex, 
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
-
-                                       // we can close this consumer thread 
once we've reached the end of the subscribed shard
-                                       break;
-                               } else {
-                                       if (fetchIntervalMillis != 0) {
-                                               
Thread.sleep(fetchIntervalMillis);
-                                       }
-
-                                       GetRecordsResult getRecordsResult = 
getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
-
-                                       // each of the Kinesis records may be 
aggregated, so we must deaggregate them before proceeding
-                                       List<UserRecord> fetchedRecords = 
deaggregateRecords(
-                                               getRecordsResult.getRecords(),
-                                               
subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
-                                               
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
-
-                                       for (UserRecord record : 
fetchedRecords) {
-                                               
deserializeRecordForCollectionAndUpdateState(record);
-                                       }
-
-                                       nextShardItr = 
getRecordsResult.getNextShardIterator();
-                               }
-                       }
-               } catch (Throwable t) {
-                       fetcherRef.stopWithError(t);
-               }
-       }
-
-       /**
-        * The loop in run() checks this before fetching next batch of records. 
Since this runnable will be executed
-        * by the ExecutorService {@link 
KinesisDataFetcher#shardConsumersExecutor}, the only way to close down this 
thread
-        * would be by calling shutdownNow() on {@link 
KinesisDataFetcher#shardConsumersExecutor} and let the executor service
-        * interrupt all currently running {@link ShardConsumer}s.
-        */
-       private boolean isRunning() {
-               return !Thread.interrupted();
-       }
-
-       /**
-        * Deserializes a record for collection, and accordingly updates the 
shard state in the fetcher. The last
-        * successfully collected sequence number in this shard consumer is 
also updated so that
-        * {@link ShardConsumer#getRecords(String, int)} may be able to use the 
correct sequence number to refresh shard
-        * iterators if necessary.
-        *
-        * Note that the server-side Kinesis timestamp is attached to the 
record when collected. When the
-        * user programs uses {@link TimeCharacteristic#EventTime}, this 
timestamp will be used by default.
-        *
-        * @param record record to deserialize and collect
-        * @throws IOException
-        */
-       private void deserializeRecordForCollectionAndUpdateState(UserRecord 
record)
-               throws IOException {
-               ByteBuffer recordData = record.getData();
-
-               byte[] dataBytes = new byte[recordData.remaining()];
-               recordData.get(dataBytes);
-
-               final long approxArrivalTimestamp = 
record.getApproximateArrivalTimestamp().getTime();
-
-               final T value = deserializer.deserialize(
-                       dataBytes,
-                       record.getPartitionKey(),
-                       record.getSequenceNumber(),
-                       approxArrivalTimestamp,
-                       subscribedShard.getStreamName(),
-                       subscribedShard.getShard().getShardId());
-
-               SequenceNumber collectedSequenceNumber = (record.isAggregated())
-                       ? new SequenceNumber(record.getSequenceNumber(), 
record.getSubSequenceNumber())
-                       : new SequenceNumber(record.getSequenceNumber());
-
-               fetcherRef.emitRecordAndUpdateState(
-                       value,
-                       approxArrivalTimestamp,
-                       subscribedShardStateIndex,
-                       collectedSequenceNumber);
-
-               lastSequenceNum = collectedSequenceNumber;
-       }
-
-       /**
-        * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while 
also handling unexpected
-        * AWS {@link ExpiredIteratorException}s to assure that we get results 
and don't just fail on
-        * such occasions. The returned shard iterator within the successful 
{@link GetRecordsResult} should
-        * be used for the next call to this method.
-        *
-        * Note: it is important that this method is not called again before 
all the records from the last result have been
-        * fully collected with {@link 
ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, 
otherwise
-        * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in 
the middle of an aggregated record, leading to
-        * incorrect shard iteration if the iterator had to be refreshed.
-        *
-        * @param shardItr shard iterator to use
-        * @param maxNumberOfRecords the maximum number of records to fetch for 
this getRecords attempt
-        * @return get records result
-        * @throws InterruptedException
-        */
-       private GetRecordsResult getRecords(String shardItr, int 
maxNumberOfRecords) throws InterruptedException {
-               GetRecordsResult getRecordsResult = null;
-               while (getRecordsResult == null) {
-                       try {
-                               getRecordsResult = kinesis.getRecords(shardItr, 
maxNumberOfRecords);
-                       } catch (ExpiredIteratorException eiEx) {
-                               LOG.warn("Encountered an unexpected expired 
iterator {} for shard {};" +
-                                       " refreshing the iterator ...", 
shardItr, subscribedShard);
-                               shardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), 
lastSequenceNum.getSequenceNumber());
-
-                               // sleep for the fetch interval before the next 
getRecords attempt with the refreshed iterator
-                               if (fetchIntervalMillis != 0) {
-                                       Thread.sleep(fetchIntervalMillis);
-                               }
-                       }
-               }
-               return getRecordsResult;
-       }
-
-       @SuppressWarnings("unchecked")
-       protected static List<UserRecord> deaggregateRecords(List<Record> 
records, String startingHashKey, String endingHashKey) {
-               return UserRecord.deaggregate(records, new 
BigInteger(startingHashKey), new BigInteger(endingHashKey));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
deleted file mode 100644
index 53ed11b..0000000
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.model;
-
-import com.amazonaws.services.kinesis.model.Shard;
-
-import java.io.Serializable;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A serializable representation of a AWS Kinesis Stream shard. It is 
basically a wrapper class around the information
- * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, 
with some extra utility methods to
- * determine whether or not a shard is closed and whether or not the shard is 
a result of parent shard splits or merges.
- */
-public class KinesisStreamShard implements Serializable {
-
-       private static final long serialVersionUID = -6004217801761077536L;
-
-       private final String streamName;
-       private final Shard shard;
-
-       private final int cachedHash;
-
-       /**
-        * Create a new KinesisStreamShard
-        *
-        * @param streamName
-        *           the name of the Kinesis stream that this shard belongs to
-        * @param shard
-        *           the actual AWS Shard instance that will be wrapped within 
this KinesisStreamShard
-        */
-       public KinesisStreamShard(String streamName, Shard shard) {
-               this.streamName = checkNotNull(streamName);
-               this.shard = checkNotNull(shard);
-
-               // since our description of Kinesis Streams shards can be fully 
defined with the stream name and shard id,
-               // our hash doesn't need to use hash code of Amazon's 
description of Shards, which uses other info for calculation
-               int hash = 17;
-               hash = 37 * hash + streamName.hashCode();
-               hash = 37 * hash + shard.getShardId().hashCode();
-               this.cachedHash = hash;
-       }
-
-       public String getStreamName() {
-               return streamName;
-       }
-
-       public boolean isClosed() {
-               return 
(shard.getSequenceNumberRange().getEndingSequenceNumber() != null);
-       }
-
-       public Shard getShard() {
-               return shard;
-       }
-
-       @Override
-       public String toString() {
-               return "KinesisStreamShard{" +
-                       "streamName='" + streamName + "'" +
-                       ", shard='" + shard.toString() + "'}";
-       }
-
-       @Override
-       public boolean equals(Object obj) {
-               if (!(obj instanceof KinesisStreamShard)) {
-                       return false;
-               }
-
-               if (obj == this) {
-                       return true;
-               }
-
-               KinesisStreamShard other = (KinesisStreamShard) obj;
-
-               return streamName.equals(other.getStreamName()) && 
shard.equals(other.getShard());
-       }
-
-       @Override
-       public int hashCode() {
-               return cachedHash;
-       }
-
-       /**
-        * Utility function to compare two shard ids
-        *
-        * @param firstShardId first shard id to compare
-        * @param secondShardId second shard id to compare
-        * @return a value less than 0 if the first shard id is smaller than 
the second shard id,
-        *         or a value larger than 0 the first shard is larger then the 
second shard id,
-        *         or 0 if they are equal
-        */
-       public static int compareShardIds(String firstShardId, String 
secondShardId) {
-               if (!isValidShardId(firstShardId)) {
-                       throw new IllegalArgumentException("The first shard id 
has invalid format.");
-               }
-
-               if (!isValidShardId(secondShardId)) {
-                       throw new IllegalArgumentException("The second shard id 
has invalid format.");
-               }
-
-               // digit segment of the shard id starts at index 8
-               return Long.compare(Long.parseLong(firstShardId.substring(8)), 
Long.parseLong(secondShardId.substring(8)));
-       }
-
-       /**
-        * Checks if a shard id has valid format.
-        * Kinesis stream shard ids have 12-digit numbers left-padded with 0's,
-        * prefixed with "shardId-", ex. "shardId-000000000015".
-        *
-        * @param shardId the shard id to check
-        * @return whether the shard id is valid
-        */
-       public static boolean isValidShardId(String shardId) {
-               if (shardId == null) { return false; }
-               return shardId.matches("^shardId-\\d{12}");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
deleted file mode 100644
index 00181da..0000000
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.model;
-
-/**
- * A wrapper class that bundles a {@link KinesisStreamShard} with its last 
processed sequence number.
- */
-public class KinesisStreamShardState {
-
-       private KinesisStreamShard kinesisStreamShard;
-       private SequenceNumber lastProcessedSequenceNum;
-
-       public KinesisStreamShardState(KinesisStreamShard kinesisStreamShard, 
SequenceNumber lastProcessedSequenceNum) {
-               this.kinesisStreamShard = kinesisStreamShard;
-               this.lastProcessedSequenceNum = lastProcessedSequenceNum;
-       }
-
-       public KinesisStreamShard getKinesisStreamShard() {
-               return this.kinesisStreamShard;
-       }
-
-       public SequenceNumber getLastProcessedSequenceNum() {
-               return this.lastProcessedSequenceNum;
-       }
-
-       public void setLastProcessedSequenceNum(SequenceNumber update) {
-               this.lastProcessedSequenceNum = update;
-       }
-
-       @Override
-       public String toString() {
-               return "KinesisStreamShardState{" +
-                       "kinesisStreamShard='" + kinesisStreamShard.toString() 
+ "'" +
-                       ", lastProcessedSequenceNumber='" + 
lastProcessedSequenceNum.toString() + "'}";
-       }
-
-       @Override
-       public boolean equals(Object obj) {
-               if (!(obj instanceof KinesisStreamShardState)) {
-                       return false;
-               }
-
-               if (obj == this) {
-                       return true;
-               }
-
-               KinesisStreamShardState other = (KinesisStreamShardState) obj;
-
-               return kinesisStreamShard.equals(other.getKinesisStreamShard()) 
&& lastProcessedSequenceNum.equals(other.getLastProcessedSequenceNum());
-       }
-
-       @Override
-       public int hashCode() {
-               return 37 * (kinesisStreamShard.hashCode() + 
lastProcessedSequenceNum.hashCode());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
deleted file mode 100644
index 8182201..0000000
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.model;
-
-import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
-import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
-
-/**
- * Special flag values for sequence numbers in shards to indicate special 
positions.
- * The value is initially set by {@link FlinkKinesisConsumer} when {@link 
KinesisDataFetcher}s are created.
- * The KinesisDataFetchers will use this value to determine how to retrieve 
the starting shard iterator from AWS Kinesis.
- */
-public enum SentinelSequenceNumber {
-
-       /** Flag value for shard's sequence numbers to indicate that the
-        * shard should start to be read from the latest incoming records */
-       SENTINEL_LATEST_SEQUENCE_NUM( new SequenceNumber("LATEST_SEQUENCE_NUM") 
),
-
-       /** Flag value for shard's sequence numbers to indicate that the shard 
should
-        * start to be read from the earliest records that haven't expired yet 
*/
-       SENTINEL_EARLIEST_SEQUENCE_NUM( new 
SequenceNumber("EARLIEST_SEQUENCE_NUM") ),
-
-       /** Flag value to indicate that we have already read the last record of 
this shard
-        * (Note: Kinesis shards that have been closed due to a split or merge 
will have an ending data record) */
-       SENTINEL_SHARD_ENDING_SEQUENCE_NUM( new 
SequenceNumber("SHARD_ENDING_SEQUENCE_NUM") );
-
-       private SequenceNumber sentinel;
-
-       SentinelSequenceNumber(SequenceNumber sentinel) {
-               this.sentinel = sentinel;
-       }
-
-       public SequenceNumber get() {
-               return sentinel;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
deleted file mode 100644
index 021f53f..0000000
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.model;
-
-import java.io.Serializable;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A serializable representation of a Kinesis record's sequence number. It has 
two fields: the main sequence number,
- * and also a subsequence number. If this {@link SequenceNumber} is referring 
to an aggregated Kinesis record, the
- * subsequence number will be a non-negative value representing the order of 
the sub-record within the aggregation.
- */
-public class SequenceNumber implements Serializable {
-
-       private static final long serialVersionUID = 876972197938972667L;
-
-       private static final String DELIMITER = "-";
-
-       private final String sequenceNumber;
-       private final long subSequenceNumber;
-
-       private final int cachedHash;
-
-       /**
-        * Create a new instance for a non-aggregated Kinesis record without a 
subsequence number.
-        * @param sequenceNumber the sequence number
-        */
-       public SequenceNumber(String sequenceNumber) {
-               this(sequenceNumber, -1);
-       }
-
-       /**
-        * Create a new instance, with the specified sequence number and 
subsequence number.
-        * To represent the sequence number for a non-aggregated Kinesis 
record, the subsequence number should be -1.
-        * Otherwise, give a non-negative sequence number to represent an 
aggregated Kinesis record.
-        *
-        * @param sequenceNumber the sequence number
-        * @param subSequenceNumber the subsequence number (-1 to represent 
non-aggregated Kinesis records)
-        */
-       public SequenceNumber(String sequenceNumber, long subSequenceNumber) {
-               this.sequenceNumber = checkNotNull(sequenceNumber);
-               this.subSequenceNumber = subSequenceNumber;
-
-               this.cachedHash = 37 * (sequenceNumber.hashCode() + 
Long.valueOf(subSequenceNumber).hashCode());
-       }
-
-       public boolean isAggregated() {
-               return subSequenceNumber >= 0;
-       }
-
-       public String getSequenceNumber() {
-               return sequenceNumber;
-       }
-
-       public long getSubSequenceNumber() {
-               return subSequenceNumber;
-       }
-
-       @Override
-       public String toString() {
-               if (isAggregated()) {
-                       return sequenceNumber + DELIMITER + subSequenceNumber;
-               } else {
-                       return sequenceNumber;
-               }
-       }
-
-       @Override
-       public boolean equals(Object obj) {
-               if (!(obj instanceof SequenceNumber)) {
-                       return false;
-               }
-
-               if (obj == this) {
-                       return true;
-               }
-
-               SequenceNumber other = (SequenceNumber) obj;
-
-               return sequenceNumber.equals(other.getSequenceNumber())
-                       && (subSequenceNumber == other.getSubSequenceNumber());
-       }
-
-       @Override
-       public int hashCode() {
-               return cachedHash;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
deleted file mode 100644
index 04b1654..0000000
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.proxy;
-
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Basic model class to bundle the shards retrieved from Kinesis on a {@link 
KinesisProxyInterface#getShardList(Map)} call.
- */
-public class GetShardListResult {
-
-       private final Map<String, LinkedList<KinesisStreamShard>> 
streamsToRetrievedShardList = new HashMap<>();
-
-       public void addRetrievedShardToStream(String stream, KinesisStreamShard 
retrievedShard) {
-               if (!streamsToRetrievedShardList.containsKey(stream)) {
-                       streamsToRetrievedShardList.put(stream, new 
LinkedList<KinesisStreamShard>());
-               }
-               streamsToRetrievedShardList.get(stream).add(retrievedShard);
-       }
-
-       public void addRetrievedShardsToStream(String stream, 
List<KinesisStreamShard> retrievedShards) {
-               if (retrievedShards.size() != 0) {
-                       if (!streamsToRetrievedShardList.containsKey(stream)) {
-                               streamsToRetrievedShardList.put(stream, new 
LinkedList<KinesisStreamShard>());
-                       }
-                       
streamsToRetrievedShardList.get(stream).addAll(retrievedShards);
-               }
-       }
-
-       public List<KinesisStreamShard> getRetrievedShardListOfStream(String 
stream) {
-               if (!streamsToRetrievedShardList.containsKey(stream)) {
-                       return null;
-               } else {
-                       return streamsToRetrievedShardList.get(stream);
-               }
-       }
-
-       public KinesisStreamShard getLastSeenShardOfStream(String stream) {
-               if (!streamsToRetrievedShardList.containsKey(stream)) {
-                       return null;
-               } else {
-                       return 
streamsToRetrievedShardList.get(stream).getLast();
-               }
-       }
-
-       public boolean hasRetrievedShards() {
-               return !streamsToRetrievedShardList.isEmpty();
-       }
-
-       public Set<String> getStreamsWithRetrievedShards() {
-               return streamsToRetrievedShardList.keySet();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
deleted file mode 100644
index 9ffc8e6..0000000
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.proxy;
-
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
-import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
-import com.amazonaws.services.kinesis.model.DescribeStreamResult;
-import com.amazonaws.services.kinesis.model.GetRecordsRequest;
-import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
-import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
-import com.amazonaws.services.kinesis.model.LimitExceededException;
-import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
-import com.amazonaws.services.kinesis.model.StreamStatus;
-import com.amazonaws.services.kinesis.model.Shard;
-import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import java.util.Map;
-import java.util.Random;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Kinesis proxy implementation - a utility class that is used as a proxy to 
make
- * calls to AWS Kinesis for several functions, such as getting a list of 
shards and
- * fetching a batch of data records starting from a specified record sequence 
number.
- *
- * NOTE:
- * In the AWS KCL library, there is a similar implementation - {@link 
com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}.
- * This implementation differs mainly in that we can make operations to 
arbitrary Kinesis streams, which is a needed
- * functionality for the Flink Kinesis Connecter since the consumer may 
simultaneously read from multiple Kinesis streams.
- */
-public class KinesisProxy implements KinesisProxyInterface {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(KinesisProxy.class);
-
-       /** The actual Kinesis client from the AWS SDK that we will be using to 
make calls */
-       private final AmazonKinesisClient kinesisClient;
-
-       /** Random seed used to calculate backoff jitter for Kinesis operations 
*/
-       private final static Random seed = new Random();
-
-       // 
------------------------------------------------------------------------
-       //  describeStream() related performance settings
-       // 
------------------------------------------------------------------------
-
-       /** Base backoff millis for the describe stream operation */
-       private final long describeStreamBaseBackoffMillis;
-
-       /** Maximum backoff millis for the describe stream operation */
-       private final long describeStreamMaxBackoffMillis;
-
-       /** Exponential backoff power constant for the describe stream 
operation */
-       private final double describeStreamExpConstant;
-
-       // 
------------------------------------------------------------------------
-       //  getRecords() related performance settings
-       // 
------------------------------------------------------------------------
-
-       /** Base backoff millis for the get records operation */
-       private final long getRecordsBaseBackoffMillis;
-
-       /** Maximum backoff millis for the get records operation */
-       private final long getRecordsMaxBackoffMillis;
-
-       /** Exponential backoff power constant for the get records operation */
-       private final double getRecordsExpConstant;
-
-       /** Maximum attempts for the get records operation */
-       private final int getRecordsMaxAttempts;
-
-       // 
------------------------------------------------------------------------
-       //  getShardIterator() related performance settings
-       // 
------------------------------------------------------------------------
-
-       /** Base backoff millis for the get shard iterator operation */
-       private final long getShardIteratorBaseBackoffMillis;
-
-       /** Maximum backoff millis for the get shard iterator operation */
-       private final long getShardIteratorMaxBackoffMillis;
-
-       /** Exponential backoff power constant for the get shard iterator 
operation */
-       private final double getShardIteratorExpConstant;
-
-       /** Maximum attempts for the get shard iterator operation */
-       private final int getShardIteratorMaxAttempts;
-
-       /**
-        * Create a new KinesisProxy based on the supplied configuration 
properties
-        *
-        * @param configProps configuration properties containing AWS 
credential and AWS region info
-        */
-       private KinesisProxy(Properties configProps) {
-               checkNotNull(configProps);
-
-               this.kinesisClient = AWSUtil.createKinesisClient(configProps);
-
-               this.describeStreamBaseBackoffMillis = Long.valueOf(
-                       configProps.getProperty(
-                               
ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
-                               
Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE)));
-               this.describeStreamMaxBackoffMillis = Long.valueOf(
-                       configProps.getProperty(
-                               
ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
-                               
Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX)));
-               this.describeStreamExpConstant = Double.valueOf(
-                       configProps.getProperty(
-                               
ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
-                               
Double.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT)));
-
-               this.getRecordsBaseBackoffMillis = Long.valueOf(
-                       configProps.getProperty(
-                               
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE,
-                               
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE)));
-               this.getRecordsMaxBackoffMillis = Long.valueOf(
-                       configProps.getProperty(
-                               
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX,
-                               
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX)));
-               this.getRecordsExpConstant = Double.valueOf(
-                       configProps.getProperty(
-                               
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
-                               
Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT)));
-               this.getRecordsMaxAttempts = Integer.valueOf(
-                       configProps.getProperty(
-                               
ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
-                               
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES)));
-
-               this.getShardIteratorBaseBackoffMillis = Long.valueOf(
-                       configProps.getProperty(
-                               
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE,
-                               
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE)));
-               this.getShardIteratorMaxBackoffMillis = Long.valueOf(
-                       configProps.getProperty(
-                               
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX,
-                               
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX)));
-               this.getShardIteratorExpConstant = Double.valueOf(
-                       configProps.getProperty(
-                               
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
-                               
Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT)));
-               this.getShardIteratorMaxAttempts = Integer.valueOf(
-                       configProps.getProperty(
-                               
ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
-                               
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES)));
-
-       }
-
-       /**
-        * Creates a Kinesis proxy.
-        *
-        * @param configProps configuration properties
-        * @return the created kinesis proxy
-        */
-       public static KinesisProxyInterface create(Properties configProps) {
-               return new KinesisProxy(configProps);
-       }
-
-       /**
-        * {@inheritDoc}
-        */
-       @Override
-       public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) throws InterruptedException {
-               final GetRecordsRequest getRecordsRequest = new 
GetRecordsRequest();
-               getRecordsRequest.setShardIterator(shardIterator);
-               getRecordsRequest.setLimit(maxRecordsToGet);
-
-               GetRecordsResult getRecordsResult = null;
-
-               int attempt = 0;
-               while (attempt <= getRecordsMaxAttempts && getRecordsResult == 
null) {
-                       try {
-                               getRecordsResult = 
kinesisClient.getRecords(getRecordsRequest);
-                       } catch (ProvisionedThroughputExceededException ex) {
-                               long backoffMillis = fullJitterBackoff(
-                                       getRecordsBaseBackoffMillis, 
getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
-                               LOG.warn("Got 
ProvisionedThroughputExceededException. Backing off for "
-                                       + backoffMillis + " millis.");
-                               Thread.sleep(backoffMillis);
-                       }
-               }
-
-               if (getRecordsResult == null) {
-                       throw new RuntimeException("Rate Exceeded for 
getRecords operation - all " + getRecordsMaxAttempts +
-                               " retry attempts returned 
ProvisionedThroughputExceededException.");
-               }
-
-               return getRecordsResult;
-       }
-
-       /**
-        * {@inheritDoc}
-        */
-       @Override
-       public GetShardListResult getShardList(Map<String, String> 
streamNamesWithLastSeenShardIds) throws InterruptedException {
-               GetShardListResult result = new GetShardListResult();
-
-               for (Map.Entry<String,String> streamNameWithLastSeenShardId : 
streamNamesWithLastSeenShardIds.entrySet()) {
-                       String stream = streamNameWithLastSeenShardId.getKey();
-                       String lastSeenShardId = 
streamNameWithLastSeenShardId.getValue();
-                       result.addRetrievedShardsToStream(stream, 
getShardsOfStream(stream, lastSeenShardId));
-               }
-               return result;
-       }
-
-       /**
-        * {@inheritDoc}
-        */
-       @Override
-       public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException 
{
-               GetShardIteratorResult getShardIteratorResult = null;
-
-               int attempt = 0;
-               while (attempt <= getShardIteratorMaxAttempts && 
getShardIteratorResult == null) {
-                       try {
-                               getShardIteratorResult =
-                                       
kinesisClient.getShardIterator(shard.getStreamName(), 
shard.getShard().getShardId(), shardIteratorType, startingSeqNum);
-                       } catch (ProvisionedThroughputExceededException ex) {
-                               long backoffMillis = fullJitterBackoff(
-                                       getShardIteratorBaseBackoffMillis, 
getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++);
-                               LOG.warn("Got 
ProvisionedThroughputExceededException. Backing off for "
-                                       + backoffMillis + " millis.");
-                               Thread.sleep(backoffMillis);
-                       }
-               }
-
-               if (getShardIteratorResult == null) {
-                       throw new RuntimeException("Rate Exceeded for 
getShardIterator operation - all " + getShardIteratorMaxAttempts +
-                               " retry attempts returned 
ProvisionedThroughputExceededException.");
-               }
-               return getShardIteratorResult.getShardIterator();
-       }
-
-       private List<KinesisStreamShard> getShardsOfStream(String streamName, 
@Nullable String lastSeenShardId) throws InterruptedException {
-               List<KinesisStreamShard> shardsOfStream = new ArrayList<>();
-
-               DescribeStreamResult describeStreamResult;
-               do {
-                       describeStreamResult = describeStream(streamName, 
lastSeenShardId);
-
-                       List<Shard> shards = 
describeStreamResult.getStreamDescription().getShards();
-                       for (Shard shard : shards) {
-                               shardsOfStream.add(new 
KinesisStreamShard(streamName, shard));
-                       }
-
-                       if (shards.size() != 0) {
-                               lastSeenShardId = shards.get(shards.size() - 
1).getShardId();
-                       }
-               } while 
(describeStreamResult.getStreamDescription().isHasMoreShards());
-
-               return shardsOfStream;
-       }
-
-       /**
-        * Get metainfo for a Kinesis stream, which contains information about 
which shards this Kinesis stream possess.
-        *
-        * This method is using a "full jitter" approach described in AWS's 
article,
-        * <a 
href="https://www.awsarchitectureblog.com/2015/03/backoff.html";>"Exponential 
Backoff and Jitter"</a>.
-        * This is necessary because concurrent calls will be made by all 
parallel subtask's fetcher. This
-        * jitter backoff approach will help distribute calls across the 
fetchers over time.
-        *
-        * @param streamName the stream to describe
-        * @param startShardId which shard to start with for this describe 
operation (earlier shard's infos will not appear in result)
-        * @return the result of the describe stream operation
-        */
-       private DescribeStreamResult describeStream(String streamName, 
@Nullable String startShardId) throws InterruptedException {
-               final DescribeStreamRequest describeStreamRequest = new 
DescribeStreamRequest();
-               describeStreamRequest.setStreamName(streamName);
-               describeStreamRequest.setExclusiveStartShardId(startShardId);
-
-               DescribeStreamResult describeStreamResult = null;
-
-               // Call DescribeStream, with full-jitter backoff (if we get 
LimitExceededException).
-               int attemptCount = 0;
-               while (describeStreamResult == null) { // retry until we get a 
result
-                       try {
-                               describeStreamResult = 
kinesisClient.describeStream(describeStreamRequest);
-                       } catch (LimitExceededException le) {
-                               long backoffMillis = fullJitterBackoff(
-                                       describeStreamBaseBackoffMillis, 
describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
-                               LOG.warn("Got LimitExceededException when 
describing stream " + streamName + ". Backing off for "
-                                       + backoffMillis + " millis.");
-                               Thread.sleep(backoffMillis);
-                       } catch (ResourceNotFoundException re) {
-                               throw new RuntimeException("Error while getting 
stream details", re);
-                       }
-               }
-
-               String streamStatus = 
describeStreamResult.getStreamDescription().getStreamStatus();
-               if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || 
streamStatus.equals(StreamStatus.UPDATING.toString()))) {
-                       if (LOG.isWarnEnabled()) {
-                               LOG.warn("The status of stream " + streamName + 
" is " + streamStatus + "; result of the current " +
-                                       "describeStream operation will not 
contain any shard information.");
-                       }
-               }
-
-               // Kinesalite (mock implementation of Kinesis) does not 
correctly exclude shards before the exclusive
-               // start shard id in the returned shards list; check if we need 
to remove these erroneously returned shards
-               if (startShardId != null) {
-                       List<Shard> shards = 
describeStreamResult.getStreamDescription().getShards();
-                       Iterator<Shard> shardItr = shards.iterator();
-                       while (shardItr.hasNext()) {
-                               if 
(KinesisStreamShard.compareShardIds(shardItr.next().getShardId(), startShardId) 
<= 0) {
-                                       shardItr.remove();
-                               }
-                       }
-               }
-
-               return describeStreamResult;
-       }
-
-       private static long fullJitterBackoff(long base, long max, double 
power, int attempt) {
-               long exponentialBackoff = (long) Math.min(max, base * 
Math.pow(power, attempt));
-               return (long)(seed.nextDouble()*exponentialBackoff); // random 
jitter between 0 and the exponential backoff
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
deleted file mode 100644
index 39ddc52..0000000
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.proxy;
-
-import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-
-import java.util.Map;
-
-/**
- * Interface for a Kinesis proxy that operates on multiple Kinesis streams 
within the same AWS service region.
- */
-public interface KinesisProxyInterface {
-
-       /**
-        * Get a shard iterator from the specified position in a shard.
-        * The retrieved shard iterator can be used in {@link 
KinesisProxyInterface#getRecords(String, int)}}
-        * to read data from the Kinesis shard.
-        *
-        * @param shard the shard to get the iterator
-        * @param shardIteratorType the iterator type, defining how the shard 
is to be iterated
-        *                          (one of: TRIM_HORIZON, LATEST, 
AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER)
-        * @param startingSeqNum sequence number, must be null if 
shardIteratorType is TRIM_HORIZON or LATEST
-        * @return shard iterator which can be used to read data from Kinesis
-        * @throws InterruptedException this method will retry with backoff if 
AWS Kinesis complains that the
-        *                              operation has exceeded the rate limit; 
this exception will be thrown
-        *                              if the backoff is interrupted.
-        */
-       String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, String startingSeqNum) throws InterruptedException;
-
-       /**
-        * Get the next batch of data records using a specific shard iterator
-        *
-        * @param shardIterator a shard iterator that encodes info about which 
shard to read and where to start reading
-        * @param maxRecordsToGet the maximum amount of records to retrieve for 
this batch
-        * @return the batch of retrieved records, also with a shard iterator 
that can be used to get the next batch
-        * @throws InterruptedException this method will retry with backoff if 
AWS Kinesis complains that the
-        *                              operation has exceeded the rate limit; 
this exception will be thrown
-        *                              if the backoff is interrupted.
-        */
-       GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) 
throws InterruptedException;
-
-       /**
-        * Get shard list of multiple Kinesis streams, ignoring the
-        * shards of each stream before a specified last seen shard id.
-        *
-        * @param streamNamesWithLastSeenShardIds a map with stream as key, and 
last seen shard id as value
-        * @return result of the shard list query
-        * @throws InterruptedException this method will retry with backoff if 
AWS Kinesis complains that the
-        *                              operation has exceeded the rate limit; 
this exception will be thrown
-        *                              if the backoff is interrupted.
-        */
-       GetShardListResult getShardList(Map<String,String> 
streamNamesWithLastSeenShardIds) throws InterruptedException;
-}

Reply via email to