http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml
deleted file mode 100644
index 45b3b92..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-    <logger name="org.apache.flink.streaming" level="WARN"/>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/pom.xml 
b/flink-streaming-connectors/flink-connector-kinesis/pom.xml
deleted file mode 100644
index 29170ad..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/pom.xml
+++ /dev/null
@@ -1,164 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-streaming-connectors</artifactId>
-               <version>1.2-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-connector-kinesis_2.10</artifactId>
-       <name>flink-connector-kinesis</name>
-       <properties>
-               <aws.sdk.version>1.10.71</aws.sdk.version>
-               <aws.kinesis-kcl.version>1.6.2</aws.kinesis-kcl.version>
-               <aws.kinesis-kpl.version>0.10.2</aws.kinesis-kpl.version>
-       </properties>
-
-       <packaging>jar</packaging>
-
-       <dependencies>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-java_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>com.google.guava</groupId>
-                       <artifactId>guava</artifactId>
-                       <version>${guava.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-tests_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-                       <type>test-jar</type>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-test-utils_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <!-- Note:
-                       The below dependencies are licenced under the Amazon 
Software License.
-                       Flink includes the "flink-connector-kinesis" only as an 
optional dependency for that reason.
-               -->
-               <dependency>
-                       <groupId>com.amazonaws</groupId>
-                       <artifactId>aws-java-sdk-kinesis</artifactId>
-                       <version>${aws.sdk.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>com.amazonaws</groupId>
-                       <artifactId>amazon-kinesis-producer</artifactId>
-                       <version>${aws.kinesis-kpl.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>com.amazonaws</groupId>
-                       <artifactId>amazon-kinesis-client</artifactId>
-                       <version>${aws.kinesis-kcl.version}</version>
-                       <!--
-                               We're excluding the below from the KCL since 
we'll only be using the
-                               
com.amazonaws.services.kinesis.clientlibrary.types.UserRecord class, which will 
not need these dependencies.
-                       -->
-                       <exclusions>
-                               <exclusion>
-                                       <groupId>com.amazonaws</groupId>
-                                       
<artifactId>aws-java-sdk-dynamodb</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>com.amazonaws</groupId>
-                                       
<artifactId>aws-java-sdk-cloudwatch</artifactId>
-                               </exclusion>
-                       </exclusions>
-               </dependency>
-
-       </dependencies>
-
-       <build>
-               <plugins>
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-jar-plugin</artifactId>
-                               <executions>
-                                       <execution>
-                                               <goals>
-                                                       <goal>test-jar</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                       </plugin>
-
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-shade-plugin</artifactId>
-                               <executions>
-                                       <execution>
-                                               <id>shade-flink</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>shade</goal>
-                                               </goals>
-                                               <configuration>
-                                                       
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
-                                                       <artifactSet 
combine.children="append">
-                                                               <includes>
-                                                                       
<include>com.amazonaws:*</include>
-                                                                       
<include>com.google.protobuf:*</include>
-                                                               </includes>
-                                                       </artifactSet>
-                                                       <relocations 
combine.children="override">
-                                                               <!-- DO NOT 
RELOCATE GUAVA IN THIS PACKAGE -->
-                                                               <relocation>
-                                                                       
<pattern>org.objectweb.asm</pattern>
-                                                                       
<shadedPattern>org.apache.flink.shaded.org.objectweb.asm</shadedPattern>
-                                                               </relocation>
-                                                               <relocation>
-                                                                       
<pattern>com.google.protobuf</pattern>
-                                                                       
<shadedPattern>org.apache.flink.kinesis.shaded.com.google.protobuf</shadedPattern>
-                                                               </relocation>
-                                                               <relocation>
-                                                                       
<pattern>com.amazonaws</pattern>
-                                                                       
<shadedPattern>org.apache.flink.kinesis.shaded.com.amazonaws</shadedPattern>
-                                                               </relocation>
-                                                       </relocations>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-               </plugins>
-       </build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
deleted file mode 100644
index a62dc10..0000000
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
-import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
-import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
-import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
-import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
-import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.List;
-import java.util.Properties;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The Flink Kinesis Consumer is an exactly-once parallel streaming data 
source that subscribes to multiple AWS Kinesis
- * streams within the same AWS service region, and can handle resharding of 
streams. Each subtask of the consumer is
- * responsible for fetching data records from multiple Kinesis shards. The 
number of shards fetched by each subtask will
- * change as shards are closed and created by Kinesis.
- *
- * <p>To leverage Flink's checkpointing mechanics for exactly-once streaming 
processing guarantees, the Flink Kinesis
- * consumer is implemented with the AWS Java SDK, instead of the officially 
recommended AWS Kinesis Client Library, for
- * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
- * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST.</p>
- *
- * @param <T> the type of data emitted
- */
-public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
-       implements CheckpointedAsynchronously<HashMap<KinesisStreamShard, 
SequenceNumber>>, ResultTypeQueryable<T> {
-
-       private static final long serialVersionUID = 4724006128720664870L;
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisConsumer.class);
-
-       // 
------------------------------------------------------------------------
-       //  Consumer properties
-       // 
------------------------------------------------------------------------
-
-       /** The names of the Kinesis streams that we will be consuming from */
-       private final List<String> streams;
-
-       /** Properties to parametrize settings such as AWS service region, 
initial position in stream,
-        * shard list retrieval behaviours, etc */
-       private final Properties configProps;
-
-       /** User supplied deseriliazation schema to convert Kinesis byte 
messages to Flink objects */
-       private final KinesisDeserializationSchema<T> deserializer;
-
-       // 
------------------------------------------------------------------------
-       //  Runtime state
-       // 
------------------------------------------------------------------------
-
-       /** Per-task fetcher for Kinesis data records, where each fetcher pulls 
data from one or more Kinesis shards */
-       private transient KinesisDataFetcher<T> fetcher;
-
-       /** The sequence numbers in the last state snapshot of this subtask */
-       private transient HashMap<KinesisStreamShard, SequenceNumber> 
lastStateSnapshot;
-
-       /** The sequence numbers to restore to upon restore from failure */
-       private transient HashMap<KinesisStreamShard, SequenceNumber> 
sequenceNumsToRestore;
-
-       private volatile boolean running = true;
-
-
-       // 
------------------------------------------------------------------------
-       //  Constructors
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a new Flink Kinesis Consumer.
-        *
-        * <p>The AWS credentials to be used, AWS region of the Kinesis 
streams, initial position to start streaming
-        * from are configured with a {@link Properties} instance.</p>
-        *
-        * @param stream
-        *           The single AWS Kinesis stream to read from.
-        * @param deserializer
-        *           The deserializer used to convert raw bytes of Kinesis 
records to Java objects (without key).
-        * @param configProps
-        *           The properties used to configure AWS credentials, AWS 
region, and initial starting position.
-        */
-       public FlinkKinesisConsumer(String stream, DeserializationSchema<T> 
deserializer, Properties configProps) {
-               this(stream, new 
KinesisDeserializationSchemaWrapper<>(deserializer), configProps);
-       }
-
-       /**
-        * Creates a new Flink Kinesis Consumer.
-        *
-        * <p>The AWS credentials to be used, AWS region of the Kinesis 
streams, initial position to start streaming
-        * from are configured with a {@link Properties} instance.</p>
-        *
-        * @param stream
-        *           The single AWS Kinesis stream to read from.
-        * @param deserializer
-        *           The keyed deserializer used to convert raw bytes of 
Kinesis records to Java objects.
-        * @param configProps
-        *           The properties used to configure AWS credentials, AWS 
region, and initial starting position.
-        */
-       public FlinkKinesisConsumer(String stream, 
KinesisDeserializationSchema<T> deserializer, Properties configProps) {
-               this(Collections.singletonList(stream), deserializer, 
configProps);
-       }
-
-       /**
-        * Creates a new Flink Kinesis Consumer.
-        *
-        * <p>The AWS credentials to be used, AWS region of the Kinesis 
streams, initial position to start streaming
-        * from are configured with a {@link Properties} instance.</p>
-        *
-        * @param streams
-        *           The AWS Kinesis streams to read from.
-        * @param deserializer
-        *           The keyed deserializer used to convert raw bytes of 
Kinesis records to Java objects.
-        * @param configProps
-        *           The properties used to configure AWS credentials, AWS 
region, and initial starting position.
-        */
-       public FlinkKinesisConsumer(List<String> streams, 
KinesisDeserializationSchema<T> deserializer, Properties configProps) {
-               checkNotNull(streams, "streams can not be null");
-               checkArgument(streams.size() != 0, "must be consuming at least 
1 stream");
-               checkArgument(!streams.contains(""), "stream names cannot be 
empty Strings");
-               this.streams = streams;
-
-               this.configProps = checkNotNull(configProps, "configProps can 
not be null");
-
-               // check the configuration properties for any conflicting 
settings
-               
KinesisConfigUtil.validateConsumerConfiguration(this.configProps);
-
-               this.deserializer = checkNotNull(deserializer, "deserializer 
can not be null");
-
-               if (LOG.isInfoEnabled()) {
-                       StringBuilder sb = new StringBuilder();
-                       for (String stream : streams) {
-                               sb.append(stream).append(", ");
-                       }
-                       LOG.info("Flink Kinesis Consumer is going to read the 
following streams: {}", sb.toString());
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Source life cycle
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void open(Configuration parameters) throws Exception {
-               super.open(parameters);
-
-               // restore to the last known sequence numbers from the latest 
complete snapshot
-               if (sequenceNumsToRestore != null) {
-                       if (LOG.isInfoEnabled()) {
-                               LOG.info("Subtask {} is restoring sequence 
numbers {} from previous checkpointed state",
-                                       
getRuntimeContext().getIndexOfThisSubtask(), sequenceNumsToRestore.toString());
-                       }
-
-                       // initialize sequence numbers with restored state
-                       lastStateSnapshot = sequenceNumsToRestore;
-               } else {
-                       // start fresh with empty sequence numbers if there are 
no snapshots to restore from.
-                       lastStateSnapshot = new HashMap<>();
-               }
-       }
-
-       @Override
-       public void run(SourceContext<T> sourceContext) throws Exception {
-
-               // all subtasks will run a fetcher, regardless of whether or 
not the subtask will initially have
-               // shards to subscribe to; fetchers will continuously poll for 
changes in the shard list, so all subtasks
-               // can potentially have new shards to subscribe to later on
-               fetcher = new KinesisDataFetcher<>(
-                       streams, sourceContext, getRuntimeContext(), 
configProps, deserializer);
-
-               boolean isRestoringFromFailure = (sequenceNumsToRestore != 
null);
-               fetcher.setIsRestoringFromFailure(isRestoringFromFailure);
-
-               // if we are restoring from a checkpoint, we iterate over the 
restored
-               // state and accordingly seed the fetcher with subscribed 
shards states
-               if (isRestoringFromFailure) {
-                       for (Map.Entry<KinesisStreamShard, SequenceNumber> 
restored : lastStateSnapshot.entrySet()) {
-                               fetcher.advanceLastDiscoveredShardOfStream(
-                                       restored.getKey().getStreamName(), 
restored.getKey().getShard().getShardId());
-
-                               if (LOG.isInfoEnabled()) {
-                                       LOG.info("Subtask {} is seeding the 
fetcher with restored shard {}," +
-                                                       " starting state set to 
the restored sequence number {}",
-                                               
getRuntimeContext().getIndexOfThisSubtask(), restored.getKey().toString(), 
restored.getValue());
-                               }
-                               fetcher.registerNewSubscribedShardState(
-                                       new 
KinesisStreamShardState(restored.getKey(), restored.getValue()));
-                       }
-               }
-
-               // check that we are running before starting the fetcher
-               if (!running) {
-                       return;
-               }
-
-               // start the fetcher loop. The fetcher will stop running only 
when cancel() or
-               // close() is called, or an error is thrown by threads created 
by the fetcher
-               fetcher.runFetcher();
-
-               // check that the fetcher has terminated before fully closing
-               fetcher.awaitTermination();
-               sourceContext.close();
-       }
-
-       @Override
-       public void cancel() {
-               running = false;
-
-               KinesisDataFetcher fetcher = this.fetcher;
-               this.fetcher = null;
-
-               // this method might be called before the subtask actually 
starts running,
-               // so we must check if the fetcher is actually created
-               if (fetcher != null) {
-                       try {
-                               // interrupt the fetcher of any work
-                               fetcher.shutdownFetcher();
-                               fetcher.awaitTermination();
-                       } catch (Exception e) {
-                               LOG.warn("Error while closing Kinesis data 
fetcher", e);
-                       }
-               }
-       }
-
-       @Override
-       public void close() throws Exception {
-               cancel();
-               super.close();
-       }
-
-       @Override
-       public TypeInformation<T> getProducedType() {
-               return deserializer.getProducedType();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  State Snapshot & Restore
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long 
checkpointId, long checkpointTimestamp) throws Exception {
-               if (lastStateSnapshot == null) {
-                       LOG.debug("snapshotState() requested on not yet opened 
source; returning null.");
-                       return null;
-               }
-
-               if (fetcher == null) {
-                       LOG.debug("snapshotState() requested on not yet running 
source; returning null.");
-                       return null;
-               }
-
-               if (!running) {
-                       LOG.debug("snapshotState() called on closed source; 
returning null.");
-                       return null;
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Snapshotting state ...");
-               }
-
-               lastStateSnapshot = fetcher.snapshotState();
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Snapshotted state, last processed sequence 
numbers: {}, checkpoint id: {}, timestamp: {}",
-                               lastStateSnapshot.toString(), checkpointId, 
checkpointTimestamp);
-               }
-
-               return lastStateSnapshot;
-       }
-
-       @Override
-       public void restoreState(HashMap<KinesisStreamShard, SequenceNumber> 
restoredState) throws Exception {
-               sequenceNumsToRestore = restoredState;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
deleted file mode 100644
index 579bd6b..0000000
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kinesis;
-
-import com.amazonaws.services.kinesis.producer.Attempt;
-import com.amazonaws.services.kinesis.producer.KinesisProducer;
-import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
-import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
-import com.amazonaws.services.kinesis.producer.UserRecordResult;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import 
org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
-import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
-import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
-import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.util.PropertiesUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Objects;
-import java.util.Properties;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
- *
- * @param <OUT> Data type to produce into Kinesis Streams
- */
-public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
-
-       /** Properties to parametrize settings such as AWS service region, 
access key etc. */
-       private final Properties configProps;
-
-       /* Flag controlling the error behavior of the producer */
-       private boolean failOnError = false;
-
-       /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
-       private String defaultStream;
-
-       /* Default partition id. Can be overwritten by the serialization schema 
*/
-       private String defaultPartition;
-
-       /* Schema for turning the OUT type into a byte array. */
-       private final KinesisSerializationSchema<OUT> schema;
-
-       /* Optional custom partitioner */
-       private KinesisPartitioner<OUT> customPartitioner = null;
-
-
-       // --------------------------- Runtime fields 
---------------------------
-
-
-       /* Our Kinesis instance for each parallel Flink sink */
-       private transient KinesisProducer producer;
-
-       /* Callback handling failures */
-       private transient FutureCallback<UserRecordResult> callback;
-
-       /* Field for async exception */
-       private transient volatile Throwable thrownException;
-
-
-       // --------------------------- Initialization and configuration  
---------------------------
-
-
-       /**
-        * Create a new FlinkKinesisProducer.
-        * This is a constructor supporting Flink's {@see SerializationSchema}.
-        *
-        * @param schema Serialization schema for the data type
-        * @param configProps The properties used to configure AWS credentials 
and AWS region
-        */
-       public FlinkKinesisProducer(final SerializationSchema<OUT> schema, 
Properties configProps) {
-
-               // create a simple wrapper for the serialization schema
-               this(new KinesisSerializationSchema<OUT>() {
-                       @Override
-                       public ByteBuffer serialize(OUT element) {
-                               // wrap into ByteBuffer
-                               return 
ByteBuffer.wrap(schema.serialize(element));
-                       }
-                       // use default stream and hash key
-                       @Override
-                       public String getTargetStream(OUT element) {
-                               return null;
-                       }
-               }, configProps);
-       }
-
-       /**
-        * Create a new FlinkKinesisProducer.
-        * This is a constructor supporting {@see KinesisSerializationSchema}.
-        *
-        * @param schema Kinesis serialization schema for the data type
-        * @param configProps The properties used to configure AWS credentials 
and AWS region
-        */
-       public FlinkKinesisProducer(KinesisSerializationSchema<OUT> schema, 
Properties configProps) {
-               this.configProps = checkNotNull(configProps, "configProps can 
not be null");
-
-               // check the configuration properties for any conflicting 
settings
-               
KinesisConfigUtil.validateProducerConfiguration(this.configProps);
-
-               
ClosureCleaner.ensureSerializable(Objects.requireNonNull(schema));
-               this.schema = schema;
-       }
-
-       /**
-        * If set to true, the producer will immediately fail with an exception 
on any error.
-        * Otherwise, the errors are logged and the producer goes on.
-        *
-        * @param failOnError Error behavior flag
-        */
-       public void setFailOnError(boolean failOnError) {
-               this.failOnError = failOnError;
-       }
-
-       /**
-        * Set a default stream name.
-        * @param defaultStream Name of the default Kinesis stream
-        */
-       public void setDefaultStream(String defaultStream) {
-               this.defaultStream = defaultStream;
-       }
-
-       /**
-        * Set default partition id
-        * @param defaultPartition Name of the default partition
-        */
-       public void setDefaultPartition(String defaultPartition) {
-               this.defaultPartition = defaultPartition;
-       }
-
-       public void setCustomPartitioner(KinesisPartitioner<OUT> partitioner) {
-               Objects.requireNonNull(partitioner);
-               ClosureCleaner.ensureSerializable(partitioner);
-               this.customPartitioner = partitioner;
-       }
-
-
-       // --------------------------- Lifecycle methods 
---------------------------
-
-
-       @Override
-       public void open(Configuration parameters) throws Exception {
-               super.open(parameters);
-
-               KinesisProducerConfiguration producerConfig = new 
KinesisProducerConfiguration();
-
-               
producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION));
-               
producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
-               if 
(configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) {
-                       
producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps,
-                                       
ProducerConfigConstants.COLLECTION_MAX_COUNT, 
producerConfig.getCollectionMaxCount(), LOG));
-               }
-               if 
(configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) {
-                       
producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps,
-                                       
ProducerConfigConstants.AGGREGATION_MAX_COUNT, 
producerConfig.getAggregationMaxCount(), LOG));
-               }
-
-               producer = new KinesisProducer(producerConfig);
-               callback = new FutureCallback<UserRecordResult>() {
-                       @Override
-                       public void onSuccess(UserRecordResult result) {
-                               if (!result.isSuccessful()) {
-                                       if(failOnError) {
-                                               thrownException = new 
RuntimeException("Record was not sent successful");
-                                       } else {
-                                               LOG.warn("Record was not sent 
successful");
-                                       }
-                               }
-                       }
-
-                       @Override
-                       public void onFailure(Throwable t) {
-                               if (failOnError) {
-                                       thrownException = t;
-                               } else {
-                                       LOG.warn("An exception occurred while 
processing a record", t);
-                               }
-                       }
-               };
-
-               if (this.customPartitioner != null) {
-                       
this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), 
getRuntimeContext().getNumberOfParallelSubtasks());
-               }
-
-               LOG.info("Started Kinesis producer instance for region '{}'", 
producerConfig.getRegion());
-       }
-
-       @Override
-       public void invoke(OUT value) throws Exception {
-               if (this.producer == null) {
-                       throw new RuntimeException("Kinesis producer has been 
closed");
-               }
-               if (thrownException != null) {
-                       String errorMessages = "";
-                       if (thrownException instanceof 
UserRecordFailedException) {
-                               List<Attempt> attempts = 
((UserRecordFailedException) thrownException).getResult().getAttempts();
-                               for (Attempt attempt: attempts) {
-                                       if (attempt.getErrorMessage() != null) {
-                                               errorMessages += 
attempt.getErrorMessage() +"\n";
-                                       }
-                               }
-                       }
-                       if (failOnError) {
-                               throw new RuntimeException("An exception was 
thrown while processing a record: " + errorMessages, thrownException);
-                       } else {
-                               LOG.warn("An exception was thrown while 
processing a record: {}", thrownException, errorMessages);
-                               thrownException = null; // reset
-                       }
-               }
-
-               String stream = defaultStream;
-               String partition = defaultPartition;
-
-               ByteBuffer serialized = schema.serialize(value);
-
-               // maybe set custom stream
-               String customStream = schema.getTargetStream(value);
-               if (customStream != null) {
-                       stream = customStream;
-               }
-
-               String explicitHashkey = null;
-               // maybe set custom partition
-               if (customPartitioner != null) {
-                       partition = customPartitioner.getPartitionId(value);
-                       explicitHashkey = 
customPartitioner.getExplicitHashKey(value);
-               }
-
-               if (stream == null) {
-                       if (failOnError) {
-                               throw new RuntimeException("No target stream 
set");
-                       } else {
-                               LOG.warn("No target stream set. Skipping 
record");
-                               return;
-                       }
-               }
-
-               ListenableFuture<UserRecordResult> cb = 
producer.addUserRecord(stream, partition, explicitHashkey, serialized);
-               Futures.addCallback(cb, callback);
-       }
-
-       @Override
-       public void close() throws Exception {
-               LOG.info("Closing producer");
-               super.close();
-               KinesisProducer kp = this.producer;
-               this.producer = null;
-               if (kp != null) {
-                       LOG.info("Flushing outstanding {} records", 
kp.getOutstandingRecordsCount());
-                       // try to flush all outstanding records
-                       while (kp.getOutstandingRecordsCount() > 0) {
-                               kp.flush();
-                               try {
-                                       Thread.sleep(500);
-                               } catch (InterruptedException e) {
-                                       LOG.warn("Flushing was interrupted.");
-                                       // stop the blocking flushing and 
destroy producer immediately
-                                       break;
-                               }
-                       }
-                       LOG.info("Flushing done. Destroying producer 
instance.");
-                       kp.destroy();
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
deleted file mode 100644
index bd23abe..0000000
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kinesis;
-
-
-import java.io.Serializable;
-
-public abstract class KinesisPartitioner<T> implements Serializable {
-
-       /**
-        * Return a partition id based on the input
-        * @param element Element to partition
-        * @return A string representing the partition id
-        */
-       public abstract String getPartitionId(T element);
-
-       /**
-        * Optional method for setting an explicit hash key
-        * @param element Element to get the hash key for
-        * @return the hash key for the element
-        */
-       public String getExplicitHashKey(T element) {
-               return null;
-       }
-
-       /**
-        * Optional initializer.
-        *
-        * @param indexOfThisSubtask Index of this partitioner instance
-        * @param numberOfParallelSubtasks Total number of parallel instances
-        */
-       public void initialize(int indexOfThisSubtask, int 
numberOfParallelSubtasks) {
-               //
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
deleted file mode 100644
index 01d4f00..0000000
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.config;
-
-import com.amazonaws.auth.AWSCredentialsProvider;
-
-/**
- * Configuration keys for AWS service usage
- */
-public class AWSConfigConstants {
-
-       /**
-        * Possible configuration values for the type of credential provider to 
use when accessing AWS Kinesis.
-        * Internally, a corresponding implementation of {@link 
AWSCredentialsProvider} will be used.
-        */
-       public enum CredentialProvider {
-
-               /** Look for the environment variables AWS_ACCESS_KEY_ID and 
AWS_SECRET_ACCESS_KEY to create AWS credentials */
-               ENV_VAR,
-
-               /** Look for Java system properties aws.accessKeyId and 
aws.secretKey to create AWS credentials */
-               SYS_PROP,
-
-               /** Use a AWS credentials profile file to create the AWS 
credentials */
-               PROFILE,
-
-               /** Simply create AWS credentials by supplying the AWS access 
key ID and AWS secret key in the configuration properties */
-               BASIC,
-
-               /** A credentials provider chain will be used that searches for 
credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance 
metadata **/
-               AUTO,
-       }
-
-       /** The AWS region of the Kinesis streams to be pulled ("us-east-1" is 
used if not set) */
-       public static final String AWS_REGION = "aws.region";
-
-       /** The AWS access key ID to use when setting credentials provider type 
to BASIC */
-       public static final String AWS_ACCESS_KEY_ID = 
"aws.credentials.provider.basic.accesskeyid";
-
-       /** The AWS secret key to use when setting credentials provider type to 
BASIC */
-       public static final String AWS_SECRET_ACCESS_KEY = 
"aws.credentials.provider.basic.secretkey";
-
-       /** The credential provider type to use when AWS credentials are 
required (BASIC is used if not set)*/
-       public static final String AWS_CREDENTIALS_PROVIDER = 
"aws.credentials.provider";
-
-       /** Optional configuration for profile path if credential provider type 
is set to be PROFILE */
-       public static final String AWS_PROFILE_PATH = 
"aws.credentials.provider.profile.path";
-
-       /** Optional configuration for profile name if credential provider type 
is set to be PROFILE */
-       public static final String AWS_PROFILE_NAME = 
"aws.credentials.provider.profile.name";
-
-       /** The AWS endpoint for Kinesis (derived from the AWS region setting 
if not set) */
-       public static final String AWS_ENDPOINT = "aws.endpoint";
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
deleted file mode 100644
index 76c20ed..0000000
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.config;
-
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
-import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer;
-import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
-
-/**
- * Optional consumer specific configuration keys and default values for {@link 
FlinkKinesisConsumer}
- */
-public class ConsumerConfigConstants extends AWSConfigConstants {
-
-       /**
-        * The initial position to start reading shards from. This will affect 
the {@link ShardIteratorType} used
-        * when the consumer tasks retrieve the first shard iterator for each 
Kinesis shard.
-        */
-       public enum InitialPosition {
-
-               /** Start reading from the earliest possible record in the 
stream (excluding expired data records) */
-               
TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM),
-
-               /** Start reading from the latest incoming record */
-               LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM);
-
-               private SentinelSequenceNumber sentinelSequenceNumber;
-
-               InitialPosition(SentinelSequenceNumber sentinelSequenceNumber) {
-                       this.sentinelSequenceNumber = sentinelSequenceNumber;
-               }
-
-               public SentinelSequenceNumber toSentinelSequenceNumber() {
-                       return this.sentinelSequenceNumber;
-               }
-       }
-
-       /** The initial position to start reading Kinesis streams from (LATEST 
is used if not set) */
-       public static final String STREAM_INITIAL_POSITION = 
"flink.stream.initpos";
-
-       /** The base backoff time between each describeStream attempt */
-       public static final String STREAM_DESCRIBE_BACKOFF_BASE = 
"flink.stream.describe.backoff.base";
-
-       /** The maximum backoff time between each describeStream attempt */
-       public static final String STREAM_DESCRIBE_BACKOFF_MAX = 
"flink.stream.describe.backoff.max";
-
-       /** The power constant for exponential backoff between each 
describeStream attempt */
-       public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT 
= "flink.stream.describe.backoff.expconst";
-
-       /** The maximum number of records to try to get each time we fetch 
records from a AWS Kinesis shard */
-       public static final String SHARD_GETRECORDS_MAX = 
"flink.shard.getrecords.maxrecordcount";
-
-       /** The maximum number of getRecords attempts if we get 
ProvisionedThroughputExceededException */
-       public static final String SHARD_GETRECORDS_RETRIES = 
"flink.shard.getrecords.maxretries";
-
-       /** The base backoff time between getRecords attempts if we get a 
ProvisionedThroughputExceededException */
-       public static final String SHARD_GETRECORDS_BACKOFF_BASE = 
"flink.shard.getrecords.backoff.base";
-
-       /** The maximum backoff time between getRecords attempts if we get a 
ProvisionedThroughputExceededException */
-       public static final String SHARD_GETRECORDS_BACKOFF_MAX = 
"flink.shard.getrecords.backoff.max";
-
-       /** The power constant for exponential backoff between each getRecords 
attempt */
-       public static final String 
SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = 
"flink.shard.getrecords.backoff.expconst";
-
-       /** The interval between each getRecords request to a AWS Kinesis shard 
in milliseconds */
-       public static final String SHARD_GETRECORDS_INTERVAL_MILLIS = 
"flink.shard.getrecords.intervalmillis";
-
-       /** The maximum number of getShardIterator attempts if we get 
ProvisionedThroughputExceededException */
-       public static final String SHARD_GETITERATOR_RETRIES = 
"flink.shard.getiterator.maxretries";
-
-       /** The base backoff time between getShardIterator attempts if we get a 
ProvisionedThroughputExceededException */
-       public static final String SHARD_GETITERATOR_BACKOFF_BASE = 
"flink.shard.getiterator.backoff.base";
-
-       /** The maximum backoff time between getShardIterator attempts if we 
get a ProvisionedThroughputExceededException */
-       public static final String SHARD_GETITERATOR_BACKOFF_MAX = 
"flink.shard.getiterator.backoff.max";
-
-       /** The power constant for exponential backoff between each 
getShardIterator attempt */
-       public static final String 
SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = 
"flink.shard.getiterator.backoff.expconst";
-
-       /** The interval between each attempt to discover new shards */
-       public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = 
"flink.shard.discovery.intervalmillis";
-
-       // 
------------------------------------------------------------------------
-       //  Default values for consumer configuration
-       // 
------------------------------------------------------------------------
-
-       public static final String DEFAULT_STREAM_INITIAL_POSITION = 
InitialPosition.LATEST.toString();
-
-       public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L;
-
-       public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L;
-
-       public static final double 
DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
-
-       public static final int DEFAULT_SHARD_GETRECORDS_MAX = 100;
-
-       public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3;
-
-       public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE = 300L;
-
-       public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX = 1000L;
-
-       public static final double 
DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
-
-       public static final long DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS = 0L;
-
-       public static final int DEFAULT_SHARD_GETITERATOR_RETRIES = 3;
-
-       public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE = 300L;
-
-       public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX = 1000L;
-
-       public static final double 
DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
-
-       public static final long DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS = 
10000L;
-
-       /**
-        * To avoid shard iterator expires in {@link ShardConsumer}s, the value 
for the configured
-        * getRecords interval can not exceed 5 minutes, which is the expire 
time for retrieved iterators.
-        */
-       public static final long MAX_SHARD_GETRECORDS_INTERVAL_MILLIS = 300000L;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
deleted file mode 100644
index 1edddfc..0000000
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.config;
-
-import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
-
-/**
- * Optional producer specific configuration keys for {@link 
FlinkKinesisProducer}
- */
-public class ProducerConfigConstants extends AWSConfigConstants {
-
-       /** Maximum number of items to pack into an PutRecords request. **/
-       public static final String COLLECTION_MAX_COUNT = 
"aws.producer.collectionMaxCount";
-
-       /** Maximum number of items to pack into an aggregated record. **/
-       public static final String AGGREGATION_MAX_COUNT = 
"aws.producer.aggregationMaxCount";
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
deleted file mode 100644
index 55668c6..0000000
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kinesis.examples;
-
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
-import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
-import java.util.Properties;
-
-/**
- * This is an example on how to consume data from Kinesis
- */
-public class ConsumeFromKinesis {
-
-       public static void main(String[] args) throws Exception {
-               ParameterTool pt = ParameterTool.fromArgs(args);
-
-               StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               see.setParallelism(1);
-
-               Properties kinesisConsumerConfig = new Properties();
-               
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
pt.getRequired("region"));
-               
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
pt.getRequired("accesskey"));
-               
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY,
 pt.getRequired("secretkey"));
-
-               DataStream<String> kinesis = see.addSource(new 
FlinkKinesisConsumer<>(
-                       "flink-test",
-                       new SimpleStringSchema(),
-                       kinesisConsumerConfig));
-
-               kinesis.print();
-
-               see.execute();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
deleted file mode 100644
index d178137..0000000
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kinesis.examples;
-
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
-import 
org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
-import java.util.Properties;
-
-/**
- * This is an example on how to produce data into Kinesis
- */
-public class ProduceIntoKinesis {
-
-       public static void main(String[] args) throws Exception {
-               ParameterTool pt = ParameterTool.fromArgs(args);
-
-               StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               see.setParallelism(1);
-
-               DataStream<String> simpleStringStream = see.addSource(new 
EventsGenerator());
-
-               Properties kinesisProducerConfig = new Properties();
-               
kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, 
pt.getRequired("region"));
-               
kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, 
pt.getRequired("accessKey"));
-               
kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY,
 pt.getRequired("secretKey"));
-
-               FlinkKinesisProducer<String> kinesis = new 
FlinkKinesisProducer<>(
-                               new SimpleStringSchema(), 
kinesisProducerConfig);
-
-               kinesis.setFailOnError(true);
-               kinesis.setDefaultStream("flink-test");
-               kinesis.setDefaultPartition("0");
-
-               simpleStringStream.addSink(kinesis);
-
-               see.execute();
-       }
-
-       public static class EventsGenerator implements SourceFunction<String> {
-               private boolean running = true;
-
-               @Override
-               public void run(SourceContext<String> ctx) throws Exception {
-                       long seq = 0;
-                       while(running) {
-                               Thread.sleep(10);
-                               ctx.collect((seq++) + "-" + 
RandomStringUtils.randomAlphabetic(12));
-                       }
-               }
-
-               @Override
-               public void cancel() {
-                       running = false;
-               }
-       }
-}

Reply via email to