http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
new file mode 100644
index 0000000..dbf95f9
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import 
org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
+import 
org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Suite of FlinkKinesisConsumer tests for the methods called throughout the 
source life cycle.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({FlinkKinesisConsumer.class, KinesisConfigUtil.class})
+public class FlinkKinesisConsumerTest {
+
+       @Rule
+       private ExpectedException exception = ExpectedException.none();
+
+       // 
----------------------------------------------------------------------
+       // FlinkKinesisConsumer.validateAwsConfiguration() tests
+       // 
----------------------------------------------------------------------
+
+       @Test
+       public void testMissingAwsRegionInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("The AWS region ('" + 
AWSConfigConstants.AWS_REGION + "') must be set in the config.");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKey");
+               
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+               KinesisConfigUtil.validateAwsConfiguration(testConfig);
+       }
+
+       @Test
+       public void testUnrecognizableAwsRegionInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Invalid AWS region");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"wrongRegionId");
+               testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+               KinesisConfigUtil.validateAwsConfiguration(testConfig);
+       }
+
+       @Test
+       public void 
testCredentialProviderTypeDefaultToBasicButNoCredentialsSetInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Please set values for AWS Access Key 
ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
+                               "and Secret Key ('" + 
AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS 
credential provider type.");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
+
+               KinesisConfigUtil.validateAwsConfiguration(testConfig);
+       }
+
+       @Test
+       public void 
testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Please set values for AWS Access Key 
ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
+                               "and Secret Key ('" + 
AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS 
credential provider type.");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+
+               KinesisConfigUtil.validateAwsConfiguration(testConfig);
+       }
+
+       @Test
+       public void testUnrecognizableCredentialProviderTypeInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Invalid AWS Credential Provider Type");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"wrongProviderType");
+               testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+               KinesisConfigUtil.validateAwsConfiguration(testConfig);
+       }
+
+       // 
----------------------------------------------------------------------
+       // FlinkKinesisConsumer.validateConsumerConfiguration() tests
+       // 
----------------------------------------------------------------------
+
+       @Test
+       public void testUnrecognizableStreamInitPositionTypeInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Invalid initial position in stream");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"BASIC");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"wrongInitPosition");
+
+               KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+       }
+
+       @Test
+       public void 
testUnparsableLongForDescribeStreamBackoffBaseMillisInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Invalid value given for describe 
stream operation base backoff milliseconds");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, 
"unparsableLong");
+
+               KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+       }
+
+       @Test
+       public void 
testUnparsableLongForDescribeStreamBackoffMaxMillisInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Invalid value given for describe 
stream operation max backoff milliseconds");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, 
"unparsableLong");
+
+               KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+       }
+
+       @Test
+       public void 
testUnparsableDoubleForDescribeStreamBackoffExponentialConstantInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Invalid value given for describe 
stream operation backoff exponential constant");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
 "unparsableDouble");
+
+               KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+       }
+
+       @Test
+       public void testUnparsableIntForGetRecordsRetriesInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Invalid value given for maximum retry 
attempts for getRecords shard operation");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, 
"unparsableInt");
+
+               KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+       }
+
+       @Test
+       public void testUnparsableIntForGetRecordsMaxCountInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Invalid value given for maximum 
records per getRecords shard operation");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, 
"unparsableInt");
+
+               KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+       }
+
+       @Test
+       public void testUnparsableLongForGetRecordsBackoffBaseMillisInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Invalid value given for get records 
operation base backoff milliseconds");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, 
"unparsableLong");
+
+               KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+       }
+
+       @Test
+       public void testUnparsableLongForGetRecordsBackoffMaxMillisInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Invalid value given for get records 
operation max backoff milliseconds");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, 
"unparsableLong");
+
+               KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+       }
+
+       @Test
+       public void 
testUnparsableDoubleForGetRecordsBackoffExponentialConstantInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Invalid value given for get records 
operation backoff exponential constant");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
 "unparsableDouble");
+
+               KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+       }
+
+       @Test
+       public void testUnparsableLongForGetRecordsIntervalMillisInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Invalid value given for getRecords 
sleep interval in milliseconds");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
 "unparsableLong");
+
+               KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+       }
+
+       @Test
+       public void testUnparsableIntForGetShardIteratorRetriesInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Invalid value given for maximum retry 
attempts for getShardIterator shard operation");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, 
"unparsableInt");
+
+               KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+       }
+
+       @Test
+       public void 
testUnparsableLongForGetShardIteratorBackoffBaseMillisInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Invalid value given for get shard 
iterator operation base backoff milliseconds");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, 
"unparsableLong");
+
+               KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+       }
+
+       @Test
+       public void 
testUnparsableLongForGetShardIteratorBackoffMaxMillisInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Invalid value given for get shard 
iterator operation max backoff milliseconds");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, 
"unparsableLong");
+
+               KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+       }
+
+       @Test
+       public void 
testUnparsableDoubleForGetShardIteratorBackoffExponentialConstantInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Invalid value given for get shard 
iterator operation backoff exponential constant");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
 "unparsableDouble");
