[FLINK-3375] [kafka connector] Rework/simplify Kafka Connector and have a 
WatermarkExtractor object per partition


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c93103d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c93103d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c93103d

Branch: refs/heads/master
Commit: 3c93103d1476cb05ec0c018bfa6876e4ecad38e8
Parents: 0ac1549
Author: Stephan Ewen <[email protected]>
Authored: Wed Apr 6 23:21:17 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Apr 13 01:10:54 2016 +0200

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumer08.java  | 358 ++------
 .../connectors/kafka/internals/Fetcher.java     |  72 --
 .../kafka/internals/Kafka08Fetcher.java         | 446 +++++++++
 .../kafka/internals/KillerWatchDog.java         |  62 ++
 .../kafka/internals/LegacyFetcher.java          | 896 -------------------
 .../kafka/internals/OffsetHandler.java          |  55 --
 .../kafka/internals/PartitionInfoFetcher.java   |  66 ++
 .../kafka/internals/PartitionerWrapper.java     |  49 -
 .../internals/PeriodicOffsetCommitter.java      |  85 ++
 .../kafka/internals/SimpleConsumerThread.java   | 504 +++++++++++
 .../kafka/internals/ZookeeperOffsetHandler.java |  58 +-
 .../connectors/kafka/Kafka08ITCase.java         | 176 ++--
 .../connectors/kafka/KafkaConsumer08Test.java   |  90 ++
 .../connectors/kafka/KafkaConsumerTest.java     | 156 ----
 .../kafka/KafkaShortRetention08ITCase.java      |   3 +-
 .../internals/ZookeeperOffsetHandlerTest.java   |  56 --
 .../src/test/resources/log4j-test.properties    |   5 +-
 .../connectors/kafka/FlinkKafkaConsumer09.java  | 398 ++------
 .../kafka/internal/Kafka09Fetcher.java          | 311 +++++++
 .../connectors/kafka/Kafka09ITCase.java         |  21 +-
 .../connectors/kafka/KafkaProducerTest.java     |   8 +-
 .../kafka/KafkaShortRetention09ITCase.java      |   1 +
 .../src/test/resources/log4j-test.properties    |   5 +-
 .../kafka/FlinkKafkaConsumerBase.java           | 668 ++++++--------
 .../kafka/internals/AbstractFetcher.java        | 439 +++++++++
 .../kafka/internals/ExceptionProxy.java         |  73 ++
 .../kafka/internals/KafkaPartitionState.java    |  65 --
 .../kafka/internals/KafkaTopicPartition.java    |  36 +-
 .../internals/KafkaTopicPartitionState.java     | 105 +++
 ...picPartitionStateWithPeriodicWatermarks.java |  71 ++
 ...cPartitionStateWithPunctuatedWatermarks.java |  84 ++
 .../kafka/partitioner/KafkaPartitioner.java     |   2 +-
 .../connectors/kafka/util/KafkaUtils.java       |  13 +-
 .../kafka/FlinkKafkaConsumerBaseTest.java       | 222 +++++
 .../KafkaConsumerPartitionAssignmentTest.java   |  96 +-
 .../connectors/kafka/KafkaConsumerTestBase.java | 175 ++--
 .../connectors/kafka/KafkaProducerTestBase.java |   8 +-
 .../kafka/KafkaShortRetentionTestBase.java      |  45 +-
 .../internals/KafkaTopicPartitionTest.java      |  57 ++
 .../testutils/JobManagerCommunicationUtils.java |  49 +-
 .../kafka/testutils/MockRuntimeContext.java     |  26 +-
 .../AssignerWithPeriodicWatermarks.java         |   3 +
 .../AssignerWithPunctuatedWatermarks.java       |   3 +
 43 files changed, 3407 insertions(+), 2714 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index 4748781..48cc461 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import kafka.api.OffsetRequest;
 import kafka.cluster.Broker;
 import kafka.common.ErrorMapping;
 import kafka.javaapi.PartitionMetadata;
@@ -24,40 +25,32 @@ import kafka.javaapi.TopicMetadata;
 import kafka.javaapi.TopicMetadataRequest;
 import kafka.javaapi.consumer.SimpleConsumer;
 
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.internals.Fetcher;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionState;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
-import 
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.SerializedValue;
+
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.Node;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 
-import static java.util.Objects.requireNonNull;
 import static 
org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.getIntFromConfig;
 import static 
org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.getLongFromConfig;
-
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * The Flink Kafka Consumer is a streaming data source that pulls a parallel 
data stream from
@@ -102,12 +95,8 @@ import static 
org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.getLon
  * reach the Kafka brokers or ZooKeeper.</p>
  */
 public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
-       
-       // 
------------------------------------------------------------------------
-       
+
        private static final long serialVersionUID = -6272159445203409112L;
-       
-       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaConsumer08.class);
 
        /** Configuration key for the number of retries for getting the 
partition info */
        public static final String GET_PARTITIONS_RETRIES_KEY = 
