[FLINK-3058] Add support for Kafka 0.9.0.0

For adding Kafka 0.9.0.0 support, this commit changes the following:
- Split up of the kafka connector into a flink-connector-kafka-(base|0.9|0.8) 
with different dependencies
- The base package contains common test cases, classes and implementations (the 
producer for 0.9 and 0.8 relies on exactly the same code)
- the 0.8 package contains a kafka connector implementation against the 
SimpleConsumer (low level) API of Kafka 0.8. There are some additional tests 
for the ZK offset committing
- The 0.9 package relies on the new Consumer API of Kafka 0.9.0.0
- Support for metrics for all producers and the 0.9 consumer through Flink's 
accumulators.

This closes #1489


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81320c1c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81320c1c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81320c1c

Branch: refs/heads/master
Commit: 81320c1c7ee98b9a663998df51cc4d5aa73d9b2a
Parents: 83fb2fa
Author: Robert Metzger <[email protected]>
Authored: Wed Dec 16 17:29:42 2015 +0100
Committer: Robert Metzger <[email protected]>
Committed: Wed Jan 20 20:30:50 2016 +0100

----------------------------------------------------------------------
 .gitignore                                      |    1 +
 docs/apis/streaming/connectors/kafka.md         |   67 +-
 docs/apis/streaming/index.md                    |    4 +-
 .../flink/api/java/typeutils/TypeExtractor.java |    4 +-
 .../webmonitor/RuntimeMonitorHandler.java       |    6 +
 .../flink-connector-kafka-0.8/pom.xml           |  160 ++
 .../connectors/kafka/FlinkKafkaConsumer08.java  |  592 +++++++
 .../connectors/kafka/FlinkKafkaConsumer081.java |   39 +
 .../connectors/kafka/FlinkKafkaConsumer082.java |   39 +
 .../connectors/kafka/FlinkKafkaProducer.java    |   64 +
 .../connectors/kafka/FlinkKafkaProducer08.java  |  128 ++
 .../connectors/kafka/internals/Fetcher.java     |   81 +
 .../kafka/internals/LegacyFetcher.java          |  652 ++++++++
 .../kafka/internals/OffsetHandler.java          |   56 +
 .../kafka/internals/PartitionerWrapper.java     |   49 +
 .../kafka/internals/ZookeeperOffsetHandler.java |  146 ++
 .../connectors/kafka/Kafka08ITCase.java         |  266 ++++
 .../connectors/kafka/Kafka08ProducerITCase.java |   33 +
 .../connectors/kafka/KafkaConsumerTest.java     |  152 ++
 .../connectors/kafka/KafkaLocalSystemTime.java  |   48 +
 .../connectors/kafka/KafkaProducerTest.java     |  114 ++
 .../kafka/KafkaTestEnvironmentImpl.java         |  337 ++++
 .../internals/ZookeeperOffsetHandlerTest.java   |   56 +
 .../src/test/resources/log4j-test.properties    |   29 +
 .../src/test/resources/logback-test.xml         |   30 +
 .../flink-connector-kafka-0.9/pom.xml           |  131 ++
 .../connectors/kafka/FlinkKafkaConsumer09.java  |  501 ++++++
 .../connectors/kafka/FlinkKafkaProducer09.java  |  130 ++
 .../kafka/examples/ReadFromKafka.java           |   56 +
 .../kafka/examples/WriteIntoKafka.java          |   70 +
 .../src/main/resources/log4j.properties         |   29 +
 .../connectors/kafka/Kafka09ITCase.java         |  114 ++
 .../connectors/kafka/Kafka09ProducerITCase.java |   33 +
 .../connectors/kafka/KafkaProducerTest.java     |  115 ++
 .../kafka/KafkaTestEnvironmentImpl.java         |  340 ++++
 .../src/test/resources/log4j-test.properties    |   29 +
 .../src/test/resources/logback-test.xml         |   30 +
 .../flink-connector-kafka-base/pom.xml          |  169 ++
 .../kafka/FlinkKafkaConsumerBase.java           |  225 +++
 .../kafka/FlinkKafkaProducerBase.java           |  291 ++++
 .../kafka/internals/KafkaTopicPartition.java    |  132 ++
 .../internals/KafkaTopicPartitionLeader.java    |  129 ++
 .../internals/ZooKeeperStringSerializer.java    |   51 +
 .../metrics/AvgKafkaMetricAccumulator.java      |  141 ++
 .../metrics/DefaultKafkaMetricAccumulator.java  |  159 ++
 .../metrics/MaxKafkaMetricAccumulator.java      |   57 +
 .../metrics/MinKafkaMetricAccumulator.java      |   57 +
 .../kafka/partitioner/FixedPartitioner.java     |   80 +
 .../kafka/partitioner/KafkaPartitioner.java     |   41 +
 .../KeyedDeserializationSchema.java             |   52 +
 .../KeyedDeserializationSchemaWrapper.java      |   51 +
 .../serialization/KeyedSerializationSchema.java |   48 +
 .../KeyedSerializationSchemaWrapper.java        |   43 +
 ...eInformationKeyValueSerializationSchema.java |  171 ++
 .../KafkaConsumerPartitionAssignmentTest.java   |  273 ++++
 .../connectors/kafka/KafkaConsumerTestBase.java | 1371 ++++++++++++++++
 .../connectors/kafka/KafkaProducerTestBase.java |  187 +++
 .../connectors/kafka/KafkaTestBase.java         |  170 ++
 .../connectors/kafka/KafkaTestEnvironment.java  |   83 +
 .../connectors/kafka/TestFixedPartitioner.java  |  104 ++
 .../kafka/testutils/DataGenerators.java         |  219 +++
 .../kafka/testutils/DiscardingSink.java         |   33 +
 .../kafka/testutils/FailingIdentityMapper.java  |  115 ++
 .../testutils/JobManagerCommunicationUtils.java |   76 +
 .../kafka/testutils/MockRuntimeContext.java     |  157 ++
 .../testutils/PartitionValidatingMapper.java    |   53 +
 .../kafka/testutils/ThrottledMapper.java        |   44 +
 .../kafka/testutils/Tuple2Partitioner.java      |   51 +
 .../testutils/ValidatingExactlyOnceSink.java    |   82 +
 .../src/test/resources/log4j-test.properties    |   29 +
 .../src/test/resources/logback-test.xml         |   30 +
 .../flink-connector-kafka/pom.xml               |  141 --
 .../connectors/kafka/FlinkKafkaConsumer.java    |  815 ----------
 .../connectors/kafka/FlinkKafkaConsumer081.java |   58 -
 .../connectors/kafka/FlinkKafkaConsumer082.java |   85 -
 .../connectors/kafka/FlinkKafkaProducer.java    |  340 ----
 .../connectors/kafka/internals/Fetcher.java     |   81 -
 .../kafka/internals/KafkaTopicPartition.java    |  124 --
 .../internals/KafkaTopicPartitionLeader.java    |  129 --
 .../kafka/internals/LegacyFetcher.java          |  648 --------
 .../kafka/internals/OffsetHandler.java          |   56 -
 .../kafka/internals/PartitionerWrapper.java     |   49 -
 .../internals/ZooKeeperStringSerializer.java    |   51 -
 .../kafka/internals/ZookeeperOffsetHandler.java |  143 --
 .../kafka/partitioner/FixedPartitioner.java     |   80 -
 .../kafka/partitioner/KafkaPartitioner.java     |   42 -
 .../KafkaConsumerPartitionAssignmentTest.java   |  273 ----
 .../connectors/kafka/KafkaConsumerTest.java     |  155 --
 .../connectors/kafka/KafkaConsumerTestBase.java | 1475 ------------------
 .../streaming/connectors/kafka/KafkaITCase.java |  133 --
 .../connectors/kafka/KafkaLocalSystemTime.java  |   48 -
 .../connectors/kafka/KafkaProducerITCase.java   |  189 ---
 .../connectors/kafka/KafkaProducerTest.java     |  114 --
 .../connectors/kafka/KafkaTestBase.java         |  387 -----
 .../connectors/kafka/TestFixedPartitioner.java  |  104 --
 .../internals/ZookeeperOffsetHandlerTest.java   |   67 -
 .../kafka/testutils/DataGenerators.java         |  214 ---
 .../kafka/testutils/DiscardingSink.java         |   33 -
 .../kafka/testutils/FailingIdentityMapper.java  |  115 --
 .../testutils/JobManagerCommunicationUtils.java |   76 -
 .../kafka/testutils/MockRuntimeContext.java     |  157 --
 .../testutils/PartitionValidatingMapper.java    |   53 -
 .../kafka/testutils/SuccessException.java       |   26 -
 .../kafka/testutils/ThrottledMapper.java        |   44 -
 .../kafka/testutils/Tuple2Partitioner.java      |   51 -
 .../testutils/ValidatingExactlyOnceSink.java    |   81 -
 .../src/test/resources/log4j-test.properties    |   29 -
 .../src/test/resources/logback-test.xml         |   30 -
 flink-streaming-connectors/pom.xml              |    4 +-
 .../api/environment/CheckpointConfig.java       |    2 +-
 .../KeyedDeserializationSchema.java             |   52 -
 .../KeyedDeserializationSchemaWrapper.java      |   51 -
 .../serialization/KeyedSerializationSchema.java |   48 -
 .../KeyedSerializationSchemaWrapper.java        |   43 -
 ...eInformationKeyValueSerializationSchema.java |  171 --
 .../TypeInformationSerializationSchema.java     |    1 -
 .../EventTimeAllWindowCheckpointingITCase.java  |   29 +-
 .../EventTimeWindowCheckpointingITCase.java     |   29 +-
 .../WindowCheckpointingITCase.java              |   27 +-
 .../flink/test/util/SuccessException.java       |   26 +
 .../org/apache/flink/test/util/TestUtils.java   |   52 +
 pom.xml                                         |    1 +
 122 files changed, 9770 insertions(+), 7167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 8e30de7..a73a9d3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -23,3 +23,4 @@ 
flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/gen
 flink-runtime-web/web-dashboard/node_modules/
 flink-runtime-web/web-dashboard/bower_components/
 atlassian-ide-plugin.xml
+out/

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/docs/apis/streaming/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/kafka.md 
b/docs/apis/streaming/connectors/kafka.md
index 9d9d6ba..f0757ce 100644
--- a/docs/apis/streaming/connectors/kafka.md
+++ b/docs/apis/streaming/connectors/kafka.md
@@ -34,14 +34,15 @@ exactly-once processing semantics. To achieve that, Flink 
does not purely rely o
 offset tracking, but tracks and checkpoints these offsets internally as well.
 
 Please pick a package (maven artifact id) and class name for your use-case and 
environment.
-For most users, the `FlinkKafkaConsumer082` (part of `flink-connector-kafka`) 
is appropriate.
+For most users, the `FlinkKafkaConsumer08` (part of `flink-connector-kafka`) 
is appropriate.
 
 <table class="table table-bordered">
   <thead>
     <tr>
       <th class="text-left">Maven Dependency</th>
       <th class="text-left">Supported since</th>
-      <th class="text-left">Class name</th>
+      <th class="text-left">Consumer and <br>
+      Producer Class name</th>
       <th class="text-left">Kafka version</th>
       <th class="text-left">Notes</th>
     </tr>
@@ -50,17 +51,27 @@ For most users, the `FlinkKafkaConsumer082` (part of 
`flink-connector-kafka`) is
     <tr>
         <td>flink-connector-kafka</td>
         <td>0.9.1, 0.10</td>
-        <td>FlinkKafkaConsumer081</td>
-        <td>0.8.1</td>
+        <td>FlinkKafkaConsumer082<br>
+        FlinkKafkaProducer</td>
+        <td>0.8.x</td>
         <td>Uses the <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example";>SimpleConsumer</a>
 API of Kafka internally. Offsets are committed to ZK by Flink.</td>
     </tr>
-    <tr>
-        <td>flink-connector-kafka</td>
-        <td>0.9.1, 0.10</td>
-        <td>FlinkKafkaConsumer082</td>
-        <td>0.8.2</td>
+     <tr>
+        <td>flink-connector-kafka-0.8</td>
+        <td>1.0.0</td>
+        <td>FlinkKafkaConsumer08<br>
+        FlinkKafkaProducer08</td>
+        <td>0.8.x</td>
         <td>Uses the <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example";>SimpleConsumer</a>
 API of Kafka internally. Offsets are committed to ZK by Flink.</td>
     </tr>
+     <tr>
+        <td>flink-connector-kafka-0.9</td>
+        <td>1.0.0</td>
+        <td>FlinkKafkaConsumer09<br>
+        FlinkKafkaProducer09</td>
+        <td>0.9.x</td>
+        <td>Uses the new <a 
href="http://kafka.apache.org/documentation.html#newconsumerapi";>Consumer 
API</a> Kafka.</td>
+    </tr>
   </tbody>
 </table>
 
@@ -69,7 +80,7 @@ Then, import the connector in your maven project:
 {% highlight xml %}
 <dependency>
   <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-kafka</artifactId>