+
+               KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+       }
+
+       @Test
+       public void testUnparsableLongForShardDiscoveryIntervalMillisInConfig() 
{
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Invalid value given for shard 
discovery sleep interval in milliseconds");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, 
"unparsableLong");
+
+               KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+       }
+
+       // 
----------------------------------------------------------------------
+       // FlinkKinesisConsumer.validateProducerConfiguration() tests
+       // 
----------------------------------------------------------------------
+
+       @Test
+       public void testUnparsableLongForCollectionMaxCountInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Invalid value given for maximum number 
of items to pack into a PutRecords request");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ProducerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT, 
"unparsableLong");
+
+               KinesisConfigUtil.validateProducerConfiguration(testConfig);
+       }
+
+       @Test
+       public void testUnparsableLongForAggregationMaxCountInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Invalid value given for maximum number 
of items to pack into an aggregated record");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ProducerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT, 
"unparsableLong");
+
+               KinesisConfigUtil.validateProducerConfiguration(testConfig);
+       }
+
+       // 
----------------------------------------------------------------------
+       // Tests related to state initialization
+       // 
----------------------------------------------------------------------
+
+       @Test
+       public void testSnapshotStateShouldBeNullIfSourceNotOpened() throws 
Exception {
+               Properties config = new Properties();
+               config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+               config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+
+               FlinkKinesisConsumer<String> consumer = new 
FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config);
+
+               assertTrue(consumer.snapshotState(123, 123) == null); 
//arbitrary checkpoint id and timestamp
+       }
+
+       @Test
+       public void testSnapshotStateShouldBeNullIfSourceNotRun() throws 
Exception {
+               Properties config = new Properties();
+               config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+               config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+
+               FlinkKinesisConsumer<String> consumer = new 
FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config);
+               consumer.open(new Configuration()); // only opened, not run
+
+               assertTrue(consumer.snapshotState(123, 123) == null); 
//arbitrary checkpoint id and timestamp
+       }
+
+       // 
----------------------------------------------------------------------
+       // Tests related to fetcher initialization
+       // 
----------------------------------------------------------------------
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void 
testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoint() throws 
Exception {
+               KinesisDataFetcher mockedFetcher = 
Mockito.mock(KinesisDataFetcher.class);
+               
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
+
+               // assume the given config is correct
+               PowerMockito.mockStatic(KinesisConfigUtil.class);
+               PowerMockito.doNothing().when(KinesisConfigUtil.class);
+
+               TestableFlinkKinesisConsumer consumer = new 
TestableFlinkKinesisConsumer(
+                       "fakeStream", new Properties(), 10, 2);
+               consumer.open(new Configuration());
+               consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
+
+               Mockito.verify(mockedFetcher).setIsRestoringFromFailure(false);
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void 
testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws Exception {
+               KinesisDataFetcher mockedFetcher = 
Mockito.mock(KinesisDataFetcher.class);
+               
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
+
+               // assume the given config is correct
+               PowerMockito.mockStatic(KinesisConfigUtil.class);
+               PowerMockito.doNothing().when(KinesisConfigUtil.class);
+
+               HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = 
new HashMap<>();
+               fakeRestoredState.put(
+                       new KinesisStreamShard("fakeStream1",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+                       new SequenceNumber(UUID.randomUUID().toString()));
+               fakeRestoredState.put(
+                       new KinesisStreamShard("fakeStream1",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+                       new SequenceNumber(UUID.randomUUID().toString()));
+               fakeRestoredState.put(
+                       new KinesisStreamShard("fakeStream1",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
+                       new SequenceNumber(UUID.randomUUID().toString()));
+               fakeRestoredState.put(
+                       new KinesisStreamShard("fakeStream2",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+                       new SequenceNumber(UUID.randomUUID().toString()));
+               fakeRestoredState.put(
+                       new KinesisStreamShard("fakeStream2",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+                       new SequenceNumber(UUID.randomUUID().toString()));
+
+               TestableFlinkKinesisConsumer consumer = new 
TestableFlinkKinesisConsumer(
+                       "fakeStream", new Properties(), 10, 2);
+               consumer.restoreState(fakeRestoredState);
+               consumer.open(new Configuration());
+               consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
+
+               Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true);
+               for (Map.Entry<KinesisStreamShard, SequenceNumber> 
restoredShard : fakeRestoredState.entrySet()) {
+                       
Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream(
+                               restoredShard.getKey().getStreamName(), 
restoredShard.getKey().getShard().getShardId());
+                       
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
+                               new 
KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
new file mode 100644
index 0000000..e79f9b1
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -0,0 +1,510 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import 
org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
+import 
org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
+import 
org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcher;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(TestableKinesisDataFetcher.class)
+public class KinesisDataFetcherTest {
+
+       @Test(expected = RuntimeException.class)
+       public void testIfNoShardsAreFoundShouldThrowException() throws 
Exception {
+               List<String> fakeStreams = new LinkedList<>();
+               fakeStreams.add("fakeStream1");
+               fakeStreams.add("fakeStream2");
+
+               HashMap<String, String> 
subscribedStreamsToLastSeenShardIdsUnderTest =
+                       
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams);
+
+               TestableKinesisDataFetcher fetcher =
+                       new TestableKinesisDataFetcher(
+                               fakeStreams,
+                               new Properties(),
+                               10,
+                               2,
+                               new AtomicReference<Throwable>(),
+                               new LinkedList<KinesisStreamShardState>(),
+                               subscribedStreamsToLastSeenShardIdsUnderTest,
+                               
FakeKinesisBehavioursFactory.noShardsFoundForRequestedStreamsBehaviour());
+
+               fetcher.setIsRestoringFromFailure(false); // not restoring
+
+               fetcher.runFetcher(); // this should throw RuntimeException
+       }
+
+       @Test
+       public void 
testStreamToLastSeenShardStateIsCorrectlySetWhenNotRestoringFromFailure() 
throws Exception {
+               List<String> fakeStreams = new LinkedList<>();
+               fakeStreams.add("fakeStream1");
+               fakeStreams.add("fakeStream2");
+               fakeStreams.add("fakeStream3");
+               fakeStreams.add("fakeStream4");
+
+               HashMap<String, String> 
subscribedStreamsToLastSeenShardIdsUnderTest =
+                       
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams);
+
+               Map<String,Integer> streamToShardCount = new HashMap<>();
+               Random rand = new Random();
+               for (String fakeStream : fakeStreams) {
+                       streamToShardCount.put(fakeStream, rand.nextInt(5)+1);
+               }
+
+               final TestableKinesisDataFetcher fetcher =
+                       new TestableKinesisDataFetcher(
+                               fakeStreams,
+                               new Properties(),
+                               10,
+                               2,
+                               new AtomicReference<Throwable>(),
+                               new LinkedList<KinesisStreamShardState>(),
+                               subscribedStreamsToLastSeenShardIdsUnderTest,
+                               
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
+
+               fetcher.setIsRestoringFromFailure(false);
+
+               
PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class));
+               Thread runFetcherThread = new Thread(new Runnable() {
+                       @Override
+                       public void run() {
+                               try {
+                                       fetcher.runFetcher();
+                               } catch (Exception e) {
+                                       //
+                               }
+                       }
+               });
+               runFetcherThread.start();
+               Thread.sleep(1000); // sleep a while before closing
+               fetcher.shutdownFetcher();
+
+
+               // assert that the streams tracked in the state are identical 
to the subscribed streams
+               Set<String> streamsInState = 
subscribedStreamsToLastSeenShardIdsUnderTest.keySet();
+               assertTrue(streamsInState.size() == fakeStreams.size());
+               assertTrue(streamsInState.containsAll(fakeStreams));
+
+               // assert that the last seen shards in state is correctly set
+               for (Map.Entry<String,String> streamToLastSeenShard : 
subscribedStreamsToLastSeenShardIdsUnderTest.entrySet()) {
+                       assertTrue(
+                               streamToLastSeenShard.getValue().equals(
+                                       
KinesisShardIdGenerator.generateFromShardOrder(streamToShardCount.get(streamToLastSeenShard.getKey())-1)));
+               }
+       }
+
+       @Test
+       public void 
testStreamToLastSeenShardStateIsCorrectlySetWhenNoNewShardsSinceRestoredCheckpoint()
 throws Exception {
+               List<String> fakeStreams = new LinkedList<>();
+               fakeStreams.add("fakeStream1");
+               fakeStreams.add("fakeStream2");
+
+               Map<KinesisStreamShard, String> restoredStateUnderTest = new 
HashMap<>();
+
+               // fakeStream1 has 3 shards before restore
+               restoredStateUnderTest.put(
+                       new KinesisStreamShard(
+                               "fakeStream1",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+                       UUID.randomUUID().toString());
+               restoredStateUnderTest.put(
+                       new KinesisStreamShard(
+                               "fakeStream1",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+                       UUID.randomUUID().toString());
+               restoredStateUnderTest.put(
+                       new KinesisStreamShard(
+                               "fakeStream1",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
+                       UUID.randomUUID().toString());
+
+               // fakeStream2 has 2 shards before restore
+               restoredStateUnderTest.put(
+                       new KinesisStreamShard(
+                               "fakeStream2",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+                       UUID.randomUUID().toString());
+               restoredStateUnderTest.put(
+                       new KinesisStreamShard(
+                               "fakeStream2",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+                       UUID.randomUUID().toString());
+
+               Map<String,Integer> streamToShardCount = new HashMap<>();
+               streamToShardCount.put("fakeStream1", 3); // fakeStream1 will 
still have 3 shards after restore
+               streamToShardCount.put("fakeStream2", 2); // fakeStream2 will 
still have 2 shards after restore
+
+               HashMap<String, String> 
subscribedStreamsToLastSeenShardIdsUnderTest =
+                       
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams);
+
+               final TestableKinesisDataFetcher fetcher =
+                       new TestableKinesisDataFetcher(
+                               fakeStreams,
+                               new Properties(),
+                               10,
+                               2,
+                               new AtomicReference<Throwable>(),
+                               new LinkedList<KinesisStreamShardState>(),
+                               subscribedStreamsToLastSeenShardIdsUnderTest,
+                               
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
+
+               for (Map.Entry<KinesisStreamShard, String> restoredState : 
restoredStateUnderTest.entrySet()) {
+                       
fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(),
 restoredState.getKey().getShard().getShardId());
+                       fetcher.registerNewSubscribedShardState(
+                               new 
KinesisStreamShardState(restoredState.getKey(), new 
SequenceNumber(restoredState.getValue())));
+               }
+
+               fetcher.setIsRestoringFromFailure(true);
+
+               
PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class));
+               Thread runFetcherThread = new Thread(new Runnable() {
+                       @Override
+                       public void run() {
+                               try {
+                                       fetcher.runFetcher();
+                               } catch (Exception e) {
+                                       //
+                               }
+                       }
+               });
+               runFetcherThread.start();
+               Thread.sleep(1000); // sleep a while before closing
+               fetcher.shutdownFetcher();
+
+               // assert that the streams tracked in the state are identical 
to the subscribed streams
+               Set<String> streamsInState = 
subscribedStreamsToLastSeenShardIdsUnderTest.keySet();
+               assertTrue(streamsInState.size() == fakeStreams.size());
+               assertTrue(streamsInState.containsAll(fakeStreams));
+
+               // assert that the last seen shards in state is correctly set
+               for (Map.Entry<String,String> streamToLastSeenShard : 
subscribedStreamsToLastSeenShardIdsUnderTest.entrySet()) {
+                       assertTrue(
+                               streamToLastSeenShard.getValue().equals(
+                                       
KinesisShardIdGenerator.generateFromShardOrder(streamToShardCount.get(streamToLastSeenShard.getKey())-1)));
+               }
+       }
+
+       @Test
+       public void 
testStreamToLastSeenShardStateIsCorrectlySetWhenNewShardsFoundSinceRestoredCheckpoint()
 throws Exception {
+               List<String> fakeStreams = new LinkedList<>();
+               fakeStreams.add("fakeStream1");
+               fakeStreams.add("fakeStream2");
+
+               Map<KinesisStreamShard, String> restoredStateUnderTest = new 
HashMap<>();
+
+               // fakeStream1 has 3 shards before restore
+               restoredStateUnderTest.put(
+                       new KinesisStreamShard(
+                               "fakeStream1",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+                       UUID.randomUUID().toString());
+               restoredStateUnderTest.put(
+                       new KinesisStreamShard(
+                               "fakeStream1",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+                       UUID.randomUUID().toString());
+               restoredStateUnderTest.put(
+                       new KinesisStreamShard(
+                               "fakeStream1",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
+                       UUID.randomUUID().toString());
+
+               // fakeStream2 has 2 shards before restore
+               restoredStateUnderTest.put(
+                       new KinesisStreamShard(
+                               "fakeStream2",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+                       UUID.randomUUID().toString());
+               restoredStateUnderTest.put(
+                       new KinesisStreamShard(
+                               "fakeStream2",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+                       UUID.randomUUID().toString());
+
+               Map<String,Integer> streamToShardCount = new HashMap<>();
+               streamToShardCount.put("fakeStream1", 3+1); // fakeStream1 had 
3 shards before & 1 new shard after restore
+               streamToShardCount.put("fakeStream2", 2+3); // fakeStream2 had 
2 shards before & 3 new shard after restore
+
+               HashMap<String, String> 
subscribedStreamsToLastSeenShardIdsUnderTest =
+                       
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams);
+
+               // using a non-resharded streams kinesis behaviour to represent 
that Kinesis is not resharded AFTER the restore
+               final TestableKinesisDataFetcher fetcher =
+                       new TestableKinesisDataFetcher(
+                               fakeStreams,
+                               new Properties(),
+                               10,
+                               2,
+                               new AtomicReference<Throwable>(),
+                               new LinkedList<KinesisStreamShardState>(),
+                               subscribedStreamsToLastSeenShardIdsUnderTest,
+                               
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
+
+               for (Map.Entry<KinesisStreamShard, String> restoredState : 
restoredStateUnderTest.entrySet()) {
+                       
fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(),
 restoredState.getKey().getShard().getShardId());
+                       fetcher.registerNewSubscribedShardState(
+                               new 
KinesisStreamShardState(restoredState.getKey(), new 
SequenceNumber(restoredState.getValue())));
+               }
+
+               fetcher.setIsRestoringFromFailure(true);
+
+               
PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class));
+               Thread runFetcherThread = new Thread(new Runnable() {
+                       @Override
+                       public void run() {
+                               try {
+                                       fetcher.runFetcher();
+                               } catch (Exception e) {
+                                       //
+                               }
+                       }
+               });
+               runFetcherThread.start();
+               Thread.sleep(1000); // sleep a while before closing
+               fetcher.shutdownFetcher();
+
+               // assert that the streams tracked in the state are identical 
to the subscribed streams
+               Set<String> streamsInState = 
subscribedStreamsToLastSeenShardIdsUnderTest.keySet();
+               assertTrue(streamsInState.size() == fakeStreams.size());
+               assertTrue(streamsInState.containsAll(fakeStreams));
+
+               // assert that the last seen shards in state is correctly set
+               for (Map.Entry<String,String> streamToLastSeenShard : 
subscribedStreamsToLastSeenShardIdsUnderTest.entrySet()) {
+                       assertTrue(
+                               streamToLastSeenShard.getValue().equals(
+                                       
KinesisShardIdGenerator.generateFromShardOrder(streamToShardCount.get(streamToLastSeenShard.getKey())-1)));
+               }
+       }
+
+       @Test
+       public void 
testStreamToLastSeenShardStateIsCorrectlySetWhenNoNewShardsSinceRestoredCheckpointAndSomeStreamsDoNotExist()
 throws Exception {
+               List<String> fakeStreams = new LinkedList<>();
+               fakeStreams.add("fakeStream1");
+               fakeStreams.add("fakeStream2");
+               fakeStreams.add("fakeStream3"); // fakeStream3 will not have 
any shards
+               fakeStreams.add("fakeStream4"); // fakeStream4 will not have 
any shards
+
+               Map<KinesisStreamShard, String> restoredStateUnderTest = new 
HashMap<>();
+
+               // fakeStream1 has 3 shards before restore
+               restoredStateUnderTest.put(
+                       new KinesisStreamShard(
+                               "fakeStream1",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+                       UUID.randomUUID().toString());
+               restoredStateUnderTest.put(
+                       new KinesisStreamShard(
+                               "fakeStream1",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+                       UUID.randomUUID().toString());
+               restoredStateUnderTest.put(
+                       new KinesisStreamShard(
+                               "fakeStream1",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
+                       UUID.randomUUID().toString());
+
+               // fakeStream2 has 2 shards before restore
+               restoredStateUnderTest.put(
+                       new KinesisStreamShard(
+                               "fakeStream2",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+                       UUID.randomUUID().toString());
+               restoredStateUnderTest.put(
+                       new KinesisStreamShard(
+                               "fakeStream2",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+                       UUID.randomUUID().toString());
+
+               Map<String,Integer> streamToShardCount = new HashMap<>();
+               streamToShardCount.put("fakeStream1", 3); // fakeStream1 has 
fixed 3 shards
+               streamToShardCount.put("fakeStream2", 2); // fakeStream2 has 
fixed 2 shards
+               streamToShardCount.put("fakeStream3", 0); // no shards can be 
found for fakeStream3
+               streamToShardCount.put("fakeStream4", 0); // no shards can be 
found for fakeStream4
+
+               HashMap<String, String> 
subscribedStreamsToLastSeenShardIdsUnderTest =
+                       
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams);
+
+               // using a non-resharded streams kinesis behaviour to represent 
that Kinesis is not resharded AFTER the restore
+               final TestableKinesisDataFetcher fetcher =
+                       new TestableKinesisDataFetcher(
+                               fakeStreams,
+                               new Properties(),
+                               10,
+                               2,
+                               new AtomicReference<Throwable>(),
+                               new LinkedList<KinesisStreamShardState>(),
+                               subscribedStreamsToLastSeenShardIdsUnderTest,
+                               
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
+
+               for (Map.Entry<KinesisStreamShard, String> restoredState : 
restoredStateUnderTest.entrySet()) {
+                       
fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(),
 restoredState.getKey().getShard().getShardId());
+                       fetcher.registerNewSubscribedShardState(
+                               new 
KinesisStreamShardState(restoredState.getKey(), new 
SequenceNumber(restoredState.getValue())));
+               }
+
+               fetcher.setIsRestoringFromFailure(true);
+
+               
PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class));
+               Thread runFetcherThread = new Thread(new Runnable() {
+                       @Override
+                       public void run() {
+                               try {
+                                       fetcher.runFetcher();
+                               } catch (Exception e) {
+                                       //
+                               }
+                       }
+               });
+               runFetcherThread.start();
+               Thread.sleep(1000); // sleep a while before closing
+               fetcher.shutdownFetcher();
+
+               // assert that the streams tracked in the state are identical 
to the subscribed streams
+               Set<String> streamsInState = 
subscribedStreamsToLastSeenShardIdsUnderTest.keySet();
+               assertTrue(streamsInState.size() == fakeStreams.size());
+               assertTrue(streamsInState.containsAll(fakeStreams));
+
+               // assert that the last seen shards in state is correctly set
+               
assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream1").equals(
+                       KinesisShardIdGenerator.generateFromShardOrder(2)));
+               
assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream2").equals(
+                       KinesisShardIdGenerator.generateFromShardOrder(1)));
+               
assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream3") == 
null);
+               
assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream4") == 
null);
+       }
+
+       @Test
+       public void 
testStreamToLastSeenShardStateIsCorrectlySetWhenNewShardsFoundSinceRestoredCheckpointAndSomeStreamsDoNotExist()
 throws Exception {
+               List<String> fakeStreams = new LinkedList<>();
+               fakeStreams.add("fakeStream1");
+               fakeStreams.add("fakeStream2");
+               fakeStreams.add("fakeStream3"); // fakeStream3 will not have 
any shards
+               fakeStreams.add("fakeStream4"); // fakeStream4 will not have 
any shards
+
+               Map<KinesisStreamShard, String> restoredStateUnderTest = new 
HashMap<>();
+
+               // fakeStream1 has 3 shards before restore
+               restoredStateUnderTest.put(
+                       new KinesisStreamShard(
+                               "fakeStream1",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+                       UUID.randomUUID().toString());
+               restoredStateUnderTest.put(
+                       new KinesisStreamShard(
+                               "fakeStream1",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+                       UUID.randomUUID().toString());
+               restoredStateUnderTest.put(
+                       new KinesisStreamShard(
+                               "fakeStream1",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
+                       UUID.randomUUID().toString());
+
+               // fakeStream2 has 2 shards before restore
+               restoredStateUnderTest.put(
+                       new KinesisStreamShard(
+                               "fakeStream2",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+                       UUID.randomUUID().toString());
+               restoredStateUnderTest.put(
+                       new KinesisStreamShard(
+                               "fakeStream2",
+                               new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+                       UUID.randomUUID().toString());
+
+               Map<String,Integer> streamToShardCount = new HashMap<>();
+               streamToShardCount.put("fakeStream1", 3+1); // fakeStream1 had 
3 shards before & 1 new shard after restore
+               streamToShardCount.put("fakeStream2", 2+3); // fakeStream2 had 
2 shards before & 2 new shard after restore
+               streamToShardCount.put("fakeStream3", 0); // no shards can be 
found for fakeStream3
+               streamToShardCount.put("fakeStream4", 0); // no shards can be 
found for fakeStream4
+
+               HashMap<String, String> 
subscribedStreamsToLastSeenShardIdsUnderTest =
+                       
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams);
+
+               // using a non-resharded streams kinesis behaviour to represent 
that Kinesis is not resharded AFTER the restore
+               final TestableKinesisDataFetcher fetcher =
+                       new TestableKinesisDataFetcher(
+                               fakeStreams,
+                               new Properties(),
+                               10,
+                               2,
+                               new AtomicReference<Throwable>(),
+                               new LinkedList<KinesisStreamShardState>(),
+                               subscribedStreamsToLastSeenShardIdsUnderTest,
+                               
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
+
+               for (Map.Entry<KinesisStreamShard, String> restoredState : 
restoredStateUnderTest.entrySet()) {
+                       
fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(),
 restoredState.getKey().getShard().getShardId());
+                       fetcher.registerNewSubscribedShardState(
+                               new 
KinesisStreamShardState(restoredState.getKey(), new 
SequenceNumber(restoredState.getValue())));
+               }
+
+               fetcher.setIsRestoringFromFailure(true);
+
+               
PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class));
+               Thread runFetcherThread = new Thread(new Runnable() {
+                       @Override
+                       public void run() {
+                               try {
+                                       fetcher.runFetcher();
+                               } catch (Exception e) {
+                                       //
+                               }
+                       }
+               });
+               runFetcherThread.start();
+               Thread.sleep(1000); // sleep a while before closing
+               fetcher.shutdownFetcher();
+
+               // assert that the streams tracked in the state are identical 
to the subscribed streams
+               Set<String> streamsInState = 
subscribedStreamsToLastSeenShardIdsUnderTest.keySet();
+               assertTrue(streamsInState.size() == fakeStreams.size());
+               assertTrue(streamsInState.containsAll(fakeStreams));
+
+               // assert that the last seen shards in state is correctly set
+               
assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream1").equals(
+                       KinesisShardIdGenerator.generateFromShardOrder(3)));
+               
assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream2").equals(
+                       KinesisShardIdGenerator.generateFromShardOrder(4)));
+               
assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream3") == 
null);
+               
assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream4") == 
null);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
new file mode 100644
index 0000000..96764a4
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import 
org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
+import 
org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
+import 
org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcher;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.math.BigInteger;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertTrue;
+
+public class ShardConsumerTest {
+
+       @Test
+       public void testCorrectNumOfCollectedRecordsAndUpdatedState() {
+               KinesisStreamShard fakeToBeConsumedShard = new 
KinesisStreamShard(
+                       "fakeStream",
+                       new Shard()
+                               
.withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))
+                               .withHashKeyRange(
+                                       new HashKeyRange()
+                                               .withStartingHashKey("0")
+                                               .withEndingHashKey(new 
BigInteger(StringUtils.repeat("FF", 16), 16).toString())));
+
+               LinkedList<KinesisStreamShardState> 
subscribedShardsStateUnderTest = new LinkedList<>();
+               subscribedShardsStateUnderTest.add(
+                       new KinesisStreamShardState(fakeToBeConsumedShard, new 
SequenceNumber("fakeStartingState")));
+
+               TestableKinesisDataFetcher fetcher =
+                       new TestableKinesisDataFetcher(
+                               Collections.singletonList("fakeStream"),
+                               new Properties(),
+                               10,
+                               2,
+                               new AtomicReference<Throwable>(),
+                               subscribedShardsStateUnderTest,
+                               
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
+                               Mockito.mock(KinesisProxyInterface.class));
+
+               new ShardConsumer<>(
+                       fetcher,
+                       0,
+                       
subscribedShardsStateUnderTest.get(0).getKinesisStreamShard(),
+                       
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
+                       
FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 
9)).run();
+
+               assertTrue(fetcher.getNumOfElementsCollected() == 1000);
+               
assertTrue(subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum().equals(
+                       
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()));
+       }
+
+       @Test
+       public void 
testCorrectNumOfCollectedRecordsAndUpdatedStateWithUnexpectedExpiredIterator() {
+               KinesisStreamShard fakeToBeConsumedShard = new 
KinesisStreamShard(
+                       "fakeStream",
+                       new Shard()
+                               
.withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))
+                               .withHashKeyRange(
+                                       new HashKeyRange()
+                                               .withStartingHashKey("0")
+                                               .withEndingHashKey(new 
BigInteger(StringUtils.repeat("FF", 16), 16).toString())));
+
+               LinkedList<KinesisStreamShardState> 
subscribedShardsStateUnderTest = new LinkedList<>();
+               subscribedShardsStateUnderTest.add(
+                       new KinesisStreamShardState(fakeToBeConsumedShard, new 
SequenceNumber("fakeStartingState")));
+
+               TestableKinesisDataFetcher fetcher =
+                       new TestableKinesisDataFetcher(
+                               Collections.singletonList("fakeStream"),
+                               new Properties(),
+                               10,
+                               2,
+                               new AtomicReference<Throwable>(),
+                               subscribedShardsStateUnderTest,
+                               
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
+                               Mockito.mock(KinesisProxyInterface.class));
+
+               new ShardConsumer<>(
+                       fetcher,
+                       0,
+                       
subscribedShardsStateUnderTest.get(0).getKinesisStreamShard(),
+                       
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
+                       // Get a total of 1000 records with 9 getRecords() 
calls,
+                       // and the 7th getRecords() call will encounter an 
unexpected expired shard iterator
+                       
FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(1000,
 9, 7)).run();
