http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
new file mode 100644
index 0000000..9fec52d
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
@@ -0,0 +1,652 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import kafka.api.FetchRequestBuilder;
+import kafka.api.OffsetRequest;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.ErrorMapping;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.MessageAndOffset;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+
+import org.apache.flink.util.StringUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * This fetcher uses Kafka's low-level API to pull data from a specific
+ * set of topics and partitions.
+ * 
+ * <p>This code is in parts based on the tutorial code for the low-level Kafka 
consumer.</p>
+ */
+public class LegacyFetcher implements Fetcher {
+       
+       private static final Logger LOG = 
LoggerFactory.getLogger(LegacyFetcher.class);
+
+       
+       /** The properties that configure the Kafka connection */
+       private final Properties config;
+       
+       /** The task name, to give more readable names to the spawned threads */
+       private final String taskName;
+       
+       /** The first error that occurred in a connection thread */
+       private final AtomicReference<Throwable> error;
+
+       /** The partitions that the fetcher should read, with their starting 
offsets */
+       private Map<KafkaTopicPartitionLeader, Long> partitionsToRead;
+
+       /** The seek() method might receive KafkaTopicPartition's without 
leader information
+        * (for example when restoring).
+        * If there are elements in this list, we'll fetch the leader from 
Kafka.
+        **/
+       private Map<KafkaTopicPartition, Long> partitionsToReadWithoutLeader;
+       
+       /** Reference the the thread that executed the run() method. */
+       private volatile Thread mainThread;
+       
+       /** Flag to shot the fetcher down */
+       private volatile boolean running = true;
+
+       public LegacyFetcher(List<KafkaTopicPartitionLeader> partitions, 
Properties props, String taskName) {
+               this.config = checkNotNull(props, "The config properties cannot 
be null");
+               //this.topic = checkNotNull(topic, "The topic cannot be null");
+               this.partitionsToRead = new HashMap<>();
+               for (KafkaTopicPartitionLeader p: partitions) {
+                       partitionsToRead.put(p, 
FlinkKafkaConsumer08.OFFSET_NOT_SET);
+               }
+               this.taskName = taskName;
+               this.error = new AtomicReference<>();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Fetcher methods
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public void seek(KafkaTopicPartition topicPartition, long offsetToRead) 
{
+               if (partitionsToRead == null) {
+                       throw new IllegalArgumentException("No partitions to 
read set");
+               }
+               if (!topicPartition.isContained(partitionsToRead)) {
+                       throw new IllegalArgumentException("Can not set offset 
on a partition (" + topicPartition
+                                       + ") we are not going to read. 
Partitions to read " + partitionsToRead);
+               }
+               if (partitionsToReadWithoutLeader == null) {
+                       partitionsToReadWithoutLeader = new HashMap<>();
+               }
+               partitionsToReadWithoutLeader.put(topicPartition, offsetToRead);
+       }
+       
+       @Override
+       public void close() {
+               // flag needs to be check by the run() method that creates the 
spawned threads
+               this.running = false;
+               
+               // all other cleanup is made by the run method itself
+       }
+
+       @Override
+       public <T> void run(SourceFunction.SourceContext<T> sourceContext,
+                                               KeyedDeserializationSchema<T> 
deserializer,
+                                               HashMap<KafkaTopicPartition, 
Long> lastOffsets) throws Exception {
+               
+               if (partitionsToRead == null || partitionsToRead.size() == 0) {
+                       throw new IllegalArgumentException("No partitions set");
+               }
+               
+               // NOTE: This method is needs to always release all resources 
it acquires
+               
+               this.mainThread = Thread.currentThread();
+
+               LOG.info("Reading from partitions " + partitionsToRead + " 
using the legacy fetcher");
+
+               // get lead broker if necessary
+               if (partitionsToReadWithoutLeader != null && 
partitionsToReadWithoutLeader.size() > 0) {
+                       LOG.info("Refreshing leader information for partitions 
{}", KafkaTopicPartition.toString(partitionsToReadWithoutLeader));
+                       // NOTE: The kafka client apparently locks itself in an 
infinite loop sometimes
+                       // when it is interrupted, so we run it only in a 
separate thread.
+                       // since it sometimes refuses to shut down, we resort 
to the admittedly harsh
+                       // means of killing the thread after a timeout.
+                       PartitionInfoFetcher infoFetcher = new 
PartitionInfoFetcher(KafkaTopicPartition.getTopics(partitionsToReadWithoutLeader),
 config);
+                       infoFetcher.start();
+
+                       KillerWatchDog watchDog = new 
KillerWatchDog(infoFetcher, 60000);
+                       watchDog.start();
+
+                       List<KafkaTopicPartitionLeader> 
topicPartitionWithLeaderList = infoFetcher.getPartitions();
+
+                       // replace potentially outdated leader information in 
partitionsToRead with fresh data from topicPartitionWithLeader
+                       for (Map.Entry<KafkaTopicPartition, Long> pt: 
partitionsToReadWithoutLeader.entrySet()) {
+                               KafkaTopicPartitionLeader 
topicPartitionWithLeader = null;
+                               // go through list
+                               for (KafkaTopicPartitionLeader withLeader: 
topicPartitionWithLeaderList) {
+                                       if 
(withLeader.getTopicPartition().equals(pt.getKey())) {
+                                               topicPartitionWithLeader = 
withLeader;
+                                               break;
+                                       }
+                               }
+                               if (topicPartitionWithLeader == null) {
+                                       throw new IllegalStateException("Unable 
to find topic/partition leader information");
+                               }
+                               Long removed = 
KafkaTopicPartitionLeader.replaceIgnoringLeader(topicPartitionWithLeader, 
pt.getValue(), partitionsToRead);
+                               if (removed == null) {
+                                       throw new IllegalStateException("Seek 
request on unknown topic partition");
+                               }
+                       }
+               }
+
+
+               // build a map for each broker with its partitions
+               Map<Node, List<FetchPartition>> fetchBrokers = new HashMap<>();
+
+               for (Map.Entry<KafkaTopicPartitionLeader, Long> entry : 
partitionsToRead.entrySet()) {
+                       final KafkaTopicPartitionLeader topicPartition = 
entry.getKey();
+                       final long offset = entry.getValue();
+
+                       List<FetchPartition> partitions = 
fetchBrokers.get(topicPartition.getLeader());
+                       if (partitions == null) {
+                               partitions = new ArrayList<>();
+                               fetchBrokers.put(topicPartition.getLeader(), 
partitions);
+                       }
+
+                       partitions.add(new 
FetchPartition(topicPartition.getTopicPartition().getTopic(), 
topicPartition.getTopicPartition().getPartition(), offset));
+               }
+
+               // create SimpleConsumers for each broker
+               ArrayList<SimpleConsumerThread<?>> consumers = new 
ArrayList<>(fetchBrokers.size());
+               
+               for (Map.Entry<Node, List<FetchPartition>> brokerInfo : 
fetchBrokers.entrySet()) {
+                       final Node broker = brokerInfo.getKey();
+                       final List<FetchPartition> partitionsList = 
brokerInfo.getValue();
+                       
+                       FetchPartition[] partitions = 
partitionsList.toArray(new FetchPartition[partitionsList.size()]);
+
+                       SimpleConsumerThread<T> thread = new 
SimpleConsumerThread<>(this, config,
+                                       broker, partitions, sourceContext, 
deserializer, lastOffsets);
+
+                       thread.setName(String.format("SimpleConsumer - %s - 
broker-%s (%s:%d)",
+                                       taskName, broker.id(), broker.host(), 
broker.port()));
+                       thread.setDaemon(true);
+                       consumers.add(thread);
+               }
+               
+               // last check whether we should abort.
+               if (!running) {
+                       return;
+               }
+               
+               // start all consumer threads
+               for (SimpleConsumerThread<?> t : consumers) {
+                       LOG.info("Starting thread {}", t.getName());
+                       t.start();
+               }
+               
+               // wait until all consumer threads are done, or until we are 
aborted, or until
+               // an error occurred in one of the fetcher threads
+               try {
+                       boolean someConsumersRunning = true;
+                       while (running && error.get() == null && 
someConsumersRunning) {
+                               try {
+                                       // wait for the consumer threads. if an 
error occurs, we are interrupted
+                                       for (SimpleConsumerThread<?> t : 
consumers) {
+                                               t.join();
+                                       }
+       
+                                       // safety net
+                                       someConsumersRunning = false;
+                                       for (SimpleConsumerThread<?> t : 
consumers) {
+                                               someConsumersRunning |= 
t.isAlive();
+                                       }
+                               }
+                               catch (InterruptedException e) {
+                                       // ignore. we should notice what 
happened in the next loop check
+                               }
+                       }
+                       
+                       // make sure any asynchronous error is noticed
+                       Throwable error = this.error.get();
+                       if (error != null) {
+                               throw new Exception(error.getMessage(), error);
+                       }
+               }
+               finally {
+                       // make sure that in any case (completion, abort, 
error), all spawned threads are stopped
+                       for (SimpleConsumerThread<?> t : consumers) {
+                               if (t.isAlive()) {
+                                       t.cancel();
+                               }
+                       }
+               }
+       }
+       
+       /**
+        * Reports an error from a fetch thread. This will cause the main 
thread to see this error,
+        * abort, and cancel all other fetch threads.
+        * 
+        * @param error The error to report.
+        */
+       @Override
+       public void stopWithError(Throwable error) {
+               if (this.error.compareAndSet(null, error)) {
+                       // we are the first to report an error
+                       if (mainThread != null) {
+                               mainThread.interrupt();
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Representation of a partition to fetch.
+        */
+       private static class FetchPartition {
+
+               final String topic;
+               
+               /** ID of the partition within the topic (0 indexed, as given 
by Kafka) */
+               final int partition;
+               
+               /** Offset pointing at the next element to read from that 
partition. */
+               long nextOffsetToRead;
+
+               FetchPartition(String topic, int partition, long 
nextOffsetToRead) {
+                       this.topic = topic;
+                       this.partition = partition;
+                       this.nextOffsetToRead = nextOffsetToRead;
+               }
+               
+               @Override
+               public String toString() {
+                       return "FetchPartition {topic=" + topic +", partition=" 
+ partition + ", offset=" + nextOffsetToRead + '}';
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Per broker fetcher
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * Each broker needs its separate connection. This thread implements 
the connection to
+        * one broker. The connection can fetch multiple partitions from the 
broker.
+        * 
+        * @param <T> The data type fetched.
+        */
+       private static class SimpleConsumerThread<T> extends Thread {
+               
+               private final SourceFunction.SourceContext<T> sourceContext;
+               private final KeyedDeserializationSchema<T> deserializer;
+               private final HashMap<KafkaTopicPartition, Long> offsetsState;
+               
+               private final FetchPartition[] partitions;
+               
+               private final Node broker;
+
+               private final Properties config;
+
+               private final LegacyFetcher owner;
+
+               private SimpleConsumer consumer;
+               
+               private volatile boolean running = true;
+
+
+               // exceptions are thrown locally
+               public SimpleConsumerThread(LegacyFetcher owner,
+                                                                       
Properties config,
+                                                                       Node 
broker,
+                                                                       
FetchPartition[] partitions,
+                                                                       
SourceFunction.SourceContext<T> sourceContext,
+                                                                       
KeyedDeserializationSchema<T> deserializer,
+                                                                       
HashMap<KafkaTopicPartition, Long> offsetsState) {
+                       this.owner = owner;
+                       this.config = config;
+                       this.broker = broker;
+                       this.partitions = partitions;
+                       this.sourceContext = checkNotNull(sourceContext);
+                       this.deserializer = checkNotNull(deserializer);
+                       this.offsetsState = checkNotNull(offsetsState);
+               }
+
+               @Override
+               public void run() {
+                       LOG.info("Starting to fetch from {}", 
Arrays.toString(this.partitions));
+                       try {
+                               // set up the config values
+                               final String clientId = 
"flink-kafka-consumer-legacy-" + broker.id();
+
+                               // these are the actual configuration values of 
Kafka + their original default values.
+                               final int soTimeout = 
Integer.valueOf(config.getProperty("socket.timeout.ms", "30000"));
+                               final int bufferSize = 
Integer.valueOf(config.getProperty("socket.receive.buffer.bytes", "65536"));
+                               final int fetchSize = 
Integer.valueOf(config.getProperty("fetch.message.max.bytes", "1048576"));
+                               final int maxWait = 
Integer.valueOf(config.getProperty("fetch.wait.max.ms", "100"));
+                               final int minBytes = 
Integer.valueOf(config.getProperty("fetch.min.bytes", "1"));
+                               
+                               // create the Kafka consumer that we actually 
use for fetching
+                               consumer = new SimpleConsumer(broker.host(), 
broker.port(), soTimeout, bufferSize, clientId);
+
+                               // make sure that all partitions have some 
offsets to start with
+                               // those partitions that do not have an offset 
from a checkpoint need to get
+                               // their start offset from ZooKeeper
+                               {
+                                       List<FetchPartition> 
partitionsToGetOffsetsFor = new ArrayList<>();
+
+                                       for (FetchPartition fp : partitions) {
+                                               if (fp.nextOffsetToRead == 
FlinkKafkaConsumer08.OFFSET_NOT_SET) {
+                                                       // retrieve the offset 
from the consumer
+                                                       
partitionsToGetOffsetsFor.add(fp);
+                                               }
+                                       }
+                                       if (partitionsToGetOffsetsFor.size() > 
0) {
+                                               getLastOffset(consumer, 
partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config));
+                                               LOG.info("No prior offsets 
found for some partitions. Fetched the following start offsets {}", 
partitionsToGetOffsetsFor);
+                                       }
+                               }
+                               
+                               // Now, the actual work starts :-)
+                               int offsetOutOfRangeCount = 0;
+                               fetchLoop: while (running) {
+                                       FetchRequestBuilder frb = new 
FetchRequestBuilder();
+                                       frb.clientId(clientId);
+                                       frb.maxWait(maxWait);
+                                       frb.minBytes(minBytes);
+                                       
+                                       for (FetchPartition fp : partitions) {
+                                               frb.addFetch(fp.topic, 
fp.partition, fp.nextOffsetToRead, fetchSize);
+                                       }
+                                       kafka.api.FetchRequest fetchRequest = 
frb.build();
+                                       LOG.debug("Issuing fetch request {}", 
fetchRequest);
+
+                                       FetchResponse fetchResponse = 
consumer.fetch(fetchRequest);
+
+                                       if (fetchResponse.hasError()) {
+                                               String exception = "";
+                                               List<FetchPartition> 
partitionsToGetOffsetsFor = new ArrayList<>();
+                                               for (FetchPartition fp : 
partitions) {
+                                                       short code = 
fetchResponse.errorCode(fp.topic, fp.partition);
+
+                                                       if (code == 
ErrorMapping.OffsetOutOfRangeCode()) {
+                                                               // we were 
asked to read from an out-of-range-offset (maybe set wrong in Zookeeper)
+                                                               // Kafka's high 
level consumer is resetting the offset according to 'auto.offset.reset'
+                                                               
partitionsToGetOffsetsFor.add(fp);
+                                                       } else if (code != 
ErrorMapping.NoError()) {
+                                                               exception += 
"\nException for partition " + fp.partition + ": " +
+                                                                               
StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
+                                                       }
+                                               }
+                                               if 
(partitionsToGetOffsetsFor.size() > 0) {
+                                                       // safeguard against an 
infinite loop.
+                                                       if 
(offsetOutOfRangeCount++ > 0) {
+                                                               throw new 
RuntimeException("Found invalid offsets more than once in partitions 
"+partitionsToGetOffsetsFor.toString()+" " +
+                                                                               
"Exceptions: "+exception);
+                                                       }
+                                                       // get valid offsets 
for these partitions and try again.
+                                                       LOG.warn("The following 
partitions had an invalid offset: {}", partitionsToGetOffsetsFor);
+                                                       getLastOffset(consumer, 
partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config));
+                                                       LOG.warn("The new 
partition offsets are {}", partitionsToGetOffsetsFor);
+                                                       continue; // jump back 
to create a new fetch request. The offset has not been touched.
+                                               } else {
+                                                       // all partitions 
failed on an error
+                                                       throw new 
IOException("Error while fetching from broker: " + exception);
+                                               }
+                                       }
+
+                                       int messagesInFetch = 0;
+                                       int deletedMessages = 0;
+                                       for (FetchPartition fp : partitions) {
+                                               final ByteBufferMessageSet 
messageSet = fetchResponse.messageSet(fp.topic, fp.partition);
+                                               final KafkaTopicPartition 
topicPartition = new KafkaTopicPartition(fp.topic, fp.partition);
+                                               
+                                               for (MessageAndOffset msg : 
messageSet) {
+                                                       if (running) {
+                                                               
messagesInFetch++;
+                                                               if 
(msg.offset() < fp.nextOffsetToRead) {
+                                                                       // we 
have seen this message already
+                                                                       
LOG.info("Skipping message with offset " + msg.offset()
+                                                                               
        + " because we have seen messages until " + fp.nextOffsetToRead
+                                                                               
        + " from partition " + fp.partition + " already");
+                                                                       
continue;
+                                                               }
+
+                                                               final long 
offset = msg.offset();
+
+                                                               ByteBuffer 
payload = msg.message().payload();
+
+                                                               // If the 
message value is null, this represents a delete command for the message key.
+                                                               // Log this and 
pass it on to the client who might want to also receive delete messages.
+                                                               byte[] 
valueBytes;
+                                                               if (payload == 
null) {
+                                                                       
deletedMessages++;
+                                                                       
valueBytes = null;
+                                                               } else {
+                                                                       
valueBytes = new byte[payload.remaining()];
+                                                                       
payload.get(valueBytes);
+                                                               }
+
+                                                               // put key into 
byte array
+                                                               byte[] keyBytes 
= null;
+                                                               int keySize = 
msg.message().keySize();
+
+                                                               if (keySize >= 
0) { // message().hasKey() is doing the same. We save one int deserialization
+                                                                       
ByteBuffer keyPayload = msg.message().key();
+                                                                       
keyBytes = new byte[keySize];
+                                                                       
keyPayload.get(keyBytes);
+                                                               }
+
+                                                               final T value = 
deserializer.deserialize(keyBytes, valueBytes, fp.topic, fp.partition, offset);
+                                                               
if(deserializer.isEndOfStream(value)) {
+                                                                       running 
= false;
+                                                                       break 
fetchLoop; // leave running loop
+                                                               }
+                                                               synchronized 
(sourceContext.getCheckpointLock()) {
+                                                                       
sourceContext.collect(value);
+                                                                       
offsetsState.put(topicPartition, offset);
+                                                               }
+                                                               
+                                                               // advance 
offset for the next request
+                                                               
fp.nextOffsetToRead = offset + 1;
+                                                       }
+                                                       else {
+                                                               // no longer 
running
+                                                               return;
+                                                       }
+                                               }
+                                       }
+                                       LOG.debug("This fetch contained {} 
messages ({} deleted messages)", messagesInFetch, deletedMessages);
+                               }
+                       }
+                       catch (Throwable t) {
+                               // report to the main thread
+                               owner.stopWithError(t);
+                       }
+                       finally {
+                               // end of run loop. close connection to consumer
+                               if (consumer != null) {
+                                       // closing the consumer should not fail 
the program
+                                       try {
+                                               consumer.close();
+                                       }
+                                       catch (Throwable t) {
+                                               LOG.error("Error while closing 
the Kafka simple consumer", t);
+                                       }
+                               }
+                       }
+               }
+
+               /**
+                * Cancels this fetch thread. The thread will release all 
resources and terminate.
+                */
+               public void cancel() {
+                       this.running = false;
+                       
+                       // interrupt whatever the consumer is doing
+                       if (consumer != null) {
+                               consumer.close();
+                       }
+                       
+                       this.interrupt();
+               }
+
+               /**
+                * Request latest offsets for a set of partitions, via a Kafka 
consumer.
+                *
+                * @param consumer The consumer connected to lead broker
+                * @param partitions The list of partitions we need offsets for
+                * @param whichTime The type of time we are requesting. -1 and 
-2 are special constants (See OffsetRequest)
+                */
+               private static void getLastOffset(SimpleConsumer consumer, 
List<FetchPartition> partitions, long whichTime) {
+
+                       Map<TopicAndPartition, PartitionOffsetRequestInfo> 
requestInfo = new HashMap<>();
+                       for (FetchPartition fp: partitions) {
+                               TopicAndPartition topicAndPartition = new 
TopicAndPartition(fp.topic, fp.partition);
+                               requestInfo.put(topicAndPartition, new 
PartitionOffsetRequestInfo(whichTime, 1));
+                       }
+
+                       kafka.javaapi.OffsetRequest request = new 
kafka.javaapi.OffsetRequest(requestInfo, 
kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
+                       OffsetResponse response = 
consumer.getOffsetsBefore(request);
+
+                       if (response.hasError()) {
+                               String exception = "";
+                               for (FetchPartition fp: partitions) {
+                                       short code;
+                                       if ( (code=response.errorCode(fp.topic, 
fp.partition)) != ErrorMapping.NoError()) {
+                                               exception += "\nException for 
partition "+fp.partition+": "+ 
StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
+                                       }
+                               }
+                               throw new RuntimeException("Unable to get last 
offset for partitions " + partitions + ". " + exception);
+                       }
+
+                       for (FetchPartition fp: partitions) {
+                               // the resulting offset is the next offset we 
are going to read
+                               // for not-yet-consumed partitions, it is 0.
+                               fp.nextOffsetToRead = 
response.offsets(fp.topic, fp.partition)[0];
+                       }
+               }
+
+               private static long getInvalidOffsetBehavior(Properties config) 
{
+                       long timeType;
+                       if 
(config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"latest").equals("latest")) {
+                               timeType = OffsetRequest.LatestTime();
+                       } else {
+                               timeType = OffsetRequest.EarliestTime();
+                       }
+                       return timeType;
+               }
+       }
+
+
+       private static class PartitionInfoFetcher extends Thread {
+
+               private final List<String> topics;
+               private final Properties properties;
+
+               private volatile List<KafkaTopicPartitionLeader> result;
+               private volatile Throwable error;
+
+
+               PartitionInfoFetcher(List<String> topics, Properties 
properties) {
+                       this.topics = topics;
+                       this.properties = properties;
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               result = 
FlinkKafkaConsumer08.getPartitionsForTopic(topics, properties);
+                       }
+                       catch (Throwable t) {
+                               this.error = t;
+                       }
+               }
+
+               public List<KafkaTopicPartitionLeader> getPartitions() throws 
Exception {
+                       try {
+                               this.join();
+                       }
+                       catch (InterruptedException e) {
+                               throw new Exception("Partition fetching was 
cancelled before completion");
+                       }
+
+                       if (error != null) {
+                               throw new Exception("Failed to fetch partitions 
for topics " + topics.toString(), error);
+                       }
+                       if (result != null) {
+                               return result;
+                       }
+                       throw new Exception("Partition fetching failed");
+               }
+       }
+
+       private static class KillerWatchDog extends Thread {
+
+               private final Thread toKill;
+               private final long timeout;
+
+               private KillerWatchDog(Thread toKill, long timeout) {
+                       super("KillerWatchDog");
+                       setDaemon(true);
+
+                       this.toKill = toKill;
+                       this.timeout = timeout;
+               }
+
+               @SuppressWarnings("deprecation")
+               @Override
+               public void run() {
+                       final long deadline = System.currentTimeMillis() + 
timeout;
+                       long now;
+
+                       while (toKill.isAlive() && (now = 
System.currentTimeMillis()) < deadline) {
+                               try {
+                                       toKill.join(deadline - now);
+                               }
+                               catch (InterruptedException e) {
+                                       // ignore here, our job is important!
+                               }
+                       }
+
+                       // this is harsh, but this watchdog is a last resort
+                       if (toKill.isAlive()) {
+                               toKill.stop();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
new file mode 100644
index 0000000..fdd89c6
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The offset handler is responsible for locating the initial partition 
offsets 
+ * where the source should start reading, as well as committing offsets from 
completed
+ * checkpoints.
+ */
+public interface OffsetHandler {
+
+       /**
+        * Commits the given offset for the partitions. May commit the offsets 
to the Kafka broker,
+        * or to ZooKeeper, based on its configured behavior.
+        *
+        * @param offsetsToCommit The offset to commit, per partition.
+        */
+       void commit(Map<KafkaTopicPartition, Long> offsetsToCommit) throws 
Exception;
+
+       /**
+        * Positions the given fetcher to the initial read offsets where the 
stream consumption
+        * will start from.
+        * 
+        * @param partitions The partitions for which to seeks the fetcher to 
the beginning.
+        * @param fetcher The fetcher that will pull data from Kafka and must 
be positioned.
+        */
+       void seekFetcherToInitialOffsets(List<KafkaTopicPartitionLeader> 
partitions, Fetcher fetcher) throws Exception;
+
+       /**
+        * Closes the offset handler, releasing all resources.
+        * 
+        * @throws IOException Thrown, if the closing fails.
+        */
+       void close() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
new file mode 100644
index 0000000..a38c3bd
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import kafka.producer.Partitioner;
+import kafka.utils.VerifiableProperties;
+
+/**
+ * Hacky wrapper to send an object instance through a Properties - map.
+ *
+ * This works as follows:
+ * The recommended way of creating a KafkaSink is specifying a classname for 
the partitioner.
+ *
+ * Otherwise (if the user gave a (serializable) class instance), we give Kafka 
the PartitionerWrapper class of Flink.
+ * This is set in the key-value (java.util.Properties) map.
+ * In addition to that, we use the Properties.put(Object, Object) to store the 
instance of the (serializable).
+ * This is a hack because the put() method is called on the underlying Hashmap.
+ *
+ * This PartitionerWrapper is called with the Properties. From there, we 
extract the wrapped Partitioner instance.
+ *
+ * The serializable PartitionerWrapper is serialized into the Properties 
Hashmap and also deserialized from there.
+ */
+public class PartitionerWrapper implements Partitioner {
+       public final static String SERIALIZED_WRAPPER_NAME = 
"flink.kafka.wrapper.serialized";
+
+       private Partitioner wrapped;
+       public PartitionerWrapper(VerifiableProperties properties) {
+               wrapped = (Partitioner) 
properties.props().get(SERIALIZED_WRAPPER_NAME);
+       }
+
+       @Override
+       public int partition(Object value, int numberOfPartitions) {
+               return wrapped.partition(value, numberOfPartitions);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
new file mode 100644
index 0000000..1eca4dd
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import kafka.utils.ZKGroupTopicDirs;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Handler for committing Kafka offsets to Zookeeper and to retrieve them 
again.
+ */
+public class ZookeeperOffsetHandler implements OffsetHandler {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
+       
+       private static final long OFFSET_NOT_SET = 
FlinkKafkaConsumer08.OFFSET_NOT_SET;
+
+       private final String groupId;
+
+       private final CuratorFramework curatorClient;
+
+
+       public ZookeeperOffsetHandler(Properties props) {
+               this.groupId = 
props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
+               if (this.groupId == null) {
+                       throw new IllegalArgumentException("Required property '"
+                                       + ConsumerConfig.GROUP_ID_CONFIG + "' 
has not been set");
+               }
+               
+               String zkConnect = props.getProperty("zookeeper.connect");
+               if (zkConnect == null) {
+                       throw new IllegalArgumentException("Required property 
'zookeeper.connect' has not been set");
+               }
+
+               // we use Curator's default timeouts
+               int sessionTimeoutMs =  
Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "60000"));
+               int connectionTimeoutMs = 
Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "15000"));
+               
+               // undocumented config options allowing users to configure the 
retry policy. (they are "flink." prefixed as they are no official kafka configs)
+               int backoffBaseSleepTime = 
Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100"));
+               int backoffMaxRetries =  
Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10"));
+               
+               RetryPolicy retryPolicy = new 
ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries);
+               curatorClient = CuratorFrameworkFactory.newClient(zkConnect, 
sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
+               curatorClient.start();
+       }
+
+
+       @Override
+       public void commit(Map<KafkaTopicPartition, Long> offsetsToCommit) 
throws Exception {
+               for (Map.Entry<KafkaTopicPartition, Long> entry : 
offsetsToCommit.entrySet()) {
+                       KafkaTopicPartition tp = entry.getKey();
+                       long offset = entry.getValue();
+                       
+                       if (offset >= 0) {
+                               setOffsetInZooKeeper(curatorClient, groupId, 
tp.getTopic(), tp.getPartition(), offset);
+                       }
+               }
+       }
+
+       @Override
+       public void seekFetcherToInitialOffsets(List<KafkaTopicPartitionLeader> 
partitions, Fetcher fetcher) throws Exception {
+               for (KafkaTopicPartitionLeader tp : partitions) {
+                       long offset = getOffsetFromZooKeeper(curatorClient, 
groupId, tp.getTopicPartition().getTopic(), 
tp.getTopicPartition().getPartition());
+
+                       if (offset != OFFSET_NOT_SET) {
+                               LOG.info("Offset for partition {} was set to {} 
in ZooKeeper. Seeking fetcher to that position.",
+                                               
tp.getTopicPartition().getPartition(), offset);
+
+                               // the offset in Zookeeper was the last read 
offset, seek is accepting the next-to-read-offset.
+                               fetcher.seek(tp.getTopicPartition(), offset + 
1);
+                       }
+               }
+       }
+
+       @Override
+       public void close() throws IOException {
+               curatorClient.close();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Communication with Zookeeper
+       // 
------------------------------------------------------------------------
+       
+       public static void setOffsetInZooKeeper(CuratorFramework curatorClient, 
String groupId, String topic, int partition, long offset) throws Exception {
+               ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, 
topic);
+               String path = topicDirs.consumerOffsetDir() + "/" + partition;
+               
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
+               byte[] data = Long.toString(offset).getBytes();
+               curatorClient.setData().forPath(path, data);
+       }
+
+       public static long getOffsetFromZooKeeper(CuratorFramework 
curatorClient, String groupId, String topic, int partition) throws Exception {
+               ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, 
topic);
+               String path = topicDirs.consumerOffsetDir() + "/" + partition;
+               
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
+               
+               byte[] data = curatorClient.getData().forPath(path);
+               
+               if (data == null) {
+                       return OFFSET_NOT_SET;
+               } else {
+                       String asString = new String(data);
+                       if (asString.length() == 0) {
+                               return OFFSET_NOT_SET;
+                       } else {
+                               try {
+                                       return Long.parseLong(asString);
+                               } catch (NumberFormatException e) {
+                                       throw new Exception(String.format(
+                                               "The offset in ZooKeeper for 
group '%s', topic '%s', partition %d is a malformed string: %s",
+                                               groupId, topic, partition, 
asString));
+                               }
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
new file mode 100644
index 0000000..26e31f5
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.api.java.functions.FlatMapIterator;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.Properties;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class Kafka08ITCase extends KafkaConsumerTestBase {
+
+       // 
------------------------------------------------------------------------
+       //  Suite of Tests
+       // 
------------------------------------------------------------------------
+
+       @Test(timeout = 60000)
+       public void testCheckpointing() throws Exception {
+               runCheckpointingTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testFailOnNoBroker() throws Exception {
+               runFailOnNoBrokerTest();
+       }
+
+
+       @Test(timeout = 60000)
+       public void testConcurrentProducerConsumerTopology() throws Exception {
+               runSimpleConcurrentProducerConsumerTopology();
+       }
+
+       @Test(timeout = 60000)
+       public void testKeyValueSupport() throws Exception {
+               runKeyValueTest();
+       }
+
+       // --- canceling / failures ---
+
+       @Test(timeout = 60000)
+       public void testCancelingEmptyTopic() throws Exception {
+               runCancelingOnEmptyInputTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testCancelingFullTopic() throws Exception {
+               runCancelingOnFullInputTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testFailOnDeploy() throws Exception {
+               runFailOnDeployTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testInvalidOffset() throws Exception {
+               final String topic = "invalidOffsetTopic";
+               final int parallelism = 1;
+
+               // create topic
+               createTestTopic(topic, parallelism, 1);
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+
+               // write 20 messages into topic:
+               writeSequence(env, topic, 20, parallelism);
+
+               // set invalid offset:
+               CuratorFramework curatorClient = 
((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
+               ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, 
standardCC.groupId(), topic, 0, 1234);
+               curatorClient.close();
+
+               // read from topic
+               final int valuesCount = 20;
+               final int startFrom = 0;
+               readSequence(env, standardCC.props().props(), parallelism, 
topic, valuesCount, startFrom);
+
+               deleteTestTopic(topic);
+       }
+
+       // --- source to partition mappings and exactly once ---
+
+       @Test(timeout = 60000)
+       public void testOneToOneSources() throws Exception {
+               runOneToOneExactlyOnceTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testOneSourceMultiplePartitions() throws Exception {
+               runOneSourceMultiplePartitionsExactlyOnceTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testMultipleSourcesOnePartition() throws Exception {
+               runMultipleSourcesOnePartitionExactlyOnceTest();
+       }
+
+       // --- broker failure ---
+
+       @Test(timeout = 60000)
+       public void testBrokerFailure() throws Exception {
+               runBrokerFailureTest();
+       }
+
+       // --- special executions ---
+
+       @Test(timeout = 60000)
+       public void testBigRecordJob() throws Exception {
+               runBigRecordTestTopology();
+       }
+
+       @Test(timeout = 60000)
+       public void testMultipleTopics() throws Exception {
+               runConsumeMultipleTopics();
+       }
+
+       @Test(timeout = 60000)
+       public void testAllDeletes() throws Exception {
+               runAllDeletesTest();
+       }
+
+       @Test(timeout=60000)
+       public void testMetricsAndEndOfStream() throws Exception {
+               runMetricsAndEndOfStreamTest();
+       }
+
+
+       /**
+        * Tests that offsets are properly committed to ZooKeeper and initial 
offsets are read from ZooKeeper.
+        *
+        * This test is only applicable if the Flink Kafka Consumer uses the 
ZooKeeperOffsetHandler.
+        */
+       @Test(timeout = 60000)
+       public void testOffsetInZookeeper() throws Exception {
+               final String topicName = "testOffsetInZK";
+               final int parallelism = 3;
+
+               createTestTopic(topicName, parallelism, 1);
+
+               StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               env1.getConfig().disableSysoutLogging();
+               env1.enableCheckpointing(50);
+               env1.setNumberOfExecutionRetries(0);
+               env1.setParallelism(parallelism);
+
+               StreamExecutionEnvironment env2 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               env2.getConfig().disableSysoutLogging();
+               env2.enableCheckpointing(50);
+               env2.setNumberOfExecutionRetries(0);
+               env2.setParallelism(parallelism);
+
+               StreamExecutionEnvironment env3 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               env3.getConfig().disableSysoutLogging();
+               env3.enableCheckpointing(50);
+               env3.setNumberOfExecutionRetries(0);
+               env3.setParallelism(parallelism);
+
+               // write a sequence from 0 to 99 to each of the 3 partitions.
+               writeSequence(env1, topicName, 100, parallelism);
+
+               readSequence(env2, standardProps, parallelism, topicName, 100, 
0);
+
+               CuratorFramework curatorClient = 
((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
+
+               long o1 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, 
standardCC.groupId(), topicName, 0);
+               long o2 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, 
standardCC.groupId(), topicName, 1);
+               long o3 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, 
standardCC.groupId(), topicName, 2);
+
+               LOG.info("Got final offsets from zookeeper o1={}, o2={}, 
o3={}", o1, o2, o3);
+
+               assertTrue(o1 == FlinkKafkaConsumer08.OFFSET_NOT_SET || (o1 >= 
0 && o1 <= 100));
+               assertTrue(o2 == FlinkKafkaConsumer08.OFFSET_NOT_SET || (o2 >= 
0 && o2 <= 100));
+               assertTrue(o3 == FlinkKafkaConsumer08.OFFSET_NOT_SET || (o3 >= 
0 && o3 <= 100));
+
+               LOG.info("Manipulating offsets");
+
+               // set the offset to 50 for the three partitions
+               ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, 
standardCC.groupId(), topicName, 0, 49);
+               ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, 
standardCC.groupId(), topicName, 1, 49);
+               ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, 
standardCC.groupId(), topicName, 2, 49);
+
+               curatorClient.close();
+
+               // create new env
+               readSequence(env3, standardProps, parallelism, topicName, 50, 
50);
+
+               deleteTestTopic(topicName);
+       }
+
+       @Test(timeout = 60000)
+       public void testOffsetAutocommitTest() throws Exception {
+               final String topicName = "testOffsetAutocommit";
+               final int parallelism = 3;
+
+               createTestTopic(topicName, parallelism, 1);
+
+               StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               env1.getConfig().disableSysoutLogging();
+               env1.setNumberOfExecutionRetries(0);
+               env1.setParallelism(parallelism);
+
+               StreamExecutionEnvironment env2 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               // NOTE: We are not enabling the checkpointing!
+               env2.getConfig().disableSysoutLogging();
+               env2.setNumberOfExecutionRetries(0);
+               env2.setParallelism(parallelism);
+
+
+               // write a sequence from 0 to 99 to each of the 3 partitions.
+               writeSequence(env1, topicName, 100, parallelism);
+
+
+               // the readSequence operation sleeps for 20 ms between each 
record.
+               // setting a delay of 25*20 = 500 for the commit interval makes
+               // sure that we commit roughly 3-4 times while reading, however
+               // at least once.
+               Properties readProps = new Properties();
+               readProps.putAll(standardProps);
+               readProps.setProperty("auto.commit.interval.ms", "500");
+
+               // read so that the offset can be committed to ZK
+               readSequence(env2, readProps, parallelism, topicName, 100, 0);
+
+               // get the offset
+               CuratorFramework curatorFramework = 
((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
+
+               long o1 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardCC.groupId(), topicName, 0);
+               long o2 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardCC.groupId(), topicName, 1);
+               long o3 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardCC.groupId(), topicName, 2);
+
+               LOG.info("Got final offsets from zookeeper o1={}, o2={}, 
o3={}", o1, o2, o3);
+
+               // ensure that the offset has been committed
+               assertTrue("Offset of o1=" + o1 + " was not in range", o1 > 0 
&& o1 <= 100);
+               assertTrue("Offset of o2=" + o2 + " was not in range", o2 > 0 
&& o2 <= 100);
+               assertTrue("Offset of o3=" + o3 + " was not in range", o3 > 0 
&& o3 <= 100);
+
+               deleteTestTopic(topicName);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
new file mode 100644
index 0000000..fc13719
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+
+import org.junit.Test;
+
+
+@SuppressWarnings("serial")
+public class Kafka08ProducerITCase extends KafkaProducerTestBase {
+
+       @Test
+       public void testCustomPartitioning() {
+               runCustomPartitioningTest();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
new file mode 100644
index 0000000..113ad71
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.commons.collections.map.LinkedMap;
+
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class KafkaConsumerTest {
+
+       @Test
+       public void testValidateZooKeeperConfig() {
+               try {
+                       // empty
+                       Properties emptyProperties = new Properties();
+                       try {
+                               
FlinkKafkaConsumer08.validateZooKeeperConfig(emptyProperties);
+                               fail("should fail with an exception");
+                       }
+                       catch (IllegalArgumentException e) {
+                               // expected
+                       }
+
+                       // no connect string (only group string)
+                       Properties noConnect = new Properties();
+                       noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, 
"flink-test-group");
+                       try {
+                               
FlinkKafkaConsumer08.validateZooKeeperConfig(noConnect);
+                               fail("should fail with an exception");
+                       }
+                       catch (IllegalArgumentException e) {
+                               // expected
+                       }
+
+                       // no group string (only connect string)
+                       Properties noGroup = new Properties();
+                       noGroup.put("zookeeper.connect", "localhost:47574");
+                       try {
+                               
FlinkKafkaConsumer08.validateZooKeeperConfig(noGroup);
+                               fail("should fail with an exception");
+                       }
+                       catch (IllegalArgumentException e) {
+                               // expected
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testSnapshot() {
+               try {
+                       Field offsetsField = 
FlinkKafkaConsumerBase.class.getDeclaredField("offsetsState");
+                       Field runningField = 
FlinkKafkaConsumerBase.class.getDeclaredField("running");
+                       Field mapField = 
FlinkKafkaConsumerBase.class.getDeclaredField("pendingCheckpoints");
+
+                       offsetsField.setAccessible(true);
+                       runningField.setAccessible(true);
+                       mapField.setAccessible(true);
+
+                       FlinkKafkaConsumer08<?> consumer = 
mock(FlinkKafkaConsumer08.class);
+                       when(consumer.snapshotState(anyLong(), 
anyLong())).thenCallRealMethod();
+
+
+                       HashMap<KafkaTopicPartition, Long> testOffsets = new 
HashMap<>();
+                       long[] offsets = new long[] { 43, 6146, 133, 16, 162, 
616 };
+                       int j = 0;
+                       for (long i: offsets) {
+                               KafkaTopicPartition ktp = new 
KafkaTopicPartition("topic", j++);
+                               testOffsets.put(ktp, i);
+                       }
+
+                       LinkedMap map = new LinkedMap();
+
+                       offsetsField.set(consumer, testOffsets);
+                       runningField.set(consumer, true);
+                       mapField.set(consumer, map);
+
+                       assertTrue(map.isEmpty());
+
+                       // make multiple checkpoints
+                       for (long checkpointId = 10L; checkpointId <= 2000L; 
checkpointId += 9L) {
+                               HashMap<KafkaTopicPartition, Long> checkpoint = 
consumer.snapshotState(checkpointId, 47 * checkpointId);
+                               assertEquals(testOffsets, checkpoint);
+
+                               // change the offsets, make sure the snapshot 
did not change
+                               HashMap<KafkaTopicPartition, Long> 
checkpointCopy = (HashMap<KafkaTopicPartition, Long>) checkpoint.clone();
+
+                               for (Map.Entry<KafkaTopicPartition, Long> e: 
testOffsets.entrySet()) {
+                                       testOffsets.put(e.getKey(), 
e.getValue() + 1);
+                               }
+
+                               assertEquals(checkpointCopy, checkpoint);
+
+                               assertTrue(map.size() > 0);
+                               assertTrue(map.size() <= 
FlinkKafkaConsumer08.MAX_NUM_PENDING_CHECKPOINTS);
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       @Ignore("Kafka consumer internally makes an infinite loop")
+       public void testCreateSourceWithoutCluster() {
+               try {
+                       Properties props = new Properties();
+                       props.setProperty("zookeeper.connect", 
"localhost:56794");
+                       props.setProperty("bootstrap.servers", 
"localhost:11111, localhost:22222");
+                       props.setProperty("group.id", "non-existent-group");
+
+                       new 
FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), new 
SimpleStringSchema(), props);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
new file mode 100644
index 0000000..72d2772
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaLocalSystemTime implements Time {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(KafkaLocalSystemTime.class);
+
+       @Override
+       public long milliseconds() {
+               return System.currentTimeMillis();
+       }
+
+       @Override
+       public long nanoseconds() {
+               return System.nanoTime();
+       }
+
+       @Override
+       public void sleep(long ms) {
+               try {
+                       Thread.sleep(ms);
+               } catch (InterruptedException e) {
+                       LOG.warn("Interruption", e);
+               }
+       }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
new file mode 100644
index 0000000..8602ffe
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import static org.junit.Assert.*;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(FlinkKafkaProducerBase.class)
+public class KafkaProducerTest extends TestLogger {
+       
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testPropagateExceptions() {
+               try {
+                       // mock kafka producer
+                       KafkaProducer<?, ?> kafkaProducerMock = 
mock(KafkaProducer.class);
+                       
+                       // partition setup
+                       
when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
+                                       Arrays.asList(new 
PartitionInfo("mock_topic", 42, null, null, null)));
+
+                       // failure when trying to send an element
+                       when(kafkaProducerMock.send(any(ProducerRecord.class), 
any(Callback.class)))
+                               .thenAnswer(new 
Answer<Future<RecordMetadata>>() {
+                                       @Override
+                                       public Future<RecordMetadata> 
answer(InvocationOnMock invocation) throws Throwable {
+                                               Callback callback = (Callback) 
invocation.getArguments()[1];
+                                               callback.onCompletion(null, new 
Exception("Test error"));
+                                               return null;
+                                       }
+                               });
+                       
+                       // make sure the FlinkKafkaProducer instantiates our 
mock producer
+                       
whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
+                       
+                       // (1) producer that propagates errors
+
+                       FlinkKafkaProducer08<String> producerPropagating = new 
FlinkKafkaProducer08<>(
+                                       "mock_topic", new SimpleStringSchema(), 
new Properties(), null);
+
+                       producerPropagating.setRuntimeContext(new 
MockRuntimeContext(17, 3));
+                       producerPropagating.open(new Configuration());
+                       
+                       try {
+                               producerPropagating.invoke("value");
+                               producerPropagating.invoke("value");
+                               fail("This should fail with an exception");
+                       }
+                       catch (Exception e) {
+                               assertNotNull(e.getCause());
+                               assertNotNull(e.getCause().getMessage());
+                               
assertTrue(e.getCause().getMessage().contains("Test error"));
+                       }
+
+                       // (2) producer that only logs errors
+
+                       FlinkKafkaProducer08<String> producerLogging = new 
FlinkKafkaProducer08<>(
+                                       "mock_topic", new SimpleStringSchema(), 
new Properties(), null);
+                       producerLogging.setLogFailuresOnly(true);
+                       
+                       producerLogging.setRuntimeContext(new 
MockRuntimeContext(17, 3));
+                       producerLogging.open(new Configuration());
+
+                       producerLogging.invoke("value");
+                       producerLogging.invoke("value");
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
new file mode 100644
index 0000000..348b75d
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.admin.AdminUtils;
+import kafka.api.PartitionMetadata;
+import kafka.common.KafkaException;
+import kafka.consumer.ConsumerConfig;
+import kafka.network.SocketServer;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingServer;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
+import 
org.apache.flink.streaming.connectors.kafka.internals.ZooKeeperStringSerializer;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.Seq;
+
+import java.io.File;
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * An implementation of the KafkaServerProvider for Kafka 0.8
+ */
+public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
+
+       protected static final Logger LOG = 
LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
+       private File tmpZkDir;
+       private File tmpKafkaParent;
+       private List<File> tmpKafkaDirs;
+       private List<KafkaServer> brokers;
+       private TestingServer zookeeper;
+       private String zookeeperConnectionString;
+       private String brokerConnectionString = "";
+       private Properties standardProps;
+       private ConsumerConfig standardCC;
+
+
+       public String getBrokerConnectionString() {
+               return brokerConnectionString;
+       }
+
+
+       @Override
+       public ConsumerConfig getStandardConsumerConfig() {
+               return standardCC;
+       }
+
+       @Override
+       public Properties getStandardProperties() {
+               return standardProps;
+       }
+
+       @Override
+       public String getVersion() {
+               return "0.8";
+       }
+
+       @Override
+       public List<KafkaServer> getBrokers() {
+               return brokers;
+       }
+
+       @Override
+       public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, 
KeyedDeserializationSchema<T> readSchema, Properties props) {
+               return new FlinkKafkaConsumer08<>(topics, readSchema, props);
+       }
+
+       @Override
+       public <T> FlinkKafkaProducerBase<T> getProducer(String topic, 
KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> 
partitioner) {
+               return new FlinkKafkaProducer08<T>(topic, serSchema, props, 
partitioner);
+       }
+
+       @Override
+       public void restartBroker(int leaderId) throws Exception {
+               brokers.set(leaderId, getKafkaServer(leaderId, 
tmpKafkaDirs.get(leaderId), KAFKA_HOST, zookeeperConnectionString));
+       }
+
+       @Override
+       public int getLeaderToShutDown(String topic) throws Exception {
+               ZkClient zkClient = createZkClient();
+               PartitionMetadata firstPart = null;
+               do {
+                       if (firstPart != null) {
+                               LOG.info("Unable to find leader. error code 
{}", firstPart.errorCode());
+                               // not the first try. Sleep a bit
+                               Thread.sleep(150);
+                       }
+
+                       Seq<PartitionMetadata> partitionMetadata = 
AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
+                       firstPart = partitionMetadata.head();
+               }
+               while (firstPart.errorCode() != 0);
+               zkClient.close();
+
+               return firstPart.leader().get().id();
+       }
+
+       @Override
+       public int getBrokerId(KafkaServer server) {
+               return server.socketServer().brokerId();
+       }
+
+
+       @Override
+       public void prepare(int numKafkaServers) {
+               File tempDir = new File(System.getProperty("java.io.tmpdir"));
+
+               tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + 
(UUID.randomUUID().toString()));
+               assertTrue("cannot create zookeeper temp dir", 
tmpZkDir.mkdirs());
+
+               tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + 
(UUID.randomUUID().toString()));
+               assertTrue("cannot create kafka temp dir", 
tmpKafkaParent.mkdirs());
+
+               tmpKafkaDirs = new ArrayList<>(numKafkaServers);
+               for (int i = 0; i < numKafkaServers; i++) {
+                       File tmpDir = new File(tmpKafkaParent, "server-" + i);
+                       assertTrue("cannot create kafka temp dir", 
tmpDir.mkdir());
+                       tmpKafkaDirs.add(tmpDir);
+               }
+
+               zookeeper = null;
+               brokers = null;
+
+               try {
+                       LOG.info("Starting Zookeeper");
+                       zookeeper = new TestingServer(-1, tmpZkDir);
+                       zookeeperConnectionString = 
zookeeper.getConnectString();
+
+                       LOG.info("Starting KafkaServer");
+                       brokers = new ArrayList<>(numKafkaServers);
+
+                       for (int i = 0; i < numKafkaServers; i++) {
+                               brokers.add(getKafkaServer(i, 
tmpKafkaDirs.get(i), KafkaTestEnvironment.KAFKA_HOST, 
zookeeperConnectionString));
+                               SocketServer socketServer = 
brokers.get(i).socketServer();
+
+                               String host = socketServer.host() == null ? 
"localhost" : socketServer.host();
+                               brokerConnectionString += 
hostAndPortToUrlString(host, socketServer.port()) + ",";
+                       }
+
+                       LOG.info("ZK and KafkaServer started.");
+               }
+               catch (Throwable t) {
+                       t.printStackTrace();
+                       fail("Test setup failed: " + t.getMessage());
+               }
+
+               standardProps = new Properties();
+               standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
+               standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
+               standardProps.setProperty("group.id", "flink-tests");
+               standardProps.setProperty("auto.commit.enable", "false");
+               standardProps.setProperty("zookeeper.session.timeout.ms", 
"12000"); // 6 seconds is default. Seems to be too small for travis.
+               standardProps.setProperty("zookeeper.connection.timeout.ms", 
"20000");
+               standardProps.setProperty("auto.offset.reset", "earliest"); // 
read from the beginning.
+               standardProps.setProperty("fetch.message.max.bytes", "256"); // 
make a lot of fetches (MESSAGES MUST BE SMALLER!)
+
+               Properties consumerConfigProps = new Properties();
+               consumerConfigProps.putAll(standardProps);
+               consumerConfigProps.setProperty("auto.offset.reset", 
"smallest");
+               standardCC = new ConsumerConfig(consumerConfigProps);
+       }
+
+       @Override
+       public void shutdown() {
+               for (KafkaServer broker : brokers) {
+                       if (broker != null) {
+                               broker.shutdown();
+                       }
+               }
+               brokers.clear();
+
+               if (zookeeper != null) {
+                       try {
+                               zookeeper.stop();
+                       }
+                       catch (Exception e) {
+                               LOG.warn("ZK.stop() failed", e);
+                       }
+                       zookeeper = null;
+               }
+
+               // clean up the temp spaces
+
+               if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
+                       try {
+                               FileUtils.deleteDirectory(tmpKafkaParent);
+                       }
+                       catch (Exception e) {
+                               // ignore
+                       }
+               }
+               if (tmpZkDir != null && tmpZkDir.exists()) {
+                       try {
+                               FileUtils.deleteDirectory(tmpZkDir);
+                       }
+                       catch (Exception e) {
+                               // ignore
+                       }
+               }
+       }
+
+       @Override
+       public void createTestTopic(String topic, int numberOfPartitions, int 
replicationFactor) {
+               // create topic with one client
+               Properties topicConfig = new Properties();
+               LOG.info("Creating topic {}", topic);
+
+               ZkClient creator = createZkClient();
+
+               AdminUtils.createTopic(creator, topic, numberOfPartitions, 
replicationFactor, topicConfig);
+               creator.close();
+
+               // validate that the topic has been created
+               final long deadline = System.currentTimeMillis() + 30000;
+               do {
+                       try {
+                               Thread.sleep(100);
+                       }
+                       catch (InterruptedException e) {
+                               // restore interrupted state
+                       }
+                       List<KafkaTopicPartitionLeader> partitions = 
FlinkKafkaConsumer08.getPartitionsForTopic(Collections.singletonList(topic), 
standardProps);
+                       if (partitions != null && partitions.size() > 0) {
+                               return;
+                       }
+               }
+               while (System.currentTimeMillis() < deadline);
+               fail ("Test topic could not be created");
+       }
+
+       @Override
+       public void deleteTestTopic(String topic) {
+               LOG.info("Deleting topic {}", topic);
+
+               ZkClient zk = createZkClient();
+               AdminUtils.deleteTopic(zk, topic);
+               zk.close();
+       }
+
+       private ZkClient createZkClient() {
+               return new ZkClient(standardCC.zkConnect(), 
standardCC.zkSessionTimeoutMs(),
+                               standardCC.zkConnectionTimeoutMs(), new 
ZooKeeperStringSerializer());
+       }
+
+       /**
+        * Only for the 0.8 server we need access to the zk client.
+        */
+       public CuratorFramework createCuratorClient() {
+               RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 10);
+               CuratorFramework curatorClient = 
CuratorFrameworkFactory.newClient(standardProps.getProperty("zookeeper.connect"),
 retryPolicy);
+               curatorClient.start();
+               return curatorClient;
+       }
+
+       /**
+        * Copied from 
com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
+        */
+       protected static KafkaServer getKafkaServer(int brokerId, File 
tmpFolder,
+                                                                               
                String kafkaHost,
+                                                                               
                String zookeeperConnectionString) throws Exception {
+               LOG.info("Starting broker with id {}", brokerId);
+               Properties kafkaProperties = new Properties();
+
+               // properties have to be Strings
+               kafkaProperties.put("advertised.host.name", kafkaHost);
+               kafkaProperties.put("broker.id", Integer.toString(brokerId));
+               kafkaProperties.put("log.dir", tmpFolder.toString());
+               kafkaProperties.put("zookeeper.connect", 
zookeeperConnectionString);
+               kafkaProperties.put("message.max.bytes", String.valueOf(50 * 
1024 * 1024));
+               kafkaProperties.put("replica.fetch.max.bytes", 
String.valueOf(50 * 1024 * 1024));
+
+               // for CI stability, increase zookeeper session timeout
+               kafkaProperties.put("zookeeper.session.timeout.ms", "20000");
+
+               final int numTries = 5;
+
+               for (int i = 1; i <= numTries; i++) {
+                       int kafkaPort = NetUtils.getAvailablePort();
+                       kafkaProperties.put("port", 
Integer.toString(kafkaPort));
+                       KafkaConfig kafkaConfig = new 
KafkaConfig(kafkaProperties);
+
+                       try {
+                               KafkaServer server = new 
KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
+                               server.startup();
+                               return server;
+                       }
+                       catch (KafkaException e) {
+                               if (e.getCause() instanceof BindException) {
+                                       // port conflict, retry...
+                                       LOG.info("Port conflict when starting 
Kafka Broker. Retrying...");
+                               }
+                               else {
+                                       throw e;
+                               }
+                       }
+               }
+
+               throw new Exception("Could not start Kafka after " + numTries + 
" retries due to port conflicts.");
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
new file mode 100644
index 0000000..c99e133
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl;
+import org.apache.flink.streaming.connectors.kafka.KafkaTestBase;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ZookeeperOffsetHandlerTest extends KafkaTestBase {
+       
+       @Test
+       public void runOffsetManipulationinZooKeeperTest() {
+               try {
+                       final String topicName = 
"ZookeeperOffsetHandlerTest-Topic";
+                       final String groupId = 
"ZookeeperOffsetHandlerTest-Group";
+                       
+                       final long offset = (long) (Math.random() * 
Long.MAX_VALUE);
+
+                       CuratorFramework curatorFramework = 
((KafkaTestEnvironmentImpl)kafkaServer ).createCuratorClient();
+                       kafkaServer.createTestTopic(topicName, 3, 2);
+
+                       
ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorFramework, groupId, 
topicName, 0, offset);
+       
+                       long fetchedOffset = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, groupId, 
topicName, 0);
+
+                       curatorFramework.close();
+                       
+                       assertEquals(offset, fetchedOffset);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..6bdfb48
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties
@@ -0,0 +1,29 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml 
b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
new file mode 100644
index 0000000..b3c9749
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
@@ -0,0 +1,131 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-streaming-connectors-parent</artifactId>
+               <version>1.0-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-connector-kafka-0.9</artifactId>
+       <name>flink-connector-kafka-0.9</name>
+
+       <packaging>jar</packaging>
+
+       <!-- Allow users to pass custom connector versions -->
+       <properties>
+               <kafka.version>0.9.0.0</kafka.version>
+       </properties>
+
+       <dependencies>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-kafka-base</artifactId>
+                       <version>${project.version}</version>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.apache.kafka</groupId>
+                                       
<artifactId>kafka_${scala.binary.version}</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-kafka-base</artifactId>
+                       <version>${project.version}</version>
+                       <exclusions>
+                               <!-- exclude 0.8 dependencies -->
+                               <exclusion>
+                                       <groupId>org.apache.kafka</groupId>
+                                       
<artifactId>kafka_${scala.binary.version}</artifactId>
+                               </exclusion>
+                       </exclusions>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+
+                       <groupId>org.apache.kafka</groupId>
+                       <artifactId>kafka-clients</artifactId>
+                       <version>${kafka.version}</version>
+               </dependency>
+
+               <dependency>
+                       <!-- include 0.9 server for tests  -->
+                       <groupId>org.apache.kafka</groupId>
+                       <artifactId>kafka_${scala.binary.version}</artifactId>
+                       <version>${kafka.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-tests</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.google.guava</groupId>
+                       <artifactId>guava</artifactId>
+                       <version>${guava.version}</version>
+               </dependency>
+
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+       </dependencies>
+       
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-failsafe-plugin</artifactId>
+                               <configuration>
+                                       <!-- Enforce single fork execution due 
to heavy mini cluster use in the tests -->
+                                       <forkCount>1</forkCount>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+       
+</project>

Reply via email to