http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
new file mode 100644
index 0000000..d015157
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import kafka.common.TopicAndPartition;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.kafka.common.Node;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.SerializedValue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A fetcher that fetches data from Kafka brokers via the Kafka 0.8 low-level 
consumer API.
+ * The fetcher also handles the explicit communication with ZooKeeper to fetch 
initial offsets
+ * and to write offsets to ZooKeeper.
+ *
+ * @param <T> The type of elements produced by the fetcher.
+ */
+public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
+       
+       static final KafkaTopicPartitionState<TopicAndPartition> MARKER = 
+                       new KafkaTopicPartitionState<>(new 
KafkaTopicPartition("n/a", -1), new TopicAndPartition("n/a", -1));
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(Kafka08Fetcher.class);
+
+       // 
------------------------------------------------------------------------
+
+       /** The schema to convert between Kafka's byte messages, and Flink's 
objects */
+       private final KeyedDeserializationSchema<T> deserializer;
+
+       /** The properties that configure the Kafka connection */
+       private final Properties kafkaConfig;
+
+       /** The subtask's runtime context */
+       private final RuntimeContext runtimeContext;
+
+       /** The queue of partitions that are currently not assigned to a broker 
connection */
+       private final 
ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> 
unassignedPartitionsQueue;
+
+       /** The behavior to use in case that an offset is not valid (any more) 
for a partition */
+       private final long invalidOffsetBehavior;
+
+       /** The interval in which to automatically commit (-1 if deactivated) */
+       private final long autoCommitInterval;
+
+       /** The handler that reads/writes offsets from/to ZooKeeper */
+       private volatile ZookeeperOffsetHandler zookeeperOffsetHandler;
+
+       /** Flag to track the main work loop as alive */
+       private volatile boolean running = true;
+
+
+       public Kafka08Fetcher(
+                       SourceContext<T> sourceContext,
+                       List<KafkaTopicPartition> assignedPartitions,
+                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
+                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
+                       StreamingRuntimeContext runtimeContext,
+                       KeyedDeserializationSchema<T> deserializer,
+                       Properties kafkaProperties,
+                       long invalidOffsetBehavior,
+                       long autoCommitInterval,
+                       boolean useMetrics) throws Exception
+       {
+               super(
+                               sourceContext,
+                               assignedPartitions,
+                               watermarksPeriodic,
+                               watermarksPunctuated,
+                               runtimeContext.getProcessingTimeService(),
+                               
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
+                               runtimeContext.getUserCodeClassLoader(),
+                               useMetrics);
+
+               this.deserializer = checkNotNull(deserializer);
+               this.kafkaConfig = checkNotNull(kafkaProperties);
+               this.runtimeContext = runtimeContext;
+               this.invalidOffsetBehavior = invalidOffsetBehavior;
+               this.autoCommitInterval = autoCommitInterval;
+               this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
+
+               // initially, all these partitions are not assigned to a 
specific broker connection
+               for (KafkaTopicPartitionState<TopicAndPartition> partition : 
subscribedPartitions()) {
+                       unassignedPartitionsQueue.add(partition);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Main Work Loop
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void runFetchLoop() throws Exception {
+               // the map from broker to the thread that is connected to that 
broker
+               final Map<Node, SimpleConsumerThread<T>> brokerToThread = new 
HashMap<>();
+
+               // this holds possible the exceptions from the concurrent 
broker connection threads
+               final ExceptionProxy errorHandler = new 
ExceptionProxy(Thread.currentThread());
+
+               // the offset handler handles the communication with ZooKeeper, 
to commit externally visible offsets
+               final ZookeeperOffsetHandler zookeeperOffsetHandler = new 
ZookeeperOffsetHandler(kafkaConfig);
+               this.zookeeperOffsetHandler = zookeeperOffsetHandler;
+
+               PeriodicOffsetCommitter periodicCommitter = null;
+               try {
+                       // read offsets from ZooKeeper for partitions that did 
not restore offsets
+                       {
+                               List<KafkaTopicPartition> 
partitionsWithNoOffset = new ArrayList<>();
+                               for 
(KafkaTopicPartitionState<TopicAndPartition> partition : 
subscribedPartitions()) {
+                                       if (!partition.isOffsetDefined()) {
+                                               
partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
+                                       }
+                               }
+
+                               Map<KafkaTopicPartition, Long> zkOffsets = 
zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset);
+                               for 
(KafkaTopicPartitionState<TopicAndPartition> partition : 
subscribedPartitions()) {
+                                       Long zkOffset = 
zkOffsets.get(partition.getKafkaTopicPartition());
+                                       if (zkOffset != null) {
+                                               // the offset in ZK represents 
the "next record to process", so we need to subtract it by 1
+                                               // to correctly represent our 
internally checkpointed offsets
+                                               partition.setOffset(zkOffset - 
1);
+                                       }
+                               }
+                       }
+
+                       // start the periodic offset committer thread, if 
necessary
+                       if (autoCommitInterval > 0) {
+                               LOG.info("Starting periodic offset committer, 
with commit interval of {}ms", autoCommitInterval);
+
+                               periodicCommitter = new 
PeriodicOffsetCommitter(zookeeperOffsetHandler, 
+                                               subscribedPartitions(), 
errorHandler, autoCommitInterval);
+                               periodicCommitter.setName("Periodic Kafka 
partition offset committer");
+                               periodicCommitter.setDaemon(true);
+                               periodicCommitter.start();
+                       }
+
+                       // register offset metrics
+                       if (useMetrics) {
+                               final MetricGroup kafkaMetricGroup = 
runtimeContext.getMetricGroup().addGroup("KafkaConsumer");
+                               addOffsetStateGauge(kafkaMetricGroup);
+                       }
+
+                       // Main loop polling elements from the 
unassignedPartitions queue to the threads
+                       while (running) {
+                               // re-throw any exception from the concurrent 
fetcher threads
+                               errorHandler.checkAndThrowException();
+
+                               // wait for max 5 seconds trying to get 
partitions to assign
+                               // if threads shut down, this poll returns 
earlier, because the threads inject the
+                               // special marker into the queue
+                               
List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToAssign = 
+                                               
unassignedPartitionsQueue.getBatchBlocking(5000);
+                               partitionsToAssign.remove(MARKER);
+
+                               if (!partitionsToAssign.isEmpty()) {
+                                       LOG.info("Assigning {} partitions to 
broker threads", partitionsToAssign.size());
+                                       Map<Node, 
List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeaders = 
+                                                       
findLeaderForPartitions(partitionsToAssign, kafkaConfig);
+
+                                       // assign the partitions to the leaders 
(maybe start the threads)
+                                       for (Map.Entry<Node, 
List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeader : 
+                                                       
partitionsWithLeaders.entrySet())
+                                       {
+                                               final Node leader = 
partitionsWithLeader.getKey();
+                                               final 
List<KafkaTopicPartitionState<TopicAndPartition>> partitions = 
partitionsWithLeader.getValue();
+                                               SimpleConsumerThread<T> 
brokerThread = brokerToThread.get(leader);
+
+                                               if (!running) {
+                                                       break;
+                                               }
+
+                                               if (brokerThread == null || 
!brokerThread.getNewPartitionsQueue().isOpen()) {
+                                                       // start new thread
+                                                       brokerThread = 
createAndStartSimpleConsumerThread(partitions, leader, errorHandler);
+                                                       
brokerToThread.put(leader, brokerThread);
+                                               }
+                                               else {
+                                                       // put elements into 
queue of thread
+                                                       
ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> 
newPartitionsQueue = 
+                                                                       
brokerThread.getNewPartitionsQueue();
+                                                       
+                                                       for 
(KafkaTopicPartitionState<TopicAndPartition> fp : partitions) {
+                                                               if 
(!newPartitionsQueue.addIfOpen(fp)) {
+                                                                       // we 
were unable to add the partition to the broker's queue
+                                                                       // the 
broker has closed in the meantime (the thread will shut down)
+                                                                       // 
create a new thread for connecting to this broker
+                                                                       
List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions = new 
ArrayList<>();
+                                                                       
seedPartitions.add(fp);
+                                                                       
brokerThread = createAndStartSimpleConsumerThread(seedPartitions, leader, 
errorHandler);
+                                                                       
brokerToThread.put(leader, brokerThread);
+                                                                       
newPartitionsQueue = brokerThread.getNewPartitionsQueue(); // update queue for 
the subsequent partitions
+                                                               }
+                                                       }
+                                               }
+                                       }
+                               }
+                               else {
+                                       // there were no partitions to assign. 
Check if any broker threads shut down.
+                                       // we get into this section of the 
code, if either the poll timed out, or the
+                                       // blocking poll was woken up by the 
marker element
+                                       Iterator<SimpleConsumerThread<T>> 
bttIterator = brokerToThread.values().iterator();
+                                       while (bttIterator.hasNext()) {
+                                               SimpleConsumerThread<T> thread 
= bttIterator.next();
+                                               if 
(!thread.getNewPartitionsQueue().isOpen()) {
+                                                       LOG.info("Removing 
stopped consumer thread {}", thread.getName());
+                                                       bttIterator.remove();
+                                               }
+                                       }
+                               }
+
+                               if (brokerToThread.size() == 0 && 
unassignedPartitionsQueue.isEmpty()) {
+                                       if (unassignedPartitionsQueue.close()) {
+                                               LOG.info("All consumer threads 
are finished, there are no more unassigned partitions. Stopping fetcher");
+                                               break;
+                                       }
+                                       // we end up here if somebody added 
something to the queue in the meantime --> continue to poll queue again
+                               }
+                       }
+               }
+               catch (InterruptedException e) {
+                       // this may be thrown because an exception on one of 
the concurrent fetcher threads
+                       // woke this thread up. make sure we throw the root 
exception instead in that case
+                       errorHandler.checkAndThrowException();
+
+                       // no other root exception, throw the interrupted 
exception
+                       throw e;
+               }
+               finally {
+                       this.running = false;
+                       this.zookeeperOffsetHandler = null;
+
+                       // if we run a periodic committer thread, shut that down
+                       if (periodicCommitter != null) {
+                               periodicCommitter.shutdown();
+                       }
+
+                       // clear the interruption flag
+                       // this allows the joining on consumer threads (on best 
effort) to happen in
+                       // case the initial interrupt already
+                       Thread.interrupted();
+
+                       // make sure that in any case (completion, abort, 
error), all spawned threads are stopped
+                       try {
+                               int runningThreads;
+                               do {
+                                       // check whether threads are alive and 
cancel them
+                                       runningThreads = 0;
+                                       Iterator<SimpleConsumerThread<T>> 
threads = brokerToThread.values().iterator();
+                                       while (threads.hasNext()) {
+                                               SimpleConsumerThread<?> t = 
threads.next();
+                                               if (t.isAlive()) {
+                                                       t.cancel();
+                                                       runningThreads++;
+                                               } else {
+                                                       threads.remove();
+                                               }
+                                       }
+
+                                       // wait for the threads to finish, 
before issuing a cancel call again
+                                       if (runningThreads > 0) {
+                                               for (SimpleConsumerThread<?> t 
: brokerToThread.values()) {
+                                                       t.join(500 / 
runningThreads + 1);
+                                               }
+                                       }
+                               }
+                               while (runningThreads > 0);
+                       }
+                       catch (InterruptedException ignored) {
+                               // waiting for the thread shutdown apparently 
got interrupted
+                               // restore interrupted state and continue
+                               Thread.currentThread().interrupt();
+                       }
+                       catch (Throwable t) {
+                               // we catch all here to preserve the original 
exception
+                               LOG.error("Exception while shutting down 
consumer threads", t);
+                       }
+
+                       try {
+                               zookeeperOffsetHandler.close();
+                       }
+                       catch (Throwable t) {
+                               // we catch all here to preserve the original 
exception
+                               LOG.error("Exception while shutting down 
ZookeeperOffsetHandler", t);
+                       }
+               }
+       }
+
+       @Override
+       public void cancel() {
+               // signal the main thread to exit
+               this.running = false;
+
+               // make sure the main thread wakes up soon
+               this.unassignedPartitionsQueue.addIfOpen(MARKER);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Kafka 0.8 specific class instantiation
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition 
partition) {
+               return new TopicAndPartition(partition.getTopic(), 
partition.getPartition());
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Offset handling
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> 
offsets) throws Exception {
+               ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
+               if (zkHandler != null) {
+                       // the ZK handler takes care of incrementing the 
offsets by 1 before committing
+                       zkHandler.prepareAndCommitOffsets(offsets);
+               }
+
+               // Set committed offsets in topic partition state
+               KafkaTopicPartitionState<TopicAndPartition>[] partitions = 
subscribedPartitions();
+               for (KafkaTopicPartitionState<TopicAndPartition> partition : 
partitions) {
+                       Long offset = 
offsets.get(partition.getKafkaTopicPartition());
+                       if (offset != null) {
+                               partition.setCommittedOffset(offset);
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       private SimpleConsumerThread<T> createAndStartSimpleConsumerThread(
+                       List<KafkaTopicPartitionState<TopicAndPartition>> 
seedPartitions,
+                       Node leader,
+                       ExceptionProxy errorHandler) throws IOException, 
ClassNotFoundException
+       {
+               // each thread needs its own copy of the deserializer, because 
the deserializer is
+               // not necessarily thread safe
+               final KeyedDeserializationSchema<T> clonedDeserializer =
+                               InstantiationUtil.clone(deserializer, 
runtimeContext.getUserCodeClassLoader());
+
+               // seed thread with list of fetch partitions (otherwise it 
would shut down immediately again
+               SimpleConsumerThread<T> brokerThread = new 
SimpleConsumerThread<>(
+                               this, errorHandler, kafkaConfig, leader, 
seedPartitions, unassignedPartitionsQueue, 
+                               clonedDeserializer, invalidOffsetBehavior);
+
+               brokerThread.setName(String.format("SimpleConsumer - %s - 
broker-%s (%s:%d)",
+                               runtimeContext.getTaskName(), leader.id(), 
leader.host(), leader.port()));
+               brokerThread.setDaemon(true);
+               brokerThread.start();
+
+               LOG.info("Starting thread {}", brokerThread.getName());
+               return brokerThread;
+       }
+
+       /**
+        * Returns a list of unique topics from for the given partitions
+        *
+        * @param partitions A the partitions
+        * @return A list of unique topics
+        */
+       private static List<String> 
getTopics(List<KafkaTopicPartitionState<TopicAndPartition>> partitions) {
+               HashSet<String> uniqueTopics = new HashSet<>();
+               for (KafkaTopicPartitionState<TopicAndPartition> fp: 
partitions) {
+                       uniqueTopics.add(fp.getTopic());
+               }
+               return new ArrayList<>(uniqueTopics);
+       }
+
+       /**
+        * Find leaders for the partitions
+        *
+        * From a high level, the method does the following:
+        *       - Get a list of FetchPartitions (usually only a few partitions)
+        *       - Get the list of topics from the FetchPartitions list and 
request the partitions for the topics. (Kafka doesn't support getting leaders 
for a set of partitions)
+        *       - Build a Map<Leader, List<FetchPartition>> where only the 
requested partitions are contained.
+        *
+        * @param partitionsToAssign fetch partitions list
+        * @return leader to partitions map
+        */
+       private static Map<Node, 
List<KafkaTopicPartitionState<TopicAndPartition>>> findLeaderForPartitions(
+                       List<KafkaTopicPartitionState<TopicAndPartition>> 
partitionsToAssign,
+                       Properties kafkaProperties) throws Exception
+       {
+               if (partitionsToAssign.isEmpty()) {
+                       throw new IllegalArgumentException("Leader request for 
empty partitions list");
+               }
+
+               LOG.info("Refreshing leader information for partitions {}", 
partitionsToAssign);
+               
+               // this request is based on the topic names
+               PartitionInfoFetcher infoFetcher = new 
PartitionInfoFetcher(getTopics(partitionsToAssign), kafkaProperties);
+               infoFetcher.start();
+
+               // NOTE: The kafka client apparently locks itself up sometimes
+               // when it is interrupted, so we run it only in a separate 
thread.
+               // since it sometimes refuses to shut down, we resort to the 
admittedly harsh
+               // means of killing the thread after a timeout.
+               KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 
60000);
+               watchDog.start();
+
+               // this list contains ALL partitions of the requested topics
+               List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = 
infoFetcher.getPartitions();
+
+               // copy list to track unassigned partitions
+               List<KafkaTopicPartitionState<TopicAndPartition>> 
unassignedPartitions = new ArrayList<>(partitionsToAssign);
+
+               // final mapping from leader -> list(fetchPartition)
+               Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> 
leaderToPartitions = new HashMap<>();
+
+               for(KafkaTopicPartitionLeader partitionLeader: 
topicPartitionWithLeaderList) {
+                       if (unassignedPartitions.size() == 0) {
+                               // we are done: all partitions are assigned
+                               break;
+                       }
+
+                       Iterator<KafkaTopicPartitionState<TopicAndPartition>> 
unassignedPartitionsIterator = unassignedPartitions.iterator();
+                       while (unassignedPartitionsIterator.hasNext()) {
+                               KafkaTopicPartitionState<TopicAndPartition> 
unassignedPartition = unassignedPartitionsIterator.next();
+
+                               if 
(unassignedPartition.getKafkaTopicPartition().equals(partitionLeader.getTopicPartition()))
 {
+                                       // we found the leader for one of the 
fetch partitions
+                                       Node leader = 
partitionLeader.getLeader();
+
+                                       
List<KafkaTopicPartitionState<TopicAndPartition>> partitionsOfLeader = 
leaderToPartitions.get(leader);
+                                       if (partitionsOfLeader == null) {
+                                               partitionsOfLeader = new 
ArrayList<>();
+                                               leaderToPartitions.put(leader, 
partitionsOfLeader);
+                                       }
+                                       
partitionsOfLeader.add(unassignedPartition);
+                                       unassignedPartitionsIterator.remove(); 
// partition has been assigned
+                                       break;
+                               }
+                       }
+               }
+
+               if (unassignedPartitions.size() > 0) {
+                       throw new RuntimeException("Unable to find a leader for 
partitions: " + unassignedPartitions);
+               }
+
+               LOG.debug("Partitions with assigned leaders {}", 
leaderToPartitions);
+
+               return leaderToPartitions;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
new file mode 100644
index 0000000..4d61e53
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+/**
+ * A watch dog thread that forcibly kills another thread, if that thread does 
not
+ * finish in time.
+ * 
+ * <p>This uses the discouraged {@link Thread#stop()} method. While this is not
+ * advisable, this watch dog is only for extreme cases of thread that simply
+ * to not terminate otherwise.
+ */
+class KillerWatchDog extends Thread {
+
+       private final Thread toKill;
+       private final long timeout;
+
+       KillerWatchDog(Thread toKill, long timeout) {
+               super("KillerWatchDog");
+               setDaemon(true);
+
+               this.toKill = toKill;
+               this.timeout = timeout;
+       }
+
+       @SuppressWarnings("deprecation")
+       @Override
+       public void run() {
+               final long deadline = System.currentTimeMillis() + timeout;
+               long now;
+
+               while (toKill.isAlive() && (now = System.currentTimeMillis()) < 
deadline) {
+                       try {
+                               toKill.join(deadline - now);
+                       }
+                       catch (InterruptedException e) {
+                               // ignore here, our job is important!
+                       }
+               }
+
+               // this is harsh, but this watchdog is a last resort
+               if (toKill.isAlive()) {
+                       toKill.stop();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
new file mode 100644
index 0000000..d8d927d
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
+
+import java.util.List;
+import java.util.Properties;
+
+class PartitionInfoFetcher extends Thread {
+
+       private final List<String> topics;
+       private final Properties properties;
+
+       private volatile List<KafkaTopicPartitionLeader> result;
+       private volatile Throwable error;
+
+
+       PartitionInfoFetcher(List<String> topics, Properties properties) {
+               this.topics = topics;
+               this.properties = properties;
+       }
+
+       @Override
+       public void run() {
+               try {
+                       result = 
FlinkKafkaConsumer08.getPartitionsForTopic(topics, properties);
+               }
+               catch (Throwable t) {
+                       this.error = t;
+               }
+       }
+
+       public List<KafkaTopicPartitionLeader> getPartitions() throws Exception 
{
+               try {
+                       this.join();
+               }
+               catch (InterruptedException e) {
+                       throw new Exception("Partition fetching was cancelled 
before completion");
+               }
+
+               if (error != null) {
+                       throw new Exception("Failed to fetch partitions for 
topics " + topics.toString(), error);
+               }
+               if (result != null) {
+                       return result;
+               }
+               throw new Exception("Partition fetching failed");
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
new file mode 100644
index 0000000..27d90f2
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import java.util.HashMap;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A thread that periodically writes the current Kafka partition offsets to 
Zookeeper.
+ */
+public class PeriodicOffsetCommitter extends Thread {
+
+       /** The ZooKeeper handler */
+       private final ZookeeperOffsetHandler offsetHandler;
+       
+       private final KafkaTopicPartitionState<?>[] partitionStates;
+       
+       /** The proxy to forward exceptions to the main thread */
+       private final ExceptionProxy errorHandler;
+       
+       /** Interval in which to commit, in milliseconds */
+       private final long commitInterval;
+       
+       /** Flag to mark the periodic committer as running */
+       private volatile boolean running = true;
+
+       PeriodicOffsetCommitter(ZookeeperOffsetHandler offsetHandler,
+                       KafkaTopicPartitionState<?>[] partitionStates,
+                       ExceptionProxy errorHandler,
+                       long commitInterval)
+       {
+               this.offsetHandler = checkNotNull(offsetHandler);
+               this.partitionStates = checkNotNull(partitionStates);
+               this.errorHandler = checkNotNull(errorHandler);
+               this.commitInterval = commitInterval;
+               
+               checkArgument(commitInterval > 0);
+       }
+
+       @Override
+       public void run() {
+               try {
+                       while (running) {
+                               Thread.sleep(commitInterval);
+
+                               // create copy a deep copy of the current 
offsets
+                               HashMap<KafkaTopicPartition, Long> 
offsetsToCommit = new HashMap<>(partitionStates.length);
+                               for (KafkaTopicPartitionState<?> partitionState 
: partitionStates) {
+                                       
offsetsToCommit.put(partitionState.getKafkaTopicPartition(), 
partitionState.getOffset());
+                               }
+                               
+                               
offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
+                       }
+               }
+               catch (Throwable t) {
+                       if (running) {
+                               errorHandler.reportError(
+                                               new Exception("The periodic 
offset committer encountered an error: " + t.getMessage(), t));
+                       }
+               }
+       }
+
+       public void shutdown() {
+               this.running = false;
+               this.interrupt();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
new file mode 100644
index 0000000..35e491a
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import kafka.api.FetchRequestBuilder;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.ErrorMapping;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.MessageAndOffset;
+
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.kafka.common.Node;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.PropertiesUtil.getInt;
+
+/**
+ * This class implements a thread with a connection to a single Kafka broker. 
The thread
+ * pulls records for a set of topic partitions for which the connected broker 
is currently
+ * the leader. The thread deserializes these records and emits them. 
+ * 
+ * @param <T> The type of elements that this consumer thread creates from 
Kafka's byte messages
+ *            and emits into the Flink DataStream.
+ */
+class SimpleConsumerThread<T> extends Thread {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(SimpleConsumerThread.class);
+
+       private static final KafkaTopicPartitionState<TopicAndPartition> MARKER 
= Kafka08Fetcher.MARKER;
+       
+       // 
------------------------------------------------------------------------
+
+       private final Kafka08Fetcher<T> owner;
+       
+       private final KeyedDeserializationSchema<T> deserializer;
+
+       private final List<KafkaTopicPartitionState<TopicAndPartition>> 
partitions;
+
+       private final Node broker;
+
+       /** Queue containing new fetch partitions for the consumer thread */
+       private final 
ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> 
newPartitionsQueue;
+       
+       private final 
ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> 
unassignedPartitions;
+       
+       private final ExceptionProxy errorHandler;
+       
+       private final long invalidOffsetBehavior;
+       
+       private volatile boolean running = true;
+       
+
+       // ----------------- Simple Consumer ----------------------
+       private volatile SimpleConsumer consumer;
+
+       private final int soTimeout;
+       private final int minBytes;
+       private final int maxWait;
+       private final int fetchSize;
+       private final int bufferSize;
+       private final int reconnectLimit;
+
+
+       // exceptions are thrown locally
+       public SimpleConsumerThread(
+                       Kafka08Fetcher<T> owner,
+                       ExceptionProxy errorHandler,
+                       Properties config,
+                       Node broker,
+                       List<KafkaTopicPartitionState<TopicAndPartition>> 
seedPartitions,
+                       
ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> 
unassignedPartitions,
+                       KeyedDeserializationSchema<T> deserializer,
+                       long invalidOffsetBehavior)
+       {
+               this.owner = owner;
+               this.errorHandler = errorHandler;
+               this.broker = broker;
+               this.partitions = seedPartitions;
+               this.deserializer = requireNonNull(deserializer);
+               this.unassignedPartitions = 
requireNonNull(unassignedPartitions);
+               this.newPartitionsQueue = new ClosableBlockingQueue<>();
+               this.invalidOffsetBehavior = invalidOffsetBehavior;
+               
+               // these are the actual configuration values of Kafka + their 
original default values.
+               this.soTimeout = getInt(config, "socket.timeout.ms", 30000);
+               this.minBytes = getInt(config, "fetch.min.bytes", 1);
+               this.maxWait = getInt(config, "fetch.wait.max.ms", 100);
+               this.fetchSize = getInt(config, "fetch.message.max.bytes", 
1048576);
+               this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 
65536);
+               this.reconnectLimit = getInt(config, 
"flink.simple-consumer-reconnectLimit", 3);
+       }
+
+       public 
ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> 
getNewPartitionsQueue() {
+               return newPartitionsQueue;
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  main work loop
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public void run() {
+               LOG.info("Starting to fetch from {}", this.partitions);
+
+               // set up the config values
+               final String clientId = "flink-kafka-consumer-legacy-" + 
broker.id();
+
+               try {
+                       // create the Kafka consumer that we actually use for 
fetching
+                       consumer = new SimpleConsumer(broker.host(), 
broker.port(), soTimeout, bufferSize, clientId);
+                       
+                       // make sure that all partitions have some offsets to 
start with
+                       // those partitions that do not have an offset from a 
checkpoint need to get
+                       // their start offset from ZooKeeper
+                       getMissingOffsetsFromKafka(partitions);
+
+                       // Now, the actual work starts :-)
+                       int offsetOutOfRangeCount = 0;
+                       int reconnects = 0;
+                       while (running) {
+
+                               // ----------------------------------- 
partitions list maintenance ----------------------------
+
+                               // check queue for new partitions to read from:
+                               
List<KafkaTopicPartitionState<TopicAndPartition>> newPartitions = 
newPartitionsQueue.pollBatch();
+                               if (newPartitions != null) {
+                                       // found some new partitions for this 
thread's broker
+                                       
+                                       // check if the new partitions need an 
offset lookup
+                                       
getMissingOffsetsFromKafka(newPartitions);
+                                       
+                                       // add the new partitions (and check 
they are not already in there)
+                                       for 
(KafkaTopicPartitionState<TopicAndPartition> newPartition: newPartitions) {
+                                               if 
(partitions.contains(newPartition)) {
+                                                       throw new 
IllegalStateException("Adding partition " + newPartition + 
+                                                                       " to 
subscribed partitions even though it is already subscribed");
+                                               }
+                                               partitions.add(newPartition);
+                                       }
+                                       
+                                       LOG.info("Adding {} new partitions to 
consumer thread {}", newPartitions.size(), getName());
+                                       LOG.debug("Partitions list: {}", 
newPartitions);
+                               }
+
+                               if (partitions.size() == 0) {
+                                       if (newPartitionsQueue.close()) {
+                                               // close succeeded. Closing 
thread
+                                               running = false;
+                                               
+                                               LOG.info("Consumer thread {} 
does not have any partitions assigned anymore. Stopping thread.", 
+                                                               getName());
+
+                                               // add the wake-up marker into 
the queue to make the main thread
+                                               // immediately wake up and 
termination faster
+                                               
unassignedPartitions.add(MARKER);
+
+                                               break;
+                                       } else {
+                                               // close failed: fetcher main 
thread concurrently added new partitions into the queue.
+                                               // go to top of loop again and 
get the new partitions
+                                               continue; 
+                                       }
+                               }
+
+                               // ----------------------------------- request 
/ response with kafka ----------------------------
+
+                               FetchRequestBuilder frb = new 
FetchRequestBuilder();
+                               frb.clientId(clientId);
+                               frb.maxWait(maxWait);
+                               frb.minBytes(minBytes);
+
+                               for (KafkaTopicPartitionState<?> partition : 
partitions) {
+                                       frb.addFetch(
+                                                       
partition.getKafkaTopicPartition().getTopic(),
+                                                       
partition.getKafkaTopicPartition().getPartition(),
+                                                       partition.getOffset() + 
1, // request the next record
+                                                       fetchSize);
+                               }
+                               
+                               kafka.api.FetchRequest fetchRequest = 
frb.build();
+                               LOG.debug("Issuing fetch request {}", 
fetchRequest);
+
+                               FetchResponse fetchResponse;
+                               try {
+                                       fetchResponse = 
consumer.fetch(fetchRequest);
+                               }
+                               catch (Throwable cce) {
+                                       //noinspection ConstantConditions
+                                       if (cce instanceof 
ClosedChannelException) {
+                                               LOG.warn("Fetch failed because 
of ClosedChannelException.");
+                                               LOG.debug("Full exception", 
cce);
+                                               
+                                               // we don't know if the broker 
is overloaded or unavailable.
+                                               // retry a few times, then 
return ALL partitions for new leader lookup
+                                               if (++reconnects >= 
reconnectLimit) {
+                                                       LOG.warn("Unable to 
reach broker after {} retries. Returning all current partitions", 
reconnectLimit);
+                                                       for 
(KafkaTopicPartitionState<TopicAndPartition> fp: this.partitions) {
+                                                               
unassignedPartitions.add(fp);
+                                                       }
+                                                       this.partitions.clear();
+                                                       continue; // jump to 
top of loop: will close thread or subscribe to new partitions
+                                               }
+                                               try {
+                                                       consumer.close();
+                                               } catch (Throwable t) {
+                                                       LOG.warn("Error while 
closing consumer connection", t);
+                                               }
+                                               // delay & retry
+                                               Thread.sleep(100);
+                                               consumer = new 
SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);
+                                               continue; // retry
+                                       } else {
+                                               throw cce;
+                                       }
+                               }
+                               reconnects = 0;
+
+                               // ---------------------------------------- 
error handling ----------------------------
+
+                               if (fetchResponse == null) {
+                                       throw new IOException("Fetch from Kafka 
failed (request returned null)");
+                               }
+                               
+                               if (fetchResponse.hasError()) {
+                                       String exception = "";
+                                       
List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = 
new ArrayList<>();
+                                       
+                                       // iterate over partitions to get 
individual error codes
+                                       
Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = 
partitions.iterator();
+                                       boolean partitionsRemoved = false;
+                                       
+                                       while (partitionsIterator.hasNext()) {
+                                               final 
KafkaTopicPartitionState<TopicAndPartition> fp = partitionsIterator.next();
+                                               short code = 
fetchResponse.errorCode(fp.getTopic(), fp.getPartition());
+
+                                               if (code == 
ErrorMapping.OffsetOutOfRangeCode()) {
+                                                       // we were asked to 
read from an out-of-range-offset (maybe set wrong in Zookeeper)
+                                                       // Kafka's high level 
consumer is resetting the offset according to 'auto.offset.reset'
+                                                       
partitionsToGetOffsetsFor.add(fp);
+                                               }
+                                               else if (code == 
ErrorMapping.NotLeaderForPartitionCode() ||
+                                                               code == 
ErrorMapping.LeaderNotAvailableCode() ||
+                                                               code == 
ErrorMapping.BrokerNotAvailableCode() ||
+                                                               code == 
ErrorMapping.UnknownCode())
+                                               {
+                                                       // the broker we are 
connected to is not the leader for the partition.
+                                                       LOG.warn("{} is not the 
leader of {}. Reassigning leader for partition", broker, fp);
+                                                       LOG.debug("Error code = 
{}", code);
+
+                                                       
unassignedPartitions.add(fp);
+
+                                                       
partitionsIterator.remove(); // unsubscribe the partition ourselves
+                                                       partitionsRemoved = 
true;
+                                               }
+                                               else if (code != 
ErrorMapping.NoError()) {
+                                                       exception += 
"\nException for " + fp.getTopic() +":"+ fp.getPartition() + ": " +
+                                                                       
StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
+                                               }
+                                       }
+                                       if (partitionsToGetOffsetsFor.size() > 
0) {
+                                               // safeguard against an 
infinite loop.
+                                               if (offsetOutOfRangeCount++ > 
3) {
+                                                       throw new 
RuntimeException("Found invalid offsets more than three times in partitions "
+                                                                       + 
partitionsToGetOffsetsFor + " Exceptions: " + exception);
+                                               }
+                                               // get valid offsets for these 
partitions and try again.
+                                               LOG.warn("The following 
partitions had an invalid offset: {}", partitionsToGetOffsetsFor);
+                                               
getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, 
invalidOffsetBehavior);
+                                               
+                                               LOG.warn("The new partition 
offsets are {}", partitionsToGetOffsetsFor);
+                                               continue; // jump back to 
create a new fetch request. The offset has not been touched.
+                                       }
+                                       else if (partitionsRemoved) {
+                                               continue; // create new fetch 
request
+                                       }
+                                       else {
+                                               // partitions failed on an error
+                                               throw new IOException("Error 
while fetching from broker '" + broker +"': " + exception);
+                                       }
+                               } else {
+                                       // successful fetch, reset 
offsetOutOfRangeCount.
+                                       offsetOutOfRangeCount = 0;
+                               }
+
+                               // ----------------------------------- process 
fetch response ----------------------------
+
+                               int messagesInFetch = 0;
+                               int deletedMessages = 0;
+                               
Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = 
partitions.iterator();
+                               
+                               partitionsLoop:
+                               while (partitionsIterator.hasNext()) {
+                                       final 
KafkaTopicPartitionState<TopicAndPartition> currentPartition = 
partitionsIterator.next();
+                                       
+                                       final ByteBufferMessageSet messageSet = 
fetchResponse.messageSet(
+                                                       
currentPartition.getTopic(), currentPartition.getPartition());
+
+                                       for (MessageAndOffset msg : messageSet) 
{
+                                               if (running) {
+                                                       messagesInFetch++;
+                                                       final ByteBuffer 
payload = msg.message().payload();
+                                                       final long offset = 
msg.offset();
+                                                       
+                                                       if (offset <= 
currentPartition.getOffset()) {
+                                                               // we have seen 
this message already
+                                                               
LOG.info("Skipping message with offset " + msg.offset()
+                                                                               
+ " because we have seen messages until (including) "
+                                                                               
+ currentPartition.getOffset()
+                                                                               
+ " from topic/partition " + currentPartition.getTopic() + '/'
+                                                                               
+ currentPartition.getPartition() + " already");
+                                                               continue;
+                                                       }
+
+                                                       // If the message value 
is null, this represents a delete command for the message key.
+                                                       // Log this and pass it 
on to the client who might want to also receive delete messages.
+                                                       byte[] valueBytes;
+                                                       if (payload == null) {
+                                                               
deletedMessages++;
+                                                               valueBytes = 
null;
+                                                       } else {
+                                                               valueBytes = 
new byte[payload.remaining()];
+                                                               
payload.get(valueBytes);
+                                                       }
+
+                                                       // put key into byte 
array
+                                                       byte[] keyBytes = null;
+                                                       int keySize = 
msg.message().keySize();
+
+                                                       if (keySize >= 0) { // 
message().hasKey() is doing the same. We save one int deserialization
+                                                               ByteBuffer 
keyPayload = msg.message().key();
+                                                               keyBytes = new 
byte[keySize];
+                                                               
keyPayload.get(keyBytes);
+                                                       }
+
+                                                       final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
+                                                                       
currentPartition.getTopic(), currentPartition.getPartition(), offset);
+                                                       
+                                                       if 
(deserializer.isEndOfStream(value)) {
+                                                               // remove 
partition from subscribed partitions.
+                                                               
partitionsIterator.remove();
+                                                               continue 
partitionsLoop;
+                                                       }
+                                                       
+                                                       owner.emitRecord(value, 
currentPartition, offset);
+                                               }
+                                               else {
+                                                       // no longer running
+                                                       return;
+                                               }
+                                       }
+                               }
+                               LOG.debug("This fetch contained {} messages ({} 
deleted messages)", messagesInFetch, deletedMessages);
+                       } // end of fetch loop
+
+                       if (!newPartitionsQueue.close()) {
+                               throw new Exception("Bug: Cleanly leaving 
fetcher thread without having a closed queue.");
+                       }
+               }
+               catch (Throwable t) {
+                       // report to the fetcher's error handler
+                       errorHandler.reportError(t);
+               }
+               finally {
+                       if (consumer != null) {
+                               // closing the consumer should not fail the 
program
+                               try {
+                                       consumer.close();
+                               }
+                               catch (Throwable t) {
+                                       LOG.error("Error while closing the 
Kafka simple consumer", t);
+                               }
+                       }
+               }
+       }
+
+       private void getMissingOffsetsFromKafka(
+                       List<KafkaTopicPartitionState<TopicAndPartition>> 
partitions) throws IOException
+       {
+               // collect which partitions we should fetch offsets for
+               List<KafkaTopicPartitionState<TopicAndPartition>> 
partitionsToGetOffsetsFor = new ArrayList<>();
+               for (KafkaTopicPartitionState<TopicAndPartition> part : 
partitions) {
+                       if (!part.isOffsetDefined()) {
+                               // retrieve the offset from the consumer
+                               partitionsToGetOffsetsFor.add(part);
+                       }
+               }
+               
+               if (partitionsToGetOffsetsFor.size() > 0) {
+                       getLastOffsetFromKafka(consumer, 
partitionsToGetOffsetsFor, invalidOffsetBehavior);
+                       
+                       LOG.info("No checkpoint/savepoint offsets found for 
some partitions. " +
+                                       "Fetched the following start offsets 
{}", partitionsToGetOffsetsFor);
+               }
+       }
+
+       /**
+        * Cancels this fetch thread. The thread will release all resources and 
terminate.
+        */
+       public void cancel() {
+               this.running = false;
+
+               // interrupt whatever the consumer is doing
+               if (consumer != null) {
+                       consumer.close();
+               }
+
+               this.interrupt();
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Kafka Request Utils
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Request latest offsets for a set of partitions, via a Kafka consumer.
+        *
+        * <p>This method retries three times if the response has an error.
+        *
+        * @param consumer The consumer connected to lead broker
+        * @param partitions The list of partitions we need offsets for
+        * @param whichTime The type of time we are requesting. -1 and -2 are 
special constants (See OffsetRequest)
+        */
+       private static void getLastOffsetFromKafka(
+                       SimpleConsumer consumer,
+                       List<KafkaTopicPartitionState<TopicAndPartition>> 
partitions,
+                       long whichTime) throws IOException
+       {
+               Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo 
= new HashMap<>();
+               for (KafkaTopicPartitionState<TopicAndPartition> part : 
partitions) {
+                       requestInfo.put(part.getKafkaPartitionHandle(), new 
PartitionOffsetRequestInfo(whichTime, 1));
+               }
+
+               int retries = 0;
+               OffsetResponse response;
+               while (true) {
+                       kafka.javaapi.OffsetRequest request = new 
kafka.javaapi.OffsetRequest(
+                                       requestInfo, 
kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
+                       response = consumer.getOffsetsBefore(request);
+
+                       if (response.hasError()) {
+                               StringBuilder exception = new StringBuilder();
+                               for 
(KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
+                                       short code;
+                                       if ((code = 
response.errorCode(part.getTopic(), part.getPartition())) != 
ErrorMapping.NoError()) {
+                                               exception.append("\nException 
for topic=").append(part.getTopic())
+                                                               .append(" 
partition=").append(part.getPartition()).append(": ")
+                                                               
.append(StringUtils.stringifyException(ErrorMapping.exceptionFor(code)));
+                                       }
+                               }
+                               if (++retries >= 3) {
+                                       throw new IOException("Unable to get 
last offset for partitions " + partitions + ": "
+                                                       + exception.toString());
+                               } else {
+                                       LOG.warn("Unable to get last offset for 
partitions: Exception(s): {}", exception);
+                               }
+                       } else {
+                               break; // leave retry loop
+                       }
+               }
+
+               for (KafkaTopicPartitionState<TopicAndPartition> part: 
partitions) {
+                       final long offset = response.offsets(part.getTopic(), 
part.getPartition())[0];
+                       
+                       // the offset returned is that of the next record to 
fetch. because our state reflects the latest
+                       // successfully emitted record, we subtract one
+                       part.setOffset(offset - 1);
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
new file mode 100644
index 0000000..8f2ef09
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import kafka.utils.ZKGroupTopicDirs;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Handler for committing Kafka offsets to Zookeeper and to retrieve them 
again.
+ */
+public class ZookeeperOffsetHandler {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
+
+       private final String groupId;
+
+       private final CuratorFramework curatorClient;
+
+
+       public ZookeeperOffsetHandler(Properties props) {
+               this.groupId = 
props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
+               if (this.groupId == null) {
+                       throw new IllegalArgumentException("Required property '"
+                                       + ConsumerConfig.GROUP_ID_CONFIG + "' 
has not been set");
+               }
+               
+               String zkConnect = props.getProperty("zookeeper.connect");
+               if (zkConnect == null) {
+                       throw new IllegalArgumentException("Required property 
'zookeeper.connect' has not been set");
+               }
+
+               // we use Curator's default timeouts
+               int sessionTimeoutMs =  
Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "60000"));
+               int connectionTimeoutMs = 
Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "15000"));
+               
+               // undocumented config options allowing users to configure the 
retry policy. (they are "flink." prefixed as they are no official kafka configs)
+               int backoffBaseSleepTime = 
Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100"));
+               int backoffMaxRetries =  
Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10"));
+               
+               RetryPolicy retryPolicy = new 
ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries);
+               curatorClient = CuratorFrameworkFactory.newClient(zkConnect, 
sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
+               curatorClient.start();
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Offset access and manipulation
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Commits offsets for Kafka partitions to ZooKeeper. The given offsets 
to this method should be the offsets of
+        * the last processed records; this method will take care of 
incrementing the offsets by 1 before committing them so
+        * that the committed offsets to Zookeeper represent the next record to 
process.
+        * 
+        * @param internalOffsets The internal offsets (representing last 
processed records) for the partitions to commit.
+        * @throws Exception The method forwards exceptions.
+        */
+       public void prepareAndCommitOffsets(Map<KafkaTopicPartition, Long> 
internalOffsets) throws Exception {
+               for (Map.Entry<KafkaTopicPartition, Long> entry : 
internalOffsets.entrySet()) {
+                       KafkaTopicPartition tp = entry.getKey();
+
+                       Long lastProcessedOffset = entry.getValue();
+                       if (lastProcessedOffset != null && lastProcessedOffset 
>= 0) {
+                               setOffsetInZooKeeper(curatorClient, groupId, 
tp.getTopic(), tp.getPartition(), lastProcessedOffset + 1);
+                       }
+               }
+       }
+
+       /**
+        * @param partitions The partitions to read offsets for.
+        * @return The mapping from partition to offset.
+        * @throws Exception This method forwards exceptions.
+        */
+       public Map<KafkaTopicPartition, Long> 
getCommittedOffsets(List<KafkaTopicPartition> partitions) throws Exception {
+               Map<KafkaTopicPartition, Long> ret = new 
HashMap<>(partitions.size());
+               for (KafkaTopicPartition tp : partitions) {
+                       Long offset = getOffsetFromZooKeeper(curatorClient, 
groupId, tp.getTopic(), tp.getPartition());
+
+                       if (offset != null) {
+                               LOG.info("Offset for TopicPartition {}:{} was 
set to {} in ZooKeeper. Seeking fetcher to that position.",
+                                               tp.getTopic(), 
tp.getPartition(), offset);
+                               ret.put(tp, offset);
+                       }
+               }
+               return ret;
+       }
+
+       /**
+        * Closes the offset handler.
+        * 
+        * @throws IOException Thrown, if the handler cannot be closed properly.
+        */
+       public void close() throws IOException {
+               curatorClient.close();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Communication with Zookeeper
+       // 
------------------------------------------------------------------------
+       
+       public static void setOffsetInZooKeeper(CuratorFramework curatorClient, 
String groupId, String topic, int partition, long offset) throws Exception {
+               ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, 
topic);
+               String path = topicDirs.consumerOffsetDir() + "/" + partition;
+               
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
+               byte[] data = Long.toString(offset).getBytes();
+               curatorClient.setData().forPath(path, data);
+       }
+
+       public static Long getOffsetFromZooKeeper(CuratorFramework 
curatorClient, String groupId, String topic, int partition) throws Exception {
+               ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, 
topic);
+               String path = topicDirs.consumerOffsetDir() + "/" + partition;
+               
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
+               
+               byte[] data = curatorClient.getData().forPath(path);
+               
+               if (data == null) {
+                       return null;
+               } else {
+                       String asString = new String(data);
+                       if (asString.length() == 0) {
+                               return null;
+                       } else {
+                               try {
+                                       return Long.valueOf(asString);
+                               }
+                               catch (NumberFormatException e) {
+                                       LOG.error(
+                                                       "The offset in 
ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}",
+                                               groupId, topic, partition, 
asString);
+                                       return null;
+                               }
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
new file mode 100644
index 0000000..fabb0fe
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.runtime.client.JobCancellationException;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import 
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
+import 
org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class Kafka08ITCase extends KafkaConsumerTestBase {
+
+       // 
------------------------------------------------------------------------
+       //  Suite of Tests
+       // 
------------------------------------------------------------------------
+
+       @Test(timeout = 60000)
+       public void testFailOnNoBroker() throws Exception {
+               runFailOnNoBrokerTest();
+       }
+
+
+       @Test(timeout = 60000)
+       public void testConcurrentProducerConsumerTopology() throws Exception {
+               runSimpleConcurrentProducerConsumerTopology();
+       }
+
+//     @Test(timeout = 60000)
+//     public void testPunctuatedExplicitWMConsumer() throws Exception {
+//             runExplicitPunctuatedWMgeneratingConsumerTest(false);
+//     }
+
+//     @Test(timeout = 60000)
+//     public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws 
Exception {
+//             runExplicitPunctuatedWMgeneratingConsumerTest(true);
+//     }
+
+       @Test(timeout = 60000)
+       public void testKeyValueSupport() throws Exception {
+               runKeyValueTest();
+       }
+
+       // --- canceling / failures ---
+
+       @Test(timeout = 60000)
+       public void testCancelingEmptyTopic() throws Exception {
+               runCancelingOnEmptyInputTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testCancelingFullTopic() throws Exception {
+               runCancelingOnFullInputTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testFailOnDeploy() throws Exception {
+               runFailOnDeployTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testInvalidOffset() throws Exception {
+               
+               final int parallelism = 1;
+               
+               // write 20 messages into topic:
+               final String topic = writeSequence("invalidOffsetTopic", 20, 
parallelism, 1);
+
+               // set invalid offset:
+               CuratorFramework curatorClient = 
((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
+               ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, 
standardProps.getProperty("group.id"), topic, 0, 1234);
+               curatorClient.close();
+
+               // read from topic
+               final int valuesCount = 20;
+               final int startFrom = 0;
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               env.getConfig().disableSysoutLogging();
+               
+               readSequence(env, standardProps, parallelism, topic, 
valuesCount, startFrom);
+
+               deleteTestTopic(topic);
+       }
+
+       // --- source to partition mappings and exactly once ---
+
+       @Test(timeout = 60000)
+       public void testOneToOneSources() throws Exception {
+               runOneToOneExactlyOnceTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testOneSourceMultiplePartitions() throws Exception {
+               runOneSourceMultiplePartitionsExactlyOnceTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testMultipleSourcesOnePartition() throws Exception {
+               runMultipleSourcesOnePartitionExactlyOnceTest();
+       }
+
+       // --- broker failure ---
+
+       @Test(timeout = 60000)
+       public void testBrokerFailure() throws Exception {
+               runBrokerFailureTest();
+       }
+
+       // --- offset committing ---
+
+       @Test(timeout = 60000)
+       public void testCommitOffsetsToZookeeper() throws Exception {
+               runCommitOffsetsToKafka();
+       }
+
+       @Test(timeout = 60000)
+       public void testStartFromZookeeperCommitOffsets() throws Exception {
+               runStartFromKafkaCommitOffsets();
+       }
+
+       @Test(timeout = 60000)
+       public void testAutoOffsetRetrievalAndCommitToZookeeper() throws 
Exception {
+               runAutoOffsetRetrievalAndCommitToKafka();
+       }
+
+       @Test
+       public void runOffsetManipulationInZooKeeperTest() {
+               try {
+                       final String topicName = 
"ZookeeperOffsetHandlerTest-Topic";
+                       final String groupId = 
"ZookeeperOffsetHandlerTest-Group";
+
+                       final Long offset = (long) (Math.random() * 
Long.MAX_VALUE);
+
+                       CuratorFramework curatorFramework = 
((KafkaTestEnvironmentImpl)kafkaServer ).createCuratorClient();
+                       kafkaServer.createTestTopic(topicName, 3, 2);
+
+                       
ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorFramework, groupId, 
topicName, 0, offset);
+
+                       Long fetchedOffset = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, groupId, 
topicName, 0);
+
+                       curatorFramework.close();
+
+                       assertEquals(offset, fetchedOffset);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test(timeout = 60000)
+       public void testOffsetAutocommitTest() throws Exception {
+               final int parallelism = 3;
+
+               // write a sequence from 0 to 99 to each of the 3 partitions.
+               final String topicName = writeSequence("testOffsetAutocommit", 
100, parallelism, 1);
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               // NOTE: We are not enabling the checkpointing!
+               env.getConfig().disableSysoutLogging();
+               
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+               env.setParallelism(parallelism);
+
+               // the readSequence operation sleeps for 20 ms between each 
record.
+               // setting a delay of 25*20 = 500 for the commit interval makes
+               // sure that we commit roughly 3-4 times while reading, however
+               // at least once.
+               Properties readProps = new Properties();
+               readProps.putAll(standardProps);
+               readProps.setProperty("auto.commit.interval.ms", "500");
+
+               // read so that the offset can be committed to ZK
+               readSequence(env, readProps, parallelism, topicName, 100, 0);
+
+               // get the offset
+               CuratorFramework curatorFramework = 
((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
+
+               Long o1 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 0);
+               Long o2 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 1);
+               Long o3 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 2);
+               curatorFramework.close();
+               LOG.info("Got final offsets from zookeeper o1={}, o2={}, 
o3={}", o1, o2, o3);
+
+               // ensure that the offset has been committed
+               boolean atLeastOneOffsetSet = (o1 != null && o1 > 0 && o1 <= 
100) ||
+                       (o2 != null && o2 > 0 && o2 <= 100) ||
+                       (o3 != null && o3 > 0 && o3 <= 100);
+               assertTrue("Expecting at least one offset to be set o1="+o1+" 
o2="+o2+" o3="+o3, atLeastOneOffsetSet);
+
+               deleteTestTopic(topicName);
+       }
+
+       // --- special executions ---
+
+       @Test(timeout = 60000)
+       public void testBigRecordJob() throws Exception {
+               runBigRecordTestTopology();
+       }
+
+       @Test(timeout = 60000)
+       public void testMultipleTopics() throws Exception {
+               runProduceConsumeMultipleTopics();
+       }
+
+       @Test(timeout = 60000)
+       public void testAllDeletes() throws Exception {
+               runAllDeletesTest();
+       }
+
+       @Test(timeout=60000)
+       public void testEndOfStream() throws Exception {
+               runEndOfStreamTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testMetrics() throws Throwable {
+               runMetricsTest();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
new file mode 100644
index 0000000..6d0b140
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.table.Row;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
+
+       @Override
+       protected KafkaTableSink createTableSink(String topic, Properties 
properties, KafkaPartitioner<Row> partitioner,
+                       final FlinkKafkaProducerBase<Row> kafkaProducer) {
+
+               return new Kafka08JsonTableSink(topic, properties, partitioner) 
{
+                       @Override
+                       protected FlinkKafkaProducerBase<Row> 
createKafkaProducer(String topic, Properties properties,
+                                       SerializationSchema<Row> 
serializationSchema, KafkaPartitioner<Row> partitioner) {
+                               return kafkaProducer;
+                       }
+               };
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected SerializationSchema<Row> getSerializationSchema() {
+               return new JsonRowSerializationSchema(FIELD_NAMES);
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
new file mode 100644
index 0000000..a2d66ac
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+
+public class Kafka08JsonTableSourceTest extends KafkaTableSourceTestBase {
+
+       @Override
+       protected KafkaTableSource createTableSource(String topic, Properties 
properties, String[] fieldNames, TypeInformation<?>[] typeInfo) {
+               return new Kafka08JsonTableSource(topic, properties, 
fieldNames, typeInfo);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
+               return (Class) JsonRowDeserializationSchema.class;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
+               return (Class) FlinkKafkaConsumer08.class;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
new file mode 100644
index 0000000..5c951db
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class Kafka08ProducerITCase extends KafkaProducerTestBase {
+
+       @Test
+       public void testCustomPartitioning() {
+               runCustomPartitioningTest();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
new file mode 100644
index 0000000..9520f55
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Collections;
+import java.util.Properties;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.junit.Test;
+
+public class KafkaConsumer08Test {
+
+       @Test
+       public void testValidateZooKeeperConfig() {
+               try {
+                       // empty
+                       Properties emptyProperties = new Properties();
+                       try {
+                               
FlinkKafkaConsumer08.validateZooKeeperConfig(emptyProperties);
+                               fail("should fail with an exception");
+                       }
+                       catch (IllegalArgumentException e) {
+                               // expected
+                       }
+
+                       // no connect string (only group string)
+                       Properties noConnect = new Properties();
+                       noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, 
"flink-test-group");
+                       try {
+                               
FlinkKafkaConsumer08.validateZooKeeperConfig(noConnect);
+                               fail("should fail with an exception");
+                       }
+                       catch (IllegalArgumentException e) {
+                               // expected
+                       }
+
+                       // no group string (only connect string)
+                       Properties noGroup = new Properties();
+                       noGroup.put("zookeeper.connect", "localhost:47574");
+                       try {
+                               
FlinkKafkaConsumer08.validateZooKeeperConfig(noGroup);
+                               fail("should fail with an exception");
+                       }
+                       catch (IllegalArgumentException e) {
+                               // expected
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testCreateSourceWithoutCluster() {
+               try {
+                       Properties props = new Properties();
+                       props.setProperty("zookeeper.connect", 
"localhost:56794");
+                       props.setProperty("bootstrap.servers", 
"localhost:11111, localhost:22222");
+                       props.setProperty("group.id", "non-existent-group");
+
+                       FlinkKafkaConsumer08<String> consumer = new 
FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), new 
SimpleStringSchema(), props);
+                       consumer.open(new Configuration());
+                       fail();
+               }
+               catch (Exception e) {
+                       assertTrue(e.getMessage().contains("Unable to retrieve 
any partitions"));
+               }
+       }
+
+       @Test
+       public void testAllBoostrapServerHostsAreInvalid() {
+               try {
+                       String zookeeperConnect = "localhost:56794";
+                       String bootstrapServers = "indexistentHost:11111";
+                       String groupId = "non-existent-group";
+                       Properties props = createKafkaProps(zookeeperConnect, 
bootstrapServers, groupId);
+                       FlinkKafkaConsumer08<String> consumer = new 
FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"),
+                                       new SimpleStringSchema(), props);
+                       consumer.open(new Configuration());
+                       fail();
+               } catch (Exception e) {
+                       assertTrue("Exception should be thrown containing 'all 
bootstrap servers invalid' message!",
+                                       e.getMessage().contains("All the 
servers provided in: '" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
+                                                       + "' config are 
invalid"));
+               }
+       }
+
+       @Test
+       public void testAtLeastOneBootstrapServerHostIsValid() {
+               try {
+                       String zookeeperConnect = "localhost:56794";
+                       // we declare one valid boostrap server, namely the one 
with
+                       // 'localhost'
+                       String bootstrapServers = "indexistentHost:11111, 
localhost:22222";
+                       String groupId = "non-existent-group";
+                       Properties props = createKafkaProps(zookeeperConnect, 
bootstrapServers, groupId);
+                       FlinkKafkaConsumer08<String> consumer = new 
FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"),
+                                       new SimpleStringSchema(), props);
+                       consumer.open(new Configuration());
+                       fail();
+               } catch (Exception e) {
+                       // test is not failing because we have one valid 
boostrap server
+                       assertTrue("The cause of the exception should not be 
'all boostrap server are invalid'!",
+                                       !e.getMessage().contains("All the hosts 
provided in: " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
+                                                       + " config are 
invalid"));
+               }
+       }
+       
+       private Properties createKafkaProps(String zookeeperConnect, String 
bootstrapServers, String groupId) {
+               Properties props = new Properties();
+               props.setProperty("zookeeper.connect", zookeeperConnect);
+               props.setProperty("bootstrap.servers", bootstrapServers);
+               props.setProperty("group.id", groupId);
+               return props;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
new file mode 100644
index 0000000..72d2772
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaLocalSystemTime implements Time {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(KafkaLocalSystemTime.class);
+
+       @Override
+       public long milliseconds() {
+               return System.currentTimeMillis();
+       }
+
+       @Override
+       public long nanoseconds() {
+               return System.nanoTime();
+       }
+
+       @Override
+       public void sleep(long ms) {
+               try {
+                       Thread.sleep(ms);
+               } catch (InterruptedException e) {
+                       LOG.warn("Interruption", e);
+               }
+       }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
new file mode 100644
index 0000000..91fc286
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import 
org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.concurrent.Future;
+
+
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import static org.junit.Assert.*;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(FlinkKafkaProducerBase.class)
+public class KafkaProducerTest extends TestLogger {
+       
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testPropagateExceptions() {
+               try {
+                       // mock kafka producer
+                       KafkaProducer<?, ?> kafkaProducerMock = 
mock(KafkaProducer.class);
+                       
+                       // partition setup
+                       
when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
+                               // returning a unmodifiable list to mimic 
KafkaProducer#partitionsFor() behaviour
+                               Collections.singletonList(new 
PartitionInfo("mock_topic", 42, null, null, null)));
+
+                       // failure when trying to send an element
+                       when(kafkaProducerMock.send(any(ProducerRecord.class), 
any(Callback.class)))
+                               .thenAnswer(new 
Answer<Future<RecordMetadata>>() {
+                                       @Override
+                                       public Future<RecordMetadata> 
answer(InvocationOnMock invocation) throws Throwable {
+                                               Callback callback = (Callback) 
invocation.getArguments()[1];
+                                               callback.onCompletion(null, new 
Exception("Test error"));
+                                               return null;
+                                       }
+                               });
+                       
+                       // make sure the FlinkKafkaProducer instantiates our 
mock producer
+                       
whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
+                       
+                       // (1) producer that propagates errors
+
+                       FlinkKafkaProducer08<String> producerPropagating = new 
FlinkKafkaProducer08<>(
+                                       "mock_topic", new SimpleStringSchema(), 
FakeStandardProducerConfig.get(), null);
+
+                       OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
+                                       new 
OneInputStreamOperatorTestHarness<>(new StreamSink<>(producerPropagating));
+
+                       testHarness.open();
+
+                       try {
+                               testHarness.processElement(new 
StreamRecord<>("value"));
+                               testHarness.processElement(new 
StreamRecord<>("value"));
+                               fail("This should fail with an exception");
+                       }
+                       catch (Exception e) {
+                               assertNotNull(e.getCause());
+                               assertNotNull(e.getCause().getMessage());
+                               
assertTrue(e.getCause().getMessage().contains("Test error"));
+                       }
+
+                       testHarness.close();
+
+                       // (2) producer that only logs errors
+
+                       FlinkKafkaProducer08<String> producerLogging = new 
FlinkKafkaProducer08<>(
+                                       "mock_topic", new SimpleStringSchema(), 
FakeStandardProducerConfig.get(), null);
+                       producerLogging.setLogFailuresOnly(true);
+
+                       testHarness = new 
OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));
+
+                       testHarness.open();
+
+                       testHarness.processElement(new StreamRecord<>("value"));
+                       testHarness.processElement(new StreamRecord<>("value"));
+
+                       testHarness.close();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
new file mode 100644
index 0000000..c28799c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class KafkaShortRetention08ITCase extends KafkaShortRetentionTestBase {
+
+       @Test(timeout=60000)
+       public void testAutoOffsetReset() throws Exception {
+               runAutoOffsetResetTest();
+       }
+
+       @Test(timeout=60000)
+       public void testAutoOffsetResetNone() throws Exception {
+               runFailOnAutoOffsetResetNoneEager();
+       }
+}

Reply via email to