+  <artifactId>flink-connector-kafka-0.8</artifactId>
   <version>{{site.version }}</version>
 </dependency>
 {% endhighlight %}
@@ -84,14 +95,16 @@ Note that the streaming connectors are currently not part 
of the binary distribu
 
 #### Kafka Consumer
 
-The standard `FlinkKafkaConsumer082` is a Kafka consumer providing access to 
one topic. It takes the following parameters to the constructor:
+Flink's Kafka consumer is called `FlinkKafkaConsumer08` (or `09`). It provides 
access to one or more Kafka topics.
+
+The constructor accepts the following arguments:
 
-1. The topic name
-2. A DeserializationSchema
+1. The topic name / list of topic names
+2. A DeserializationSchema / KeyedDeserializationSchema for deserializing the 
data from Kafka
 3. Properties for the Kafka consumer.
   The following properties are required:
   - "bootstrap.servers" (comma separated list of Kafka brokers)
-  - "zookeeper.connect" (comma separated list of Zookeeper servers)
+  - "zookeeper.connect" (comma separated list of Zookeeper servers) (**only 
required for Kafka 0.8**)
   - "group.id" the id of the consumer group
 
 Example:
@@ -101,10 +114,11 @@ Example:
 {% highlight java %}
 Properties properties = new Properties();
 properties.setProperty("bootstrap.servers", "localhost:9092");
+// only required for Kafka 0.8
 properties.setProperty("zookeeper.connect", "localhost:2181");
 properties.setProperty("group.id", "test");
 DataStream<String> stream = env
-       .addSource(new FlinkKafkaConsumer082<>("topic", new 
SimpleStringSchema(), properties))
+       .addSource(new FlinkKafkaConsumer08<>("topic", new 
SimpleStringSchema(), properties))
        .print();
 {% endhighlight %}
 </div>
@@ -112,15 +126,28 @@ DataStream<String> stream = env
 {% highlight scala %}
 val properties = new Properties();
 properties.setProperty("bootstrap.servers", "localhost:9092");
+// only required for Kafka 0.8
 properties.setProperty("zookeeper.connect", "localhost:2181");
 properties.setProperty("group.id", "test");
 stream = env
-    .addSource(new FlinkKafkaConsumer082[String]("topic", new 
SimpleStringSchema(), properties))
+    .addSource(new FlinkKafkaConsumer08[String]("topic", new 
SimpleStringSchema(), properties))
     .print
 {% endhighlight %}
 </div>
 </div>
 
+
+##### The `DeserializationSchema`
+
+The `FlinkKafkaConsumer08` needs to know how to turn the data in Kafka into 
Java objects. The 
+`DeserializationSchema` allows users to specify such a schema. The `T 
deserialize(byte[] message)`
+method gets called for each Kafka message, passing the value from Kafka.
+For accessing both the key and value of the Kafka message, the 
`KeyedDeserializationSchema` has
+the following deserialize method ` T deserialize(byte[] messageKey, byte[] 
message, String topic, int partition, long offset)`.
+
+For convenience, Flink provides a `TypeInformationSerializationSchema` (and 
`TypeInformationKeyValueSerializationSchema`) 
+which creates a schema based on a Flink `TypeInformation`.
+
 #### Kafka Consumers and Fault Tolerance
 
 With Flink's checkpointing enabled, the Flink Kafka Consumer will consume 
records from a topic and periodically checkpoint all
@@ -155,20 +182,20 @@ If checkpointing is not enabled, the Kafka consumer will 
periodically commit the
 
 #### Kafka Producer
 
-The `FlinkKafkaProducer` writes data to a Kafka topic. The producer can 
specify a custom partitioner that assigns
-recors to partitions.
+The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can 
specify a custom partitioner that assigns
+records to partitions.
 
 Example:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-stream.addSink(new FlinkKafkaProducer<String>("localhost:9092", "my-topic", 
new SimpleStringSchema()));
+stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", 
new SimpleStringSchema()));
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-stream.addSink(new FlinkKafkaProducer[String]("localhost:9092", "my-topic", 
new SimpleStringSchema()))
+stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", 
new SimpleStringSchema()))
 {% endhighlight %}
 </div>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/docs/apis/streaming/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/index.md b/docs/apis/streaming/index.md
index 06c0014..9d2481c 100644
--- a/docs/apis/streaming/index.md
+++ b/docs/apis/streaming/index.md
@@ -1630,7 +1630,7 @@ Collection-based:
 Custom:
 
 - `addSource` - Attache a new source function. For example, to read from 
Apache Kafka you can use
-    `addSource(new FlinkKafkaConsumer082<>(...))`. See [connectors]({{ 
site.baseurl }}/apis/streaming/connectors/) for more details.
+    `addSource(new FlinkKafkaConsumer08<>(...))`. See [connectors]({{ 
site.baseurl }}/apis/streaming/connectors/) for more details.
 
 </div>
 
@@ -1682,7 +1682,7 @@ Collection-based:
 Custom:
 
 - `addSource` - Attache a new source function. For example, to read from 
Apache Kafka you can use
-    `addSource(new FlinkKafkaConsumer082<>(...))`. See [connectors]({{ 
site.baseurl }}/apis/streaming/connectors/) for more details.
+    `addSource(new FlinkKafkaConsumer08<>(...))`. See [connectors]({{ 
site.baseurl }}/apis/streaming/connectors/) for more details.
 
 </div>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 1a4ccae..ddb4a48 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -820,7 +820,7 @@ public class TypeExtractor {
                        validateInfo(typeHierarchy, t, inType);
                }
                catch(InvalidTypesException e) {
-                       throw new InvalidTypesException("Input mismatch: " + 
e.getMessage());
+                       throw new InvalidTypesException("Input mismatch: " + 
e.getMessage(), e);
                }
        }
        
@@ -840,7 +840,7 @@ public class TypeExtractor {
                        validateInfo(typeHierarchy, inType, inTypeInfo);
                }
                catch(InvalidTypesException e) {
-                       throw new InvalidTypesException("Input mismatch: " + 
e.getMessage());
+                       throw new InvalidTypesException("Input mismatch: " + 
e.getMessage(), e);
                }
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 4143434..c304abb 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -36,6 +36,8 @@ import 
org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
 import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
 import org.apache.flink.util.ExceptionUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.Option;
 import scala.Tuple2;
 import scala.concurrent.Await;
@@ -58,6 +60,8 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 @ChannelHandler.Sharable
 public class RuntimeMonitorHandler extends SimpleChannelInboundHandler<Routed> 
{
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(RuntimeMonitorHandler.class);
+
        private static final Charset ENCODING = Charset.forName("UTF-8");
 
