http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
deleted file mode 100644
index d015157..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ /dev/null
@@ -1,481 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.common.TopicAndPartition;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.kafka.common.Node;
-
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.SerializedValue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A fetcher that fetches data from Kafka brokers via the Kafka 0.8 low-level 
consumer API.
- * The fetcher also handles the explicit communication with ZooKeeper to fetch 
initial offsets
- * and to write offsets to ZooKeeper.
- *
- * @param <T> The type of elements produced by the fetcher.
- */
-public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
-       
-       static final KafkaTopicPartitionState<TopicAndPartition> MARKER = 
-                       new KafkaTopicPartitionState<>(new 
KafkaTopicPartition("n/a", -1), new TopicAndPartition("n/a", -1));
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(Kafka08Fetcher.class);
-
-       // 
------------------------------------------------------------------------
-
-       /** The schema to convert between Kafka's byte messages, and Flink's 
objects */
-       private final KeyedDeserializationSchema<T> deserializer;
-
-       /** The properties that configure the Kafka connection */
-       private final Properties kafkaConfig;
-
-       /** The subtask's runtime context */
-       private final RuntimeContext runtimeContext;
-
-       /** The queue of partitions that are currently not assigned to a broker 
connection */
-       private final 
ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> 
unassignedPartitionsQueue;
-
-       /** The behavior to use in case that an offset is not valid (any more) 
for a partition */
-       private final long invalidOffsetBehavior;
-
-       /** The interval in which to automatically commit (-1 if deactivated) */
-       private final long autoCommitInterval;
-
-       /** The handler that reads/writes offsets from/to ZooKeeper */
-       private volatile ZookeeperOffsetHandler zookeeperOffsetHandler;
-
-       /** Flag to track the main work loop as alive */
-       private volatile boolean running = true;
-
-
-       public Kafka08Fetcher(
-                       SourceContext<T> sourceContext,
-                       List<KafkaTopicPartition> assignedPartitions,
-                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
-                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
-                       StreamingRuntimeContext runtimeContext,
-                       KeyedDeserializationSchema<T> deserializer,
-                       Properties kafkaProperties,
-                       long invalidOffsetBehavior,
-                       long autoCommitInterval,
-                       boolean useMetrics) throws Exception
-       {
-               super(
-                               sourceContext,
-                               assignedPartitions,
-                               watermarksPeriodic,
-                               watermarksPunctuated,
-                               runtimeContext.getProcessingTimeService(),
-                               
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
-                               runtimeContext.getUserCodeClassLoader(),
-                               useMetrics);
-
-               this.deserializer = checkNotNull(deserializer);
-               this.kafkaConfig = checkNotNull(kafkaProperties);
-               this.runtimeContext = runtimeContext;
-               this.invalidOffsetBehavior = invalidOffsetBehavior;
-               this.autoCommitInterval = autoCommitInterval;
-               this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
-
-               // initially, all these partitions are not assigned to a 
specific broker connection
-               for (KafkaTopicPartitionState<TopicAndPartition> partition : 
subscribedPartitions()) {
-                       unassignedPartitionsQueue.add(partition);
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Main Work Loop
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void runFetchLoop() throws Exception {
-               // the map from broker to the thread that is connected to that 
broker
-               final Map<Node, SimpleConsumerThread<T>> brokerToThread = new 
HashMap<>();
-
-               // this holds possible the exceptions from the concurrent 
broker connection threads
-               final ExceptionProxy errorHandler = new 
ExceptionProxy(Thread.currentThread());
-
-               // the offset handler handles the communication with ZooKeeper, 
to commit externally visible offsets
-               final ZookeeperOffsetHandler zookeeperOffsetHandler = new 
ZookeeperOffsetHandler(kafkaConfig);
-               this.zookeeperOffsetHandler = zookeeperOffsetHandler;
-
-               PeriodicOffsetCommitter periodicCommitter = null;
-               try {
-                       // read offsets from ZooKeeper for partitions that did 
not restore offsets
-                       {
-                               List<KafkaTopicPartition> 
partitionsWithNoOffset = new ArrayList<>();
-                               for 
(KafkaTopicPartitionState<TopicAndPartition> partition : 
subscribedPartitions()) {
-                                       if (!partition.isOffsetDefined()) {
-                                               
partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
-                                       }
-                               }
-
-                               Map<KafkaTopicPartition, Long> zkOffsets = 
zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset);
-                               for 
(KafkaTopicPartitionState<TopicAndPartition> partition : 
subscribedPartitions()) {
-                                       Long zkOffset = 
zkOffsets.get(partition.getKafkaTopicPartition());
-                                       if (zkOffset != null) {
-                                               // the offset in ZK represents 
the "next record to process", so we need to subtract it by 1
-                                               // to correctly represent our 
internally checkpointed offsets
-                                               partition.setOffset(zkOffset - 
1);
-                                       }
-                               }
-                       }
-
-                       // start the periodic offset committer thread, if 
necessary
-                       if (autoCommitInterval > 0) {
-                               LOG.info("Starting periodic offset committer, 
with commit interval of {}ms", autoCommitInterval);
-
-                               periodicCommitter = new 
PeriodicOffsetCommitter(zookeeperOffsetHandler, 
-                                               subscribedPartitions(), 
errorHandler, autoCommitInterval);
-                               periodicCommitter.setName("Periodic Kafka 
partition offset committer");
-                               periodicCommitter.setDaemon(true);
-                               periodicCommitter.start();
-                       }
-
-                       // register offset metrics
-                       if (useMetrics) {
-                               final MetricGroup kafkaMetricGroup = 
runtimeContext.getMetricGroup().addGroup("KafkaConsumer");
-                               addOffsetStateGauge(kafkaMetricGroup);
-                       }
-
-                       // Main loop polling elements from the 
unassignedPartitions queue to the threads
-                       while (running) {
-                               // re-throw any exception from the concurrent 
fetcher threads
-                               errorHandler.checkAndThrowException();
-
-                               // wait for max 5 seconds trying to get 
partitions to assign
-                               // if threads shut down, this poll returns 
earlier, because the threads inject the
-                               // special marker into the queue
-                               
List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToAssign = 
-                                               
unassignedPartitionsQueue.getBatchBlocking(5000);
-                               partitionsToAssign.remove(MARKER);
-
-                               if (!partitionsToAssign.isEmpty()) {
-                                       LOG.info("Assigning {} partitions to 
broker threads", partitionsToAssign.size());
-                                       Map<Node, 
List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeaders = 
-                                                       
findLeaderForPartitions(partitionsToAssign, kafkaConfig);
-
-                                       // assign the partitions to the leaders 
(maybe start the threads)
-                                       for (Map.Entry<Node, 
List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeader : 
-                                                       
partitionsWithLeaders.entrySet())
-                                       {
-                                               final Node leader = 
partitionsWithLeader.getKey();
-                                               final 
List<KafkaTopicPartitionState<TopicAndPartition>> partitions = 
partitionsWithLeader.getValue();
-                                               SimpleConsumerThread<T> 
brokerThread = brokerToThread.get(leader);
-
-                                               if (!running) {
-                                                       break;
-                                               }
-
-                                               if (brokerThread == null || 
!brokerThread.getNewPartitionsQueue().isOpen()) {
-                                                       // start new thread
-                                                       brokerThread = 
createAndStartSimpleConsumerThread(partitions, leader, errorHandler);
-                                                       
brokerToThread.put(leader, brokerThread);
-                                               }
-                                               else {
-                                                       // put elements into 
queue of thread
-                                                       
ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> 
newPartitionsQueue = 
-                                                                       
brokerThread.getNewPartitionsQueue();
-                                                       
-                                                       for 
(KafkaTopicPartitionState<TopicAndPartition> fp : partitions) {
-                                                               if 
(!newPartitionsQueue.addIfOpen(fp)) {
-                                                                       // we 
were unable to add the partition to the broker's queue
-                                                                       // the 
broker has closed in the meantime (the thread will shut down)
-                                                                       // 
create a new thread for connecting to this broker
-                                                                       
List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions = new 
ArrayList<>();
-                                                                       
seedPartitions.add(fp);
-                                                                       
brokerThread = createAndStartSimpleConsumerThread(seedPartitions, leader, 
errorHandler);
-                                                                       
brokerToThread.put(leader, brokerThread);
-                                                                       
newPartitionsQueue = brokerThread.getNewPartitionsQueue(); // update queue for 
the subsequent partitions
-                                                               }
-                                                       }
-                                               }
-                                       }
-                               }
-                               else {
-                                       // there were no partitions to assign. 
Check if any broker threads shut down.
-                                       // we get into this section of the 
code, if either the poll timed out, or the
-                                       // blocking poll was woken up by the 
marker element
-                                       Iterator<SimpleConsumerThread<T>> 
bttIterator = brokerToThread.values().iterator();
-                                       while (bttIterator.hasNext()) {
-                                               SimpleConsumerThread<T> thread 
= bttIterator.next();
-                                               if 
(!thread.getNewPartitionsQueue().isOpen()) {
-                                                       LOG.info("Removing 
stopped consumer thread {}", thread.getName());
-                                                       bttIterator.remove();
-                                               }
-                                       }
-                               }
-
-                               if (brokerToThread.size() == 0 && 
unassignedPartitionsQueue.isEmpty()) {
-                                       if (unassignedPartitionsQueue.close()) {
-                                               LOG.info("All consumer threads 
are finished, there are no more unassigned partitions. Stopping fetcher");
-                                               break;
-                                       }
-                                       // we end up here if somebody added 
something to the queue in the meantime --> continue to poll queue again
-                               }
-                       }
-               }
-               catch (InterruptedException e) {
-                       // this may be thrown because an exception on one of 
the concurrent fetcher threads
-                       // woke this thread up. make sure we throw the root 
exception instead in that case
-                       errorHandler.checkAndThrowException();
-
-                       // no other root exception, throw the interrupted 
exception
-                       throw e;
-               }
-               finally {
-                       this.running = false;
-                       this.zookeeperOffsetHandler = null;
-
-                       // if we run a periodic committer thread, shut that down
-                       if (periodicCommitter != null) {
-                               periodicCommitter.shutdown();
-                       }
-
-                       // clear the interruption flag
-                       // this allows the joining on consumer threads (on best 
effort) to happen in
-                       // case the initial interrupt already
-                       Thread.interrupted();
-
-                       // make sure that in any case (completion, abort, 
error), all spawned threads are stopped
-                       try {
-                               int runningThreads;
-                               do {
-                                       // check whether threads are alive and 
cancel them
-                                       runningThreads = 0;
-                                       Iterator<SimpleConsumerThread<T>> 
threads = brokerToThread.values().iterator();
-                                       while (threads.hasNext()) {
-                                               SimpleConsumerThread<?> t = 
threads.next();
-                                               if (t.isAlive()) {
-                                                       t.cancel();
-                                                       runningThreads++;
-                                               } else {
-                                                       threads.remove();
-                                               }
-                                       }
-
-                                       // wait for the threads to finish, 
before issuing a cancel call again
-                                       if (runningThreads > 0) {
-                                               for (SimpleConsumerThread<?> t 
: brokerToThread.values()) {
-                                                       t.join(500 / 
runningThreads + 1);
-                                               }
-                                       }
-                               }
-                               while (runningThreads > 0);
-                       }
-                       catch (InterruptedException ignored) {
-                               // waiting for the thread shutdown apparently 
got interrupted
-                               // restore interrupted state and continue
-                               Thread.currentThread().interrupt();
-                       }
-                       catch (Throwable t) {
-                               // we catch all here to preserve the original 
exception
-                               LOG.error("Exception while shutting down 
consumer threads", t);
-                       }
-
-                       try {
-                               zookeeperOffsetHandler.close();
-                       }
-                       catch (Throwable t) {
-                               // we catch all here to preserve the original 
exception
-                               LOG.error("Exception while shutting down 
ZookeeperOffsetHandler", t);
-                       }
-               }
-       }
-
-       @Override
-       public void cancel() {
-               // signal the main thread to exit
-               this.running = false;
-
-               // make sure the main thread wakes up soon
-               this.unassignedPartitionsQueue.addIfOpen(MARKER);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Kafka 0.8 specific class instantiation
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition 
partition) {
-               return new TopicAndPartition(partition.getTopic(), 
partition.getPartition());
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Offset handling
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> 
offsets) throws Exception {
-               ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
-               if (zkHandler != null) {
-                       // the ZK handler takes care of incrementing the 
offsets by 1 before committing
-                       zkHandler.prepareAndCommitOffsets(offsets);
-               }
-
-               // Set committed offsets in topic partition state
-               KafkaTopicPartitionState<TopicAndPartition>[] partitions = 
subscribedPartitions();
-               for (KafkaTopicPartitionState<TopicAndPartition> partition : 
partitions) {
-                       Long offset = 
offsets.get(partition.getKafkaTopicPartition());
-                       if (offset != null) {
-                               partition.setCommittedOffset(offset);
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       private SimpleConsumerThread<T> createAndStartSimpleConsumerThread(
-                       List<KafkaTopicPartitionState<TopicAndPartition>> 
seedPartitions,
-                       Node leader,
-                       ExceptionProxy errorHandler) throws IOException, 
ClassNotFoundException
-       {
-               // each thread needs its own copy of the deserializer, because 
the deserializer is
-               // not necessarily thread safe
-               final KeyedDeserializationSchema<T> clonedDeserializer =
-                               InstantiationUtil.clone(deserializer, 
runtimeContext.getUserCodeClassLoader());
-
-               // seed thread with list of fetch partitions (otherwise it 
would shut down immediately again
-               SimpleConsumerThread<T> brokerThread = new 
SimpleConsumerThread<>(
-                               this, errorHandler, kafkaConfig, leader, 
seedPartitions, unassignedPartitionsQueue, 
-                               clonedDeserializer, invalidOffsetBehavior);
-
-               brokerThread.setName(String.format("SimpleConsumer - %s - 
broker-%s (%s:%d)",
-                               runtimeContext.getTaskName(), leader.id(), 
leader.host(), leader.port()));
-               brokerThread.setDaemon(true);
-               brokerThread.start();
-
-               LOG.info("Starting thread {}", brokerThread.getName());
-               return brokerThread;
-       }
-
-       /**
-        * Returns a list of unique topics from for the given partitions
-        *
-        * @param partitions A the partitions
-        * @return A list of unique topics
-        */
-       private static List<String> 
getTopics(List<KafkaTopicPartitionState<TopicAndPartition>> partitions) {
-               HashSet<String> uniqueTopics = new HashSet<>();
-               for (KafkaTopicPartitionState<TopicAndPartition> fp: 
partitions) {
-                       uniqueTopics.add(fp.getTopic());
-               }
-               return new ArrayList<>(uniqueTopics);
-       }
-
-       /**
-        * Find leaders for the partitions
-        *
-        * From a high level, the method does the following:
-        *       - Get a list of FetchPartitions (usually only a few partitions)
-        *       - Get the list of topics from the FetchPartitions list and 
request the partitions for the topics. (Kafka doesn't support getting leaders 
for a set of partitions)
-        *       - Build a Map<Leader, List<FetchPartition>> where only the 
requested partitions are contained.
-        *
-        * @param partitionsToAssign fetch partitions list
-        * @return leader to partitions map
-        */
-       private static Map<Node, 
List<KafkaTopicPartitionState<TopicAndPartition>>> findLeaderForPartitions(
-                       List<KafkaTopicPartitionState<TopicAndPartition>> 
partitionsToAssign,
-                       Properties kafkaProperties) throws Exception
-       {
-               if (partitionsToAssign.isEmpty()) {
-                       throw new IllegalArgumentException("Leader request for 
empty partitions list");
-               }
-
-               LOG.info("Refreshing leader information for partitions {}", 
partitionsToAssign);
-               
-               // this request is based on the topic names
-               PartitionInfoFetcher infoFetcher = new 
PartitionInfoFetcher(getTopics(partitionsToAssign), kafkaProperties);
-               infoFetcher.start();
-
-               // NOTE: The kafka client apparently locks itself up sometimes
-               // when it is interrupted, so we run it only in a separate 
thread.
-               // since it sometimes refuses to shut down, we resort to the 
admittedly harsh
-               // means of killing the thread after a timeout.
-               KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 
60000);
-               watchDog.start();
-
-               // this list contains ALL partitions of the requested topics
-               List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = 
infoFetcher.getPartitions();
-
-               // copy list to track unassigned partitions
-               List<KafkaTopicPartitionState<TopicAndPartition>> 
unassignedPartitions = new ArrayList<>(partitionsToAssign);
-
-               // final mapping from leader -> list(fetchPartition)
-               Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> 
leaderToPartitions = new HashMap<>();
-
-               for(KafkaTopicPartitionLeader partitionLeader: 
topicPartitionWithLeaderList) {
-                       if (unassignedPartitions.size() == 0) {
-                               // we are done: all partitions are assigned
-                               break;
-                       }
-
-                       Iterator<KafkaTopicPartitionState<TopicAndPartition>> 
unassignedPartitionsIterator = unassignedPartitions.iterator();
-                       while (unassignedPartitionsIterator.hasNext()) {
-                               KafkaTopicPartitionState<TopicAndPartition> 
unassignedPartition = unassignedPartitionsIterator.next();
-
-                               if 
(unassignedPartition.getKafkaTopicPartition().equals(partitionLeader.getTopicPartition()))
 {
-                                       // we found the leader for one of the 
fetch partitions
-                                       Node leader = 
partitionLeader.getLeader();
-
-                                       
List<KafkaTopicPartitionState<TopicAndPartition>> partitionsOfLeader = 
leaderToPartitions.get(leader);
-                                       if (partitionsOfLeader == null) {
-                                               partitionsOfLeader = new 
ArrayList<>();
-                                               leaderToPartitions.put(leader, 
partitionsOfLeader);
-                                       }
-                                       
partitionsOfLeader.add(unassignedPartition);
-                                       unassignedPartitionsIterator.remove(); 
// partition has been assigned
-                                       break;
-                               }
-                       }
-               }
-
-               if (unassignedPartitions.size() > 0) {
-                       throw new RuntimeException("Unable to find a leader for 
partitions: " + unassignedPartitions);
-               }
-
-               LOG.debug("Partitions with assigned leaders {}", 
leaderToPartitions);
-
-               return leaderToPartitions;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
deleted file mode 100644
index 4d61e53..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-/**
- * A watch dog thread that forcibly kills another thread, if that thread does 
not
- * finish in time.
- * 
- * <p>This uses the discouraged {@link Thread#stop()} method. While this is not
- * advisable, this watch dog is only for extreme cases of thread that simply
- * to not terminate otherwise.
- */
-class KillerWatchDog extends Thread {
-
-       private final Thread toKill;
-       private final long timeout;
-
-       KillerWatchDog(Thread toKill, long timeout) {
-               super("KillerWatchDog");
-               setDaemon(true);
-
-               this.toKill = toKill;
-               this.timeout = timeout;
-       }
-
-       @SuppressWarnings("deprecation")
-       @Override
-       public void run() {
-               final long deadline = System.currentTimeMillis() + timeout;
-               long now;
-
-               while (toKill.isAlive() && (now = System.currentTimeMillis()) < 
deadline) {
-                       try {
-                               toKill.join(deadline - now);
-                       }
-                       catch (InterruptedException e) {
-                               // ignore here, our job is important!
-                       }
-               }
-
-               // this is harsh, but this watchdog is a last resort
-               if (toKill.isAlive()) {
-                       toKill.stop();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
deleted file mode 100644
index d8d927d..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
-
-import java.util.List;
-import java.util.Properties;
-
-class PartitionInfoFetcher extends Thread {
-
-       private final List<String> topics;
-       private final Properties properties;
-
-       private volatile List<KafkaTopicPartitionLeader> result;
-       private volatile Throwable error;
-
-
-       PartitionInfoFetcher(List<String> topics, Properties properties) {
-               this.topics = topics;
-               this.properties = properties;
-       }
-
-       @Override
-       public void run() {
-               try {
-                       result = 
FlinkKafkaConsumer08.getPartitionsForTopic(topics, properties);
-               }
-               catch (Throwable t) {
-                       this.error = t;
-               }
-       }
-
-       public List<KafkaTopicPartitionLeader> getPartitions() throws Exception 
{
-               try {
-                       this.join();
-               }
-               catch (InterruptedException e) {
-                       throw new Exception("Partition fetching was cancelled 
before completion");
-               }
-
-               if (error != null) {
-                       throw new Exception("Failed to fetch partitions for 
topics " + topics.toString(), error);
-               }
-               if (result != null) {
-                       return result;
-               }
-               throw new Exception("Partition fetching failed");
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
deleted file mode 100644
index 27d90f2..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import java.util.HashMap;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A thread that periodically writes the current Kafka partition offsets to 
Zookeeper.
- */
-public class PeriodicOffsetCommitter extends Thread {
-
-       /** The ZooKeeper handler */
-       private final ZookeeperOffsetHandler offsetHandler;
-       
-       private final KafkaTopicPartitionState<?>[] partitionStates;
-       
-       /** The proxy to forward exceptions to the main thread */
-       private final ExceptionProxy errorHandler;
-       
-       /** Interval in which to commit, in milliseconds */
-       private final long commitInterval;
-       
-       /** Flag to mark the periodic committer as running */
-       private volatile boolean running = true;
-
-       PeriodicOffsetCommitter(ZookeeperOffsetHandler offsetHandler,
-                       KafkaTopicPartitionState<?>[] partitionStates,
-                       ExceptionProxy errorHandler,
-                       long commitInterval)
-       {
-               this.offsetHandler = checkNotNull(offsetHandler);
-               this.partitionStates = checkNotNull(partitionStates);
-               this.errorHandler = checkNotNull(errorHandler);
-               this.commitInterval = commitInterval;
-               
-               checkArgument(commitInterval > 0);
-       }
-
-       @Override
-       public void run() {
-               try {
-                       while (running) {
-                               Thread.sleep(commitInterval);
-
-                               // create copy a deep copy of the current 
offsets
-                               HashMap<KafkaTopicPartition, Long> 
offsetsToCommit = new HashMap<>(partitionStates.length);
-                               for (KafkaTopicPartitionState<?> partitionState 
: partitionStates) {
-                                       
offsetsToCommit.put(partitionState.getKafkaTopicPartition(), 
partitionState.getOffset());
-                               }
-                               
-                               
offsetHandler.prepareAndCommitOffsets(offsetsToCommit);
-                       }
-               }
-               catch (Throwable t) {
-                       if (running) {
-                               errorHandler.reportError(
-                                               new Exception("The periodic 
offset committer encountered an error: " + t.getMessage(), t));
-                       }
-               }
-       }
-
-       public void shutdown() {
-               this.running = false;
-               this.interrupt();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
deleted file mode 100644
index 35e491a..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
+++ /dev/null
@@ -1,504 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.api.FetchRequestBuilder;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.ErrorMapping;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.OffsetResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.MessageAndOffset;
-
-import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.util.StringUtils;
-
-import org.apache.kafka.common.Node;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import static java.util.Objects.requireNonNull;
-import static org.apache.flink.util.PropertiesUtil.getInt;
-
-/**
- * This class implements a thread with a connection to a single Kafka broker. 
The thread
- * pulls records for a set of topic partitions for which the connected broker 
is currently
- * the leader. The thread deserializes these records and emits them. 
- * 
- * @param <T> The type of elements that this consumer thread creates from 
Kafka's byte messages
- *            and emits into the Flink DataStream.
- */
-class SimpleConsumerThread<T> extends Thread {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(SimpleConsumerThread.class);
-
-       private static final KafkaTopicPartitionState<TopicAndPartition> MARKER 
= Kafka08Fetcher.MARKER;
-       
-       // 
------------------------------------------------------------------------
-
-       private final Kafka08Fetcher<T> owner;
-       
-       private final KeyedDeserializationSchema<T> deserializer;
-
-       private final List<KafkaTopicPartitionState<TopicAndPartition>> 
partitions;
-
-       private final Node broker;
-
-       /** Queue containing new fetch partitions for the consumer thread */
-       private final 
ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> 
newPartitionsQueue;
-       
-       private final 
ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> 
unassignedPartitions;
-       
-       private final ExceptionProxy errorHandler;
-       
-       private final long invalidOffsetBehavior;
-       
-       private volatile boolean running = true;
-       
-
-       // ----------------- Simple Consumer ----------------------
-       private volatile SimpleConsumer consumer;
-
-       private final int soTimeout;
-       private final int minBytes;
-       private final int maxWait;
-       private final int fetchSize;
-       private final int bufferSize;
-       private final int reconnectLimit;
-
-
-       // exceptions are thrown locally
-       public SimpleConsumerThread(
-                       Kafka08Fetcher<T> owner,
-                       ExceptionProxy errorHandler,
-                       Properties config,
-                       Node broker,
-                       List<KafkaTopicPartitionState<TopicAndPartition>> 
seedPartitions,
-                       
ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> 
unassignedPartitions,
-                       KeyedDeserializationSchema<T> deserializer,
-                       long invalidOffsetBehavior)
-       {
-               this.owner = owner;
-               this.errorHandler = errorHandler;
-               this.broker = broker;
-               this.partitions = seedPartitions;
-               this.deserializer = requireNonNull(deserializer);
-               this.unassignedPartitions = 
requireNonNull(unassignedPartitions);
-               this.newPartitionsQueue = new ClosableBlockingQueue<>();
-               this.invalidOffsetBehavior = invalidOffsetBehavior;
-               
-               // these are the actual configuration values of Kafka + their 
original default values.
-               this.soTimeout = getInt(config, "socket.timeout.ms", 30000);
-               this.minBytes = getInt(config, "fetch.min.bytes", 1);
-               this.maxWait = getInt(config, "fetch.wait.max.ms", 100);
-               this.fetchSize = getInt(config, "fetch.message.max.bytes", 
1048576);
-               this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 
65536);
-               this.reconnectLimit = getInt(config, 
"flink.simple-consumer-reconnectLimit", 3);
-       }
-
-       public 
ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> 
getNewPartitionsQueue() {
-               return newPartitionsQueue;
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  main work loop
-       // 
------------------------------------------------------------------------
-       
-       @Override
-       public void run() {
-               LOG.info("Starting to fetch from {}", this.partitions);
-
-               // set up the config values
-               final String clientId = "flink-kafka-consumer-legacy-" + 
broker.id();
-
-               try {
-                       // create the Kafka consumer that we actually use for 
fetching
-                       consumer = new SimpleConsumer(broker.host(), 
broker.port(), soTimeout, bufferSize, clientId);
-                       
-                       // make sure that all partitions have some offsets to 
start with
-                       // those partitions that do not have an offset from a 
checkpoint need to get
-                       // their start offset from ZooKeeper
-                       getMissingOffsetsFromKafka(partitions);
-
-                       // Now, the actual work starts :-)
-                       int offsetOutOfRangeCount = 0;
-                       int reconnects = 0;
-                       while (running) {
-
-                               // ----------------------------------- 
partitions list maintenance ----------------------------
-
-                               // check queue for new partitions to read from:
-                               
List<KafkaTopicPartitionState<TopicAndPartition>> newPartitions = 
newPartitionsQueue.pollBatch();
-                               if (newPartitions != null) {
-                                       // found some new partitions for this 
thread's broker
-                                       
-                                       // check if the new partitions need an 
offset lookup
-                                       
getMissingOffsetsFromKafka(newPartitions);
-                                       
-                                       // add the new partitions (and check 
they are not already in there)
-                                       for 
(KafkaTopicPartitionState<TopicAndPartition> newPartition: newPartitions) {
-                                               if 
(partitions.contains(newPartition)) {
-                                                       throw new 
IllegalStateException("Adding partition " + newPartition + 
-                                                                       " to 
subscribed partitions even though it is already subscribed");
-                                               }
-                                               partitions.add(newPartition);
-                                       }
-                                       
-                                       LOG.info("Adding {} new partitions to 
consumer thread {}", newPartitions.size(), getName());
-                                       LOG.debug("Partitions list: {}", 
newPartitions);
-                               }
-
-                               if (partitions.size() == 0) {
-                                       if (newPartitionsQueue.close()) {
-                                               // close succeeded. Closing 
thread
-                                               running = false;
-                                               
-                                               LOG.info("Consumer thread {} 
does not have any partitions assigned anymore. Stopping thread.", 
-                                                               getName());
-
-                                               // add the wake-up marker into 
the queue to make the main thread
-                                               // immediately wake up and 
termination faster
-                                               
unassignedPartitions.add(MARKER);
-
-                                               break;
-                                       } else {
-                                               // close failed: fetcher main 
thread concurrently added new partitions into the queue.
-                                               // go to top of loop again and 
get the new partitions
-                                               continue; 
-                                       }
-                               }
-
-                               // ----------------------------------- request 
/ response with kafka ----------------------------
-
-                               FetchRequestBuilder frb = new 
FetchRequestBuilder();
-                               frb.clientId(clientId);
-                               frb.maxWait(maxWait);
-                               frb.minBytes(minBytes);
-
-                               for (KafkaTopicPartitionState<?> partition : 
partitions) {
-                                       frb.addFetch(
-                                                       
partition.getKafkaTopicPartition().getTopic(),
-                                                       
partition.getKafkaTopicPartition().getPartition(),
-                                                       partition.getOffset() + 
1, // request the next record
-                                                       fetchSize);
-                               }
-                               
-                               kafka.api.FetchRequest fetchRequest = 
frb.build();
-                               LOG.debug("Issuing fetch request {}", 
fetchRequest);
-
-                               FetchResponse fetchResponse;
-                               try {
-                                       fetchResponse = 
consumer.fetch(fetchRequest);
-                               }
-                               catch (Throwable cce) {
-                                       //noinspection ConstantConditions
-                                       if (cce instanceof 
ClosedChannelException) {
-                                               LOG.warn("Fetch failed because 
of ClosedChannelException.");
-                                               LOG.debug("Full exception", 
cce);
-                                               
-                                               // we don't know if the broker 
is overloaded or unavailable.
-                                               // retry a few times, then 
return ALL partitions for new leader lookup
-                                               if (++reconnects >= 
reconnectLimit) {
-                                                       LOG.warn("Unable to 
reach broker after {} retries. Returning all current partitions", 
reconnectLimit);
-                                                       for 
(KafkaTopicPartitionState<TopicAndPartition> fp: this.partitions) {
-                                                               
unassignedPartitions.add(fp);
-                                                       }
-                                                       this.partitions.clear();
-                                                       continue; // jump to 
top of loop: will close thread or subscribe to new partitions
-                                               }
-                                               try {
-                                                       consumer.close();
-                                               } catch (Throwable t) {
-                                                       LOG.warn("Error while 
closing consumer connection", t);
-                                               }
-                                               // delay & retry
-                                               Thread.sleep(100);
-                                               consumer = new 
SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);
-                                               continue; // retry
-                                       } else {
-                                               throw cce;
-                                       }
-                               }
-                               reconnects = 0;
-
-                               // ---------------------------------------- 
error handling ----------------------------
-
-                               if (fetchResponse == null) {
-                                       throw new IOException("Fetch from Kafka 
failed (request returned null)");
-                               }
-                               
-                               if (fetchResponse.hasError()) {
-                                       String exception = "";
-                                       
List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = 
new ArrayList<>();
-                                       
-                                       // iterate over partitions to get 
individual error codes
-                                       
Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = 
partitions.iterator();
-                                       boolean partitionsRemoved = false;
-                                       
-                                       while (partitionsIterator.hasNext()) {
-                                               final 
KafkaTopicPartitionState<TopicAndPartition> fp = partitionsIterator.next();
-                                               short code = 
fetchResponse.errorCode(fp.getTopic(), fp.getPartition());
-
-                                               if (code == 
ErrorMapping.OffsetOutOfRangeCode()) {
-                                                       // we were asked to 
read from an out-of-range-offset (maybe set wrong in Zookeeper)
-                                                       // Kafka's high level 
consumer is resetting the offset according to 'auto.offset.reset'
-                                                       
partitionsToGetOffsetsFor.add(fp);
-                                               }
-                                               else if (code == 
ErrorMapping.NotLeaderForPartitionCode() ||
-                                                               code == 
ErrorMapping.LeaderNotAvailableCode() ||
-                                                               code == 
ErrorMapping.BrokerNotAvailableCode() ||
-                                                               code == 
ErrorMapping.UnknownCode())
-                                               {
-                                                       // the broker we are 
connected to is not the leader for the partition.
-                                                       LOG.warn("{} is not the 
leader of {}. Reassigning leader for partition", broker, fp);
-                                                       LOG.debug("Error code = 
{}", code);
-
-                                                       
unassignedPartitions.add(fp);
-
-                                                       
partitionsIterator.remove(); // unsubscribe the partition ourselves
-                                                       partitionsRemoved = 
true;
-                                               }
-                                               else if (code != 
ErrorMapping.NoError()) {
-                                                       exception += 
"\nException for " + fp.getTopic() +":"+ fp.getPartition() + ": " +
-                                                                       
StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
-                                               }
-                                       }
-                                       if (partitionsToGetOffsetsFor.size() > 
0) {
-                                               // safeguard against an 
infinite loop.
-                                               if (offsetOutOfRangeCount++ > 
3) {
-                                                       throw new 
RuntimeException("Found invalid offsets more than three times in partitions "
-                                                                       + 
partitionsToGetOffsetsFor + " Exceptions: " + exception);
-                                               }
-                                               // get valid offsets for these 
partitions and try again.
-                                               LOG.warn("The following 
partitions had an invalid offset: {}", partitionsToGetOffsetsFor);
-                                               
getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, 
invalidOffsetBehavior);
-                                               
-                                               LOG.warn("The new partition 
offsets are {}", partitionsToGetOffsetsFor);
-                                               continue; // jump back to 
create a new fetch request. The offset has not been touched.
-                                       }
-                                       else if (partitionsRemoved) {
-                                               continue; // create new fetch 
request
-                                       }
-                                       else {
-                                               // partitions failed on an error
-                                               throw new IOException("Error 
while fetching from broker '" + broker +"': " + exception);
-                                       }
-                               } else {
-                                       // successful fetch, reset 
offsetOutOfRangeCount.
-                                       offsetOutOfRangeCount = 0;
-                               }
-
-                               // ----------------------------------- process 
fetch response ----------------------------
-
-                               int messagesInFetch = 0;
-                               int deletedMessages = 0;
-                               
Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = 
partitions.iterator();
-                               
-                               partitionsLoop:
-                               while (partitionsIterator.hasNext()) {
-                                       final 
KafkaTopicPartitionState<TopicAndPartition> currentPartition = 
partitionsIterator.next();
-                                       
-                                       final ByteBufferMessageSet messageSet = 
fetchResponse.messageSet(
-                                                       
currentPartition.getTopic(), currentPartition.getPartition());
-
-                                       for (MessageAndOffset msg : messageSet) 
{
-                                               if (running) {
-                                                       messagesInFetch++;
-                                                       final ByteBuffer 
payload = msg.message().payload();
-                                                       final long offset = 
msg.offset();
-                                                       
-                                                       if (offset <= 
currentPartition.getOffset()) {
-                                                               // we have seen 
this message already
-                                                               
LOG.info("Skipping message with offset " + msg.offset()
-                                                                               
+ " because we have seen messages until (including) "
-                                                                               
+ currentPartition.getOffset()
-                                                                               
+ " from topic/partition " + currentPartition.getTopic() + '/'
-                                                                               
+ currentPartition.getPartition() + " already");
-                                                               continue;
-                                                       }
-
-                                                       // If the message value 
is null, this represents a delete command for the message key.
-                                                       // Log this and pass it 
on to the client who might want to also receive delete messages.
-                                                       byte[] valueBytes;
-                                                       if (payload == null) {
-                                                               
deletedMessages++;
-                                                               valueBytes = 
null;
-                                                       } else {
-                                                               valueBytes = 
new byte[payload.remaining()];
-                                                               
payload.get(valueBytes);
-                                                       }
-
-                                                       // put key into byte 
array
-                                                       byte[] keyBytes = null;
-                                                       int keySize = 
msg.message().keySize();
-
-                                                       if (keySize >= 0) { // 
message().hasKey() is doing the same. We save one int deserialization
-                                                               ByteBuffer 
keyPayload = msg.message().key();
-                                                               keyBytes = new 
byte[keySize];
-                                                               
keyPayload.get(keyBytes);
-                                                       }
-
-                                                       final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
-                                                                       
currentPartition.getTopic(), currentPartition.getPartition(), offset);
-                                                       
-                                                       if 
(deserializer.isEndOfStream(value)) {
-                                                               // remove 
partition from subscribed partitions.
-                                                               
partitionsIterator.remove();
-                                                               continue 
partitionsLoop;
-                                                       }
-                                                       
-                                                       owner.emitRecord(value, 
currentPartition, offset);
-                                               }
-                                               else {
-                                                       // no longer running
-                                                       return;
-                                               }
-                                       }
-                               }
-                               LOG.debug("This fetch contained {} messages ({} 
deleted messages)", messagesInFetch, deletedMessages);
-                       } // end of fetch loop
-
-                       if (!newPartitionsQueue.close()) {
-                               throw new Exception("Bug: Cleanly leaving 
fetcher thread without having a closed queue.");
-                       }
-               }
-               catch (Throwable t) {
-                       // report to the fetcher's error handler
-                       errorHandler.reportError(t);
-               }
-               finally {
-                       if (consumer != null) {
-                               // closing the consumer should not fail the 
program
-                               try {
-                                       consumer.close();
-                               }
-                               catch (Throwable t) {
-                                       LOG.error("Error while closing the 
Kafka simple consumer", t);
-                               }
-                       }
-               }
-       }
-
-       private void getMissingOffsetsFromKafka(
-                       List<KafkaTopicPartitionState<TopicAndPartition>> 
partitions) throws IOException
-       {
-               // collect which partitions we should fetch offsets for
-               List<KafkaTopicPartitionState<TopicAndPartition>> 
partitionsToGetOffsetsFor = new ArrayList<>();
-               for (KafkaTopicPartitionState<TopicAndPartition> part : 
partitions) {
-                       if (!part.isOffsetDefined()) {
-                               // retrieve the offset from the consumer
-                               partitionsToGetOffsetsFor.add(part);
-                       }
-               }
-               
-               if (partitionsToGetOffsetsFor.size() > 0) {
-                       getLastOffsetFromKafka(consumer, 
partitionsToGetOffsetsFor, invalidOffsetBehavior);
-                       
-                       LOG.info("No checkpoint/savepoint offsets found for 
some partitions. " +
-                                       "Fetched the following start offsets 
{}", partitionsToGetOffsetsFor);
-               }
-       }
-
-       /**
-        * Cancels this fetch thread. The thread will release all resources and 
terminate.
-        */
-       public void cancel() {
-               this.running = false;
-
-               // interrupt whatever the consumer is doing
-               if (consumer != null) {
-                       consumer.close();
-               }
-
-               this.interrupt();
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  Kafka Request Utils
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Request latest offsets for a set of partitions, via a Kafka consumer.
-        *
-        * <p>This method retries three times if the response has an error.
-        *
-        * @param consumer The consumer connected to lead broker
-        * @param partitions The list of partitions we need offsets for
-        * @param whichTime The type of time we are requesting. -1 and -2 are 
special constants (See OffsetRequest)
-        */
-       private static void getLastOffsetFromKafka(
-                       SimpleConsumer consumer,
-                       List<KafkaTopicPartitionState<TopicAndPartition>> 
partitions,
-                       long whichTime) throws IOException
-       {
-               Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo 
= new HashMap<>();
-               for (KafkaTopicPartitionState<TopicAndPartition> part : 
partitions) {
-                       requestInfo.put(part.getKafkaPartitionHandle(), new 
PartitionOffsetRequestInfo(whichTime, 1));
-               }
-
-               int retries = 0;
-               OffsetResponse response;
-               while (true) {
-                       kafka.javaapi.OffsetRequest request = new 
kafka.javaapi.OffsetRequest(
-                                       requestInfo, 
kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
-                       response = consumer.getOffsetsBefore(request);
-
-                       if (response.hasError()) {
-                               StringBuilder exception = new StringBuilder();
-                               for 
(KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
-                                       short code;
-                                       if ((code = 
response.errorCode(part.getTopic(), part.getPartition())) != 
ErrorMapping.NoError()) {
-                                               exception.append("\nException 
for topic=").append(part.getTopic())
-                                                               .append(" 
partition=").append(part.getPartition()).append(": ")
-                                                               
.append(StringUtils.stringifyException(ErrorMapping.exceptionFor(code)));
-                                       }
-                               }
-                               if (++retries >= 3) {
-                                       throw new IOException("Unable to get 
last offset for partitions " + partitions + ": "
-                                                       + exception.toString());
-                               } else {
-                                       LOG.warn("Unable to get last offset for 
partitions: Exception(s): {}", exception);
-                               }
-                       } else {
-                               break; // leave retry loop
-                       }
-               }
-
-               for (KafkaTopicPartitionState<TopicAndPartition> part: 
partitions) {
-                       final long offset = response.offsets(part.getTopic(), 
part.getPartition())[0];
-                       
-                       // the offset returned is that of the next record to 
fetch. because our state reflects the latest
-                       // successfully emitted record, we subtract one
-                       part.setOffset(offset - 1);
-               }
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
deleted file mode 100644
index 8f2ef09..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.utils.ZKGroupTopicDirs;
-
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Handler for committing Kafka offsets to Zookeeper and to retrieve them 
again.
- */
-public class ZookeeperOffsetHandler {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
-
-       private final String groupId;
-
-       private final CuratorFramework curatorClient;
-
-
-       public ZookeeperOffsetHandler(Properties props) {
-               this.groupId = 
props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
-               if (this.groupId == null) {
-                       throw new IllegalArgumentException("Required property '"
-                                       + ConsumerConfig.GROUP_ID_CONFIG + "' 
has not been set");
-               }
-               
-               String zkConnect = props.getProperty("zookeeper.connect");
-               if (zkConnect == null) {
-                       throw new IllegalArgumentException("Required property 
'zookeeper.connect' has not been set");
-               }
-
-               // we use Curator's default timeouts
-               int sessionTimeoutMs =  
Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "60000"));
-               int connectionTimeoutMs = 
Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "15000"));
-               
-               // undocumented config options allowing users to configure the 
retry policy. (they are "flink." prefixed as they are no official kafka configs)
-               int backoffBaseSleepTime = 
Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100"));
-               int backoffMaxRetries =  
Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10"));
-               
-               RetryPolicy retryPolicy = new 
ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries);
-               curatorClient = CuratorFrameworkFactory.newClient(zkConnect, 
sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
-               curatorClient.start();
-       }
-       
-       // 
------------------------------------------------------------------------
-       //  Offset access and manipulation
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Commits offsets for Kafka partitions to ZooKeeper. The given offsets 
to this method should be the offsets of
-        * the last processed records; this method will take care of 
incrementing the offsets by 1 before committing them so
-        * that the committed offsets to Zookeeper represent the next record to 
process.
-        * 
-        * @param internalOffsets The internal offsets (representing last 
processed records) for the partitions to commit.
-        * @throws Exception The method forwards exceptions.
-        */
-       public void prepareAndCommitOffsets(Map<KafkaTopicPartition, Long> 
internalOffsets) throws Exception {
-               for (Map.Entry<KafkaTopicPartition, Long> entry : 
internalOffsets.entrySet()) {
-                       KafkaTopicPartition tp = entry.getKey();
-
-                       Long lastProcessedOffset = entry.getValue();
-                       if (lastProcessedOffset != null && lastProcessedOffset 
>= 0) {
-                               setOffsetInZooKeeper(curatorClient, groupId, 
tp.getTopic(), tp.getPartition(), lastProcessedOffset + 1);
-                       }
-               }
-       }
-
-       /**
-        * @param partitions The partitions to read offsets for.
-        * @return The mapping from partition to offset.
-        * @throws Exception This method forwards exceptions.
-        */
-       public Map<KafkaTopicPartition, Long> 
getCommittedOffsets(List<KafkaTopicPartition> partitions) throws Exception {
-               Map<KafkaTopicPartition, Long> ret = new 
HashMap<>(partitions.size());
-               for (KafkaTopicPartition tp : partitions) {
-                       Long offset = getOffsetFromZooKeeper(curatorClient, 
groupId, tp.getTopic(), tp.getPartition());
-
-                       if (offset != null) {
-                               LOG.info("Offset for TopicPartition {}:{} was 
set to {} in ZooKeeper. Seeking fetcher to that position.",
-                                               tp.getTopic(), 
tp.getPartition(), offset);
-                               ret.put(tp, offset);
-                       }
-               }
-               return ret;
-       }
-
-       /**
-        * Closes the offset handler.
-        * 
-        * @throws IOException Thrown, if the handler cannot be closed properly.
-        */
-       public void close() throws IOException {
-               curatorClient.close();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Communication with Zookeeper
-       // 
------------------------------------------------------------------------
-       
-       public static void setOffsetInZooKeeper(CuratorFramework curatorClient, 
String groupId, String topic, int partition, long offset) throws Exception {
-               ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, 
topic);
-               String path = topicDirs.consumerOffsetDir() + "/" + partition;
-               
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
-               byte[] data = Long.toString(offset).getBytes();
-               curatorClient.setData().forPath(path, data);
-       }
-
-       public static Long getOffsetFromZooKeeper(CuratorFramework 
curatorClient, String groupId, String topic, int partition) throws Exception {
-               ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, 
topic);
-               String path = topicDirs.consumerOffsetDir() + "/" + partition;
-               
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
-               
-               byte[] data = curatorClient.getData().forPath(path);
-               
-               if (data == null) {
-                       return null;
-               } else {
-                       String asString = new String(data);
-                       if (asString.length() == 0) {
-                               return null;
-                       } else {
-                               try {
-                                       return Long.valueOf(asString);
-                               }
-                               catch (NumberFormatException e) {
-                                       LOG.error(
-                                                       "The offset in 
ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}",
-                                               groupId, topic, partition, 
asString);
-                                       return null;
-                               }
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
deleted file mode 100644
index fabb0fe..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.runtime.client.JobCancellationException;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import 
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
-import 
org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class Kafka08ITCase extends KafkaConsumerTestBase {
-
-       // 
------------------------------------------------------------------------
-       //  Suite of Tests
-       // 
------------------------------------------------------------------------
-
-       @Test(timeout = 60000)
-       public void testFailOnNoBroker() throws Exception {
-               runFailOnNoBrokerTest();
-       }
-
-
-       @Test(timeout = 60000)
-       public void testConcurrentProducerConsumerTopology() throws Exception {
-               runSimpleConcurrentProducerConsumerTopology();
-       }
-
-//     @Test(timeout = 60000)
-//     public void testPunctuatedExplicitWMConsumer() throws Exception {
-//             runExplicitPunctuatedWMgeneratingConsumerTest(false);
-//     }
-
-//     @Test(timeout = 60000)
-//     public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws 
Exception {
-//             runExplicitPunctuatedWMgeneratingConsumerTest(true);
-//     }
-
-       @Test(timeout = 60000)
-       public void testKeyValueSupport() throws Exception {
-               runKeyValueTest();
-       }
-
-       // --- canceling / failures ---
-
-       @Test(timeout = 60000)
-       public void testCancelingEmptyTopic() throws Exception {
-               runCancelingOnEmptyInputTest();
-       }
-
-       @Test(timeout = 60000)
-       public void testCancelingFullTopic() throws Exception {
-               runCancelingOnFullInputTest();
-       }
-
-       @Test(timeout = 60000)
-       public void testFailOnDeploy() throws Exception {
-               runFailOnDeployTest();
-       }
-
-       @Test(timeout = 60000)
-       public void testInvalidOffset() throws Exception {
-               
-               final int parallelism = 1;
-               
-               // write 20 messages into topic:
-               final String topic = writeSequence("invalidOffsetTopic", 20, 
parallelism, 1);
-
-               // set invalid offset:
-               CuratorFramework curatorClient = 
((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
-               ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, 
standardProps.getProperty("group.id"), topic, 0, 1234);
-               curatorClient.close();
-
-               // read from topic
-               final int valuesCount = 20;
-               final int startFrom = 0;
-
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               env.getConfig().disableSysoutLogging();
-               
-               readSequence(env, standardProps, parallelism, topic, 
valuesCount, startFrom);
-
-               deleteTestTopic(topic);
-       }
-
-       // --- source to partition mappings and exactly once ---
-
-       @Test(timeout = 60000)
-       public void testOneToOneSources() throws Exception {
-               runOneToOneExactlyOnceTest();
-       }
-
-       @Test(timeout = 60000)
-       public void testOneSourceMultiplePartitions() throws Exception {
-               runOneSourceMultiplePartitionsExactlyOnceTest();
-       }
-
-       @Test(timeout = 60000)
-       public void testMultipleSourcesOnePartition() throws Exception {
-               runMultipleSourcesOnePartitionExactlyOnceTest();
-       }
-
-       // --- broker failure ---
-
-       @Test(timeout = 60000)
-       public void testBrokerFailure() throws Exception {
-               runBrokerFailureTest();
-       }
-
-       // --- offset committing ---
-
-       @Test(timeout = 60000)
-       public void testCommitOffsetsToZookeeper() throws Exception {
-               runCommitOffsetsToKafka();
-       }
-
-       @Test(timeout = 60000)
-       public void testStartFromZookeeperCommitOffsets() throws Exception {
-               runStartFromKafkaCommitOffsets();
-       }
-
-       @Test(timeout = 60000)
-       public void testAutoOffsetRetrievalAndCommitToZookeeper() throws 
Exception {
-               runAutoOffsetRetrievalAndCommitToKafka();
-       }
-
-       @Test
-       public void runOffsetManipulationInZooKeeperTest() {
-               try {
-                       final String topicName = 
"ZookeeperOffsetHandlerTest-Topic";
-                       final String groupId = 
"ZookeeperOffsetHandlerTest-Group";
-
-                       final Long offset = (long) (Math.random() * 
Long.MAX_VALUE);
-
-                       CuratorFramework curatorFramework = 
((KafkaTestEnvironmentImpl)kafkaServer ).createCuratorClient();
-                       kafkaServer.createTestTopic(topicName, 3, 2);
-
-                       
ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorFramework, groupId, 
topicName, 0, offset);
-
-                       Long fetchedOffset = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, groupId, 
topicName, 0);
-
-                       curatorFramework.close();
-
-                       assertEquals(offset, fetchedOffset);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test(timeout = 60000)
-       public void testOffsetAutocommitTest() throws Exception {
-               final int parallelism = 3;
-
-               // write a sequence from 0 to 99 to each of the 3 partitions.
-               final String topicName = writeSequence("testOffsetAutocommit", 
100, parallelism, 1);
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-               // NOTE: We are not enabling the checkpointing!
-               env.getConfig().disableSysoutLogging();
-               
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-               env.setParallelism(parallelism);
-
-               // the readSequence operation sleeps for 20 ms between each 
record.
-               // setting a delay of 25*20 = 500 for the commit interval makes
-               // sure that we commit roughly 3-4 times while reading, however
-               // at least once.
-               Properties readProps = new Properties();
-               readProps.putAll(standardProps);
-               readProps.setProperty("auto.commit.interval.ms", "500");
-
-               // read so that the offset can be committed to ZK
-               readSequence(env, readProps, parallelism, topicName, 100, 0);
-
-               // get the offset
-               CuratorFramework curatorFramework = 
((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
-
-               Long o1 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 0);
-               Long o2 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 1);
-               Long o3 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 2);
-               curatorFramework.close();
-               LOG.info("Got final offsets from zookeeper o1={}, o2={}, 
o3={}", o1, o2, o3);
-
-               // ensure that the offset has been committed
-               boolean atLeastOneOffsetSet = (o1 != null && o1 > 0 && o1 <= 
100) ||
-                       (o2 != null && o2 > 0 && o2 <= 100) ||
-                       (o3 != null && o3 > 0 && o3 <= 100);
-               assertTrue("Expecting at least one offset to be set o1="+o1+" 
o2="+o2+" o3="+o3, atLeastOneOffsetSet);
-
-               deleteTestTopic(topicName);
-       }
-
-       // --- special executions ---
-
-       @Test(timeout = 60000)
-       public void testBigRecordJob() throws Exception {
-               runBigRecordTestTopology();
-       }
-
-       @Test(timeout = 60000)
-       public void testMultipleTopics() throws Exception {
-               runProduceConsumeMultipleTopics();
-       }
-
-       @Test(timeout = 60000)
-       public void testAllDeletes() throws Exception {
-               runAllDeletesTest();
-       }
-
-       @Test(timeout=60000)
-       public void testEndOfStream() throws Exception {
-               runEndOfStreamTest();
-       }
-
-       @Test(timeout = 60000)
-       public void testMetrics() throws Throwable {
-               runMetricsTest();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
deleted file mode 100644
index 6d0b140..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.table.Row;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-import java.util.Properties;
-
-public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
-
-       @Override
-       protected KafkaTableSink createTableSink(String topic, Properties 
properties, KafkaPartitioner<Row> partitioner,
-                       final FlinkKafkaProducerBase<Row> kafkaProducer) {
-
-               return new Kafka08JsonTableSink(topic, properties, partitioner) 
{
-                       @Override
-                       protected FlinkKafkaProducerBase<Row> 
createKafkaProducer(String topic, Properties properties,
-                                       SerializationSchema<Row> 
serializationSchema, KafkaPartitioner<Row> partitioner) {
-                               return kafkaProducer;
-                       }
-               };
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       protected SerializationSchema<Row> getSerializationSchema() {
-               return new JsonRowSerializationSchema(FIELD_NAMES);
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
deleted file mode 100644
index a2d66ac..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import java.util.Properties;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
-
-public class Kafka08JsonTableSourceTest extends KafkaTableSourceTestBase {
-
-       @Override
-       protected KafkaTableSource createTableSource(String topic, Properties 
properties, String[] fieldNames, TypeInformation<?>[] typeInfo) {
-               return new Kafka08JsonTableSource(topic, properties, 
fieldNames, typeInfo);
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
-               return (Class) JsonRowDeserializationSchema.class;
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
-               return (Class) FlinkKafkaConsumer08.class;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
deleted file mode 100644
index 5c951db..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class Kafka08ProducerITCase extends KafkaProducerTestBase {
-
-       @Test
-       public void testCustomPartitioning() {
-               runCustomPartitioningTest();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
deleted file mode 100644
index 9520f55..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.Collections;
-import java.util.Properties;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.junit.Test;
-
-public class KafkaConsumer08Test {
-
-       @Test
-       public void testValidateZooKeeperConfig() {
-               try {
-                       // empty
-                       Properties emptyProperties = new Properties();
-                       try {
-                               
FlinkKafkaConsumer08.validateZooKeeperConfig(emptyProperties);
-                               fail("should fail with an exception");
-                       }
-                       catch (IllegalArgumentException e) {
-                               // expected
-                       }
-
-                       // no connect string (only group string)
-                       Properties noConnect = new Properties();
-                       noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, 
"flink-test-group");
-                       try {
-                               
FlinkKafkaConsumer08.validateZooKeeperConfig(noConnect);
-                               fail("should fail with an exception");
-                       }
-                       catch (IllegalArgumentException e) {
-                               // expected
-                       }
-
-                       // no group string (only connect string)
-                       Properties noGroup = new Properties();
-                       noGroup.put("zookeeper.connect", "localhost:47574");
-                       try {
-                               
FlinkKafkaConsumer08.validateZooKeeperConfig(noGroup);
-                               fail("should fail with an exception");
-                       }
-                       catch (IllegalArgumentException e) {
-                               // expected
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCreateSourceWithoutCluster() {
-               try {
-                       Properties props = new Properties();
-                       props.setProperty("zookeeper.connect", 
"localhost:56794");
-                       props.setProperty("bootstrap.servers", 
"localhost:11111, localhost:22222");
-                       props.setProperty("group.id", "non-existent-group");
-
-                       FlinkKafkaConsumer08<String> consumer = new 
FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), new 
SimpleStringSchema(), props);
-                       consumer.open(new Configuration());
-                       fail();
-               }
-               catch (Exception e) {
-                       assertTrue(e.getMessage().contains("Unable to retrieve 
any partitions"));
-               }
-       }
-
-       @Test
-       public void testAllBoostrapServerHostsAreInvalid() {
-               try {
-                       String zookeeperConnect = "localhost:56794";
-                       String bootstrapServers = "indexistentHost:11111";
-                       String groupId = "non-existent-group";
-                       Properties props = createKafkaProps(zookeeperConnect, 
bootstrapServers, groupId);
-                       FlinkKafkaConsumer08<String> consumer = new 
FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"),
-                                       new SimpleStringSchema(), props);
-                       consumer.open(new Configuration());
-                       fail();
-               } catch (Exception e) {
-                       assertTrue("Exception should be thrown containing 'all 
bootstrap servers invalid' message!",
-                                       e.getMessage().contains("All the 
servers provided in: '" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
-                                                       + "' config are 
invalid"));
-               }
-       }
-
-       @Test
-       public void testAtLeastOneBootstrapServerHostIsValid() {
-               try {
-                       String zookeeperConnect = "localhost:56794";
-                       // we declare one valid boostrap server, namely the one 
with
-                       // 'localhost'
-                       String bootstrapServers = "indexistentHost:11111, 
localhost:22222";
-                       String groupId = "non-existent-group";
-                       Properties props = createKafkaProps(zookeeperConnect, 
bootstrapServers, groupId);
-                       FlinkKafkaConsumer08<String> consumer = new 
FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"),
-                                       new SimpleStringSchema(), props);
-                       consumer.open(new Configuration());
-                       fail();
-               } catch (Exception e) {
-                       // test is not failing because we have one valid 
boostrap server
-                       assertTrue("The cause of the exception should not be 
'all boostrap server are invalid'!",
-                                       !e.getMessage().contains("All the hosts 
provided in: " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
-                                                       + " config are 
invalid"));
-               }
-       }
-       
-       private Properties createKafkaProps(String zookeeperConnect, String 
bootstrapServers, String groupId) {
-               Properties props = new Properties();
-               props.setProperty("zookeeper.connect", zookeeperConnect);
-               props.setProperty("bootstrap.servers", bootstrapServers);
-               props.setProperty("group.id", groupId);
-               return props;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
deleted file mode 100644
index 72d2772..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.utils.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KafkaLocalSystemTime implements Time {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(KafkaLocalSystemTime.class);
-
-       @Override
-       public long milliseconds() {
-               return System.currentTimeMillis();
-       }
-
-       @Override
-       public long nanoseconds() {
-               return System.nanoTime();
-       }
-
-       @Override
-       public void sleep(long ms) {
-               try {
-                       Thread.sleep(ms);
-               } catch (InterruptedException e) {
-                       LOG.warn("Interruption", e);
-               }
-       }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
deleted file mode 100644
index 91fc286..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import 
org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.PartitionInfo;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Collections;
-import java.util.concurrent.Future;
-
-
-import static org.mockito.Mockito.*;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-import static org.junit.Assert.*;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(FlinkKafkaProducerBase.class)
-public class KafkaProducerTest extends TestLogger {
-       
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testPropagateExceptions() {
-               try {
-                       // mock kafka producer
-                       KafkaProducer<?, ?> kafkaProducerMock = 
mock(KafkaProducer.class);
-                       
-                       // partition setup
-                       
when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
-                               // returning a unmodifiable list to mimic 
KafkaProducer#partitionsFor() behaviour
-                               Collections.singletonList(new 
PartitionInfo("mock_topic", 42, null, null, null)));
-
-                       // failure when trying to send an element
-                       when(kafkaProducerMock.send(any(ProducerRecord.class), 
any(Callback.class)))
-                               .thenAnswer(new 
Answer<Future<RecordMetadata>>() {
-                                       @Override
-                                       public Future<RecordMetadata> 
answer(InvocationOnMock invocation) throws Throwable {
-                                               Callback callback = (Callback) 
invocation.getArguments()[1];
-                                               callback.onCompletion(null, new 
Exception("Test error"));
-                                               return null;
-                                       }
-                               });
-                       
-                       // make sure the FlinkKafkaProducer instantiates our 
mock producer
-                       
whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
-                       
-                       // (1) producer that propagates errors
-
-                       FlinkKafkaProducer08<String> producerPropagating = new 
FlinkKafkaProducer08<>(
-                                       "mock_topic", new SimpleStringSchema(), 
FakeStandardProducerConfig.get(), null);
-
-                       OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
-                                       new 
OneInputStreamOperatorTestHarness<>(new StreamSink<>(producerPropagating));
-
-                       testHarness.open();
-
-                       try {
-                               testHarness.processElement(new 
StreamRecord<>("value"));
-                               testHarness.processElement(new 
StreamRecord<>("value"));
-                               fail("This should fail with an exception");
-                       }
-                       catch (Exception e) {
-                               assertNotNull(e.getCause());
-                               assertNotNull(e.getCause().getMessage());
-                               
assertTrue(e.getCause().getMessage().contains("Test error"));
-                       }
-
-                       testHarness.close();
-
-                       // (2) producer that only logs errors
-
-                       FlinkKafkaProducer08<String> producerLogging = new 
FlinkKafkaProducer08<>(
-                                       "mock_topic", new SimpleStringSchema(), 
FakeStandardProducerConfig.get(), null);
-                       producerLogging.setLogFailuresOnly(true);
-
-                       testHarness = new 
OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));
-
-                       testHarness.open();
-
-                       testHarness.processElement(new StreamRecord<>("value"));
-                       testHarness.processElement(new StreamRecord<>("value"));
-
-                       testHarness.close();
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-}

Reply via email to