+
+               assertTrue(fetcher.getNumOfElementsCollected() == 1000);
+               
assertTrue(subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum().equals(
+                       
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()));
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
new file mode 100644
index 0000000..6e02a55
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.manualtests;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.util.Collector;
+
+import java.nio.ByteBuffer;
+import java.util.Properties;
+
+/**
+ * This is a manual test for the AWS Kinesis connector in Flink.
+ *
+ * It uses:
+ *  - A custom KinesisSerializationSchema
+ *  - A custom KinesisPartitioner
+ *
+ * Invocation:
+ * --region eu-central-1 --accessKey XXXXXXXXXXXX --secretKey XXXXXXXXXXXXXXXX
+ */
+public class ManualConsumerProducerTest {
+
+       public static void main(String[] args) throws Exception {
+               ParameterTool pt = ParameterTool.fromArgs(args);
+
+               StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               see.setParallelism(4);
+
+               DataStream<String> simpleStringStream = see.addSource(new 
ProduceIntoKinesis.EventsGenerator());
+
+               Properties kinesisProducerConfig = new Properties();
+               
kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, 
pt.getRequired("region"));
+               
kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, 
pt.getRequired("accessKey"));
+               
kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY,
 pt.getRequired("secretKey"));
+
+               FlinkKinesisProducer<String> kinesis = new 
FlinkKinesisProducer<>(
+                               new KinesisSerializationSchema<String>() {
+                                       @Override
+                                       public ByteBuffer serialize(String 
element) {
+                                               return 
ByteBuffer.wrap(element.getBytes());
+                                       }
+
+                                       // every 10th element goes into a 
different stream
+                                       @Override
+                                       public String getTargetStream(String 
element) {
+                                               
if(element.split("-")[0].endsWith("0")) {
+                                                       return "flink-test-2";
+                                               }
+                                               return null; // send to default 
stream
+                                       }
+                               },
+                               kinesisProducerConfig
+               );
+
+               kinesis.setFailOnError(true);
+               kinesis.setDefaultStream("test-flink");
+               kinesis.setDefaultPartition("0");
+               kinesis.setCustomPartitioner(new KinesisPartitioner<String>() {
+                       @Override
+                       public String getPartitionId(String element) {
+                               int l = element.length();
+                               return element.substring(l - 1, l);
+                       }
+               });
+               simpleStringStream.addSink(kinesis);
+
+
+               // consuming topology
+               Properties consumerProps = new Properties();
+               
consumerProps.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
pt.getRequired("accessKey"));
+               
consumerProps.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
pt.getRequired("secretKey"));
+               consumerProps.setProperty(ConsumerConfigConstants.AWS_REGION, 
pt.getRequired("region"));
+               DataStream<String> consuming = see.addSource(new 
FlinkKinesisConsumer<>("test-flink", new SimpleStringSchema(), consumerProps));
+               // validate consumed records for correctness
+               consuming.flatMap(new FlatMapFunction<String, String>() {
+                       @Override
+                       public void flatMap(String value, Collector<String> 
out) throws Exception {
+                               String[] parts = value.split("-");
+                               try {
+                                       long l = Long.parseLong(parts[0]);
+                                       if(l < 0) {
+                                               throw new 
RuntimeException("Negative");
+                                       }
+                               } catch(NumberFormatException nfe) {
+                                       throw new RuntimeException("First part 
of '" + value + "' is not a valid numeric type");
+                               }
+                               if(parts[1].length() != 12) {
+                                       throw new RuntimeException("Second part 
of '" + value + "' doesn't have 12 characters");
+                               }
+                       }
+               });
+               consuming.print();
+
+               see.execute();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
new file mode 100644
index 0000000..2e452c1
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.manualtests;
+
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread;
+import 
org.apache.flink.streaming.connectors.kinesis.testutils.KinesisEventsGeneratorProducerThread;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This test first starts a data generator, producing data into kinesis.
+ * Then, it starts a consuming topology, ensuring that all records up to a 
certain
+ * point have been seen.
+ *
+ * Invocation:
+ * --region eu-central-1 --accessKey XXXXXXXXXXXX --secretKey XXXXXXXXXXXXXXXX
+ */
+public class ManualExactlyOnceTest {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ManualExactlyOnceTest.class);
+
+       static final int TOTAL_EVENT_COUNT = 1000; // the producer writes one 
per 10 ms, so it runs for 10k ms = 10 seconds
+
+       public static void main(String[] args) throws Exception {
+               final ParameterTool pt = ParameterTool.fromArgs(args);
+               LOG.info("Starting exactly once test");
+
+               final String streamName = "flink-test-" + 
UUID.randomUUID().toString();
+               final String accessKey = pt.getRequired("accessKey");
+               final String secretKey = pt.getRequired("secretKey");
+               final String region = pt.getRequired("region");
+
+               Properties configProps = new Properties();
+               configProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
accessKey);
+               
configProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, secretKey);
+               configProps.setProperty(AWSConfigConstants.AWS_REGION, region);
+               AmazonKinesisClient client = 
AWSUtil.createKinesisClient(configProps);
+
+               // create a stream for the test:
+               client.createStream(streamName, 1);
+
+               // wait until stream has been created
+               DescribeStreamResult status = client.describeStream(streamName);
+               LOG.info("status {}" ,status);
+               
while(!status.getStreamDescription().getStreamStatus().equals("ACTIVE")) {
+                       status = client.describeStream(streamName);
+                       LOG.info("Status of stream {}", status);
+                       Thread.sleep(1000);
+               }
+
+               final Configuration flinkConfig = new Configuration();
+               
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+               
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+               
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+               
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
+
+               LocalFlinkMiniCluster flink = new 
LocalFlinkMiniCluster(flinkConfig, false);
+               flink.start();
+
+               final int flinkPort = flink.getLeaderRPCPort();
+
+               try {
+                       final AtomicReference<Throwable> producerError = new 
AtomicReference<>();
+                       Thread producerThread = 
KinesisEventsGeneratorProducerThread.create(
+                               TOTAL_EVENT_COUNT, 2,
+                               accessKey, secretKey, region, streamName,
+                               producerError, flinkPort, flinkConfig);
+                       producerThread.start();
+
+                       final AtomicReference<Throwable> consumerError = new 
AtomicReference<>();
+                       Thread consumerThread = 
ExactlyOnceValidatingConsumerThread.create(
+                               TOTAL_EVENT_COUNT, 200, 2, 500, 500,
+                               accessKey, secretKey, region, streamName,
+                               consumerError, flinkPort, flinkConfig);
+                       consumerThread.start();
+
+                       boolean deadlinePassed = false;
+                       long deadline = System.currentTimeMillis() + (1000 * 2 
* 60); // wait at most for two minutes
+                       // wait until both producer and consumer finishes, or 
an unexpected error is thrown
+                       while ((consumerThread.isAlive() || 
producerThread.isAlive()) &&
+                               (producerError.get() == null && 
consumerError.get() == null)) {
+                               Thread.sleep(1000);
+                               if (System.currentTimeMillis() >= deadline) {
+                                       LOG.warn("Deadline passed");
+                                       deadlinePassed = true;
+                                       break; // enough waiting
+                               }
+                       }
+
+                       if (producerThread.isAlive()) {
+                               producerThread.interrupt();
+                       }
+
+                       if (consumerThread.isAlive()) {
+                               consumerThread.interrupt();
+                       }
+
+                       if (producerError.get() != null) {
+                               LOG.info("+++ TEST failed! +++");
+                               throw new RuntimeException("Producer failed", 
producerError.get());
+                       }
+                       if (consumerError.get() != null) {
+                               LOG.info("+++ TEST failed! +++");
+                               throw new RuntimeException("Consumer failed", 
consumerError.get());
+                       }
+
+                       if (!deadlinePassed) {
+                               LOG.info("+++ TEST passed! +++");
+                       } else {
+                               LOG.info("+++ TEST failed! +++");
+                       }
+
+               } finally {
+                       client.deleteStream(streamName);
+                       client.shutdown();
+
+                       // stopping flink
+                       flink.stop();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
new file mode 100644
index 0000000..6abea2a
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.manualtests;
+
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
+import com.amazonaws.services.kinesis.model.PutRecordsResult;
+import com.amazonaws.services.kinesis.model.PutRecordsRequest;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread;
+import 
org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This test first starts a data generator, producing data into kinesis.
+ * Then, it starts a consuming topology, ensuring that all records up to a 
certain
+ * point have been seen. While the data generator and consuming topology is 
running,
+ * the kinesis stream is resharded two times.
+ *
+ * Invocation:
+ * --region eu-central-1 --accessKey XXXXXXXXXXXX --secretKey XXXXXXXXXXXXXXXX
+ */
+public class ManualExactlyOnceWithStreamReshardingTest {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ManualExactlyOnceWithStreamReshardingTest.class);
+
+       static final int TOTAL_EVENT_COUNT = 20000; // a large enough record 
count so we can test resharding
+
+       public static void main(String[] args) throws Exception {
+               final ParameterTool pt = ParameterTool.fromArgs(args);
+               LOG.info("Starting exactly once with stream resharding test");
+
+               final String streamName = "flink-test-" + 
UUID.randomUUID().toString();
+               final String accessKey = pt.getRequired("accessKey");
+               final String secretKey = pt.getRequired("secretKey");
+               final String region = pt.getRequired("region");
+
+               final Properties configProps = new Properties();
+               
configProps.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, accessKey);
+               
configProps.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
secretKey);
+               configProps.setProperty(ConsumerConfigConstants.AWS_REGION, 
region);
+               
configProps.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
 "0");