        public static final String WEB_MONITOR_ADDRESS_KEY = 
"web.monitor.address";
@@ -143,12 +147,14 @@ public class RuntimeMonitorHandler extends 
SimpleChannelInboundHandler<Routed> {
                                        : 
Unpooled.wrappedBuffer(e.getMessage().getBytes(ENCODING));
                        response = new 
DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, 
message);
                        response.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
"text/plain");
+                       LOG.warn("Error while handling request", e);
                }
                catch (Exception e) {
                        byte[] bytes = 
ExceptionUtils.stringifyException(e).getBytes(ENCODING);
                        response = new 
DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
                                        
HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes));
                        response.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
"text/plain");
+                       LOG.warn("Error while handling request", e);
                }
 
                response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, 
"utf-8");

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml 
b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
new file mode 100644
index 0000000..aae4847
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
@@ -0,0 +1,160 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-streaming-connectors-parent</artifactId>
+               <version>1.0-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-connector-kafka-0.8</artifactId>
+       <name>flink-connector-kafka-0.8</name>
+
+       <packaging>jar</packaging>
+
+       <!-- Allow users to pass custom connector versions -->
+       <properties>
+               <kafka.version>0.8.2.0</kafka.version>
+       </properties>
+
+       <dependencies>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-kafka-base</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-kafka-base</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.kafka</groupId>
+                       <artifactId>kafka_${scala.binary.version}</artifactId>
+                       <version>${kafka.version}</version>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>com.sun.jmx</groupId>
+                                       <artifactId>jmxri</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>com.sun.jdmk</groupId>
+                                       <artifactId>jmxtools</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>log4j</groupId>
+                                       <artifactId>log4j</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.slf4j</groupId>
+                                       <artifactId>slf4j-simple</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>net.sf.jopt-simple</groupId>
+                                       <artifactId>jopt-simple</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.scala-lang</groupId>
+                                       <artifactId>scala-reflect</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.scala-lang</groupId>
+                                       <artifactId>scala-compiler</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>com.yammer.metrics</groupId>
+                                       
<artifactId>metrics-annotation</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.xerial.snappy</groupId>
+                                       <artifactId>snappy-java</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.google.guava</groupId>
+                       <artifactId>guava</artifactId>
+                       <version>${guava.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.curator</groupId>
+                       <artifactId>curator-test</artifactId>
+                       <version>${curator.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-shaded-curator-recipes</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-tests</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+       </dependencies>
+
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-failsafe-plugin</artifactId>
+                               <configuration>
+                                       <!-- Enforce single fork execution due 
to heavy mini cluster use in the tests -->
+                                       <forkCount>1</forkCount>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+       
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
new file mode 100644
index 0000000..543e0ff
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.cluster.Broker;
+import kafka.common.ErrorMapping;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.Fetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
+import 
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.util.NetUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kafka Consumer is a streaming data source that pulls a parallel 
data stream from
+ * Apache Kafka 0.8.x. The consumer can run in multiple parallel instances, 
each of which will pull
+ * data from one or more Kafka partitions. 
+ * 
+ * <p>The Flink Kafka Consumer participates in checkpointing and guarantees 
that no data is lost
+ * during a failure, and that the computation processes elements "exactly 
once". 
+ * (Note: These guarantees naturally assume that Kafka itself does not loose 
any data.)</p>
+ * 
+ * <p>Flink's Kafka Consumer is designed to be compatible with Kafka's 
High-Level Consumer API (0.8.x).
+ * Most of Kafka's configuration variables can be used with this consumer as 
well:
+ *         <ul>
+ *             <li>socket.timeout.ms</li>
+ *             <li>socket.receive.buffer.bytes</li>
+ *             <li>fetch.message.max.bytes</li>
+ *             <li>auto.offset.reset with the values "latest", "earliest" 
(unlike 0.8.2 behavior)</li>
+ *             <li>fetch.wait.max.ms</li>
+ *         </ul>
+ *     </li>
+ * </ul>
+ * 
+ * <h1>Offset handling</h1>
+ * 
+ * <p>Offsets whose records have been read and are checkpointed will be 
committed back to ZooKeeper
+ * by the offset handler. In addition, the offset handler finds the point 
where the source initially
+ * starts reading from the stream, when the streaming job is started.</p>
+ *
+ * <p>Please note that Flink snapshots the offsets internally as part of its 
distributed checkpoints. The offsets
+ * committed to Kafka / ZooKeeper are only to bring the outside view of 
progress in sync with Flink's view
+ * of the progress. That way, monitoring and other jobs can get a view of how 
far the Flink Kafka consumer
+ * has consumed a topic.</p>
+ *
+ * <p>If checkpointing is disabled, the consumer will periodically commit the 
current offset
+ * to Zookeeper.</p>
+ *
+ * <p>When using a Kafka topic to send data between Flink jobs, we recommend 
using the
+ * {@see TypeInformationSerializationSchema} and {@see 
TypeInformationKeyValueSerializationSchema}.</p>
+ * 
+ * <p><b>NOTE:</b> The implementation currently accesses partition metadata 
when the consumer
+ * is constructed. That means that the client that submits the program needs 
to be able to
+ * reach the Kafka brokers or ZooKeeper.</p>
+ */
+public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
+       
+       // 
------------------------------------------------------------------------
+       
+       private static final long serialVersionUID = -6272159445203409112L;
+       
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaConsumer08.class);
+
+       /** Magic number to define an unset offset. Negative offsets are not 
used by Kafka (invalid),
+        * and we pick a number that is probably (hopefully) not used by Kafka 
as a magic number for anything else. */
+       public static final long OFFSET_NOT_SET = -915623761776L;
+
+
+       /** Configuration key for the number of retries for getting the 
partition info */
+       public static final String GET_PARTITIONS_RETRIES_KEY = 
"flink.get-partitions.retry";
+
+       /** Default number of retries for getting the partition info. One retry 
means going through the full list of brokers */
+       public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;
+
+       
+       // ------  Configuration of the Consumer -------
+
+       /** List of partitions (including topics and leaders) to consume  */
+       private final List<KafkaTopicPartitionLeader> partitionInfos;
+       
+       /** The properties to parametrize the Kafka consumer and ZooKeeper 
client */ 
+       private final Properties props;
+
+
+       // ------  Runtime State  -------
+       
+       /** The fetcher used to pull data from the Kafka brokers */
+       private transient Fetcher fetcher;
+       
+       /** The committer that persists the committed offsets */
+       private transient OffsetHandler offsetHandler;
+       
+       /** The partitions actually handled by this consumer at runtime */
+       private transient List<KafkaTopicPartitionLeader> subscribedPartitions;
+
+       /** The latest offsets that have been committed to Kafka or ZooKeeper. 
These are never
+        * newer then the last offsets (Flink's internal view is fresher) */
+       private transient HashMap<KafkaTopicPartition, Long> committedOffsets;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new Kafka streaming source consumer for Kafka 0.8.x
+        *
+        * @param topic
+        *           The name of the topic that should be consumed.
+        * @param valueDeserializer
+        *           The de-/serializer used to convert between Kafka's byte 
messages and Flink's objects.
+        * @param props
+        *           The properties used to configure the Kafka consumer 
client, and the ZooKeeper client.
+        */
+       public FlinkKafkaConsumer08(String topic, DeserializationSchema<T> 
valueDeserializer, Properties props) {
+               this(Collections.singletonList(topic), valueDeserializer, 
props);
+       }
+
+       /**
+        * Creates a new Kafka streaming source consumer for Kafka 0.8.x
+        *
+        * This constructor allows passing a {@see KeyedDeserializationSchema} 
for reading key/value
+        * pairs, offsets, and topic names from Kafka.
+        *
+        * @param topic
+        *           The name of the topic that should be consumed.
+        * @param deserializer
+        *           The keyed de-/serializer used to convert between Kafka's 
byte messages and Flink's objects.
+        * @param props
+        *           The properties used to configure the Kafka consumer 
client, and the ZooKeeper client.
+        */
+       public FlinkKafkaConsumer08(String topic, KeyedDeserializationSchema<T> 
deserializer, Properties props) {
+               this(Collections.singletonList(topic), deserializer, props);
+       }
+
+       /**
+        * Creates a new Kafka streaming source consumer for Kafka 0.8.x
+        *
+        * This constructor allows passing multiple topics to the consumer.
+        *
+        * @param topics
+        *           The Kafka topics to read from.
+        * @param deserializer
+        *           The de-/serializer used to convert between Kafka's byte 
messages and Flink's objects.
+        * @param props
+        *           The properties that are used to configure both the fetcher 
and the offset handler.
+        */
+       public FlinkKafkaConsumer08(List<String> topics, 
DeserializationSchema<T> deserializer, Properties props) {
+               this(topics, new 
KeyedDeserializationSchemaWrapper<>(deserializer), props);
+       }
+
+       /**
+        * Creates a new Kafka streaming source consumer for Kafka 0.8.x
+        *
+        * This constructor allows passing multiple topics and a key/value 
deserialization schema.
+        * 
+        * @param topics
+        *           The Kafka topics to read from.
+        * @param deserializer
+        *           The keyed de-/serializer used to convert between Kafka's 
byte messages and Flink's objects.
+        * @param props
+        *           The properties that are used to configure both the fetcher 
and the offset handler.
+        */
+       public FlinkKafkaConsumer08(List<String> topics, 
KeyedDeserializationSchema<T> deserializer, Properties props) {
+               super(deserializer, props);
+
+               checkNotNull(topics, "topics");
+               this.props = checkNotNull(props, "props");
+
+               // validate the zookeeper properties
+               validateZooKeeperConfig(props);
+
+               // Connect to a broker to get the partitions for all topics
+               this.partitionInfos = getPartitionsForTopic(topics, props);
+
+               if (partitionInfos.size() == 0) {
+                       throw new RuntimeException("Unable to retrieve any 
partitions for the requested topics " + topics.toString() + "." +
+                                       "Please check previous log entries");
+               }
+
+               if (LOG.isInfoEnabled()) {
+                       
logPartitionInfo(KafkaTopicPartition.convertToPartitionInfo(partitionInfos));
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Source life cycle
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               
+               final int numConsumers = 
getRuntimeContext().getNumberOfParallelSubtasks();
+               final int thisConsumerIndex = 
getRuntimeContext().getIndexOfThisSubtask();
+               
+               // pick which partitions we work on
+               subscribedPartitions = assignPartitions(this.partitionInfos, 
numConsumers, thisConsumerIndex);
+               
+               if (LOG.isInfoEnabled()) {
+                       LOG.info("Kafka consumer {} will read partitions {} out 
of partitions {}",
+                                       thisConsumerIndex, 
KafkaTopicPartitionLeader.toString(subscribedPartitions), 
this.partitionInfos.size());
+               }
+
+               // we leave the fetcher as null, if we have no partitions
+               if (subscribedPartitions.isEmpty()) {
+                       LOG.info("Kafka consumer {} has no partitions (empty 
source)", thisConsumerIndex);
+                       this.fetcher = null; // fetcher remains null
+                       return;
+               }
+               
+               // create fetcher
+               fetcher = new LegacyFetcher(this.subscribedPartitions, props, 
getRuntimeContext().getTaskName());
+
+               // offset handling
+               offsetHandler = new ZookeeperOffsetHandler(props);
+
+               committedOffsets = new HashMap<>();
+
+               // seek to last known pos, from restore request
+               if (restoreToOffset != null) {
+                       if (LOG.isInfoEnabled()) {
+                               LOG.info("Consumer {} is restored from previous 
checkpoint: {}",
+                                               thisConsumerIndex, 
KafkaTopicPartition.toString(restoreToOffset));
+                       }
+                       
+                       for (Map.Entry<KafkaTopicPartition, Long> 
restorePartition: restoreToOffset.entrySet()) {
+                               // seek fetcher to restore position
+                               // we set the offset +1 here, because seek() is 
accepting the next offset to read,
+                               // but the restore offset is the last read 
offset
+                               fetcher.seek(restorePartition.getKey(), 
restorePartition.getValue() + 1);
+                       }
+                       // initialize offsets with restored state
+                       this.offsetsState = restoreToOffset;
+                       restoreToOffset = null;
+               }
+               else {
+                       // start with empty offsets
+                       offsetsState = new HashMap<>();
+
+                       // no restore request. Let the offset handler take care 
of the initial offset seeking
+                       
offsetHandler.seekFetcherToInitialOffsets(subscribedPartitions, fetcher);
+               }
+       }
+
+       @Override
+       public void run(SourceContext<T> sourceContext) throws Exception {
+               if (fetcher != null) {
+                       // For non-checkpointed sources, a thread which 
periodically commits the current offset into ZK.
+                       PeriodicOffsetCommitter<T> offsetCommitter = null;
+
+                       // check whether we need to start the periodic 
checkpoint committer
+                       StreamingRuntimeContext streamingRuntimeContext = 
(StreamingRuntimeContext) getRuntimeContext();
+                       if (!streamingRuntimeContext.isCheckpointingEnabled()) {
+                               // we use Kafka's own configuration parameter 
key for this.
+                               // Note that the default configuration value in 
Kafka is 60 * 1000, so we use the
+                               // same here.
+                               long commitInterval = 
Long.valueOf(props.getProperty("auto.commit.interval.ms", "60000"));
+                               offsetCommitter = new 
PeriodicOffsetCommitter<>(commitInterval, this);
+                               offsetCommitter.setDaemon(true);
+                               offsetCommitter.start();
+                               LOG.info("Starting periodic offset committer, 
with commit interval of {}ms", commitInterval);
+                       }
+
+                       try {
+                               fetcher.run(sourceContext, deserializer, 
offsetsState);
+                       } finally {
+                               if (offsetCommitter != null) {
+                                       offsetCommitter.close();
+                                       try {
+                                               offsetCommitter.join();
+                                       } catch(InterruptedException ie) {
+                                               // ignore interrupt
+                                       }
+                               }
+                       }
+               }
+               else {
+                       // this source never completes, so emit a 
Long.MAX_VALUE watermark
+                       // to not block watermark forwarding
+                       if 
(getRuntimeContext().getExecutionConfig().areTimestampsEnabled()) {
+                               sourceContext.emitWatermark(new 
Watermark(Long.MAX_VALUE));
+                       }
+
+                       final Object waitLock = new Object();
+                       while (running) {
+                               // wait until we are canceled
+                               try {
+                                       //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
+                                       synchronized (waitLock) {
+                                               waitLock.wait();
+                                       }
+                               }
+                               catch (InterruptedException e) {
+                                       // do nothing, check our "running" 
status
+                               }
+                       }
+               }
+               
+               // close the context after the work was done. this can actually 
only
+               // happen when the fetcher decides to stop fetching
+               sourceContext.close();
+       }
+
+       @Override
+       public void cancel() {
+               // set ourselves as not running
+               running = false;
+               
+               // close the fetcher to interrupt any work
+               Fetcher fetcher = this.fetcher;
+               this.fetcher = null;
+               if (fetcher != null) {
+                       try {
+                               fetcher.close();
+                       }
+                       catch (IOException e) {
+                               LOG.warn("Error while closing Kafka connector 
data fetcher", e);
+                       }
+               }
+               
+               OffsetHandler offsetHandler = this.offsetHandler;
+               this.offsetHandler = null;
+               if (offsetHandler != null) {
+                       try {
+                               offsetHandler.close();
+                       }
+                       catch (IOException e) {
+                               LOG.warn("Error while closing Kafka connector 
offset handler", e);
+                       }
+               }
+       }
+
+       @Override
+       public void close() throws Exception {
+               cancel();
+               super.close();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Checkpoint and restore
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Utility method to commit offsets.
+        *
+        * @param toCommit the offsets to commit
+        * @throws Exception
+        */
+       protected void commitOffsets(HashMap<KafkaTopicPartition, Long> 
toCommit) throws Exception {
+               Map<KafkaTopicPartition, Long> offsetsToCommit = new 
HashMap<>();
+               for (KafkaTopicPartitionLeader tp : this.subscribedPartitions) {
+                       Long offset = toCommit.get(tp.getTopicPartition());
+                       if(offset == null) {
+                               // There was no data ever consumed from this 
topic, that's why there is no entry
+                               // for this topicPartition in the map.
+                               continue;
+                       }
+                       Long lastCommitted = 
this.committedOffsets.get(tp.getTopicPartition());
+                       if (lastCommitted == null) {
+                               lastCommitted = OFFSET_NOT_SET;
+                       }
+                       if (offset != OFFSET_NOT_SET) {
+                               if (offset > lastCommitted) {
+                                       
offsetsToCommit.put(tp.getTopicPartition(), offset);
+                                       
this.committedOffsets.put(tp.getTopicPartition(), offset);
+                                       LOG.debug("Committing offset {} for 
partition {}", offset, tp.getTopicPartition());
+                               } else {
+                                       LOG.debug("Ignoring offset {} for 
partition {} because it is already committed", offset, tp.getTopicPartition());
+                               }
+                       }
+               }
+
+               if (LOG.isDebugEnabled() && offsetsToCommit.size() > 0) {
+                       LOG.debug("Committing offsets {} to Zookeeper", 
KafkaTopicPartition.toString(offsetsToCommit));
+               }
+
+               this.offsetHandler.commit(offsetsToCommit);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Miscellaneous utilities 
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Thread to periodically commit the current read offset into Zookeeper.
+        */
+       private static class PeriodicOffsetCommitter<T> extends Thread {
+               private final long commitInterval;
+               private final FlinkKafkaConsumer08<T> consumer;
+               private volatile boolean running = true;
+
+               public PeriodicOffsetCommitter(long commitInterval, 
FlinkKafkaConsumer08<T> consumer) {
+                       this.commitInterval = commitInterval;
+                       this.consumer = consumer;
+               }
+
+               @Override
+               public void run() {
+                       try {
+
+                               while (running) {
+                                       try {
+                                               Thread.sleep(commitInterval);
+                                               //  ------------  commit 
current offsets ----------------
+
+                                               // create copy of current 
offsets
+                                               @SuppressWarnings("unchecked")
+                                               HashMap<KafkaTopicPartition, 
Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) 
consumer.offsetsState.clone();
+                                               
consumer.commitOffsets(currentOffsets);
+                                       } catch (InterruptedException e) {
+                                               if (running) {
+                                                       // throw unexpected 
interruption
+                                                       throw e;
+                                               }
+                                       }
+                               }
+                       } catch (Throwable t) {
+                               LOG.warn("Periodic checkpoint committer is 
stopping the fetcher because of an error", t);
+                               consumer.fetcher.stopWithError(t);
+                       }
+               }
+
+               public void close() {
+                       this.running = false;
+                       this.interrupt();
+               }
+
+       }
+
+
+       // 
------------------------------------------------------------------------
+       //  Kafka / ZooKeeper communication utilities
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Send request to Kafka to get partitions for topic.
+        * 
+        * @param topics The name of the topics.
+        * @param properties The properties for the Kafka Consumer that is used 
to query the partitions for the topic. 
+        */
+       public static List<KafkaTopicPartitionLeader> 
getPartitionsForTopic(final List<String> topics, final Properties properties) {
+               String seedBrokersConfString = 
properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
+               final int numRetries = 
Integer.valueOf(properties.getProperty(GET_PARTITIONS_RETRIES_KEY, 
Integer.toString(DEFAULT_GET_PARTITIONS_RETRIES)));
+
+               checkNotNull(seedBrokersConfString, "Configuration property " + 
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " not set");
+               String[] seedBrokers = seedBrokersConfString.split(",");
+               List<KafkaTopicPartitionLeader> partitions = new ArrayList<>();
+
+               Random rnd = new Random();
+               retryLoop: for (int retry = 0; retry < numRetries; retry++) {
+                       // we pick a seed broker randomly to avoid overloading 
the first broker with all the requests when the
+                       // parallel source instances start. Still, we try all 
available brokers.
+                       int index = rnd.nextInt(seedBrokers.length);
+                       brokersLoop: for (int arrIdx = 0; arrIdx < 
seedBrokers.length; arrIdx++) {
+                               String seedBroker = seedBrokers[index];
+                               LOG.info("Trying to get topic metadata from 
broker {} in try {}/{}", seedBroker, retry, numRetries);
+                               if (++index == seedBrokers.length) {
+                                       index = 0;
+                               }
+
+                               URL brokerUrl = 
NetUtils.getCorrectHostnamePort(seedBroker);
+                               SimpleConsumer consumer = null;
+                               try {
+                                       final String clientId = 
"flink-kafka-consumer-partition-lookup";
+                                       final int soTimeout = 
Integer.valueOf(properties.getProperty("socket.timeout.ms", "30000"));
+                                       final int bufferSize = 
Integer.valueOf(properties.getProperty("socket.receive.buffer.bytes", "65536"));
+                                       consumer = new 
SimpleConsumer(brokerUrl.getHost(), brokerUrl.getPort(), soTimeout, bufferSize, 
clientId);
+
+                                       TopicMetadataRequest req = new 
TopicMetadataRequest(topics);
+                                       kafka.javaapi.TopicMetadataResponse 
resp = consumer.send(req);
+
+                                       List<TopicMetadata> metaData = 
resp.topicsMetadata();
+
+                                       // clear in case we have an incomplete 
list from previous tries
+                                       partitions.clear();
+                                       for (TopicMetadata item : metaData) {
+                                               if (item.errorCode() != 
ErrorMapping.NoError()) {
+                                                       if (item.errorCode() == 
ErrorMapping.InvalidTopicCode() || item.errorCode() == 
ErrorMapping.UnknownTopicOrPartitionCode()) {
+                                                               // fail hard if 
topic is unknown
+                                                               throw new 
RuntimeException("Requested partitions for unknown topic", 
ErrorMapping.exceptionFor(item.errorCode()));
+                                                       }
+                                                       // warn and try more 
brokers
+                                                       LOG.warn("Error while 
getting metadata from broker " + seedBroker + " to find partitions " +
+                                                                       "for " 
+ topics.toString() + ". Error: " + 
ErrorMapping.exceptionFor(item.errorCode()).getMessage());
+                                                       continue brokersLoop;
+                                               }
+                                               if 
(!topics.contains(item.topic())) {
+                                                       LOG.warn("Received 
metadata from topic " + item.topic() + " even though it was not requested. 
Skipping ...");
+                                                       continue brokersLoop;
+                                               }
+                                               for (PartitionMetadata part : 
item.partitionsMetadata()) {
+                                                       Node leader = 
brokerToNode(part.leader());
+                                                       KafkaTopicPartition ktp 
= new KafkaTopicPartition(item.topic(), part.partitionId());
+                                                       
KafkaTopicPartitionLeader pInfo = new KafkaTopicPartitionLeader(ktp, leader);
+                                                       partitions.add(pInfo);
+                                               }
+                                       }
+                                       break retryLoop; // leave the loop 
through the brokers
+                               } catch (Exception e) {
+                                       LOG.warn("Error communicating with 
broker " + seedBroker + " to find partitions for " + topics.toString() + ". 
Message: " + e.getMessage());
+                                       LOG.debug("Detailed trace", e);
+                               } finally {
+                                       if (consumer != null) {
+                                               consumer.close();
+                                       }
+                               }
+                       } // brokers loop
+               } // retries loop
+               return partitions;
+       }
+
+       /**
+        * Turn a broker instance into a node instance
+        * @param broker broker instance
+        * @return Node representing the given broker
+        */
+       private static Node brokerToNode(Broker broker) {
+               return new Node(broker.id(), broker.host(), broker.port());
+       }
+
+       /**
+        * Validate the ZK configuration, checking for required parameters
+        * @param props Properties to check
+        */
+       protected static void validateZooKeeperConfig(Properties props) {
+               if (props.getProperty("zookeeper.connect") == null) {
+                       throw new IllegalArgumentException("Required property 
'zookeeper.connect' has not been set in the properties");
+               }
+               if (props.getProperty(ConsumerConfig.GROUP_ID_CONFIG) == null) {
+                       throw new IllegalArgumentException("Required property 
'" + ConsumerConfig.GROUP_ID_CONFIG
+                                       + "' has not been set in the 
properties");
+               }
+               
+               try {
+                       //noinspection ResultOfMethodCallIgnored
+                       
Integer.parseInt(props.getProperty("zookeeper.session.timeout.ms", "0"));
+               }
+               catch (NumberFormatException e) {
+                       throw new IllegalArgumentException("Property 
'zookeeper.session.timeout.ms' is not a valid integer");
+               }
+               
+               try {
+                       //noinspection ResultOfMethodCallIgnored
+                       
Integer.parseInt(props.getProperty("zookeeper.connection.timeout.ms", "0"));
+               }
+               catch (NumberFormatException e) {
+                       throw new IllegalArgumentException("Property 
'zookeeper.connection.timeout.ms' is not a valid integer");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
new file mode 100644
index 0000000..56ccd0b
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * THIS CLASS IS DEPRECATED. Use FlinkKafkaConsumer08 instead.
+ */
+@Deprecated
+public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumer08<T> {
+
+       private static final long serialVersionUID = -5649906773771949146L;
+
+       /**
+        * THIS CONSTRUCTOR IS DEPRECATED. Use FlinkKafkaConsumer08 instead.
+        */
+       @Deprecated
+       public FlinkKafkaConsumer081(String topic, DeserializationSchema<T> 
valueDeserializer, Properties props) {
+               super(topic, valueDeserializer, props);
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
new file mode 100644
index 0000000..0520336
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * THIS CLASS IS DEPRECATED. Use FlinkKafkaConsumer08 instead.
+ */
+@Deprecated
+public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer08<T> {
+
+       private static final long serialVersionUID = -5649906773771949146L;
+
+       /**
+        * THIS CONSTRUCTOR IS DEPRECATED. Use FlinkKafkaConsumer08 instead.
+        */
+       @Deprecated
+       public FlinkKafkaConsumer082(String topic, DeserializationSchema<T> 
valueDeserializer, Properties props) {
+               super(topic, valueDeserializer, props);
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
new file mode 100644
index 0000000..1c2e0b7
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import java.util.Properties;
+
+
+/**
+ * THIS CLASS IS DEPRECATED. Use FlinkKafkaProducer08 instead.
+ */
+@Deprecated
+public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN>  {
+
+       @Deprecated
+       public FlinkKafkaProducer(String brokerList, String topicId, 
SerializationSchema<IN> serializationSchema) {
+               super(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), 
getPropertiesFromBrokerList(brokerList), null);
+       }
+
+       @Deprecated
+       public FlinkKafkaProducer(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig) {
+               super(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, null);
+       }
+
+       @Deprecated
+       public FlinkKafkaProducer(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, KafkaPartitioner 
customPartitioner) {
+               super(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
+
+       }
+
+       @Deprecated
+       public FlinkKafkaProducer(String brokerList, String topicId, 
KeyedSerializationSchema<IN> serializationSchema) {
+               super(topicId, serializationSchema, 
getPropertiesFromBrokerList(brokerList), null);
+       }
+
+       @Deprecated
+       public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> 
serializationSchema, Properties producerConfig) {
+               super(topicId, serializationSchema, producerConfig, null);
+       }
+
+       @Deprecated
+       public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> 
serializationSchema, Properties producerConfig, KafkaPartitioner 
customPartitioner) {
+               super(topicId, serializationSchema, producerConfig, 
customPartitioner);
+       }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
new file mode 100644
index 0000000..4975f9a
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is compatible 
with Kafka 0.8.
+ *
+ * Please note that this producer does not have any reliability guarantees.
+ *
+ * @param <IN> Type of the messages to write into Kafka.
+ */
+public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
+
+       private static final long serialVersionUID = 1L;
+
+       // ------------------- Keyless serialization schema constructors 
----------------------
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
+        * the topic.
+        *
+        * @param brokerList
+        *                      Comma separated addresses of the brokers
+        * @param topicId
+        *                      ID of the Kafka topic.
+        * @param serializationSchema
+        *                      User defined (keyless) serialization schema.
+        */
+       public FlinkKafkaProducer08(String brokerList, String topicId, 
SerializationSchema<IN> serializationSchema) {
+               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), 
getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+       }
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
+        * the topic.
+        *
+        * @param topicId
+        *                      ID of the Kafka topic.
+        * @param serializationSchema
+        *                      User defined (keyless) serialization schema.
+        * @param producerConfig
+        *                      Properties with the producer configuration.
+        */
+       public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig) {
+               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new 
FixedPartitioner<IN>());
+       }
+
+       /**
+        * The main constructor for creating a FlinkKafkaProducer.
+        *
+        * @param topicId The topic to write data to
+        * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
+        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param customPartitioner A serializable partitioner for assining 
messages to Kafka partitions.
+        */
+       public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, KafkaPartitioner<IN> 
customPartitioner) {
+               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
+
+       }
+
+       // ------------------- Key/Value serialization schema constructors 
----------------------
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
+        * the topic.
+        *
+        * @param brokerList
+        *                      Comma separated addresses of the brokers
+        * @param topicId
+        *                      ID of the Kafka topic.
+        * @param serializationSchema
+        *                      User defined serialization schema supporting 
key/value messages
+        */
+       public FlinkKafkaProducer08(String brokerList, String topicId, 
KeyedSerializationSchema<IN> serializationSchema) {
+               this(topicId, serializationSchema, 
getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+       }
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
+        * the topic.
+        *
+        * @param topicId
+        *                      ID of the Kafka topic.
+        * @param serializationSchema
+        *                      User defined serialization schema supporting 
key/value messages
+        * @param producerConfig
+        *                      Properties with the producer configuration.
+        */
+       public FlinkKafkaProducer08(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
+               this(topicId, serializationSchema, producerConfig, new 
FixedPartitioner<IN>());
+       }
+
+       /**
+        * The main constructor for creating a FlinkKafkaProducer.
+        *
+        * @param topicId The topic to write data to
+        * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
+        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param customPartitioner A serializable partitioner for assining 
messages to Kafka partitions.
+        */
+       public FlinkKafkaProducer08(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
KafkaPartitioner<IN> customPartitioner) {
+               super(topicId, serializationSchema, producerConfig, 
customPartitioner);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
new file mode 100644
index 0000000..4f1a2a6
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+/**
+ * A fetcher pulls data from Kafka, from a fix set of partitions.
+ * The fetcher supports "seeking" inside the partitions, i.e., moving to a 
different offset.
+ */
+public interface Fetcher {
+
+       /**
+        * Closes the fetcher. This will stop any operation in the
+        * {@link #run(SourceFunction.SourceContext, 
KeyedDeserializationSchema, HashMap)} method and eventually
+        * close underlying connections and release all resources.
+        */
+       void close() throws IOException;
+
+       /**
+        * Starts fetch data from Kafka and emitting it into the stream.
+        * 
+        * <p>To provide exactly once guarantees, the fetcher needs emit a 
record and update the update
+        * of the last consumed offset in one atomic operation:</p>
+        * <pre>{@code
+        * 
+        * while (running) {
+        *     T next = ...
+        *     long offset = ...
+        *     int partition = ...
+        *     synchronized (sourceContext.getCheckpointLock()) {
+        *         sourceContext.collect(next);
+        *         lastOffsets[partition] = offset;
+        *     }
+        * }
+        * }</pre>
+        *
+        * @param <T> The type of elements produced by the fetcher and emitted 
to the source context.
+        * @param sourceContext The source context to emit elements to.
+        * @param valueDeserializer The deserializer to decode the raw values 
with.
+        * @param lastOffsets The map into which to store the offsets for which 
elements are emitted (operator state)
+        */
+       <T> void run(SourceFunction.SourceContext<T> sourceContext, 
KeyedDeserializationSchema<T> valueDeserializer,
+                               HashMap<KafkaTopicPartition, Long> lastOffsets) 
throws Exception;
+       
+       /**
+        * Set the next offset to read from for the given partition.
+        * For example, if the partition <i>i</i> offset is set to <i>n</i>, 
the Fetcher's next result
+        * will be the message with <i>offset=n</i>.
+        * 
+        * @param topicPartition The partition for which to seek the offset.
+        * @param offsetToRead To offset to seek to.
+        */
+       void seek(KafkaTopicPartition topicPartition, long offsetToRead);
+
+       /**
+        * Exit run loop with given error and release all resources.
+        *
+        * @param t Error cause
+        */
+       void stopWithError(Throwable t);
+}

Reply via email to