"flink.get-partitions.retry";
@@ -115,30 +104,16 @@ public class FlinkKafkaConsumer08<T> extends 
FlinkKafkaConsumerBase<T> {
        /** Default number of retries for getting the partition info. One retry 
means going through the full list of brokers */
        public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;
 
-       
-       // ------  Configuration of the Consumer -------
+       // 
------------------------------------------------------------------------
 
-       /** Initial list of topics and partitions to consume  */
-       private final List<KafkaTopicPartition> partitionInfos;
-       
        /** The properties to parametrize the Kafka consumer and ZooKeeper 
client */ 
-       private final Properties props;
+       private final Properties kafkaProperties;
 
+       /** The behavior when encountering an invalid offset (see {@link 
OffsetRequest}) */
+       private final long invalidOffsetBehavior;
 
-       // ------  Runtime State  -------
-       
-       /** The fetcher used to pull data from the Kafka brokers */
-       private transient Fetcher fetcher;
-       
-       /** The committer that persists the committed offsets */
-       private transient OffsetHandler offsetHandler;
-       
-       /** The partitions actually handled by this consumer at runtime */
-       private transient List<KafkaTopicPartition> subscribedPartitions;
-
-       /** The latest offsets that have been committed to Kafka or ZooKeeper. 
These are never
-        * newer then the last offsets (Flink's internal view is fresher) */
-       private transient HashMap<KafkaTopicPartition, Long> committedOffsets;
+       /** The interval in which to automatically commit (-1 if deactivated) */
+       private final long autoCommitInterval;
 
        // 
------------------------------------------------------------------------
 
@@ -202,287 +177,49 @@ public class FlinkKafkaConsumer08<T> extends 
FlinkKafkaConsumerBase<T> {
         *           The properties that are used to configure both the fetcher 
and the offset handler.
         */
        public FlinkKafkaConsumer08(List<String> topics, 
KeyedDeserializationSchema<T> deserializer, Properties props) {
-               super(deserializer, props);
+               super(deserializer);
 
-               requireNonNull(topics, "topics");
-               this.props = requireNonNull(props, "props");
+               checkNotNull(topics, "topics");
+               this.kafkaProperties = checkNotNull(props, "props");
 
                // validate the zookeeper properties
                validateZooKeeperConfig(props);
 
+               this.invalidOffsetBehavior = getInvalidOffsetBehavior(props);
+               this.autoCommitInterval = getLongFromConfig(props, 
"auto.commit.interval.ms", 60000);
+
                // Connect to a broker to get the partitions for all topics
-               this.partitionInfos = 
KafkaTopicPartition.dropLeaderData(getPartitionsForTopic(topics, props));
+               List<KafkaTopicPartition> partitionInfos = 
+                               
KafkaTopicPartition.dropLeaderData(getPartitionsForTopic(topics, props));
 
                if (partitionInfos.size() == 0) {
-                       throw new RuntimeException("Unable to retrieve any 
partitions for the requested topics " + topics.toString() + "." +
-                                       "Please check previous log entries");
-               }
-
-               if (LOG.isInfoEnabled()) {
-                       logPartitionInfo(partitionInfos);
+                       throw new RuntimeException(
+                                       "Unable to retrieve any partitions for 
the requested topics " + topics + 
+                                                       ". Please check 
previous log entries");
                }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Source life cycle
-       // 
------------------------------------------------------------------------
 
-       @Override
-       public void open(Configuration parameters) throws Exception {
-               super.open(parameters);
-               
-               final int numConsumers = 
getRuntimeContext().getNumberOfParallelSubtasks();
-               final int thisConsumerIndex = 
getRuntimeContext().getIndexOfThisSubtask();
-               
-               // pick which partitions we work on
-               subscribedPartitions = assignPartitions(this.partitionInfos, 
numConsumers, thisConsumerIndex);
-               
                if (LOG.isInfoEnabled()) {
-                       LOG.info("Kafka consumer {} will read partitions {} out 
of partitions {}",
-                                       thisConsumerIndex, 
KafkaTopicPartition.toString(subscribedPartitions), this.partitionInfos.size());
-               }
-
-               // we leave the fetcher as null, if we have no partitions
-               if (subscribedPartitions.isEmpty()) {
-                       LOG.info("Kafka consumer {} has no partitions (empty 
source)", thisConsumerIndex);
-                       this.fetcher = null; // fetcher remains null
-                       return;
-               }
-
-               // offset handling
-               offsetHandler = new ZookeeperOffsetHandler(props);
-
-               committedOffsets = new HashMap<>();
-
-               // initially load the map with "offset not set", last max read 
timestamp set to Long.MIN_VALUE
-               // and mark the partition as in-active, until we receive the 
first element
-               Map<KafkaTopicPartition, KafkaPartitionState> 
subscribedPartitionsWithOffsets =
-                       new HashMap<>(subscribedPartitions.size());
-               for(KafkaTopicPartition ktp: subscribedPartitions) {
-                       subscribedPartitionsWithOffsets.put(ktp,
-                               new KafkaPartitionState(ktp.getPartition(), 
FlinkKafkaConsumerBase.OFFSET_NOT_SET));
-               }
-
-               // seek to last known pos, from restore request
-               if (restoreToOffset != null) {
-                       if (LOG.isInfoEnabled()) {
-                               LOG.info("Consumer {} is restored from previous 
checkpoint: {}",
-                                               thisConsumerIndex, 
KafkaTopicPartition.toString(restoreToOffset));
-                       }
-                       // initialize offsets with restored state
-                       this.partitionState = restoreInfoFromCheckpoint();
-                       subscribedPartitionsWithOffsets.putAll(partitionState);
-                       restoreToOffset = null;
-               }
-               else {
-                       // start with empty partition state
-                       partitionState = new HashMap<>();
-
-                       // no restore request: overwrite offsets.
-                       for(Map.Entry<KafkaTopicPartition, Long> offsetInfo: 
offsetHandler.getOffsets(subscribedPartitions).entrySet()) {
-                               KafkaTopicPartition key = offsetInfo.getKey();
-                               subscribedPartitionsWithOffsets.put(key,
-                                       new 
KafkaPartitionState(key.getPartition(), offsetInfo.getValue()));
-                       }
-               }
-               if(subscribedPartitionsWithOffsets.size() != 
subscribedPartitions.size()) {
-                       throw new IllegalStateException("The subscribed 
partitions map has more entries than the subscribed partitions " +
-                                       "list: " + 
subscribedPartitionsWithOffsets.size() + "," + subscribedPartitions.size());
-               }
-
-               // create fetcher
-               fetcher = new LegacyFetcher<T>(this, 
subscribedPartitionsWithOffsets, props,
-                               getRuntimeContext().getTaskName(), 
getRuntimeContext().getUserCodeClassLoader());
-       }
-
-       @Override
-       public void run(SourceContext<T> sourceContext) throws Exception {
-               if (fetcher != null) {
-                       StreamingRuntimeContext streamingRuntimeContext = 
(StreamingRuntimeContext) getRuntimeContext();
-
-                       // if we have a non-checkpointed source, start a thread 
which periodically commits
-                       // the current offset into ZK.
-
-                       PeriodicOffsetCommitter<T> offsetCommitter = null;
-                       if (!streamingRuntimeContext.isCheckpointingEnabled()) {
-                               // we use Kafka's own configuration parameter 
key for this.
-                               // Note that the default configuration value in 
Kafka is 60 * 1000, so we use the same here.
-                               long commitInterval = getLongFromConfig(props, 
"auto.commit.interval.ms", 60000);
-                               offsetCommitter = new 
PeriodicOffsetCommitter<>(commitInterval, this);
-                               offsetCommitter.setDaemon(true);
-                               offsetCommitter.start();
-                               LOG.info("Starting periodic offset committer, 
with commit interval of {}ms", commitInterval);
-                       }
-
-                       try {
-                               fetcher.run(sourceContext, deserializer, 
partitionState);
-                       } finally {
-                               if (offsetCommitter != null) {
-                                       offsetCommitter.close();
-                                       try {
-                                               offsetCommitter.join();
-                                       } catch(InterruptedException ie) {
-                                               // ignore interrupt
-                                       }
-                               }
-                       }
-               }
-               else {
-                       // this source never completes, so emit a 
Long.MAX_VALUE watermark
-                       // to not block watermark forwarding
-                       sourceContext.emitWatermark(new 
Watermark(Long.MAX_VALUE));
-
-                       final Object waitLock = new Object();
-                       while (running) {
-                               // wait until we are canceled
-                               try {
-                                       //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
-                                       synchronized (waitLock) {
-                                               waitLock.wait();
-                                       }
-                               }
-                               catch (InterruptedException e) {
-                                       // do nothing, check our "running" 
status
-                               }
-                       }
-               }
-               
-               // close the context after the work was done. this can actually 
only
-               // happen when the fetcher decides to stop fetching
-               sourceContext.close();
-       }
-
-       @Override
-       public void cancel() {
-               // set ourselves as not running
-               running = false;
-               
-               // close the fetcher to interrupt any work
-               Fetcher fetcher = this.fetcher;
-               this.fetcher = null;
-               if (fetcher != null) {
-                       try {
-                               fetcher.close();
-                       }
-                       catch (IOException e) {
-                               LOG.warn("Error while closing Kafka connector 
data fetcher", e);
-                       }
+                       logPartitionInfo(LOG, partitionInfos);
                }
-               
-               OffsetHandler offsetHandler = this.offsetHandler;
-               this.offsetHandler = null;
-               if (offsetHandler != null) {
-                       try {
-                               offsetHandler.close();
-                       }
-                       catch (IOException e) {
-                               LOG.warn("Error while closing Kafka connector 
offset handler", e);
-                       }
-               }
-       }
 
-       @Override
-       public void close() throws Exception {
-               cancel();
-               super.close();
+               setSubscribedPartitions(partitionInfos);
        }
 
-       // 
------------------------------------------------------------------------
-       //  Checkpoint and restore
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Utility method to commit offsets.
-        *
-        * @param toCommit the offsets to commit
-        * @throws Exception
-        */
        @Override
-       protected void commitOffsets(HashMap<KafkaTopicPartition, Long> 
toCommit) throws Exception {
-               Map<KafkaTopicPartition, Long> offsetsToCommit = new 
HashMap<>();
-               for (KafkaTopicPartition tp : this.subscribedPartitions) {
-                       Long offset = toCommit.get(tp);
-                       if(offset == null) {
-                               // There was no data ever consumed from this 
topic, that's why there is no entry
-                               // for this topicPartition in the map.
-                               continue;
-                       }
-                       Long lastCommitted = this.committedOffsets.get(tp);
-                       if (lastCommitted == null) {
-                               lastCommitted = OFFSET_NOT_SET;
-                       }
-                       if (offset != OFFSET_NOT_SET) {
-                               if (offset > lastCommitted) {
-                                       offsetsToCommit.put(tp, offset);
-                                       this.committedOffsets.put(tp, offset);
-                                       LOG.debug("Committing offset {} for 
partition {}", offset, tp);
-                               } else {
-                                       LOG.debug("Ignoring offset {} for 
partition {} because it is already committed", offset, tp);
-                               }
-                       }
-               }
-
-               if (LOG.isDebugEnabled() && offsetsToCommit.size() > 0) {
-                       LOG.debug("Committing offsets {} to Zookeeper", 
KafkaTopicPartition.toString(offsetsToCommit));
-               }
-
-               this.offsetHandler.commit(offsetsToCommit);
+       protected AbstractFetcher<T, ?> createFetcher(
+                       SourceContext<T> sourceContext,
+                       List<KafkaTopicPartition> thisSubtaskPartitions,
+                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
+                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
+                       StreamingRuntimeContext runtimeContext) throws 
Exception {
+
+               return new Kafka08Fetcher<>(sourceContext, 
thisSubtaskPartitions,
+                               watermarksPeriodic, watermarksPunctuated,
+                               runtimeContext, deserializer, kafkaProperties,
+                               invalidOffsetBehavior, autoCommitInterval);
        }
 
        // 
------------------------------------------------------------------------
-       //  Miscellaneous utilities 
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Thread to periodically commit the current read offset into Zookeeper.
-        */
-       private static class PeriodicOffsetCommitter<T> extends Thread {
-               private final long commitInterval;
-               private final FlinkKafkaConsumer08<T> consumer;
-               private volatile boolean running = true;
-
-               PeriodicOffsetCommitter(long commitInterval, 
FlinkKafkaConsumer08<T> consumer) {
-                       this.commitInterval = commitInterval;
-                       this.consumer = consumer;
-               }
-
-               @Override
-               public void run() {
-                       try {
-
-                               while (running) {
-                                       try {
-                                               Thread.sleep(commitInterval);
-                                               //  ------------  commit 
current offsets ----------------
-
-                                               // create copy a deep copy of 
the current offsets
-                                               @SuppressWarnings("unchecked")
-                                               HashMap<KafkaTopicPartition, 
Long> currentOffsets = new HashMap<>(consumer.partitionState.size());
-                                               for 
(Map.Entry<KafkaTopicPartition, KafkaPartitionState> entry: 
consumer.partitionState.entrySet()) {
-                                                       
currentOffsets.put(entry.getKey(), entry.getValue().getOffset());
-                                               }
-                                               
consumer.commitOffsets(currentOffsets);
-                                       } catch (InterruptedException e) {
-                                               if (running) {
-                                                       // throw unexpected 
interruption
-                                                       throw e;
-                                               }
-                                       }
-                               }
-                       } catch (Throwable t) {
-                               LOG.warn("Periodic checkpoint committer is 
stopping the fetcher because of an error", t);
-                               consumer.fetcher.stopWithError(t);
-                       }
-               }
-
-               public void close() {
-                       this.running = false;
-                       this.interrupt();
-               }
-       }
-
-
-       // 
------------------------------------------------------------------------
        //  Kafka / ZooKeeper communication utilities
        // 
------------------------------------------------------------------------
 
@@ -492,11 +229,11 @@ public class FlinkKafkaConsumer08<T> extends 
FlinkKafkaConsumerBase<T> {
         * @param topics The name of the topics.
         * @param properties The properties for the Kafka Consumer that is used 
to query the partitions for the topic. 
         */
-       public static List<KafkaTopicPartitionLeader> 
getPartitionsForTopic(final List<String> topics, final Properties properties) {
+       public static List<KafkaTopicPartitionLeader> 
getPartitionsForTopic(List<String> topics, Properties properties) {
                String seedBrokersConfString = 
properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
                final int numRetries = getIntFromConfig(properties, 
GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES);
 
-               requireNonNull(seedBrokersConfString, "Configuration property " 
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " not set");
+               checkNotNull(seedBrokersConfString, "Configuration property %s 
not set", ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
                String[] seedBrokers = seedBrokersConfString.split(",");
                List<KafkaTopicPartitionLeader> partitions = new ArrayList<>();
 
@@ -605,4 +342,17 @@ public class FlinkKafkaConsumer08<T> extends 
FlinkKafkaConsumerBase<T> {
                        throw new IllegalArgumentException("Property 
'zookeeper.connection.timeout.ms' is not a valid integer");
                }
        }
+
+       private static long getInvalidOffsetBehavior(Properties config) {
+               final String val = 
config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest");
+               if (val.equals("none")) {
+                       throw new IllegalArgumentException("Cannot use '" + 
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
+                                       + "' value 'none'. Possible values: 
'latest', 'largest', or 'earliest'.");
+               }
+               else if (val.equals("largest") || val.equals("latest")) { // 
largest is kafka 0.8, latest is kafka 0.9
+                       return OffsetRequest.LatestTime();
+               } else {
+                       return OffsetRequest.EarliestTime();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
deleted file mode 100644
index f86687e..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-/**
- * A fetcher pulls data from Kafka, from a fix set of partitions.
- * The fetcher supports "seeking" inside the partitions, i.e., moving to a 
different offset.
- */
-public interface Fetcher {
-
-       /**
-        * Closes the fetcher. This will stop any operation in the
-        * {@link #run(SourceFunction.SourceContext, 
KeyedDeserializationSchema, HashMap)} method and eventually
-        * close underlying connections and release all resources.
-        */
-       void close() throws IOException;
-
-       /**
-        * Starts fetch data from Kafka and emitting it into the stream.
-        * 
-        * <p>To provide exactly once guarantees, the fetcher needs emit a 
record and update the last
-        * consumed offset in one atomic operation. This is done in the
-        * {@link 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#processElement(SourceFunction.SourceContext,
 KafkaTopicPartition, Object, long)}
-        * which is called from within the {@link 
SourceFunction.SourceContext#getCheckpointLock()}, as shown below:</p>
-        * <pre>{@code
-        * 
-        * while (running) {
-        *     T next = ...
-        *     long offset = ...
-        *     int partition = ...
-        *     synchronized (sourceContext.getCheckpointLock()) {
-        *         processElement(sourceContext, partition, next, offset)
-        *     }
-        * }
-        * }</pre>
-        *
-        * @param <T> The type of elements produced by the fetcher and emitted 
to the source context.
-        * @param sourceContext The source context to emit elements to.
-        * @param valueDeserializer The deserializer to decode the raw values 
with.
-        * @param lastOffsets The map into which to store the offsets for which 
elements are emitted (operator state)
-        */
-       <T> void run(SourceFunction.SourceContext<T> sourceContext, 
KeyedDeserializationSchema<T> valueDeserializer,
-                               HashMap<KafkaTopicPartition, 
KafkaPartitionState> lastOffsets) throws Exception;
-
-       /**
-        * Exit run loop with given error and release all resources.
-        *
-        * @param t Error cause
-        */
-       void stopWithError(Throwable t);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
new file mode 100644
index 0000000..91fdc71
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import kafka.common.TopicAndPartition;
+import org.apache.kafka.common.Node;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.SerializedValue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A fetcher that fetches data from Kafka brokers via the Kafka 0.8 low-level 
consumer API.
+ * The fetcher also handles the explicit communication with ZooKeeper to fetch 
initial offsets
+ * and to write offsets to ZooKeeper.
+ *
+ * @param <T> The type of elements produced by the fetcher.
+ */
+public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
+       
+       static final KafkaTopicPartitionState<TopicAndPartition> MARKER = 
+                       new KafkaTopicPartitionState<>(new 
KafkaTopicPartition("n/a", -1), new TopicAndPartition("n/a", -1));
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(Kafka08Fetcher.class);
+
+       // 
------------------------------------------------------------------------
+
+       /** The schema to convert between Kafka's byte messages, and Flink's 
objects */
+       private final KeyedDeserializationSchema<T> deserializer;
+
+       /** The properties that configure the Kafka connection */
+       private final Properties kafkaConfig;
+
+       /** The task name, to give more readable names to the spawned threads */
+       private final String taskName;
+
+       /** The class loader for dynamically loaded classes */
+       private final ClassLoader userCodeClassLoader;
+
+       /** The queue of partitions that are currently not assigned to a broker 
connection */
+       private final 
ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> 
unassignedPartitionsQueue;
+
+       /** The behavior to use in case that an offset is not valid (any more) 
for a partition */
+       private final long invalidOffsetBehavior;
+
+       /** The interval in which to automatically commit (-1 if deactivated) */
+       private final long autoCommitInterval; 
+
+       /** The handler that reads/writes offsets from/to ZooKeeper */
+       private volatile ZookeeperOffsetHandler zookeeperOffsetHandler;
+
+       /** Flag to track the main work loop as alive */
+       private volatile boolean running = true;
+
+
+       public Kafka08Fetcher(
+                       SourceContext<T> sourceContext,
+                       List<KafkaTopicPartition> assignedPartitions,
+                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
+                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
+                       StreamingRuntimeContext runtimeContext,
+                       KeyedDeserializationSchema<T> deserializer,
+                       Properties kafkaProperties,
+                       long invalidOffsetBehavior,
+                       long autoCommitInterval) throws Exception
+       {
+               super(sourceContext, assignedPartitions, watermarksPeriodic, 
watermarksPunctuated, runtimeContext);
+
+               this.deserializer = checkNotNull(deserializer);
+               this.kafkaConfig = checkNotNull(kafkaProperties);
+               this.taskName = runtimeContext.getTaskNameWithSubtasks();
+               this.userCodeClassLoader = 
runtimeContext.getUserCodeClassLoader();
+               this.invalidOffsetBehavior = invalidOffsetBehavior;
+               this.autoCommitInterval = autoCommitInterval;
+               this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
+
+               // initially, all these partitions are not assigned to a 
specific broker connection
+               for (KafkaTopicPartitionState<TopicAndPartition> partition : 
subscribedPartitions()) {
+                       unassignedPartitionsQueue.add(partition);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Main Work Loop
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void runFetchLoop() throws Exception {
+               // the map from broker to the thread that is connected to that 
broker
+               final Map<Node, SimpleConsumerThread<T>> brokerToThread = new 
HashMap<>();
+
+               // this holds possible the exceptions from the concurrent 
broker connection threads
+               final ExceptionProxy errorHandler = new 
ExceptionProxy(Thread.currentThread());
+
+               // the offset handler handles the communication with ZooKeeper, 
to commit externally visible offsets
+               final ZookeeperOffsetHandler zookeeperOffsetHandler = new 
ZookeeperOffsetHandler(kafkaConfig);
+               this.zookeeperOffsetHandler = zookeeperOffsetHandler;
+
+               PeriodicOffsetCommitter periodicCommitter = null;
+               try {
+                       // read offsets from ZooKeeper for partitions that did 
not restore offsets
+                       {
+                               List<KafkaTopicPartition> 
partitionsWithNoOffset = new ArrayList<>();
+                               for 
(KafkaTopicPartitionState<TopicAndPartition> partition : 
subscribedPartitions()) {
+                                       if (!partition.isOffsetDefined()) {
+                                               
partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
+                                       }
+                               }
+
+                               Map<KafkaTopicPartition, Long> zkOffsets = 
zookeeperOffsetHandler.getOffsets(partitionsWithNoOffset);
+                               for 
(KafkaTopicPartitionState<TopicAndPartition> partition : 
subscribedPartitions()) {
+                                       Long offset = 
zkOffsets.get(partition.getKafkaTopicPartition());
+                                       if (offset != null) {
+                                               partition.setOffset(offset);
+                                       }
+                               }
+                       }
+
+                       // start the periodic offset committer thread, if 
necessary
+                       if (autoCommitInterval > 0) {
+                               LOG.info("Starting periodic offset committer, 
with commit interval of {}ms", autoCommitInterval);
+
+                               periodicCommitter = new 
PeriodicOffsetCommitter(zookeeperOffsetHandler, 
+                                               subscribedPartitions(), 
errorHandler, autoCommitInterval);
+                               periodicCommitter.setName("Periodic Kafka 
partition offset committer");
+                               periodicCommitter.setDaemon(true);
+                               periodicCommitter.start();
+                       }
+
+                       // Main loop polling elements from the 
unassignedPartitions queue to the threads
+                       while (running) {
+                               // re-throw any exception from the concurrent 
fetcher threads
+                               errorHandler.checkAndThrowException();
+
+                               // wait for max 5 seconds trying to get 
partitions to assign
+                               // if threads shut down, this poll returns 
earlier, because the threads inject the
+                               // special marker into the queue
+                               
List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToAssign = 
+                                               
unassignedPartitionsQueue.getBatchBlocking(5000);
+                               partitionsToAssign.remove(MARKER);
+
+                               if (!partitionsToAssign.isEmpty()) {
+                                       LOG.info("Assigning {} partitions to 
broker threads", partitionsToAssign.size());
+                                       Map<Node, 
List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeaders = 
+                                                       
findLeaderForPartitions(partitionsToAssign, kafkaConfig);
+
+                                       // assign the partitions to the leaders 
(maybe start the threads)
+                                       for (Map.Entry<Node, 
List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeader : 
+                                                       
partitionsWithLeaders.entrySet())
+                                       {
+                                               final Node leader = 
partitionsWithLeader.getKey();
+                                               final 
List<KafkaTopicPartitionState<TopicAndPartition>> partitions = 
partitionsWithLeader.getValue();
+                                               SimpleConsumerThread<T> 
brokerThread = brokerToThread.get(leader);
+
+                                               if (!running) {
+                                                       break;
+                                               }
+
+                                               if (brokerThread == null || 
!brokerThread.getNewPartitionsQueue().isOpen()) {
+                                                       // start new thread
+                                                       brokerThread = 
createAndStartSimpleConsumerThread(partitions, leader, errorHandler);
+                                                       
brokerToThread.put(leader, brokerThread);
+                                               }
+                                               else {
+                                                       // put elements into 
queue of thread
+                                                       
ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> 
newPartitionsQueue = 
+                                                                       
brokerThread.getNewPartitionsQueue();
+                                                       
+                                                       for 
(KafkaTopicPartitionState<TopicAndPartition> fp : partitions) {
+                                                               if 
(!newPartitionsQueue.addIfOpen(fp)) {
+                                                                       // we 
were unable to add the partition to the broker's queue
+                                                                       // the 
broker has closed in the meantime (the thread will shut down)
+                                                                       // 
create a new thread for connecting to this broker
+                                                                       
List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions = new 
ArrayList<>();
+                                                                       
seedPartitions.add(fp);
+                                                                       
brokerThread = createAndStartSimpleConsumerThread(seedPartitions, leader, 
errorHandler);
+                                                                       
brokerToThread.put(leader, brokerThread);
+                                                                       
newPartitionsQueue = brokerThread.getNewPartitionsQueue(); // update queue for 
the subsequent partitions
+                                                               }
+                                                       }
+                                               }
+                                       }
+                               }
+                               else {
+                                       // there were no partitions to assign. 
Check if any broker threads shut down.
+                                       // we get into this section of the 
code, if either the poll timed out, or the
+                                       // blocking poll was woken up by the 
marker element
+                                       Iterator<SimpleConsumerThread<T>> 
bttIterator = brokerToThread.values().iterator();
+                                       while (bttIterator.hasNext()) {
+                                               SimpleConsumerThread<T> thread 
= bttIterator.next();
+                                               if 
(!thread.getNewPartitionsQueue().isOpen()) {
+                                                       LOG.info("Removing 
stopped consumer thread {}", thread.getName());
+                                                       bttIterator.remove();
+                                               }
+                                       }
+                               }
+
+                               if (brokerToThread.size() == 0 && 
unassignedPartitionsQueue.isEmpty()) {
+                                       if (unassignedPartitionsQueue.close()) {
+                                               LOG.info("All consumer threads 
are finished, there are no more unassigned partitions. Stopping fetcher");
+                                               break;
+                                       }
+                                       // we end up here if somebody added 
something to the queue in the meantime --> continue to poll queue again
+                               }
+                       }
+               }
+               catch (InterruptedException e) {
+                       // this may be thrown because an exception on one of 
the concurrent fetcher threads
+                       // woke this thread up. make sure we throw the root 
exception instead in that case
+                       errorHandler.checkAndThrowException();
+
+                       // no other root exception, throw the interrupted 
exception
+                       throw e;
+               }
+               finally {
+                       this.running = false;
+                       this.zookeeperOffsetHandler = null;
+
+                       // if we run a periodic committer thread, shut that down
+                       if (periodicCommitter != null) {
+                               periodicCommitter.shutdown();
+                       }
+
+                       // make sure that in any case (completion, abort, 
error), all spawned threads are stopped
+                       try {
+                               int runningThreads;
+                               do {
+                                       // check whether threads are alive and 
cancel them
+                                       runningThreads = 0;
+                                       Iterator<SimpleConsumerThread<T>> 
threads = brokerToThread.values().iterator();
+                                       while (threads.hasNext()) {
+                                               SimpleConsumerThread<?> t = 
threads.next();
+                                               if (t.isAlive()) {
+                                                       t.cancel();
+                                                       runningThreads++;
+                                               } else {
+                                                       threads.remove();
+                                               }
+                                       }
+
+                                       // wait for the threads to finish, 
before issuing a cancel call again
+                                       if (runningThreads > 0) {
+                                               for (SimpleConsumerThread<?> t 
: brokerToThread.values()) {
+                                                       t.join(500 / 
runningThreads + 1);
+                                               }
+                                       }
+                               }
+                               while (runningThreads > 0);
+                       }
+                       catch (Throwable t) {
+                               // we catch all here to preserve the original 
exception
+                               LOG.error("Exception while shutting down 
consumer threads", t);
+                       }
+
+                       try {
+                               zookeeperOffsetHandler.close();
+                       }
+                       catch (Throwable t) {
+                               // we catch all here to preserve the original 
exception
+                               LOG.error("Exception while shutting down 
ZookeeperOffsetHandler", t);
+                       }
+               }
+       }
+
+       @Override
+       public void cancel() {
+               // signal the main thread to exit
+               this.running = false;
+
+               // make sure the main thread wakes up soon
+               this.unassignedPartitionsQueue.addIfOpen(MARKER);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Kafka 0.8 specific class instantiation
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition 
partition) {
+               return new TopicAndPartition(partition.getTopic(), 
partition.getPartition());
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Offset handling
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> 
offsets) throws Exception {
+               ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
+               if (zkHandler != null) {
+                       zkHandler.writeOffsets(offsets);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       private SimpleConsumerThread<T> createAndStartSimpleConsumerThread(
+                       List<KafkaTopicPartitionState<TopicAndPartition>> 
seedPartitions,
+                       Node leader,
+                       ExceptionProxy errorHandler) throws IOException, 
ClassNotFoundException
+       {
+               // each thread needs its own copy of the deserializer, because 
the deserializer is
+               // not necessarily thread safe
+               final KeyedDeserializationSchema<T> clonedDeserializer =
+                               InstantiationUtil.clone(deserializer, 
userCodeClassLoader);
+
+               // seed thread with list of fetch partitions (otherwise it 
would shut down immediately again
+               SimpleConsumerThread<T> brokerThread = new 
SimpleConsumerThread<>(
+                               this, errorHandler, kafkaConfig, leader, 
seedPartitions, unassignedPartitionsQueue, 
+                               clonedDeserializer, invalidOffsetBehavior);
+
+               brokerThread.setName(String.format("SimpleConsumer - %s - 
broker-%s (%s:%d)",
+                               taskName, leader.id(), leader.host(), 
leader.port()));
+               brokerThread.setDaemon(true);
+               brokerThread.start();
+
+               LOG.info("Starting thread {}", brokerThread.getName());
+               return brokerThread;
+       }
+
+       /**
+        * Returns a list of unique topics from for the given partitions
+        *
+        * @param partitions A the partitions
+        * @return A list of unique topics
+        */
+       private static List<String> 
getTopics(List<KafkaTopicPartitionState<TopicAndPartition>> partitions) {
+               HashSet<String> uniqueTopics = new HashSet<>();
+               for (KafkaTopicPartitionState<TopicAndPartition> fp: 
partitions) {
+                       uniqueTopics.add(fp.getTopic());
+               }
+               return new ArrayList<>(uniqueTopics);
+       }
+
+       /**
+        * Find leaders for the partitions
+        *
+        * From a high level, the method does the following:
+        *       - Get a list of FetchPartitions (usually only a few partitions)
+        *       - Get the list of topics from the FetchPartitions list and 
request the partitions for the topics. (Kafka doesn't support getting leaders 
for a set of partitions)
+        *       - Build a Map<Leader, List<FetchPartition>> where only the 
requested partitions are contained.
+        *
+        * @param partitionsToAssign fetch partitions list
+        * @return leader to partitions map
+        */
+       private static Map<Node, 
List<KafkaTopicPartitionState<TopicAndPartition>>> findLeaderForPartitions(
+                       List<KafkaTopicPartitionState<TopicAndPartition>> 
partitionsToAssign,
+                       Properties kafkaProperties) throws Exception
+       {
+               if (partitionsToAssign.isEmpty()) {
+                       throw new IllegalArgumentException("Leader request for 
empty partitions list");
+               }
+
+               LOG.info("Refreshing leader information for partitions {}", 
partitionsToAssign);
+               
+               // this request is based on the topic names
+               PartitionInfoFetcher infoFetcher = new 
PartitionInfoFetcher(getTopics(partitionsToAssign), kafkaProperties);
+               infoFetcher.start();
+
+               // NOTE: The kafka client apparently locks itself up sometimes
+               // when it is interrupted, so we run it only in a separate 
thread.
+               // since it sometimes refuses to shut down, we resort to the 
admittedly harsh
+               // means of killing the thread after a timeout.
+               KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 
60000);
+               watchDog.start();
+
+               // this list contains ALL partitions of the requested topics
+               List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = 
infoFetcher.getPartitions();
+
+               // copy list to track unassigned partitions
+               List<KafkaTopicPartitionState<TopicAndPartition>> 
unassignedPartitions = new ArrayList<>(partitionsToAssign);
+
+               // final mapping from leader -> list(fetchPartition)
+               Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> 
leaderToPartitions = new HashMap<>();
+
+               for(KafkaTopicPartitionLeader partitionLeader: 
topicPartitionWithLeaderList) {
+                       if (unassignedPartitions.size() == 0) {
+                               // we are done: all partitions are assigned
+                               break;
+                       }
+
+                       Iterator<KafkaTopicPartitionState<TopicAndPartition>> 
unassignedPartitionsIterator = unassignedPartitions.iterator();
+                       while (unassignedPartitionsIterator.hasNext()) {
+                               KafkaTopicPartitionState<TopicAndPartition> 
unassignedPartition = unassignedPartitionsIterator.next();
+
+                               if 
(unassignedPartition.getKafkaTopicPartition().equals(partitionLeader.getTopicPartition()))
 {
+                                       // we found the leader for one of the 
fetch partitions
+                                       Node leader = 
partitionLeader.getLeader();
+
+                                       
List<KafkaTopicPartitionState<TopicAndPartition>> partitionsOfLeader = 
leaderToPartitions.get(leader);
+                                       if (partitionsOfLeader == null) {
+                                               partitionsOfLeader = new 
ArrayList<>();
+                                               leaderToPartitions.put(leader, 
partitionsOfLeader);
+                                       }
+                                       
partitionsOfLeader.add(unassignedPartition);
+                                       unassignedPartitionsIterator.remove(); 
// partition has been assigned
+                                       break;
+                               }
+                       }
+               }
+
+               if (unassignedPartitions.size() > 0) {
+                       throw new RuntimeException("Unable to find a leader for 
partitions: " + unassignedPartitions);
+               }
+
+               LOG.debug("Partitions with assigned leaders {}", 
leaderToPartitions);
+
+               return leaderToPartitions;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
new file mode 100644
index 0000000..4d61e53
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+/**
+ * A watch dog thread that forcibly kills another thread, if that thread does 
not
+ * finish in time.
+ * 
+ * <p>This uses the discouraged {@link Thread#stop()} method. While this is not
+ * advisable, this watch dog is only for extreme cases of thread that simply
+ * to not terminate otherwise.
+ */
+class KillerWatchDog extends Thread {
+
+       private final Thread toKill;
+       private final long timeout;
+
+       KillerWatchDog(Thread toKill, long timeout) {
+               super("KillerWatchDog");
+               setDaemon(true);
+
+               this.toKill = toKill;
+               this.timeout = timeout;
+       }
+
+       @SuppressWarnings("deprecation")
+       @Override
+       public void run() {
+               final long deadline = System.currentTimeMillis() + timeout;
+               long now;
+
+               while (toKill.isAlive() && (now = System.currentTimeMillis()) < 
deadline) {
+                       try {
+                               toKill.join(deadline - now);
+                       }
+                       catch (InterruptedException e) {
+                               // ignore here, our job is important!
+                       }
+               }
+
+               // this is harsh, but this watchdog is a last resort
+               if (toKill.isAlive()) {
+                       toKill.stop();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
deleted file mode 100644
index c4dd55c..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
+++ /dev/null
@@ -1,896 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.api.FetchRequestBuilder;
-import kafka.api.OffsetRequest;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.ErrorMapping;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.OffsetResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.MessageAndOffset;
-
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.StringUtils;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.Node;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static java.util.Objects.requireNonNull;
-import static 
org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.getIntFromConfig;
-
-/**
- * This fetcher uses Kafka's low-level API to pull data from a specific
- * set of topics and partitions.
- * 
- * <p>This code is in parts based on the tutorial code for the low-level Kafka 
consumer.</p>
- */
-public class LegacyFetcher<T> implements Fetcher {
-       
-       private static final Logger LOG = 
LoggerFactory.getLogger(LegacyFetcher.class);
-
-       private static final FetchPartition MARKER = new FetchPartition("n/a", 
-1, -1);
-
-       /** The properties that configure the Kafka connection */
-       private final Properties config;
-       
-       /** The task name, to give more readable names to the spawned threads */
-       private final String taskName;
-       
-       /** The first error that occurred in a connection thread */
-       private final AtomicReference<Throwable> error;
-       
-       /** The classloader for dynamically loaded classes */
-       private final ClassLoader userCodeClassloader;
-       
-       /** Reference the the thread that executed the run() method. */
-       private volatile Thread mainThread;
-       
-       /** Flag to shut the fetcher down */
-       private volatile boolean running = true;
-
-       /**
-        * Queue of partitions which need to find a (new) leader. Elements are 
added to the queue
-        * from the consuming threads. The LegacyFetcher thread is finding new 
leaders and assigns the partitions
-        * to the respective threads.
-        */
-       private final ClosableBlockingQueue<FetchPartition> 
unassignedPartitions = new ClosableBlockingQueue<>();
-
-       /** The {@link FlinkKafkaConsumer08} to whom this Fetcher belongs. */
-       private final FlinkKafkaConsumer08<T> flinkKafkaConsumer;
-
-       /**
-        * Create a LegacyFetcher instance.
-        *
-        * @param initialPartitionsToRead Map of partitions to read. The offset 
passed is the last-fetched-offset (not the next-offset-to-fetch).
-        * @param props kafka properties
-        * @param taskName name of the parent task
-        * @param userCodeClassloader classloader for loading user code
-        */
-       public LegacyFetcher(
-                               FlinkKafkaConsumer08<T> owner,
-                               Map<KafkaTopicPartition, KafkaPartitionState> 
initialPartitionsToRead,
-                               Properties props,
-                               String taskName, ClassLoader 
userCodeClassloader) {
-
-               this.flinkKafkaConsumer = requireNonNull(owner);
-               this.config = requireNonNull(props, "The config properties 
cannot be null");
-               this.userCodeClassloader = requireNonNull(userCodeClassloader);
-               if (initialPartitionsToRead.size() == 0) {
-                       throw new IllegalArgumentException("List of initial 
partitions is empty");
-               }
-
-               for (Map.Entry<KafkaTopicPartition, KafkaPartitionState> 
partitionToRead: initialPartitionsToRead.entrySet()) {
-                       KafkaTopicPartition ktp = partitionToRead.getKey();
-                       // we increment the offset by one so that we fetch the 
next message in the partition.
-                       long offset = partitionToRead.getValue().getOffset();
-                       if (offset >= 0 && offset != 
FlinkKafkaConsumerBase.OFFSET_NOT_SET) {
-                               offset += 1L;
-                       }
-                       unassignedPartitions.add(new 
FetchPartition(ktp.getTopic(), ktp.getPartition(), offset));
-               }
-               this.taskName = taskName;
-               this.error = new AtomicReference<>();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Fetcher methods
-       // 
------------------------------------------------------------------------
-
-
-       @Override
-       public void close() {
-               // flag needs to be check by the run() method that creates the 
spawned threads
-               this.running = false;
-               
-               // all other cleanup is made by the run method itself
-       }
-
-       @Override
-       public <T> void run(SourceFunction.SourceContext<T> sourceContext,
-                                               KeyedDeserializationSchema<T> 
deserializer,
-                                               HashMap<KafkaTopicPartition, 
KafkaPartitionState> partitionState) throws Exception {
-
-               // NOTE: This method needs to always release all resources it 
acquires
-
-               this.mainThread = Thread.currentThread();
-
-               // keep presumably dead threads in the list until we are sure 
the thread is not alive anymore.
-               final Map<Node, SimpleConsumerThread<T>> brokerToThread = new 
HashMap<>();
-               try {
-                       // Main loop polling elements from the 
unassignedPartitions queue to the threads
-                       while (running && error.get() == null) {
-                               try {
-                                       // wait for 5 seconds trying to get 
partitions to assign
-                                       List<FetchPartition> partitionsToAssign 
= unassignedPartitions.getBatchBlocking(5000);
-                                       partitionsToAssign.remove(MARKER);
-                                       
-                                       if(!partitionsToAssign.isEmpty()) {
-                                               LOG.info("Assigning {} 
partitions to broker threads", partitionsToAssign.size());
-                                               Map<Node, List<FetchPartition>> 
partitionsWithLeaders = findLeaderForPartitions(partitionsToAssign);
-                                               
-                                               // assign the partitions to the 
leaders (maybe start the threads)
-                                               for (Map.Entry<Node, 
List<FetchPartition>> partitionsWithLeader : partitionsWithLeaders.entrySet()) {
-                                                       final Node leader = 
partitionsWithLeader.getKey();
-                                                       final 
List<FetchPartition> partitions = partitionsWithLeader.getValue();
-                                                       SimpleConsumerThread<T> 
brokerThread = brokerToThread.get(leader);
-
-                                                       if (!running) {
-                                                               break;
-                                                       }
-                                                       
-                                                       if (brokerThread == 
null || !brokerThread.getNewPartitionsQueue().isOpen()) {
-                                                               // start new 
thread
-                                                               brokerThread = 
createAndStartSimpleConsumerThread(sourceContext, deserializer, partitions, 
leader);
-                                                               
brokerToThread.put(leader, brokerThread);
-                                                       } else {
-                                                               // put elements 
into queue of thread
-                                                               
ClosableBlockingQueue<FetchPartition> newPartitionsQueue = 
brokerThread.getNewPartitionsQueue();
-                                                               for 
(FetchPartition fp : partitions) {
-                                                                       if 
(!newPartitionsQueue.addIfOpen(fp)) {
-                                                                               
// we were unable to add the partition to the broker's queue
-                                                                               
// the broker has closed in the meantime (the thread will shut down)
-                                                                               
// create a new thread for connecting to this broker
-                                                                               
List<FetchPartition> seedPartitions = new ArrayList<>();
-                                                                               
seedPartitions.add(fp);
-                                                                               
brokerThread = createAndStartSimpleConsumerThread(sourceContext, deserializer, 
seedPartitions, leader);
-                                                                               
brokerToThread.put(leader, brokerThread);
-                                                                               
newPartitionsQueue = brokerThread.getNewPartitionsQueue(); // update queue for 
the subsequent partitions
-                                                                       }
-                                                               }
-                                                       }
-                                               }
-                                       } else {
-                                               // there were no partitions to 
assign. Check if any broker threads shut down.
-                                               
Iterator<SimpleConsumerThread<T>> bttIterator = 
brokerToThread.values().iterator();
-                                               while (bttIterator.hasNext()) {
-                                                       SimpleConsumerThread<T> 
thread = bttIterator.next();
-                                                       
if(!thread.getNewPartitionsQueue().isOpen()) {
-                                                               
LOG.info("Removing stopped consumer thread {}", thread.getName());
-                                                               
bttIterator.remove();
-                                                       }
-                                               }
-                                       }
-
-                                       if (brokerToThread.size() == 0 && 
unassignedPartitions.isEmpty()) {
-                                               if 
(unassignedPartitions.close()) {
-                                                       LOG.info("All consumer 
threads are finished, there are no more unassigned partitions. Stopping 
fetcher");
-                                                       break;
-                                               }
-                                               // we end up here if somebody 
added something to the queue in the meantime --> continue to poll queue again
-                                       }
-                               } catch (InterruptedException e) {
-                                       // ignore. we should notice what 
happened in the next loop check
-                               }
-                       }
-
-                       // make sure any asynchronous error is noticed
-                       Throwable error = this.error.get();
-                       if (error != null) {
-                               throw new Exception(error.getMessage(), error);
-                       }
-               } finally {
-                       // make sure that in any case (completion, abort, 
error), all spawned threads are stopped
-                       int runningThreads;
-                       do {
-                               runningThreads = 0;
-                               for (SimpleConsumerThread<?> t : 
brokerToThread.values()) {
-                                       if (t.isAlive()) {
-                                               t.cancel();
-                                               runningThreads++;
-                                       }
-                               }
-                               if(runningThreads > 0) {
-                                       Thread.sleep(500);
-                               }
-                       } while(runningThreads > 0);
-               }
-       }
-
-       private <T> SimpleConsumerThread<T> 
createAndStartSimpleConsumerThread(SourceFunction.SourceContext<T> 
sourceContext,
-                                                                               
                                                                
KeyedDeserializationSchema<T> deserializer,
-                                                                               
                                                                
List<FetchPartition> seedPartitions, Node leader) throws IOException, 
ClassNotFoundException {
-               final KeyedDeserializationSchema<T> clonedDeserializer =
-                               InstantiationUtil.clone(deserializer, 
userCodeClassloader);
-
-               // seed thread with list of fetch partitions (otherwise it 
would shut down immediately again
-               SimpleConsumerThread<T> brokerThread = new 
SimpleConsumerThread<>(this, config,
-                               leader, seedPartitions, unassignedPartitions, 
sourceContext, clonedDeserializer);
-
-               brokerThread.setName(String.format("SimpleConsumer - %s - 
broker-%s (%s:%d)",
-                               taskName, leader.id(), leader.host(), 
leader.port()));
-               brokerThread.setDaemon(true);
-               brokerThread.start();
-               LOG.info("Starting thread {}", brokerThread.getName());
-               return brokerThread;
-       }
-
-       /**
-        * Find leaders for the partitions
-        *
-        * From a high level, the method does the following:
-        *       - Get a list of FetchPartitions (usually only a few partitions)
-        *       - Get the list of topics from the FetchPartitions list and 
request the partitions for the topics. (Kafka doesn't support getting leaders 
for a set of partitions)
-        *       - Build a Map<Leader, List<FetchPartition>> where only the 
requested partitions are contained.
-        *
-        * @param partitionsToAssign fetch partitions list
-        * @return leader to partitions map
-        */
-       private Map<Node, List<FetchPartition>> 
findLeaderForPartitions(List<FetchPartition> partitionsToAssign) throws 
Exception {
-               if(partitionsToAssign.size() == 0) {
-                       throw new IllegalArgumentException("Leader request for 
empty partitions list");
-               }
-
-               LOG.info("Refreshing leader information for partitions {}", 
partitionsToAssign);
-               // NOTE: The kafka client apparently locks itself in an 
infinite loop sometimes
-               // when it is interrupted, so we run it only in a separate 
thread.
-               // since it sometimes refuses to shut down, we resort to the 
admittedly harsh
-               // means of killing the thread after a timeout.
-               PartitionInfoFetcher infoFetcher = new 
PartitionInfoFetcher(getTopics(partitionsToAssign), config); // this request is 
based on the topic names
-               infoFetcher.start();
-
-               KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 
60000);
-               watchDog.start();
-
-               // this list contains ALL partitions of the requested topics
-               List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = 
infoFetcher.getPartitions();
-               // copy list to track unassigned partitions
-               List<FetchPartition> unassignedPartitions = new 
ArrayList<>(partitionsToAssign);
-
-               // final mapping from leader -> list(fetchPartition)
-               Map<Node, List<FetchPartition>> leaderToPartitions = new 
HashMap<>();
-
-               for(KafkaTopicPartitionLeader partitionLeader: 
topicPartitionWithLeaderList) {
-                       if (unassignedPartitions.size() == 0) {
-                               // we are done: all partitions are assigned
-                               break;
-                       }
-                       Iterator<FetchPartition> unassignedPartitionsIterator = 
unassignedPartitions.iterator();
-                       while (unassignedPartitionsIterator.hasNext()) {
-                               FetchPartition unassignedPartition = 
unassignedPartitionsIterator.next();
-                               if 
(unassignedPartition.topic.equals(partitionLeader.getTopicPartition().getTopic())
-                                               && 
unassignedPartition.partition == 
partitionLeader.getTopicPartition().getPartition()) {
-
-                                       // we found the leader for one of the 
fetch partitions
-                                       Node leader = 
partitionLeader.getLeader();
-                                       List<FetchPartition> partitionsOfLeader 
= leaderToPartitions.get(leader);
-                                       if (partitionsOfLeader == null) {
-                                               partitionsOfLeader = new 
ArrayList<>();
-                                               leaderToPartitions.put(leader, 
partitionsOfLeader);
-                                       }
-                                       
partitionsOfLeader.add(unassignedPartition);
-                                       unassignedPartitionsIterator.remove(); 
// partition has been assigned
-                                       break;
-                               }
-                       }
-               }
-
-               if(unassignedPartitions.size() > 0) {
-                       throw new RuntimeException("Unable to find a leader for 
partitions: " + unassignedPartitions);
-               }
-
-               LOG.debug("Partitions with assigned leaders {}", 
leaderToPartitions);
-
-               return leaderToPartitions;
-       }
-
-       /**
-        * Reports an error from a fetch thread. This will cause the main 
thread to see this error,
-        * abort, and cancel all other fetch threads.
-        * 
-        * @param error The error to report.
-        */
-       @Override
-       public void stopWithError(Throwable error) {
-               if (this.error.compareAndSet(null, error)) {
-                       // we are the first to report an error
-                       if (mainThread != null) {
-                               mainThread.interrupt();
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Representation of a partition to fetch.
-        */
-       private static class FetchPartition {
-
-               final String topic;
-               
-               /** ID of the partition within the topic (0 indexed, as given 
by Kafka) */
-               final int partition;
-               
-               /** Offset pointing at the next element to read from that 
partition. */
-               long nextOffsetToRead;
-
-               FetchPartition(String topic, int partition, long 
nextOffsetToRead) {
-                       this.topic = topic;
-                       this.partition = partition;
-                       this.nextOffsetToRead = nextOffsetToRead;
-               }
-               
-               @Override
-               public String toString() {
-                       return "FetchPartition {topic=" + topic +", partition=" 
+ partition + ", offset=" + nextOffsetToRead + '}';
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Per broker fetcher
-       // 
------------------------------------------------------------------------
-       
-       /**
-        * Each broker needs its separate connection. This thread implements 
the connection to
-        * one broker. The connection can fetch multiple partitions from the 
broker.
-        * 
-        * @param <T> The data type fetched.
-        */
-       private static class SimpleConsumerThread<T> extends Thread {
-
-               private static final Logger LOG = 
LoggerFactory.getLogger(SimpleConsumerThread.class);
-               
-               private final SourceFunction.SourceContext<T> sourceContext;
-               private final KeyedDeserializationSchema<T> deserializer;
-
-               private final List<FetchPartition> partitions;
-               
-               private final Node broker;
-
-               private final Properties config;
-
-               private final LegacyFetcher owner;
-
-               private final ClosableBlockingQueue<FetchPartition> 
unassignedPartitions;
-
-               private volatile boolean running = true;
-
-               /** Queue containing new fetch partitions for the consumer 
thread */
-               private final ClosableBlockingQueue<FetchPartition> 
newPartitionsQueue = new ClosableBlockingQueue<>();
-
-               // ----------------- Simple Consumer ----------------------
-               private SimpleConsumer consumer;
-
-               private final int soTimeout;
-               private final int minBytes;
-               private final int maxWait;
-               private final int fetchSize;
-               private final int bufferSize;
-               private final int reconnectLimit;
-
-
-               // exceptions are thrown locally
-               public SimpleConsumerThread(LegacyFetcher owner,
-                                                                       
Properties config,
-                                                                       Node 
broker,
-                                                                       
List<FetchPartition> seedPartitions,
-                                                                       
ClosableBlockingQueue<FetchPartition> unassignedPartitions,
-                                                                       
SourceFunction.SourceContext<T> sourceContext,
-                                                                       
KeyedDeserializationSchema<T> deserializer) {
-                       this.owner = owner;
-                       this.config = config;
-                       this.broker = broker;
-                       this.partitions = seedPartitions;
-                       this.sourceContext = requireNonNull(sourceContext);
-                       this.deserializer = requireNonNull(deserializer);
-                       this.unassignedPartitions = 
requireNonNull(unassignedPartitions);
-
-                       this.soTimeout = getIntFromConfig(config, 
"socket.timeout.ms", 30000);
-                       this.minBytes = getIntFromConfig(config, 
"fetch.min.bytes", 1);
-                       this.maxWait = getIntFromConfig(config, 
"fetch.wait.max.ms", 100);
-                       this.fetchSize = getIntFromConfig(config, 
"fetch.message.max.bytes", 1048576);
-                       this.bufferSize = getIntFromConfig(config, 
"socket.receive.buffer.bytes", 65536);
-                       this.reconnectLimit = getIntFromConfig(config, 
"flink.simple-consumer-reconnectLimit", 3);
-               }
-
-               @Override
-               public void run() {
-                       LOG.info("Starting to fetch from {}", this.partitions);
-
-                       // set up the config values
-                       final String clientId = "flink-kafka-consumer-legacy-" 
+ broker.id();
-                       int reconnects = 0;
-                       // these are the actual configuration values of Kafka + 
their original default values.
-
-                       try {
-                               // create the Kafka consumer that we actually 
use for fetching
-                               consumer = new SimpleConsumer(broker.host(), 
broker.port(), soTimeout, bufferSize, clientId);
-
-                               // make sure that all partitions have some 
offsets to start with
-                               // those partitions that do not have an offset 
from a checkpoint need to get
-                               // their start offset from ZooKeeper
-                               getMissingOffsetsFromKafka(partitions);
-
-                               // Now, the actual work starts :-)
-                               int offsetOutOfRangeCount = 0;
-                               while (running) {
-
-                                       // ----------------------------------- 
partitions list maintenance ----------------------------
-
-                                       // check queue for new partitions to 
read from:
-                                       List<FetchPartition> newPartitions = 
newPartitionsQueue.pollBatch();
-                                       if(newPartitions != null) {
-                                               // only this thread is taking 
elements from the queue, so the next call will never block
-
-                                               // check if the new partitions 
need an offset lookup
-                                               
getMissingOffsetsFromKafka(newPartitions);
-                                               // add the new partitions (and 
check they are not already in there)
-                                               for(FetchPartition 
newPartition: newPartitions) {
-                                                       
if(partitions.contains(newPartition)) {
-                                                               throw new 
IllegalStateException("Adding partition " + newPartition + " to subscribed 
partitions even though it is already subscribed");
-                                                       }
-                                                       
partitions.add(newPartition);
-                                               }
-                                               LOG.info("Adding {} new 
partitions to consumer thread {}", newPartitions.size(), getName());
-                                               if(LOG.isDebugEnabled()) {
-                                                       LOG.debug("Partitions 
list: {}", newPartitions);
-                                               }
-                                       }
-
-                                       if(partitions.size() == 0) {
-                                               if(newPartitionsQueue.close()) {
-                                                       // close succeeded. 
Closing thread
-                                                       LOG.info("Consumer 
thread {} does not have any partitions assigned anymore. Stopping thread.", 
getName());
-                                                       running = false;
-                                                       
-                                                       // add the wake-up 
marker into the queue to make the main thread
-                                                       // immediately wake up 
and termination faster
-                                                       
unassignedPartitions.add(MARKER);
-                                                       
-                                                       break;
-                                               } else {
-                                                       // close failed: 
LegacyFetcher main thread added new partitions into the queue.
-                                                       continue; // go to top 
of loop again and get the new partitions
-                                               }
-                                       }
-
-                                       // ----------------------------------- 
request / response with kafka ----------------------------
-
-                                       FetchRequestBuilder frb = new 
FetchRequestBuilder();
-                                       frb.clientId(clientId);
-                                       frb.maxWait(maxWait);
-                                       frb.minBytes(minBytes);
-                                       
-                                       for (FetchPartition fp : partitions) {
-                                               frb.addFetch(fp.topic, 
fp.partition, fp.nextOffsetToRead, fetchSize);
-                                       }
-                                       kafka.api.FetchRequest fetchRequest = 
frb.build();
-                                       LOG.debug("Issuing fetch request {}", 
fetchRequest);
-
-                                       FetchResponse fetchResponse;
-                                       try {
-                                               fetchResponse = 
consumer.fetch(fetchRequest);
-                                       } catch(Throwable cce) {
-                                               //noinspection 
ConstantConditions
-                                               if(cce instanceof 
ClosedChannelException) {
-                                                       LOG.warn("Fetch failed 
because of ClosedChannelException.");
-                                                       LOG.debug("Full 
exception", cce);
-                                                       // we don't know if the 
broker is overloaded or unavailable.
-                                                       // retry a few times, 
then return ALL partitions for new leader lookup
-                                                       if(++reconnects >= 
reconnectLimit) {
-                                                               
LOG.warn("Unable to reach broker after {} retries. Returning all current 
partitions", reconnectLimit);
-                                                               for 
(FetchPartition fp: this.partitions) {
-                                                                       
unassignedPartitions.add(fp);
-                                                               }
-                                                               
this.partitions.clear();
-                                                               continue; // 
jump to top of loop: will close thread or subscribe to new partitions
-                                                       }
-                                                       try {
-                                                               
consumer.close();
-                                                       } catch(Throwable t) {
-                                                               LOG.warn("Error 
while closing consumer connection", t);
-                                                       }
-                                                       // delay & retry
-                                                       Thread.sleep(500);
-                                                       consumer = new 
SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);
-                                                       continue; // retry
-                                               } else {
-                                                       throw cce;
-                                               }
-                                       }
-                                       reconnects = 0;
-
-                                       // 
---------------------------------------- error handling 
----------------------------
-
-                                       if(fetchResponse == null) {
-                                               throw new 
RuntimeException("Fetch failed");
-                                       }
-                                       if (fetchResponse.hasError()) {
-                                               String exception = "";
-                                               List<FetchPartition> 
partitionsToGetOffsetsFor = new ArrayList<>();
-                                               // iterate over partitions to 
get individual error codes
-                                               Iterator<FetchPartition> 
partitionsIterator = partitions.iterator();
-                                               boolean partitionsRemoved = 
false;
-                                               
while(partitionsIterator.hasNext()) {
-                                                       final FetchPartition fp 
= partitionsIterator.next();
-                                                       short code = 
fetchResponse.errorCode(fp.topic, fp.partition);
-
-                                                       if (code == 
ErrorMapping.OffsetOutOfRangeCode()) {
-                                                               // we were 
asked to read from an out-of-range-offset (maybe set wrong in Zookeeper)
-                                                               // Kafka's high 
level consumer is resetting the offset according to 'auto.offset.reset'
-                                                               
partitionsToGetOffsetsFor.add(fp);
-                                                       } else if(code == 
ErrorMapping.NotLeaderForPartitionCode() ||
-                                                                       code == 
ErrorMapping.LeaderNotAvailableCode() ||
-                                                                       code == 
ErrorMapping.BrokerNotAvailableCode() ||
-                                                                       code == 
ErrorMapping.UnknownCode()) {
-                                                               // the broker 
we are connected to is not the leader for the partition.
-                                                               LOG.warn("{} is 
not the leader of {}. Reassigning leader for partition", broker, fp);
-                                                               
LOG.debug("Error code = {}", code);
-                                                               
-                                                               
unassignedPartitions.add(fp);
-                                                               
-                                                               
partitionsIterator.remove(); // unsubscribe the partition ourselves
-                                                               
partitionsRemoved = true;
-                                                       } else if (code != 
ErrorMapping.NoError()) {
-                                                               exception += 
"\nException for " + fp.topic +":"+ fp.partition + ": " +
-                                                                               
StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
-                                                       }
-                                               }
-                                               if 
(partitionsToGetOffsetsFor.size() > 0) {
-                                                       // safeguard against an 
infinite loop.
-                                                       if 
(offsetOutOfRangeCount++ > 3) {
-                                                               throw new 
RuntimeException("Found invalid offsets more than three times in partitions 
"+partitionsToGetOffsetsFor.toString()+" " +
-                                                                               
"Exceptions: "+exception);
-                                                       }
-                                                       // get valid offsets 
for these partitions and try again.
-                                                       LOG.warn("The following 
partitions had an invalid offset: {}", partitionsToGetOffsetsFor);
-                                                       getLastOffset(consumer, 
partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config));
-                                                       LOG.warn("The new 
partition offsets are {}", partitionsToGetOffsetsFor);
-                                                       continue; // jump back 
to create a new fetch request. The offset has not been touched.
-                                               } else if(partitionsRemoved) {
-                                                       continue; // create new 
fetch request
-                                               } else {
-                                                       // partitions failed on 
an error
-                                                       throw new 
IOException("Error while fetching from broker '" + broker +"':" + exception);
-                                               }
-                                       } else {
-                                               // successful fetch, reset 
offsetOutOfRangeCount.
-                                               offsetOutOfRangeCount = 0;
-                                       }
-
-                                       // ----------------------------------- 
process fetch response ----------------------------
-
-                                       int messagesInFetch = 0;
-                                       int deletedMessages = 0;
-                                       Iterator<FetchPartition> 
partitionsIterator = partitions.iterator();
-                                       partitionsLoop: while 
(partitionsIterator.hasNext()) {
-                                               final FetchPartition fp = 
partitionsIterator.next();
-                                               final ByteBufferMessageSet 
messageSet = fetchResponse.messageSet(fp.topic, fp.partition);
-                                               final KafkaTopicPartition 
topicPartition = new KafkaTopicPartition(fp.topic, fp.partition);
-                                               
-                                               for (MessageAndOffset msg : 
messageSet) {
-                                                       if (running) {
-                                                               
messagesInFetch++;
-                                                               if 
(msg.offset() < fp.nextOffsetToRead) {
-                                                                       // we 
have seen this message already
-                                                                       
LOG.info("Skipping message with offset " + msg.offset()
-                                                                               
        + " because we have seen messages until " + fp.nextOffsetToRead
-                                                                               
        + " from partition " + fp.partition + " already");
-                                                                       
continue;
-                                                               }
-
-                                                               final long 
offset = msg.offset();
-
-                                                               ByteBuffer 
payload = msg.message().payload();
-
-                                                               // If the 
message value is null, this represents a delete command for the message key.
-                                                               // Log this and 
pass it on to the client who might want to also receive delete messages.
-                                                               byte[] 
valueBytes;
-                                                               if (payload == 
null) {
-                                                                       
deletedMessages++;
-                                                                       
valueBytes = null;
-                                                               } else {
-                                                                       
valueBytes = new byte[payload.remaining()];
-                                                                       
payload.get(valueBytes);
-                                                               }
-
-                                                               // put key into 
byte array
-                                                               byte[] keyBytes 
= null;
-                                                               int keySize = 
msg.message().keySize();
-
-                                                               if (keySize >= 
0) { // message().hasKey() is doing the same. We save one int deserialization
-                                                                       
ByteBuffer keyPayload = msg.message().key();
-                                                                       
keyBytes = new byte[keySize];
-                                                                       
keyPayload.get(keyBytes);
-                                                               }
-
-                                                               final T value = 
deserializer.deserialize(keyBytes, valueBytes, fp.topic, fp.partition, offset);
-                                                               
if(deserializer.isEndOfStream(value)) {
-                                                                       // 
remove partition from subscribed partitions.
-                                                                       
partitionsIterator.remove();
-                                                                       
continue partitionsLoop;
-                                                               }
-                                                               synchronized 
(sourceContext.getCheckpointLock()) {
-                                                                       
owner.flinkKafkaConsumer.processElement(sourceContext, topicPartition, value, 
offset);
-                                                               }
-                                                               
-                                                               // advance 
offset for the next request
-                                                               
fp.nextOffsetToRead = offset + 1;
-                                                       }
-                                                       else {
-                                                               // no longer 
running
-                                                               return;
-                                                       }
-                                               }
-                                       }
-                                       LOG.debug("This fetch contained {} 
messages ({} deleted messages)", messagesInFetch, deletedMessages);
-                               } // end of fetch loop
-                               
-                               if (!newPartitionsQueue.close()) {
-                                       throw new Exception("Bug: Cleanly 
leaving fetcher thread without having a closed queue.");
-                               }
-                       }
-                       catch (Throwable t) {
-                               // report to the main thread
-                               owner.stopWithError(t);
-                       }
-                       finally {
-                               // end of run loop. close connection to consumer
-                               if (consumer != null) {
-                                       // closing the consumer should not fail 
the program
-                                       try {
-                                               consumer.close();
-                                       }
-                                       catch (Throwable t) {
-                                               LOG.error("Error while closing 
the Kafka simple consumer", t);
-                                       }
-                               }
-                       }
-               }
-
-               private void getMissingOffsetsFromKafka(List<FetchPartition> 
partitions) {
-                       List<FetchPartition> partitionsToGetOffsetsFor = new 
ArrayList<>();
-
-                       for (FetchPartition fp : partitions) {
-                               if (fp.nextOffsetToRead == 
FlinkKafkaConsumerBase.OFFSET_NOT_SET) {
-                                       // retrieve the offset from the consumer
-                                       partitionsToGetOffsetsFor.add(fp);
-                               }
-                       }
-                       if (partitionsToGetOffsetsFor.size() > 0) {
-                               getLastOffset(consumer, 
partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config));
-                               LOG.info("No prior offsets found for some 
partitions. Fetched the following start offsets {}", partitionsToGetOffsetsFor);
-
-                               // setting the fetched offset also in the 
offset state.
-                               // we subtract -1 from the offset
-                               synchronized 
(sourceContext.getCheckpointLock()) {
-                                       for(FetchPartition fp: 
partitionsToGetOffsetsFor) {
-                                               
owner.flinkKafkaConsumer.updateOffsetForPartition(new 
KafkaTopicPartition(fp.topic, fp.partition), fp.nextOffsetToRead - 1L);
-                                       }
-                               }
-                       }
-               }
-
-               /**
-                * Cancels this fetch thread. The thread will release all 
resources and terminate.
-                */
-               public void cancel() {
-                       this.running = false;
-                       
-                       // interrupt whatever the consumer is doing
-                       if (consumer != null) {
-                               consumer.close();
-                       }
-                       
-                       this.interrupt();
-               }
-
-               public ClosableBlockingQueue<FetchPartition> 
getNewPartitionsQueue() {
-                       return newPartitionsQueue;
-               }
-
-               /**
-                * Request latest offsets for a set of partitions, via a Kafka 
consumer.
-                *
-                * This method retries three times if the response has an error
-                *
-                * @param consumer The consumer connected to lead broker
-                * @param partitions The list of partitions we need offsets for
-                * @param whichTime The type of time we are requesting. -1 and 
-2 are special constants (See OffsetRequest)
-                */
-               private static void getLastOffset(SimpleConsumer consumer, 
List<FetchPartition> partitions, long whichTime) {
-
-                       Map<TopicAndPartition, PartitionOffsetRequestInfo> 
requestInfo = new HashMap<>();
-                       for (FetchPartition fp: partitions) {
-                               TopicAndPartition topicAndPartition = new 
TopicAndPartition(fp.topic, fp.partition);
-                               requestInfo.put(topicAndPartition, new 
PartitionOffsetRequestInfo(whichTime, 1));
-                       }
-
-                       int retries = 0;
-                       OffsetResponse response;
-                       while(true) {
-                               kafka.javaapi.OffsetRequest request = new 
kafka.javaapi.OffsetRequest(requestInfo, 
kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
-                               response = consumer.getOffsetsBefore(request);
-
-                               if (response.hasError()) {
-                                       String exception = "";
-                                       for (FetchPartition fp : partitions) {
-                                               short code;
-                                               if ((code = 
response.errorCode(fp.topic, fp.partition)) != ErrorMapping.NoError()) {
-                                                       exception += 
"\nException for topic=" + fp.topic + " partition=" + fp.partition + ": " + 
StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
-                                               }
-                                       }
-                                       if(++retries >= 3) {
-                                               throw new 
RuntimeException("Unable to get last offset for partitions " + partitions + ". 
" + exception);
-                                       } else {
-                                               LOG.warn("Unable to get last 
offset for partitions: Exception(s): {}", exception);
-                                       }
-
-                               } else {
-                                       break; // leave retry loop
-                               }
-                       }
-
-                       for (FetchPartition fp: partitions) {
-                               // the resulting offset is the next offset we 
are going to read
-                               // for not-yet-consumed partitions, it is 0.
-                               fp.nextOffsetToRead = 
response.offsets(fp.topic, fp.partition)[0];
-                       }
-               }
-
-               private static long getInvalidOffsetBehavior(Properties config) 
{
-                       long timeType;
-                       String val = 
config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest");
-                       if(val.equals("none")) {
-                               throw new RuntimeException("Unable to find 
previous offset in consumer group. " +
-                                               "Set 'auto.offset.reset' to 
'latest' or 'earliest' to automatically get the offset from Kafka");
-                       }
-                       if (val.equals("largest") || val.equals("latest")) { // 
largest is kafka 0.8, latest is kafka 0.9
-                               timeType = OffsetRequest.LatestTime();
-                       } else {
-                               timeType = OffsetRequest.EarliestTime();
-                       }
-                       return timeType;
-               }
-       }
-
-
-       private static class PartitionInfoFetcher extends Thread {
-
-               private final List<String> topics;
-               private final Properties properties;
-
-               private volatile List<KafkaTopicPartitionLeader> result;
-               private volatile Throwable error;
-
-
-               PartitionInfoFetcher(List<String> topics, Properties 
properties) {
-                       this.topics = topics;
-                       this.properties = properties;
-               }
-
-               @Override
-               public void run() {
-                       try {
-                               result = 
FlinkKafkaConsumer08.getPartitionsForTopic(topics, properties);
-                       }
-                       catch (Throwable t) {
-                               this.error = t;
-                       }
-               }
-
-               public List<KafkaTopicPartitionLeader> getPartitions() throws 
Exception {
-                       try {
-                               this.join();
-                       }
-                       catch (InterruptedException e) {
-                               throw new Exception("Partition fetching was 
cancelled before completion");
-                       }
-
-                       if (error != null) {
-                               throw new Exception("Failed to fetch partitions 
for topics " + topics.toString(), error);
-                       }
-                       if (result != null) {
-                               return result;
-                       }
-                       throw new Exception("Partition fetching failed");
-               }
-       }
-
-       private static class KillerWatchDog extends Thread {
-
-               private final Thread toKill;
-               private final long timeout;
-
-               private KillerWatchDog(Thread toKill, long timeout) {
-                       super("KillerWatchDog");
-                       setDaemon(true);
-
-                       this.toKill = toKill;
-                       this.timeout = timeout;
-               }
-
-               @SuppressWarnings("deprecation")
-               @Override
-               public void run() {
-                       final long deadline = System.currentTimeMillis() + 
timeout;
-                       long now;
-
-                       while (toKill.isAlive() && (now = 
System.currentTimeMillis()) < deadline) {
-                               try {
-                                       toKill.join(deadline - now);
-                               }
-                               catch (InterruptedException e) {
-                                       // ignore here, our job is important!
-                               }
-                       }
-
-                       // this is harsh, but this watchdog is a last resort
-                       if (toKill.isAlive()) {
-                               toKill.stop();
-                       }
-               }
-       }
-
-       /**
-        * Returns a unique list of topics from the topic partition list
-        *
-        * @param partitionsList A lost of FetchPartitions's
-        * @return A unique list of topics from the input map
-        */
-       public static List<String> getTopics(List<FetchPartition> 
partitionsList) {
-               HashSet<String> uniqueTopics = new HashSet<>();
-               for (FetchPartition fp: partitionsList) {
-                       uniqueTopics.add(fp.topic);
-               }
-               return new ArrayList<>(uniqueTopics);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
deleted file mode 100644
index ecc7609..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * The offset handler is responsible for locating the initial partition 
offsets 
- * where the source should start reading, as well as committing offsets from 
completed
- * checkpoints.
- */
-public interface OffsetHandler {
-
-       /**
-        * Commits the given offset for the partitions. May commit the offsets 
to the Kafka broker,
-        * or to ZooKeeper, based on its configured behavior.
-        *
-        * @param offsetsToCommit The offset to commit, per partition.
-        */
-       void commit(Map<KafkaTopicPartition, Long> offsetsToCommit) throws 
Exception;
-
-       /**
-        * Positions the given fetcher to the initial read offsets where the 
stream consumption
-        * will start from.
-        * 
-        * @param partitions The partitions for which to seeks the fetcher to 
the beginning.
-        */
-       Map<KafkaTopicPartition, Long> getOffsets(List<KafkaTopicPartition> 
partitions) throws Exception;
-
-       /**
-        * Closes the offset handler, releasing all resources.
-        * 
-        * @throws IOException Thrown, if the closing fails.
-        */
-       void close() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
new file mode 100644
index 0000000..d8d927d
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
+
+import java.util.List;
+import java.util.Properties;
+
+class PartitionInfoFetcher extends Thread {
+
+       private final List<String> topics;
+       private final Properties properties;
+
+       private volatile List<KafkaTopicPartitionLeader> result;
+       private volatile Throwable error;
+
+
+       PartitionInfoFetcher(List<String> topics, Properties properties) {
+               this.topics = topics;
+               this.properties = properties;
+       }
+
+       @Override
+       public void run() {
+               try {
+                       result = 
FlinkKafkaConsumer08.getPartitionsForTopic(topics, properties);
+               }
+               catch (Throwable t) {
+                       this.error = t;
+               }
+       }
+
+       public List<KafkaTopicPartitionLeader> getPartitions() throws Exception 
{
+               try {
+                       this.join();
+               }
+               catch (InterruptedException e) {
+                       throw new Exception("Partition fetching was cancelled 
before completion");
+               }
+
+               if (error != null) {
+                       throw new Exception("Failed to fetch partitions for 
topics " + topics.toString(), error);
+               }
+               if (result != null) {
+                       return result;
+               }
+               throw new Exception("Partition fetching failed");
+       }
+}
\ No newline at end of file

Reply via email to