+               final AmazonKinesisClient client = 
AWSUtil.createKinesisClient(configProps);
+
+               // the stream is first created with 1 shard
+               client.createStream(streamName, 1);
+
+               // wait until stream has been created
+               DescribeStreamResult status = client.describeStream(streamName);
+               LOG.info("status {}", status);
+               
while(!status.getStreamDescription().getStreamStatus().equals("ACTIVE")) {
+                       status = client.describeStream(streamName);
+                       LOG.info("Status of stream {}", status);
+                       Thread.sleep(1000);
+               }
+
+               final Configuration flinkConfig = new Configuration();
+               
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+               
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+               
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+               
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
+
+               LocalFlinkMiniCluster flink = new 
LocalFlinkMiniCluster(flinkConfig, false);
+               flink.start();
+
+               final int flinkPort = flink.getLeaderRPCPort();
+
+               try {
+                       // we have to use a manual generator here instead of 
the FlinkKinesisProducer
+                       // because the FlinkKinesisProducer currently has a 
problem where records will be resent to a shard
+                       // when resharding happens; this affects the consumer 
exactly-once validation test and will never pass
+                       final AtomicReference<Throwable> producerError = new 
AtomicReference<>();
+                       Runnable manualGenerate = new Runnable() {
+                               @Override
+                               public void run() {
+                                       AmazonKinesisClient client = 
AWSUtil.createKinesisClient(configProps);
+                                       int count = 0;
+                                       final int batchSize = 30;
+                                       while (true) {
+                                               try {
+                                                       Thread.sleep(10);
+
+                                                       
Set<PutRecordsRequestEntry> batch = new HashSet<>();
+                                                       for (int i=count; 
i<count+batchSize; i++) {
+                                                               if (i >= 
TOTAL_EVENT_COUNT) {
+                                                                       break;
+                                                               }
+                                                               batch.add(
+                                                                       new 
PutRecordsRequestEntry()
+                                                                               
.withData(ByteBuffer.wrap(((i) + "-" + 
RandomStringUtils.randomAlphabetic(12)).getBytes()))
+                                                                               
.withPartitionKey(UUID.randomUUID().toString()));
+                                                       }
+                                                       count += batchSize;
+
+                                                       PutRecordsResult result 
= client.putRecords(new 
PutRecordsRequest().withStreamName(streamName).withRecords(batch));
+
+                                                       // the putRecords() 
operation may have failing records; to keep this test simple
+                                                       // instead of retrying 
on failed records, we simply pass on a runtime exception
+                                                       // and let this test 
fail
+                                                       if 
(result.getFailedRecordCount() > 0) {
+                                                               
producerError.set(new RuntimeException("The producer has failed records in one 
of the put batch attempts."));
+                                                               break;
+                                                       }
+
+                                                       if (count >= 
TOTAL_EVENT_COUNT) {
+                                                               break;
+                                                       }
+                                               } catch (Exception e) {
+                                                       producerError.set(e);
+                                               }
+                                       }
+                               }
+                       };
+                       Thread producerThread = new Thread(manualGenerate);
+                       producerThread.start();
+
+                       final AtomicReference<Throwable> consumerError = new 
AtomicReference<>();
+                       Thread consumerThread = 
ExactlyOnceValidatingConsumerThread.create(
+                               TOTAL_EVENT_COUNT, 10000, 2, 500, 500,
+                               accessKey, secretKey, region, streamName,
+                               consumerError, flinkPort, flinkConfig);
+                       consumerThread.start();
+
+                       // reshard the Kinesis stream while the producer / and 
consumers are running
+                       Runnable splitShard = new Runnable() {
+                               @Override
+                               public void run() {
+                                       try {
+                                               // first, split shard in the 
middle of the hash range
+                                               Thread.sleep(5000);
+                                               LOG.info("Splitting shard ...");
+                                               client.splitShard(
+                                                       streamName,
+                                                       
KinesisShardIdGenerator.generateFromShardOrder(0),
+                                                       
"170141183460469231731687303715884105727");
+
+                                               // wait until the split shard 
operation finishes updating ...
+                                               DescribeStreamResult status;
+                                               Random rand = new Random();
+                                               do {
+                                                       status = null;
+                                                       while (status == null) {
+                                                               // retry until 
we get status
+                                                               try {
+                                                                       status 
= client.describeStream(streamName);
+                                                               } catch 
(LimitExceededException lee) {
+                                                                       
LOG.warn("LimitExceededException while describing stream ... retrying ...");
+                                                                       
Thread.sleep(rand.nextInt(1200));
+                                                               }
+                                                       }
+                                               } while 
(!status.getStreamDescription().getStreamStatus().equals("ACTIVE"));
+
+                                               // then merge again
+                                               Thread.sleep(7000);
+                                               LOG.info("Merging shards ...");
+                                               client.mergeShards(
+                                                       streamName,
+                                                       
KinesisShardIdGenerator.generateFromShardOrder(1),
+                                                       
KinesisShardIdGenerator.generateFromShardOrder(2));
+                                       } catch (InterruptedException iex) {
+                                               //
+                                       }
+                               }
+                       };
+                       Thread splitShardThread = new Thread(splitShard);
+                       splitShardThread.start();
+
+                       boolean deadlinePassed = false;
+                       long deadline = System.currentTimeMillis() + (1000 * 5 
* 60); // wait at most for five minutes
+                       // wait until both producer and consumer finishes, or 
an unexpected error is thrown
+                       while ((consumerThread.isAlive() || 
producerThread.isAlive()) &&
+                               (producerError.get() == null && 
consumerError.get() == null)) {
+                               Thread.sleep(1000);
+                               if (System.currentTimeMillis() >= deadline) {
+                                       LOG.warn("Deadline passed");
+                                       deadlinePassed = true;
+                                       break; // enough waiting
+                               }
+                       }
+
+                       if (producerThread.isAlive()) {
+                               producerThread.interrupt();
+                       }
+
+                       if (consumerThread.isAlive()) {
+                               consumerThread.interrupt();
+                       }
+
+                       if (producerError.get() != null) {
+                               LOG.info("+++ TEST failed! +++");
+                               throw new RuntimeException("Producer failed", 
producerError.get());
+
+                       }
+
+                       if (consumerError.get() != null) {
+                               LOG.info("+++ TEST failed! +++");
+                               throw new RuntimeException("Consumer failed", 
consumerError.get());
+                       }
+
+                       if (!deadlinePassed) {
+                               LOG.info("+++ TEST passed! +++");
+                       } else {
+                               LOG.info("+++ TEST failed! +++");
+                       }
+
+               } finally {
+                       client.deleteStream(streamName);
+                       client.shutdown();
+
+                       // stopping flink
+                       flink.stop();
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
new file mode 100644
index 0000000..35e9ef6
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.manualtests;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
+
+import java.nio.ByteBuffer;
+import java.util.Properties;
+
+/**
+ * This is a manual test for the AWS Kinesis connector in Flink.
+ *
+ * It uses:
+ *  - A custom KinesisSerializationSchema
+ *  - A custom KinesisPartitioner
+ *
+ *  The streams "test-flink" and "flink-test-2" must exist.
+ *
+ * Invocation:
+ * --region eu-central-1 --accessKey XXXXXXXXXXXX --secretKey XXXXXXXXXXXXXXXX
+ */
+public class ManualProducerTest {
+
+       public static void main(String[] args) throws Exception {
+               ParameterTool pt = ParameterTool.fromArgs(args);
+
+               StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               see.setParallelism(4);
+
+               DataStream<String> simpleStringStream = see.addSource(new 
ProduceIntoKinesis.EventsGenerator());
+
+               Properties kinesisProducerConfig = new Properties();
+               
kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, 
pt.getRequired("region"));
+               
kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, 
pt.getRequired("accessKey"));
+               
kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY,
 pt.getRequired("secretKey"));
+
+               FlinkKinesisProducer<String> kinesis = new 
FlinkKinesisProducer<>(
+                               new KinesisSerializationSchema<String>() {
+                                       @Override
+                                       public ByteBuffer serialize(String 
element) {
+                                               return 
ByteBuffer.wrap(element.getBytes());
+                                       }
+
+                                       // every 10th element goes into a 
different stream
+                                       @Override
+                                       public String getTargetStream(String 
element) {
+                                               
if(element.split("-")[0].endsWith("0")) {
+                                                       return "flink-test-2";
+                                               }
+                                               return null; // send to default 
stream
+                                       }
+                               },
+                               kinesisProducerConfig
+               );
+
+               kinesis.setFailOnError(true);
+               kinesis.setDefaultStream("test-flink");
+               kinesis.setDefaultPartition("0");
+               kinesis.setCustomPartitioner(new KinesisPartitioner<String>() {
+                       @Override
+                       public String getPartitionId(String element) {
+                               int l = element.length();
+                               return element.substring(l - 1, l);
+                       }
+               });
+               simpleStringStream.addSink(kinesis);
+
+               see.execute();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
new file mode 100644
index 0000000..157964c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.BitSet;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+
+/**
+ * A thread that runs a topology with the FlinkKinesisConsumer as source, 
followed by two flat map
+ * functions, one that performs artificial failures and another that validates 
exactly-once guarantee
+ */
+public class ExactlyOnceValidatingConsumerThread {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ExactlyOnceValidatingConsumerThread.class);
+
+       public static Thread create(final int totalEventCount,
+                                                               final int 
failAtRecordCount,
+                                                               final int 
parallelism,
+                                                               final int 
checkpointInterval,
+                                                               final long 
restartDelay,
+                                                               final String 
awsAccessKey,
+                                                               final String 
awsSecretKey,
+                                                               final String 
awsRegion,
+                                                               final String 
kinesisStreamName,
+                                                               final 
AtomicReference<Throwable> errorHandler,
+                                                               final int 
flinkPort,
+                                                               final 
Configuration flinkConfig) {
+               Runnable exactlyOnceValidationConsumer = new Runnable() {
+                       @Override
+                       public void run() {
+                               try {
+                                       StreamExecutionEnvironment see = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort, 
flinkConfig);
+                                       see.setParallelism(parallelism);
+                                       
see.enableCheckpointing(checkpointInterval);
+                                       // we restart two times
+                                       
see.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, restartDelay));
+
+                                       // consuming topology
+                                       Properties consumerProps = new 
Properties();
+                                       
consumerProps.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
awsAccessKey);
+                                       
consumerProps.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
awsSecretKey);
+                                       
consumerProps.setProperty(ConsumerConfigConstants.AWS_REGION, awsRegion);
+                                       // start reading from beginning
+                                       
consumerProps.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
ConsumerConfigConstants.InitialPosition.TRIM_HORIZON.name());
+                                       DataStream<String> consuming = 
see.addSource(new FlinkKinesisConsumer<>(kinesisStreamName, new 
SimpleStringSchema(), consumerProps));
+                                       consuming
+                                               .flatMap(new 
ArtificialFailOnceFlatMapper(failAtRecordCount))
+                                               // validate consumed records 
for correctness (use only 1 instance to validate all consumed records)
+                                               .flatMap(new 
ExactlyOnceValidatingMapper(totalEventCount)).setParallelism(1);
+
+                                       LOG.info("Starting consuming topology");
+                                       tryExecute(see, "Consuming topo");
+                                       LOG.info("Consuming topo finished");
+                               } catch (Exception e) {
+                                       LOG.warn("Error while running consuming 
topology", e);
+                                       errorHandler.set(e);
+                               }
+                       }
+               };
+
+               return new Thread(exactlyOnceValidationConsumer);
+       }
+
+       private static class ExactlyOnceValidatingMapper implements 
FlatMapFunction<String,String>, Checkpointed<BitSet> {
+
+               private static final Logger LOG = 
LoggerFactory.getLogger(ExactlyOnceValidatingMapper.class);
+
+               private final int totalEventCount;
+               private BitSet validator;
+
+               public ExactlyOnceValidatingMapper(int totalEventCount) {
+                       this.totalEventCount = totalEventCount;
+                       this.validator = new BitSet(totalEventCount);
+               }
+
+               @Override
+               public void flatMap(String value, Collector<String> out) throws 
Exception {
+                       LOG.info("Consumed {}", value);
+
+                       int id = Integer.parseInt(value.split("-")[0]);
+                       if(validator.get(id)) {
+                               throw new RuntimeException("Saw id " + id +" 
twice!");
+                       }
+                       validator.set(id);
+                       if(id > totalEventCount-1) {
+                               throw new RuntimeException("Out of bounds ID 
observed");
+                       }
+
+                       if(validator.nextClearBit(0) == totalEventCount) {
+                               throw new SuccessException();
+                       }
+               }
+
+               @Override
+               public BitSet snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+                       return validator;
+               }
+
+               @Override
+               public void restoreState(BitSet state) throws Exception {
+                       this.validator = state;
+               }
+       }
+
+       private static class ArtificialFailOnceFlatMapper extends 
RichFlatMapFunction<String,String> {
+               int count = 0;
+
+               private final int failAtRecordCount;
+
+               public ArtificialFailOnceFlatMapper(int failAtRecordCount) {
+                       this.failAtRecordCount = failAtRecordCount;
+               }
+
+               @Override
+               public void flatMap(String value, Collector<String> out) throws 
Exception {
+                       if (count++ >= failAtRecordCount && 
getRuntimeContext().getAttemptNumber() == 0) {
+                               throw new RuntimeException("Artificial failure. 
Restart please.");
+                       }
+                       out.collect(value);
+               }
+       }
+}

Reply via email to