http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/pom.xml 
b/flink-connectors/flink-connector-kinesis/pom.xml
new file mode 100644
index 0000000..9e0c7e8
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/pom.xml
@@ -0,0 +1,164 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.2-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-connector-kinesis_2.10</artifactId>
+       <name>flink-connector-kinesis</name>
+       <properties>
+               <aws.sdk.version>1.10.71</aws.sdk.version>
+               <aws.kinesis-kcl.version>1.6.2</aws.kinesis-kcl.version>
+               <aws.kinesis-kpl.version>0.10.2</aws.kinesis-kpl.version>
+       </properties>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.google.guava</groupId>
+                       <artifactId>guava</artifactId>
+                       <version>${guava.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-tests_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <!-- Note:
+                       The below dependencies are licenced under the Amazon 
Software License.
+                       Flink includes the "flink-connector-kinesis" only as an 
optional dependency for that reason.
+               -->
+               <dependency>
+                       <groupId>com.amazonaws</groupId>
+                       <artifactId>aws-java-sdk-kinesis</artifactId>
+                       <version>${aws.sdk.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.amazonaws</groupId>
+                       <artifactId>amazon-kinesis-producer</artifactId>
+                       <version>${aws.kinesis-kpl.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.amazonaws</groupId>
+                       <artifactId>amazon-kinesis-client</artifactId>
+                       <version>${aws.kinesis-kcl.version}</version>
+                       <!--
+                               We're excluding the below from the KCL since 
we'll only be using the
+                               
com.amazonaws.services.kinesis.clientlibrary.types.UserRecord class, which will 
not need these dependencies.
+                       -->
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>com.amazonaws</groupId>
+                                       
<artifactId>aws-java-sdk-dynamodb</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>com.amazonaws</groupId>
+                                       
<artifactId>aws-java-sdk-cloudwatch</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>shade-flink</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+                                                       <artifactSet 
combine.children="append">
+                                                               <includes>
+                                                                       
<include>com.amazonaws:*</include>
+                                                                       
<include>com.google.protobuf:*</include>
+                                                               </includes>
+                                                       </artifactSet>
+                                                       <relocations 
combine.children="override">
+                                                               <!-- DO NOT 
RELOCATE GUAVA IN THIS PACKAGE -->
+                                                               <relocation>
+                                                                       
<pattern>org.objectweb.asm</pattern>
+                                                                       
<shadedPattern>org.apache.flink.shaded.org.objectweb.asm</shadedPattern>
+                                                               </relocation>
+                                                               <relocation>
+                                                                       
<pattern>com.google.protobuf</pattern>
+                                                                       
<shadedPattern>org.apache.flink.kinesis.shaded.com.google.protobuf</shadedPattern>
+                                                               </relocation>
+                                                               <relocation>
+                                                                       
<pattern>com.amazonaws</pattern>
+                                                                       
<shadedPattern>org.apache.flink.kinesis.shaded.com.amazonaws</shadedPattern>
+                                                               </relocation>
+                                                       </relocations>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
new file mode 100644
index 0000000..a62dc10
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is an exactly-once parallel streaming data 
source that subscribes to multiple AWS Kinesis
+ * streams within the same AWS service region, and can handle resharding of 
streams. Each subtask of the consumer is
+ * responsible for fetching data records from multiple Kinesis shards. The 
number of shards fetched by each subtask will
+ * change as shards are closed and created by Kinesis.
+ *
+ * <p>To leverage Flink's checkpointing mechanics for exactly-once streaming 
processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the officially 
recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST.</p>
+ *
+ * @param <T> the type of data emitted
+ */
+public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
+       implements CheckpointedAsynchronously<HashMap<KinesisStreamShard, 
SequenceNumber>>, ResultTypeQueryable<T> {
+
+       private static final long serialVersionUID = 4724006128720664870L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+       // 
------------------------------------------------------------------------
+       //  Consumer properties
+       // 
------------------------------------------------------------------------
+
+       /** The names of the Kinesis streams that we will be consuming from */
+       private final List<String> streams;
+
+       /** Properties to parametrize settings such as AWS service region, 
initial position in stream,
+        * shard list retrieval behaviours, etc */
+       private final Properties configProps;
+
+       /** User supplied deseriliazation schema to convert Kinesis byte 
messages to Flink objects */
+       private final KinesisDeserializationSchema<T> deserializer;
+
+       // 
------------------------------------------------------------------------
+       //  Runtime state
+       // 
------------------------------------------------------------------------
+
+       /** Per-task fetcher for Kinesis data records, where each fetcher pulls 
data from one or more Kinesis shards */
+       private transient KinesisDataFetcher<T> fetcher;
+
+       /** The sequence numbers in the last state snapshot of this subtask */
+       private transient HashMap<KinesisStreamShard, SequenceNumber> 
lastStateSnapshot;
+
+       /** The sequence numbers to restore to upon restore from failure */
+       private transient HashMap<KinesisStreamShard, SequenceNumber> 
sequenceNumsToRestore;
+
+       private volatile boolean running = true;
+
+
+       // 
------------------------------------------------------------------------
+       //  Constructors
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new Flink Kinesis Consumer.
+        *
+        * <p>The AWS credentials to be used, AWS region of the Kinesis 
streams, initial position to start streaming
+        * from are configured with a {@link Properties} instance.</p>
+        *
+        * @param stream
+        *           The single AWS Kinesis stream to read from.
+        * @param deserializer
+        *           The deserializer used to convert raw bytes of Kinesis 
records to Java objects (without key).
+        * @param configProps
+        *           The properties used to configure AWS credentials, AWS 
region, and initial starting position.
+        */
+       public FlinkKinesisConsumer(String stream, DeserializationSchema<T> 
deserializer, Properties configProps) {
+               this(stream, new 
KinesisDeserializationSchemaWrapper<>(deserializer), configProps);
+       }
+
+       /**
+        * Creates a new Flink Kinesis Consumer.
+        *
+        * <p>The AWS credentials to be used, AWS region of the Kinesis 
streams, initial position to start streaming
+        * from are configured with a {@link Properties} instance.</p>
+        *
+        * @param stream
+        *           The single AWS Kinesis stream to read from.
+        * @param deserializer
+        *           The keyed deserializer used to convert raw bytes of 
Kinesis records to Java objects.
+        * @param configProps
+        *           The properties used to configure AWS credentials, AWS 
region, and initial starting position.
+        */
+       public FlinkKinesisConsumer(String stream, 
KinesisDeserializationSchema<T> deserializer, Properties configProps) {
+               this(Collections.singletonList(stream), deserializer, 
configProps);
+       }
+
+       /**
+        * Creates a new Flink Kinesis Consumer.
+        *
+        * <p>The AWS credentials to be used, AWS region of the Kinesis 
streams, initial position to start streaming
+        * from are configured with a {@link Properties} instance.</p>
+        *
+        * @param streams
+        *           The AWS Kinesis streams to read from.
+        * @param deserializer
+        *           The keyed deserializer used to convert raw bytes of 
Kinesis records to Java objects.
+        * @param configProps
+        *           The properties used to configure AWS credentials, AWS 
region, and initial starting position.
+        */
+       public FlinkKinesisConsumer(List<String> streams, 
KinesisDeserializationSchema<T> deserializer, Properties configProps) {
+               checkNotNull(streams, "streams can not be null");
+               checkArgument(streams.size() != 0, "must be consuming at least 
1 stream");
+               checkArgument(!streams.contains(""), "stream names cannot be 
empty Strings");
+               this.streams = streams;
+
+               this.configProps = checkNotNull(configProps, "configProps can 
not be null");
+
+               // check the configuration properties for any conflicting 
settings
+               
KinesisConfigUtil.validateConsumerConfiguration(this.configProps);
+
+               this.deserializer = checkNotNull(deserializer, "deserializer 
can not be null");
+
+               if (LOG.isInfoEnabled()) {
+                       StringBuilder sb = new StringBuilder();
+                       for (String stream : streams) {
+                               sb.append(stream).append(", ");
+                       }
+                       LOG.info("Flink Kinesis Consumer is going to read the 
following streams: {}", sb.toString());
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Source life cycle
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+
+               // restore to the last known sequence numbers from the latest 
complete snapshot
+               if (sequenceNumsToRestore != null) {
+                       if (LOG.isInfoEnabled()) {
+                               LOG.info("Subtask {} is restoring sequence 
numbers {} from previous checkpointed state",
+                                       
getRuntimeContext().getIndexOfThisSubtask(), sequenceNumsToRestore.toString());
+                       }
+
+                       // initialize sequence numbers with restored state
+                       lastStateSnapshot = sequenceNumsToRestore;
+               } else {
+                       // start fresh with empty sequence numbers if there are 
no snapshots to restore from.
+                       lastStateSnapshot = new HashMap<>();
+               }
+       }
+
+       @Override
+       public void run(SourceContext<T> sourceContext) throws Exception {
+
+               // all subtasks will run a fetcher, regardless of whether or 
not the subtask will initially have
+               // shards to subscribe to; fetchers will continuously poll for 
changes in the shard list, so all subtasks
+               // can potentially have new shards to subscribe to later on
+               fetcher = new KinesisDataFetcher<>(
+                       streams, sourceContext, getRuntimeContext(), 
configProps, deserializer);
+
+               boolean isRestoringFromFailure = (sequenceNumsToRestore != 
null);
+               fetcher.setIsRestoringFromFailure(isRestoringFromFailure);
+
+               // if we are restoring from a checkpoint, we iterate over the 
restored
+               // state and accordingly seed the fetcher with subscribed 
shards states
+               if (isRestoringFromFailure) {
+                       for (Map.Entry<KinesisStreamShard, SequenceNumber> 
restored : lastStateSnapshot.entrySet()) {
+                               fetcher.advanceLastDiscoveredShardOfStream(
+                                       restored.getKey().getStreamName(), 
restored.getKey().getShard().getShardId());
+
+                               if (LOG.isInfoEnabled()) {
+                                       LOG.info("Subtask {} is seeding the 
fetcher with restored shard {}," +
+                                                       " starting state set to 
the restored sequence number {}",
+                                               
getRuntimeContext().getIndexOfThisSubtask(), restored.getKey().toString(), 
restored.getValue());
+                               }
+                               fetcher.registerNewSubscribedShardState(
+                                       new 
KinesisStreamShardState(restored.getKey(), restored.getValue()));
+                       }
+               }
+
+               // check that we are running before starting the fetcher
+               if (!running) {
+                       return;
+               }
+
+               // start the fetcher loop. The fetcher will stop running only 
when cancel() or
+               // close() is called, or an error is thrown by threads created 
by the fetcher
+               fetcher.runFetcher();
+
+               // check that the fetcher has terminated before fully closing
+               fetcher.awaitTermination();
+               sourceContext.close();
+       }
+
+       @Override
+       public void cancel() {
+               running = false;
+
+               KinesisDataFetcher fetcher = this.fetcher;
+               this.fetcher = null;
+
+               // this method might be called before the subtask actually 
starts running,
+               // so we must check if the fetcher is actually created
+               if (fetcher != null) {
+                       try {
+                               // interrupt the fetcher of any work
+                               fetcher.shutdownFetcher();
+                               fetcher.awaitTermination();
+                       } catch (Exception e) {
+                               LOG.warn("Error while closing Kinesis data 
fetcher", e);
+                       }
+               }
+       }
+
+       @Override
+       public void close() throws Exception {
+               cancel();
+               super.close();
+       }
+
+       @Override
+       public TypeInformation<T> getProducedType() {
+               return deserializer.getProducedType();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  State Snapshot & Restore
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long 
checkpointId, long checkpointTimestamp) throws Exception {
+               if (lastStateSnapshot == null) {
+                       LOG.debug("snapshotState() requested on not yet opened 
source; returning null.");
+                       return null;
+               }
+
+               if (fetcher == null) {
+                       LOG.debug("snapshotState() requested on not yet running 
source; returning null.");
+                       return null;
+               }
+
+               if (!running) {
+                       LOG.debug("snapshotState() called on closed source; 
returning null.");
+                       return null;
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Snapshotting state ...");
+               }
+
+               lastStateSnapshot = fetcher.snapshotState();
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Snapshotted state, last processed sequence 
numbers: {}, checkpoint id: {}, timestamp: {}",
+                               lastStateSnapshot.toString(), checkpointId, 
checkpointTimestamp);
+               }
+
+               return lastStateSnapshot;
+       }
+
+       @Override
+       public void restoreState(HashMap<KinesisStreamShard, SequenceNumber> 
restoredState) throws Exception {
+               sequenceNumsToRestore = restoredState;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
new file mode 100644
index 0000000..579bd6b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.PropertiesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
+ *
+ * @param <OUT> Data type to produce into Kinesis Streams
+ */
+public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
+
+       /** Properties to parametrize settings such as AWS service region, 
access key etc. */
+       private final Properties configProps;
+
+       /* Flag controlling the error behavior of the producer */
+       private boolean failOnError = false;
+
+       /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
+       private String defaultStream;
+
+       /* Default partition id. Can be overwritten by the serialization schema 
*/
+       private String defaultPartition;
+
+       /* Schema for turning the OUT type into a byte array. */
+       private final KinesisSerializationSchema<OUT> schema;
+
+       /* Optional custom partitioner */
+       private KinesisPartitioner<OUT> customPartitioner = null;
+
+
+       // --------------------------- Runtime fields 
---------------------------
+
+
+       /* Our Kinesis instance for each parallel Flink sink */
+       private transient KinesisProducer producer;
+
+       /* Callback handling failures */
+       private transient FutureCallback<UserRecordResult> callback;
+
+       /* Field for async exception */
+       private transient volatile Throwable thrownException;
+
+
+       // --------------------------- Initialization and configuration  
---------------------------
+
+
+       /**
+        * Create a new FlinkKinesisProducer.
+        * This is a constructor supporting Flink's {@see SerializationSchema}.
+        *
+        * @param schema Serialization schema for the data type
+        * @param configProps The properties used to configure AWS credentials 
and AWS region
+        */
+       public FlinkKinesisProducer(final SerializationSchema<OUT> schema, 
Properties configProps) {
+
+               // create a simple wrapper for the serialization schema
+               this(new KinesisSerializationSchema<OUT>() {
+                       @Override
+                       public ByteBuffer serialize(OUT element) {
+                               // wrap into ByteBuffer
+                               return 
ByteBuffer.wrap(schema.serialize(element));
+                       }
+                       // use default stream and hash key
+                       @Override
+                       public String getTargetStream(OUT element) {
+                               return null;
+                       }
+               }, configProps);
+       }
+
+       /**
+        * Create a new FlinkKinesisProducer.
+        * This is a constructor supporting {@see KinesisSerializationSchema}.
+        *
+        * @param schema Kinesis serialization schema for the data type
+        * @param configProps The properties used to configure AWS credentials 
and AWS region
+        */
+       public FlinkKinesisProducer(KinesisSerializationSchema<OUT> schema, 
Properties configProps) {
+               this.configProps = checkNotNull(configProps, "configProps can 
not be null");
+
+               // check the configuration properties for any conflicting 
settings
+               
KinesisConfigUtil.validateProducerConfiguration(this.configProps);
+
+               
ClosureCleaner.ensureSerializable(Objects.requireNonNull(schema));
+               this.schema = schema;
+       }
+
+       /**
+        * If set to true, the producer will immediately fail with an exception 
on any error.
+        * Otherwise, the errors are logged and the producer goes on.
+        *
+        * @param failOnError Error behavior flag
+        */
+       public void setFailOnError(boolean failOnError) {
+               this.failOnError = failOnError;
+       }
+
+       /**
+        * Set a default stream name.
+        * @param defaultStream Name of the default Kinesis stream
+        */
+       public void setDefaultStream(String defaultStream) {
+               this.defaultStream = defaultStream;
+       }
+
+       /**
+        * Set default partition id
+        * @param defaultPartition Name of the default partition
+        */
+       public void setDefaultPartition(String defaultPartition) {
+               this.defaultPartition = defaultPartition;
+       }
+
+       public void setCustomPartitioner(KinesisPartitioner<OUT> partitioner) {
+               Objects.requireNonNull(partitioner);
+               ClosureCleaner.ensureSerializable(partitioner);
+               this.customPartitioner = partitioner;
+       }
+
+
+       // --------------------------- Lifecycle methods 
---------------------------
+
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+
+               KinesisProducerConfiguration producerConfig = new 
KinesisProducerConfiguration();
+
+               
producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION));
+               
producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
+               if 
(configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) {
+                       
producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps,
+                                       
ProducerConfigConstants.COLLECTION_MAX_COUNT, 
producerConfig.getCollectionMaxCount(), LOG));
+               }
+               if 
(configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) {
+                       
producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps,
+                                       
ProducerConfigConstants.AGGREGATION_MAX_COUNT, 
producerConfig.getAggregationMaxCount(), LOG));
+               }
+
+               producer = new KinesisProducer(producerConfig);
+               callback = new FutureCallback<UserRecordResult>() {
+                       @Override
+                       public void onSuccess(UserRecordResult result) {
+                               if (!result.isSuccessful()) {
+                                       if(failOnError) {
+                                               thrownException = new 
RuntimeException("Record was not sent successful");
+                                       } else {
+                                               LOG.warn("Record was not sent 
successful");
+                                       }
+                               }
+                       }
+
+                       @Override
+                       public void onFailure(Throwable t) {
+                               if (failOnError) {
+                                       thrownException = t;
+                               } else {
+                                       LOG.warn("An exception occurred while 
processing a record", t);
+                               }
+                       }
+               };
+
+               if (this.customPartitioner != null) {
+                       
this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), 
getRuntimeContext().getNumberOfParallelSubtasks());
+               }
+
+               LOG.info("Started Kinesis producer instance for region '{}'", 
producerConfig.getRegion());
+       }
+
+       @Override
+       public void invoke(OUT value) throws Exception {
+               if (this.producer == null) {
+                       throw new RuntimeException("Kinesis producer has been 
closed");
+               }
+               if (thrownException != null) {
+                       String errorMessages = "";
+                       if (thrownException instanceof 
UserRecordFailedException) {
+                               List<Attempt> attempts = 
((UserRecordFailedException) thrownException).getResult().getAttempts();
+                               for (Attempt attempt: attempts) {
+                                       if (attempt.getErrorMessage() != null) {
+                                               errorMessages += 
attempt.getErrorMessage() +"\n";
+                                       }
+                               }
+                       }
+                       if (failOnError) {
+                               throw new RuntimeException("An exception was 
thrown while processing a record: " + errorMessages, thrownException);
+                       } else {
+                               LOG.warn("An exception was thrown while 
processing a record: {}", thrownException, errorMessages);
+                               thrownException = null; // reset
+                       }
+               }
+
+               String stream = defaultStream;
+               String partition = defaultPartition;
+
+               ByteBuffer serialized = schema.serialize(value);
+
+               // maybe set custom stream
+               String customStream = schema.getTargetStream(value);
+               if (customStream != null) {
+                       stream = customStream;
+               }
+
+               String explicitHashkey = null;
+               // maybe set custom partition
+               if (customPartitioner != null) {
+                       partition = customPartitioner.getPartitionId(value);
+                       explicitHashkey = 
customPartitioner.getExplicitHashKey(value);
+               }
+
+               if (stream == null) {
+                       if (failOnError) {
+                               throw new RuntimeException("No target stream 
set");
+                       } else {
+                               LOG.warn("No target stream set. Skipping 
record");
+                               return;
+                       }
+               }
+
+               ListenableFuture<UserRecordResult> cb = 
producer.addUserRecord(stream, partition, explicitHashkey, serialized);
+               Futures.addCallback(cb, callback);
+       }
+
+       @Override
+       public void close() throws Exception {
+               LOG.info("Closing producer");
+               super.close();
+               KinesisProducer kp = this.producer;
+               this.producer = null;
+               if (kp != null) {
+                       LOG.info("Flushing outstanding {} records", 
kp.getOutstandingRecordsCount());
+                       // try to flush all outstanding records
+                       while (kp.getOutstandingRecordsCount() > 0) {
+                               kp.flush();
+                               try {
+                                       Thread.sleep(500);
+                               } catch (InterruptedException e) {
+                                       LOG.warn("Flushing was interrupted.");
+                                       // stop the blocking flushing and 
destroy producer immediately
+                                       break;
+                               }
+                       }
+                       LOG.info("Flushing done. Destroying producer 
instance.");
+                       kp.destroy();
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
new file mode 100644
index 0000000..bd23abe
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import java.io.Serializable;
+
+public abstract class KinesisPartitioner<T> implements Serializable {
+
+       /**
+        * Return a partition id based on the input
+        * @param element Element to partition
+        * @return A string representing the partition id
+        */
+       public abstract String getPartitionId(T element);
+
+       /**
+        * Optional method for setting an explicit hash key
+        * @param element Element to get the hash key for
+        * @return the hash key for the element
+        */
+       public String getExplicitHashKey(T element) {
+               return null;
+       }
+
+       /**
+        * Optional initializer.
+        *
+        * @param indexOfThisSubtask Index of this partitioner instance
+        * @param numberOfParallelSubtasks Total number of parallel instances
+        */
+       public void initialize(int indexOfThisSubtask, int 
numberOfParallelSubtasks) {
+               //
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
new file mode 100644
index 0000000..01d4f00
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.config;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+
+/**
+ * Configuration keys for AWS service usage
+ */
+public class AWSConfigConstants {
+
+       /**
+        * Possible configuration values for the type of credential provider to 
use when accessing AWS Kinesis.
+        * Internally, a corresponding implementation of {@link 
AWSCredentialsProvider} will be used.
+        */
+       public enum CredentialProvider {
+
+               /** Look for the environment variables AWS_ACCESS_KEY_ID and 
AWS_SECRET_ACCESS_KEY to create AWS credentials */
+               ENV_VAR,
+
+               /** Look for Java system properties aws.accessKeyId and 
aws.secretKey to create AWS credentials */
+               SYS_PROP,
+
+               /** Use a AWS credentials profile file to create the AWS 
credentials */
+               PROFILE,
+
+               /** Simply create AWS credentials by supplying the AWS access 
key ID and AWS secret key in the configuration properties */
+               BASIC,
+
+               /** A credentials provider chain will be used that searches for 
credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance 
metadata **/
+               AUTO,
+       }
+
+       /** The AWS region of the Kinesis streams to be pulled ("us-east-1" is 
used if not set) */
+       public static final String AWS_REGION = "aws.region";
+
+       /** The AWS access key ID to use when setting credentials provider type 
to BASIC */
+       public static final String AWS_ACCESS_KEY_ID = 
"aws.credentials.provider.basic.accesskeyid";
+
+       /** The AWS secret key to use when setting credentials provider type to 
BASIC */
+       public static final String AWS_SECRET_ACCESS_KEY = 
"aws.credentials.provider.basic.secretkey";
+
+       /** The credential provider type to use when AWS credentials are 
required (BASIC is used if not set)*/
+       public static final String AWS_CREDENTIALS_PROVIDER = 
"aws.credentials.provider";
+
+       /** Optional configuration for profile path if credential provider type 
is set to be PROFILE */
+       public static final String AWS_PROFILE_PATH = 
"aws.credentials.provider.profile.path";
+
+       /** Optional configuration for profile name if credential provider type 
is set to be PROFILE */
+       public static final String AWS_PROFILE_NAME = 
"aws.credentials.provider.profile.name";
+
+       /** The AWS endpoint for Kinesis (derived from the AWS region setting 
if not set) */
+       public static final String AWS_ENDPOINT = "aws.endpoint";
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
new file mode 100644
index 0000000..76c20ed
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.config;
+
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+
+/**
+ * Optional consumer specific configuration keys and default values for {@link 
FlinkKinesisConsumer}
+ */
+public class ConsumerConfigConstants extends AWSConfigConstants {
+
+       /**
+        * The initial position to start reading shards from. This will affect 
the {@link ShardIteratorType} used
+        * when the consumer tasks retrieve the first shard iterator for each 
Kinesis shard.
+        */
+       public enum InitialPosition {
+
+               /** Start reading from the earliest possible record in the 
stream (excluding expired data records) */
+               
TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM),
+
+               /** Start reading from the latest incoming record */
+               LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM);
+
+               private SentinelSequenceNumber sentinelSequenceNumber;
+
+               InitialPosition(SentinelSequenceNumber sentinelSequenceNumber) {
+                       this.sentinelSequenceNumber = sentinelSequenceNumber;
+               }
+
+               public SentinelSequenceNumber toSentinelSequenceNumber() {
+                       return this.sentinelSequenceNumber;
+               }
+       }
+
+       /** The initial position to start reading Kinesis streams from (LATEST 
is used if not set) */
+       public static final String STREAM_INITIAL_POSITION = 
"flink.stream.initpos";
+
+       /** The base backoff time between each describeStream attempt */
+       public static final String STREAM_DESCRIBE_BACKOFF_BASE = 
"flink.stream.describe.backoff.base";
+
+       /** The maximum backoff time between each describeStream attempt */
+       public static final String STREAM_DESCRIBE_BACKOFF_MAX = 
"flink.stream.describe.backoff.max";
+
+       /** The power constant for exponential backoff between each 
describeStream attempt */
+       public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT 
= "flink.stream.describe.backoff.expconst";
+
+       /** The maximum number of records to try to get each time we fetch 
records from a AWS Kinesis shard */
+       public static final String SHARD_GETRECORDS_MAX = 
"flink.shard.getrecords.maxrecordcount";
+
+       /** The maximum number of getRecords attempts if we get 
ProvisionedThroughputExceededException */
+       public static final String SHARD_GETRECORDS_RETRIES = 
"flink.shard.getrecords.maxretries";
+
+       /** The base backoff time between getRecords attempts if we get a 
ProvisionedThroughputExceededException */
+       public static final String SHARD_GETRECORDS_BACKOFF_BASE = 
"flink.shard.getrecords.backoff.base";
+
+       /** The maximum backoff time between getRecords attempts if we get a 
ProvisionedThroughputExceededException */
+       public static final String SHARD_GETRECORDS_BACKOFF_MAX = 
"flink.shard.getrecords.backoff.max";
+
+       /** The power constant for exponential backoff between each getRecords 
attempt */
+       public static final String 
SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = 
"flink.shard.getrecords.backoff.expconst";
+
+       /** The interval between each getRecords request to a AWS Kinesis shard 
in milliseconds */
+       public static final String SHARD_GETRECORDS_INTERVAL_MILLIS = 
"flink.shard.getrecords.intervalmillis";
+
+       /** The maximum number of getShardIterator attempts if we get 
ProvisionedThroughputExceededException */
+       public static final String SHARD_GETITERATOR_RETRIES = 
"flink.shard.getiterator.maxretries";
+
+       /** The base backoff time between getShardIterator attempts if we get a 
ProvisionedThroughputExceededException */
+       public static final String SHARD_GETITERATOR_BACKOFF_BASE = 
"flink.shard.getiterator.backoff.base";
+
+       /** The maximum backoff time between getShardIterator attempts if we 
get a ProvisionedThroughputExceededException */
+       public static final String SHARD_GETITERATOR_BACKOFF_MAX = 
"flink.shard.getiterator.backoff.max";
+
+       /** The power constant for exponential backoff between each 
getShardIterator attempt */
+       public static final String 
SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = 
"flink.shard.getiterator.backoff.expconst";
+
+       /** The interval between each attempt to discover new shards */
+       public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = 
"flink.shard.discovery.intervalmillis";
+
+       // 
------------------------------------------------------------------------
+       //  Default values for consumer configuration
+       // 
------------------------------------------------------------------------
+
+       public static final String DEFAULT_STREAM_INITIAL_POSITION = 
InitialPosition.LATEST.toString();
+
+       public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L;
+
+       public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L;
+
+       public static final double 
DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+       public static final int DEFAULT_SHARD_GETRECORDS_MAX = 100;
+
+       public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3;
+
+       public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE = 300L;
+
+       public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX = 1000L;
+
+       public static final double 
DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+       public static final long DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS = 0L;
+
+       public static final int DEFAULT_SHARD_GETITERATOR_RETRIES = 3;
+
+       public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE = 300L;
+
+       public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX = 1000L;
+
+       public static final double 
DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+       public static final long DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS = 
10000L;
+
+       /**
+        * To avoid shard iterator expires in {@link ShardConsumer}s, the value 
for the configured
+        * getRecords interval can not exceed 5 minutes, which is the expire 
time for retrieved iterators.
+        */
+       public static final long MAX_SHARD_GETRECORDS_INTERVAL_MILLIS = 300000L;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
new file mode 100644
index 0000000..1edddfc
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.config;
+
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+
+/**
+ * Optional producer specific configuration keys for {@link 
FlinkKinesisProducer}
+ */
+public class ProducerConfigConstants extends AWSConfigConstants {
+
+       /** Maximum number of items to pack into an PutRecords request. **/
+       public static final String COLLECTION_MAX_COUNT = 
"aws.producer.collectionMaxCount";
+
+       /** Maximum number of items to pack into an aggregated record. **/
+       public static final String AGGREGATION_MAX_COUNT = 
"aws.producer.aggregationMaxCount";
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
new file mode 100644
index 0000000..55668c6
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.examples;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import java.util.Properties;
+
+/**
+ * This is an example on how to consume data from Kinesis
+ */
+public class ConsumeFromKinesis {
+
+       public static void main(String[] args) throws Exception {
+               ParameterTool pt = ParameterTool.fromArgs(args);
+
+               StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               see.setParallelism(1);
+
+               Properties kinesisConsumerConfig = new Properties();
+               
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
pt.getRequired("region"));
+               
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
pt.getRequired("accesskey"));
+               
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY,
 pt.getRequired("secretkey"));
+
+               DataStream<String> kinesis = see.addSource(new 
FlinkKinesisConsumer<>(
+                       "flink-test",
+                       new SimpleStringSchema(),
+                       kinesisConsumerConfig));
+
+               kinesis.print();
+
+               see.execute();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
new file mode 100644
index 0000000..d178137
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.examples;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import java.util.Properties;
+
+/**
+ * This is an example on how to produce data into Kinesis
+ */
+public class ProduceIntoKinesis {
+
+       public static void main(String[] args) throws Exception {
+               ParameterTool pt = ParameterTool.fromArgs(args);
+
+               StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               see.setParallelism(1);
+
+               DataStream<String> simpleStringStream = see.addSource(new 
EventsGenerator());
+
+               Properties kinesisProducerConfig = new Properties();
+               
kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, 
pt.getRequired("region"));
+               
kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, 
pt.getRequired("accessKey"));
+               
kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY,
 pt.getRequired("secretKey"));
+
+               FlinkKinesisProducer<String> kinesis = new 
FlinkKinesisProducer<>(
+                               new SimpleStringSchema(), 
kinesisProducerConfig);
+
+               kinesis.setFailOnError(true);
+               kinesis.setDefaultStream("flink-test");
+               kinesis.setDefaultPartition("0");
+
+               simpleStringStream.addSink(kinesis);
+
+               see.execute();
+       }
+
+       public static class EventsGenerator implements SourceFunction<String> {
+               private boolean running = true;
+
+               @Override
+               public void run(SourceContext<String> ctx) throws Exception {
+                       long seq = 0;
+                       while(running) {
+                               Thread.sleep(10);
+                               ctx.collect((seq++) + "-" + 
RandomStringUtils.randomAlphabetic(12));
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       running = false;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
new file mode 100644
index 0000000..a06fdca
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A KinesisDataFetcher is responsible for fetching data from multiple Kinesis 
shards. Each parallel subtask instantiates
+ * and runs a single fetcher throughout the subtask's lifetime. The fetcher 
accomplishes the following:
+ * <ul>
+ *     <li>1. continuously poll Kinesis to discover shards that the subtask 
should subscribe to. The subscribed subset
+ *                       of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
+ *                       subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
+ *                       to the same subset of shards even after 
restoring)</li>
+ *     <li>2. decide where in each discovered shard should the fetcher start 
subscribing to</li>
+ *     <li>3. subscribe to shards by creating a single thread for each 
shard</li>
+ * </ul>
+ *
+ * <p>The fetcher manages two states: 1) last seen shard ids of each 
subscribed stream (used for continuous shard discovery),
+ * and 2) last processed sequence numbers of each subscribed shard. Since 
operations on the second state will be performed
+ * by multiple threads, these operations should only be done using the handler 
methods provided in this class.
+ */
+public class KinesisDataFetcher<T> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
+
+       // 
------------------------------------------------------------------------
+       //  Consumer-wide settings
+       // 
------------------------------------------------------------------------
+
+       /** Configuration properties for the Flink Kinesis Consumer */
+       private final Properties configProps;
+
+       /** The list of Kinesis streams that the consumer is subscribing to */
+       private final List<String> streams;
+
+       /**
+        * The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
+        * Note that since this might not be thread-safe, {@link 
ShardConsumer}s using this must
+        * clone a copy using {@link 
KinesisDataFetcher#getClonedDeserializationSchema()}.
+        */
+       private final KinesisDeserializationSchema<T> deserializationSchema;
+
+       // 
------------------------------------------------------------------------
+       //  Subtask-specific settings
+       // 
------------------------------------------------------------------------
+
+       /** Runtime context of the subtask that this fetcher was created in */
+       private final RuntimeContext runtimeContext;
+
+       private final int totalNumberOfConsumerSubtasks;
+
+       private final int indexOfThisConsumerSubtask;
+
+       /**
+        * This flag should be set by {@link FlinkKinesisConsumer} using
+        * {@link KinesisDataFetcher#setIsRestoringFromFailure(boolean)}
+        */
+       private boolean isRestoredFromFailure;
+
+       // 
------------------------------------------------------------------------
+       //  Executor services to run created threads
+       // 
------------------------------------------------------------------------
+
+       /** Executor service to run {@link ShardConsumer}s to consume Kinesis 
shards */
+       private final ExecutorService shardConsumersExecutor;
+
+       // 
------------------------------------------------------------------------
+       //  Managed state, accessed and updated across multiple threads
+       // 
------------------------------------------------------------------------
+
+       /** The last discovered shard ids of each subscribed stream, updated as 
the fetcher discovers new shards in.
+        * Note: this state will be updated if new shards are found when {@link 
KinesisDataFetcher#discoverNewShardsToSubscribe()} is called.
+        */
+       private final Map<String, String> 
subscribedStreamsToLastDiscoveredShardIds;
+
+       /**
+        * The shards, along with their last processed sequence numbers, that 
this fetcher is subscribed to. The fetcher
+        * will add new subscribed shard states to this list as it discovers 
new shards. {@link ShardConsumer} threads update
+        * the last processed sequence number of subscribed shards as they 
fetch and process records.
+        *
+        * <p>Note that since multiple {@link ShardConsumer} threads will be 
performing operations on this list, all operations
+        * must be wrapped in synchronized blocks on the {@link 
KinesisDataFetcher#checkpointLock} lock. For this purpose,
+        * all threads must use the following thread-safe methods this class 
provides to operate on this list:
+        * <ul>
+        *     <li>{@link 
KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}</li>
+        *     <li>{@link KinesisDataFetcher#updateState(int, 
SequenceNumber)}</li>
+        *     <li>{@link KinesisDataFetcher#emitRecordAndUpdateState(T, long, 
int, SequenceNumber)}</li>
+        * </ul>
+        */
+       private final List<KinesisStreamShardState> subscribedShardsState;
+
+       private final SourceFunction.SourceContext<T> sourceContext;
+
+       /** Checkpoint lock, also used to synchronize operations on 
subscribedShardsState */
+       private final Object checkpointLock;
+
+       /** Reference to the first error thrown by any of the {@link 
ShardConsumer} threads */
+       private final AtomicReference<Throwable> error;
+
+       /** The Kinesis proxy that the fetcher will be using to discover new 
shards */
+       private final KinesisProxyInterface kinesis;
+
+       /** Thread that executed runFetcher() */
+       private Thread mainThread;
+
+       /**
+        * The current number of shards that are actively read by this fetcher.
+        *
+        * This value is updated in {@link 
KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)},
+        * and {@link KinesisDataFetcher#updateState(int, SequenceNumber)}.
+        */
+       private final AtomicInteger numberOfActiveShards = new AtomicInteger(0);
+
+       private volatile boolean running = true;
+
+       /**
+        * Creates a Kinesis Data Fetcher.
+        *
+        * @param streams the streams to subscribe to
+        * @param sourceContext context of the source function
+        * @param runtimeContext this subtask's runtime context
+        * @param configProps the consumer configuration properties
+        * @param deserializationSchema deserialization schema
+        */
+       public KinesisDataFetcher(List<String> streams,
+                                                       
SourceFunction.SourceContext<T> sourceContext,
+                                                       RuntimeContext 
runtimeContext,
+                                                       Properties configProps,
+                                                       
KinesisDeserializationSchema<T> deserializationSchema) {
+               this(streams,
+                       sourceContext,
+                       sourceContext.getCheckpointLock(),
+                       runtimeContext,
+                       configProps,
+                       deserializationSchema,
+                       new AtomicReference<Throwable>(),
+                       new LinkedList<KinesisStreamShardState>(),
+                       
createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
+                       KinesisProxy.create(configProps));
+       }
+
+       /** This constructor is exposed for testing purposes */
+       protected KinesisDataFetcher(List<String> streams,
+                                                               
SourceFunction.SourceContext<T> sourceContext,
+                                                               Object 
checkpointLock,
+                                                               RuntimeContext 
runtimeContext,
+                                                               Properties 
configProps,
+                                                               
KinesisDeserializationSchema<T> deserializationSchema,
+                                                               
AtomicReference<Throwable> error,
+                                                               
LinkedList<KinesisStreamShardState> subscribedShardsState,
+                                                               HashMap<String, 
String> subscribedStreamsToLastDiscoveredShardIds,
+                                                               
KinesisProxyInterface kinesis) {
+               this.streams = checkNotNull(streams);
+               this.configProps = checkNotNull(configProps);
+               this.sourceContext = checkNotNull(sourceContext);
+               this.checkpointLock = checkNotNull(checkpointLock);
+               this.runtimeContext = checkNotNull(runtimeContext);
+               this.totalNumberOfConsumerSubtasks = 
runtimeContext.getNumberOfParallelSubtasks();
+               this.indexOfThisConsumerSubtask = 
runtimeContext.getIndexOfThisSubtask();
+               this.deserializationSchema = 
checkNotNull(deserializationSchema);
+               this.kinesis = checkNotNull(kinesis);
+
+               this.error = checkNotNull(error);
+               this.subscribedShardsState = 
checkNotNull(subscribedShardsState);
+               this.subscribedStreamsToLastDiscoveredShardIds = 
checkNotNull(subscribedStreamsToLastDiscoveredShardIds);
+
+               this.shardConsumersExecutor =
+                       
createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());
+       }
+
+       /**
+        * Starts the fetcher. After starting the fetcher, it can only
+        * be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}.
+        *
+        * @throws Exception the first error or exception thrown by the fetcher 
or any of the threads created by the fetcher.
+        */
+       public void runFetcher() throws Exception {
+
+               // check that we are running before proceeding
+               if (!running) {
+                       return;
+               }
+
+               this.mainThread = Thread.currentThread();
+
+               // 
------------------------------------------------------------------------
+               //  Procedures before starting the infinite while loop:
+               // 
------------------------------------------------------------------------
+
+               //  1. query for any new shards that may have been created 
while the Kinesis consumer was not running,
+               //     and register them to the subscribedShardState list.
+               if (LOG.isDebugEnabled()) {
+                       String logFormat = (!isRestoredFromFailure)
+                               ? "Subtask {} is trying to discover initial 
shards ..."
+                               : "Subtask {} is trying to discover any new 
shards that were created while the consumer wasn't " +
+                               "running due to failure ...";
+
+                       LOG.debug(logFormat, indexOfThisConsumerSubtask);
+               }
+               List<KinesisStreamShard> newShardsCreatedWhileNotRunning = 
discoverNewShardsToSubscribe();
+               for (KinesisStreamShard shard : 
newShardsCreatedWhileNotRunning) {
+                       // the starting state for new shards created while the 
consumer wasn't running depends on whether or not
+                       // we are starting fresh (not restoring from a 
checkpoint); when we are starting fresh, this simply means
+                       // all existing shards of streams we are subscribing to 
are new shards; when we are restoring from checkpoint,
+                       // any new shards due to Kinesis resharding from the 
time of the checkpoint will be considered new shards.
+                       InitialPosition initialPosition = 
InitialPosition.valueOf(configProps.getProperty(
+                               
ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION));
+
+                       SentinelSequenceNumber startingStateForNewShard = 
(isRestoredFromFailure)
+                               ? 
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM
+                               : initialPosition.toSentinelSequenceNumber();
+
+                       if (LOG.isInfoEnabled()) {
+                               String logFormat = (!isRestoredFromFailure)
+                                       ? "Subtask {} will be seeded with 
initial shard {}, starting state set as sequence number {}"
+                                       : "Subtask {} will be seeded with new 
shard {} that was created while the consumer wasn't " +
+                                       "running due to failure, starting state 
set as sequence number {}";
+
+                               LOG.info(logFormat, indexOfThisConsumerSubtask, 
shard.toString(), startingStateForNewShard.get());
+                       }
+                       registerNewSubscribedShardState(new 
KinesisStreamShardState(shard, startingStateForNewShard.get()));
+               }
+
+               //  2. check that there is at least one shard in the subscribed 
streams to consume from (can be done by
+               //     checking if at least one value in 
subscribedStreamsToLastDiscoveredShardIds is not null)
+               boolean hasShards = false;
+               StringBuilder streamsWithNoShardsFound = new StringBuilder();
+               for (Map.Entry<String, String> streamToLastDiscoveredShardEntry 
: subscribedStreamsToLastDiscoveredShardIds.entrySet()) {
+                       if (streamToLastDiscoveredShardEntry.getValue() != 
null) {
+                               hasShards = true;
+                       } else {
+                               
streamsWithNoShardsFound.append(streamToLastDiscoveredShardEntry.getKey()).append(",
 ");
+                       }
+               }
+
+               if (streamsWithNoShardsFound.length() != 0 && 
LOG.isWarnEnabled()) {
+                       LOG.warn("Subtask {} has failed to find any shards for 
the following subscribed streams: {}",
+                               indexOfThisConsumerSubtask, 
streamsWithNoShardsFound.toString());
+               }
+
+               if (!hasShards) {
+                       throw new RuntimeException("No shards can be found for 
all subscribed streams: " + streams);
+               }
+
+               //  3. start consuming any shard state we already have in the 
subscribedShardState up to this point; the
+               //     subscribedShardState may already be seeded with values 
due to step 1., or explicitly added by the
+               //     consumer using a restored state checkpoint
+               for (int seededStateIndex = 0; seededStateIndex < 
subscribedShardsState.size(); seededStateIndex++) {
+                       KinesisStreamShardState seededShardState = 
subscribedShardsState.get(seededStateIndex);
+
+                       // only start a consuming thread if the seeded 
subscribed shard has not been completely read already
+                       if 
(!seededShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()))
 {
+
+                               if (LOG.isInfoEnabled()) {
+                                       LOG.info("Subtask {} will start 
consuming seeded shard {} from sequence number {} with ShardConsumer {}",
+                                               indexOfThisConsumerSubtask, 
seededShardState.getKinesisStreamShard().toString(),
+                                               
seededShardState.getLastProcessedSequenceNum(), seededStateIndex);
+                                       }
+
+                               shardConsumersExecutor.submit(
+                                       new ShardConsumer<>(
+                                               this,
+                                               seededStateIndex,
+                                               
subscribedShardsState.get(seededStateIndex).getKinesisStreamShard(),
+                                               
subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum()));
+                       }
+               }
+
+               // 
------------------------------------------------------------------------
+
+               // finally, start the infinite shard discovery and consumer 
launching loop;
+               // we will escape from this loop only when shutdownFetcher() or 
stopWithError() is called
+
+               final long discoveryIntervalMillis = Long.valueOf(
+                       configProps.getProperty(
+                               
ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
+                               
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)));
+
+               // FLINK-4341:
+               // For downstream operators that work on time (ex. window 
operators), we are required to emit a max value watermark
+               // for subtasks that won't continue to have shards to read from 
unless resharding happens in the future, otherwise
+               // the downstream watermarks would not advance, leading to 
unbounded accumulating state.
+               //
+               // The side-effect of this limitation is that on resharding, we 
must fail hard if the newly discovered shard
+               // is to be subscribed by a subtask that has previously emitted 
a max value watermark, otherwise the watermarks
+               // will be messed up.
+               //
+               // There are 2 cases were we need to either emit a max value 
watermark, or deliberately fail hard:
+               //  (a) if this subtask has no more shards to read from unless 
resharding happens in the future, we emit a max
+               //      value watermark. This case is encountered when 1) all 
previously read shards by this subtask were closed
+               //      due to resharding, 2) when this subtask was initially 
only subscribed to closed shards while the consumer
+               //      was told to start from TRIM_HORIZON, or 3) there was 
initially no shards for this subtask to read on startup.
+               //  (b) this subtask has discovered new shards to read from due 
to a reshard; if this subtask has already emitted
+               //      a max value watermark, we must deliberately fail hard 
to avoid messing up the watermarks. The new shards
+               //      will be subscribed by this subtask after restore as 
initial shards on startup.
+               //
+               // TODO: This is a temporary workaround until a min-watermark 
information service is available in the JobManager
+               // Please see FLINK-4341 for more detail
+
+               boolean emittedMaxValueWatermark = false;
+
+               if (this.numberOfActiveShards.get() == 0) {
+                       // FLINK-4341 workaround case (a) - please see the 
above for details on this case
+                       LOG.info("Subtask {} has no initial shards to read on 
startup; emitting max value watermark ...",
+                               indexOfThisConsumerSubtask);
+                       sourceContext.emitWatermark(new 
Watermark(Long.MAX_VALUE));
+                       emittedMaxValueWatermark = true;
+               }
+
+               while (running) {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Subtask {} is trying to discover new 
shards that were created due to resharding ...",
+                                       indexOfThisConsumerSubtask);
+                       }
+                       List<KinesisStreamShard> newShardsDueToResharding = 
discoverNewShardsToSubscribe();
+
+                       // -- NOTE: Potential race condition between 
newShardsDueToResharding and numberOfActiveShards --
+                       // Since numberOfActiveShards is updated by parallel 
shard consuming threads in updateState(), there exists
+                       // a race condition with the currently queried 
newShardsDueToResharding. Therefore, numberOfActiveShards
+                       // may not correctly reflect the discover result in the 
below case determination. This may lead to incorrect
+                       // case determination on the current discovery attempt, 
but can still be correctly handled on future attempts.
+                       //
+                       // Although this can be resolved by wrapping the 
current shard discovery attempt with the below
+                       // case determination within a synchronized block on 
the checkpoint lock for atomicity, there will be
+                       // considerable throughput performance regression as 
shard discovery is a remote call to AWS. Therefore,
+                       // since the case determination is a temporary 
workaround for FLINK-4341, the race condition is tolerable as
+                       // we can still eventually handle max value watermark 
emitting / deliberately failing on successive
+                       // discovery attempts.
+
+                       if (newShardsDueToResharding.size() == 0 && 
this.numberOfActiveShards.get() == 0 && !emittedMaxValueWatermark) {
+                               // FLINK-4341 workaround case (a) - please see 
the above for details on this case
+                               LOG.info("Subtask {} has completed reading all 
shards; emitting max value watermark ...",
+                                       indexOfThisConsumerSubtask);
+                               sourceContext.emitWatermark(new 
Watermark(Long.MAX_VALUE));
+                               emittedMaxValueWatermark = true;
+                       } else if (newShardsDueToResharding.size() > 0 && 
emittedMaxValueWatermark) {
+                               // FLINK-4341 workaround case (b) - please see 
the above for details on this case
+                               //
+                               // Note that in the case where on resharding 
this subtask ceased to read all of it's previous shards
+                               // but new shards is also to be subscribed by 
this subtask immediately after, emittedMaxValueWatermark
+                               // will be false; this allows the fetcher to 
continue reading the new shards without failing on such cases.
+                               // However, due to the race condition mentioned 
above, we might still fall into case (a) first, and
+                               // then (b) on the next discovery attempt. 
Although the failure is ideally unnecessary, max value
+                               // watermark emitting still remains to be 
correct.
+
+                               LOG.warn("Subtask {} has discovered {} new 
shards to subscribe, but is failing hard to avoid messing" +
+                                               " up watermarks; the new shards 
will be subscribed by this subtask after restore ...",
+                                       indexOfThisConsumerSubtask, 
newShardsDueToResharding.size());
+                               throw new RuntimeException("Deliberate failure 
to avoid messing up watermarks");
+                       }
+
+                       for (KinesisStreamShard shard : 
newShardsDueToResharding) {
+                               // since there may be delay in discovering a 
new shard, all new shards due to
+                               // resharding should be read starting from the 
earliest record possible
+                               KinesisStreamShardState newShardState =
+                                       new KinesisStreamShardState(shard, 
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
+                               int newStateIndex = 
registerNewSubscribedShardState(newShardState);
+
+                               if (LOG.isInfoEnabled()) {
+                                       LOG.info("Subtask {} has discovered a 
new shard {} due to resharding, and will start consuming " +
+                                                       "the shard from 
sequence number {} with ShardConsumer {}",
+                                               indexOfThisConsumerSubtask, 
newShardState.getKinesisStreamShard().toString(),
+                                               
newShardState.getLastProcessedSequenceNum(), newStateIndex);
+                               }
+
+                               shardConsumersExecutor.submit(
+                                       new ShardConsumer<>(
+                                               this,
+                                               newStateIndex,
+                                               
newShardState.getKinesisStreamShard(),
+                                               
newShardState.getLastProcessedSequenceNum()));
+                       }
+
+                       // we also check if we are running here so that we 
won't start the discovery sleep
+                       // interval if the running flag was set to false during 
the middle of the while loop
+                       if (running && discoveryIntervalMillis != 0) {
+                               try {
+                                       Thread.sleep(discoveryIntervalMillis);
+                               } catch (InterruptedException iex) {
+                                       // the sleep may be interrupted by 
shutdownFetcher()
+                               }
+                       }
+               }
+
+               // make sure all resources have been terminated before leaving
+               awaitTermination();
+
+               // any error thrown in the shard consumer threads will be 
thrown to the main thread
+               Throwable throwable = this.error.get();
+               if (throwable != null) {
+                       if (throwable instanceof Exception) {
+                               throw (Exception) throwable;
+                       } else if (throwable instanceof Error) {
+                               throw (Error) throwable;
+                       } else {
+                               throw new Exception(throwable);
+                       }
+               }
+       }
+
+       /**
+        * Creates a snapshot of the current last processed sequence numbers of 
each subscribed shard.
+        *
+        * @return state snapshot
+        */
+       public HashMap<KinesisStreamShard, SequenceNumber> snapshotState() {
+               // this method assumes that the checkpoint lock is held
+               assert Thread.holdsLock(checkpointLock);
+
+               HashMap<KinesisStreamShard, SequenceNumber> stateSnapshot = new 
HashMap<>();
+               for (KinesisStreamShardState shardWithState : 
subscribedShardsState) {
+                       
stateSnapshot.put(shardWithState.getKinesisStreamShard(), 
shardWithState.getLastProcessedSequenceNum());
+               }
+               return stateSnapshot;
+       }
+
+       /**
+        * Starts shutting down the fetcher. Must be called to allow {@link 
KinesisDataFetcher#runFetcher()} to complete.
+        * Once called, the shutdown procedure will be executed and all shard 
consuming threads will be interrupted.
+        */
+       public void shutdownFetcher() {
+               running = false;
+               mainThread.interrupt(); // the main thread may be sleeping for 
the discovery interval
+
+               if (LOG.isInfoEnabled()) {
+                       LOG.info("Shutting down the shard consumer threads of 
subtask {} ...", indexOfThisConsumerSubtask);
+               }
+               shardConsumersExecutor.shutdownNow();
+       }
+
+       /** After calling {@link KinesisDataFetcher#shutdownFetcher()}, this 
can be called to await the fetcher shutdown */
+       public void awaitTermination() throws InterruptedException {
+               while(!shardConsumersExecutor.isTerminated()) {
+                       Thread.sleep(50);
+               }
+       }
+
+       /** Called by created threads to pass on errors. Only the first thrown 
error is set.
+        * Once set, the shutdown process will be executed and all shard 
consuming threads will be interrupted. */
+       protected void stopWithError(Throwable throwable) {
+               if (this.error.compareAndSet(null, throwable)) {
+                       shutdownFetcher();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Functions that update the subscribedStreamToLastDiscoveredShardIds 
state
+       // 
------------------------------------------------------------------------
+
+       /** Updates the last discovered shard of a subscribed stream; only 
updates if the update is valid */
+       public void advanceLastDiscoveredShardOfStream(String stream, String 
shardId) {
+               String lastSeenShardIdOfStream = 
this.subscribedStreamsToLastDiscoveredShardIds.get(stream);
+
+               // the update is valid only if the given shard id is greater
+               // than the previous last seen shard id of the stream
+               if (lastSeenShardIdOfStream == null) {
+                       // if not previously set, simply put as the last seen 
shard id
+                       
this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId);
+               } else if (KinesisStreamShard.compareShardIds(shardId, 
lastSeenShardIdOfStream) > 0) {
+                       
this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId);
+               }
+       }
+
+       /**
+        * A utility function that does the following:
+        *
+        * 1. Find new shards for each stream that we haven't seen before
+        * 2. For each new shard, determine whether this consumer subtask 
should subscribe to them;
+        *        if yes, it is added to the returned list of shards
+        * 3. Update the subscribedStreamsToLastDiscoveredShardIds state so 
that we won't get shards
+        *    that we have already seen before the next time this function is 
called
+        */
+       private List<KinesisStreamShard> discoverNewShardsToSubscribe() throws 
InterruptedException {
+
+               List<KinesisStreamShard> newShardsToSubscribe = new 
LinkedList<>();
+
+               GetShardListResult shardListResult = 
kinesis.getShardList(subscribedStreamsToLastDiscoveredShardIds);
+               if (shardListResult.hasRetrievedShards()) {
+                       Set<String> streamsWithNewShards = 
shardListResult.getStreamsWithRetrievedShards();
+
+                       for (String stream : streamsWithNewShards) {
+                               List<KinesisStreamShard> newShardsOfStream = 
shardListResult.getRetrievedShardListOfStream(stream);
+                               for (KinesisStreamShard newShard : 
newShardsOfStream) {
+                                       if 
(isThisSubtaskShouldSubscribeTo(newShard, totalNumberOfConsumerSubtasks, 
indexOfThisConsumerSubtask)) {
+                                               
newShardsToSubscribe.add(newShard);
+                                       }
+                               }
+
+                               advanceLastDiscoveredShardOfStream(
+                                       stream, 
shardListResult.getLastSeenShardOfStream(stream).getShard().getShardId());
+                       }
+               }
+
+               return newShardsToSubscribe;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Functions to get / set information about the consumer
+       // 
------------------------------------------------------------------------
+
+       public void setIsRestoringFromFailure(boolean bool) {
+               this.isRestoredFromFailure = bool;
+       }
+
+       protected Properties getConsumerConfiguration() {
+               return configProps;
+       }
+
+       protected KinesisDeserializationSchema<T> 
getClonedDeserializationSchema() {
+               try {
+                       return InstantiationUtil.clone(deserializationSchema, 
runtimeContext.getUserCodeClassLoader());
+               } catch (IOException | ClassNotFoundException ex) {
+                       // this really shouldn't happen; simply wrap it around 
a runtime exception
+                       throw new RuntimeException(ex);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Thread-safe operations for record emitting and shard state updating
+       //  that assure atomicity with respect to the checkpoint lock
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Atomic operation to collect a record and update state to the 
sequence number of the record.
+        * This method is called by {@link ShardConsumer}s.
+        *
+        * @param record the record to collect
+        * @param recordTimestamp timestamp to attach to the collected record
+        * @param shardStateIndex index of the shard to update in 
subscribedShardsState;
+        *                        this index should be the returned value from
+        *                        {@link 
KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, 
called
+        *                        when the shard state was registered.
+        * @param lastSequenceNumber the last sequence number value to update
+        */
+       protected void emitRecordAndUpdateState(T record, long recordTimestamp, 
int shardStateIndex, SequenceNumber lastSequenceNumber) {
+               synchronized (checkpointLock) {
+                       sourceContext.collectWithTimestamp(record, 
recordTimestamp);
+                       updateState(shardStateIndex, lastSequenceNumber);
+               }
+       }
+
+       /**
+        * Update the shard to last processed sequence number state.
+        * This method is called by {@link ShardConsumer}s.
+        *
+        * @param shardStateIndex index of the shard to update in 
subscribedShardsState;
+        *                        this index should be the returned value from
+        *                        {@link 
KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, 
called
+        *                        when the shard state was registered.
+        * @param lastSequenceNumber the last sequence number value to update
+        */
+       protected void updateState(int shardStateIndex, SequenceNumber 
lastSequenceNumber) {
+               synchronized (checkpointLock) {
+                       
subscribedShardsState.get(shardStateIndex).setLastProcessedSequenceNum(lastSequenceNumber);
+
+                       // if a shard's state is updated to be 
SENTINEL_SHARD_ENDING_SEQUENCE_NUM by its consumer thread,
+                       // we've finished reading the shard and should 
determine it to be non-active
+                       if 
(lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()))
 {
+                               this.numberOfActiveShards.decrementAndGet();
+                               LOG.info("Subtask {} has reached the end of 
subscribed shard: {}",
+                                       indexOfThisConsumerSubtask, 
subscribedShardsState.get(shardStateIndex).getKinesisStreamShard());
+                       }
+               }
+       }
+
+       /**
+        * Register a new subscribed shard state.
+        *
+        * @param newSubscribedShardState the new shard state that this fetcher 
is to be subscribed to
+        */
+       public int registerNewSubscribedShardState(KinesisStreamShardState 
newSubscribedShardState) {
+               synchronized (checkpointLock) {
+                       subscribedShardsState.add(newSubscribedShardState);
+
+                       // If a registered shard has initial state that is not 
SENTINEL_SHARD_ENDING_SEQUENCE_NUM (will be the case
+                       // if the consumer had already finished reading a shard 
before we failed and restored), we determine that
+                       // this subtask has a new active shard
+                       if 
(!newSubscribedShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()))
 {
+                               this.numberOfActiveShards.incrementAndGet();
+                       }
+
+                       return subscribedShardsState.size()-1;
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Miscellaneous utility functions
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Utility function to determine whether a shard should be subscribed 
by this consumer subtask.
+        *
+        * @param shard the shard to determine
+        * @param totalNumberOfConsumerSubtasks total number of consumer 
subtasks
+        * @param indexOfThisConsumerSubtask index of this consumer subtask
+        */
+       private static boolean 
isThisSubtaskShouldSubscribeTo(KinesisStreamShard shard,
+                                                                               
                                int totalNumberOfConsumerSubtasks,
+                                                                               
                                int indexOfThisConsumerSubtask) {
+               return (Math.abs(shard.hashCode() % 
totalNumberOfConsumerSubtasks)) == indexOfThisConsumerSubtask;
+       }
+
+       private static ExecutorService createShardConsumersThreadPool(final 
String subtaskName) {
+               return Executors.newCachedThreadPool(new ThreadFactory() {
+                       @Override
+                       public Thread newThread(Runnable runnable) {
+                               final AtomicLong threadCount = new 
AtomicLong(0);
+                               Thread thread = new Thread(runnable);
+                               thread.setName("shardConsumers-" + subtaskName 
+ "-thread-" + threadCount.getAndIncrement());
+                               thread.setDaemon(true);
+                               return thread;
+                       }
+               });
+       }
+
+       /**
+        * Utility function to create an initial map of the last discovered 
shard id of each subscribed stream, set to null;
+        * This is called in the constructor; correct values will be set later 
on by calling advanceLastDiscoveredShardOfStream()
+        *
+        * @param streams the list of subscribed streams
+        * @return the initial map for subscribedStreamsToLastDiscoveredShardIds
+        */
+       protected static HashMap<String, String> 
createInitialSubscribedStreamsToLastDiscoveredShardsState(List<String> streams) 
{
+               HashMap<String, String> initial = new HashMap<>();
+               for (String stream : streams) {
+                       initial.put(stream, null);
+               }
+               return initial;
+       }
+}

Reply via email to