http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
new file mode 100644
index 0000000..c17aae6
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+
+/**
+ * The ConsumerCallBridge simply calls methods on the {@link KafkaConsumer}.
+ * 
+ * This indirection is necessary, because Kafka broke binary compatibility 
between 0.9 and 0.10,
+ * for example changing {@code assign(List)} to {@code assign(Collection)}.
+ * 
+ * Because of that, we need to two versions whose compiled code goes against 
different method signatures.
+ * Even though the source of subclasses may look identical, the byte code will 
be different, because they
+ * are compiled against different dependencies.
+ */
+public class KafkaConsumerCallBridge {
+
+       public void assignPartitions(KafkaConsumer<?, ?> consumer, 
List<TopicPartition> topicPartitions) throws Exception {
+               consumer.assign(topicPartitions);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
new file mode 100644
index 0000000..9cfa840
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.metrics.MetricGroup;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The thread the runs the {@link KafkaConsumer}, connecting to the brokers 
and polling records.
+ * The thread pushes the data into a {@link Handover} to be picked up by the 
fetcher that will
+ * deserialize and emit the records.
+ * 
+ * <p><b>IMPORTANT:</b> This thread must not be interrupted when attempting to 
shut it down.
+ * The Kafka consumer code was found to not always handle interrupts well, and 
to even
+ * deadlock in certain situations.
+ * 
+ * <p>Implementation Note: This code is written to be reusable in later 
versions of the KafkaConsumer.
+ * Because Kafka is not maintaining binary compatibility, we use a "call 
bridge" as an indirection
+ * to the KafkaConsumer calls that change signature.
+ */
+public class KafkaConsumerThread extends Thread {
+
+       /** Logger for this consumer */
+       private final Logger log;
+
+       /** The handover of data and exceptions between the consumer thread and 
the task thread */
+       private final Handover handover;
+
+       /** The next offsets that the main thread should commit */
+       private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> 
nextOffsetsToCommit;
+
+       /** The configuration for the Kafka consumer */
+       private final Properties kafkaProperties;
+
+       /** The partitions that this consumer reads from */ 
+       private final KafkaTopicPartitionState<TopicPartition>[] 
subscribedPartitions;
+
+       /** We get this from the outside to publish metrics. **/
+       private final MetricGroup kafkaMetricGroup;
+
+       /** The indirections on KafkaConsumer methods, for cases where 
KafkaConsumer compatibility is broken */
+       private final KafkaConsumerCallBridge consumerCallBridge;
+
+       /** The maximum number of milliseconds to wait for a fetch batch */
+       private final long pollTimeout;
+
+       /** Flag whether to add Kafka's metrics to the Flink metrics */
+       private final boolean useMetrics;
+
+       /** Reference to the Kafka consumer, once it is created */
+       private volatile KafkaConsumer<byte[], byte[]> consumer;
+
+       /** Flag to mark the main work loop as alive */
+       private volatile boolean running;
+
+       /** Flag tracking whether the latest commit request has completed */
+       private volatile boolean commitInProgress;
+
+
+       public KafkaConsumerThread(
+                       Logger log,
+                       Handover handover,
+                       Properties kafkaProperties,
+                       KafkaTopicPartitionState<TopicPartition>[] 
subscribedPartitions,
+                       MetricGroup kafkaMetricGroup,
+                       KafkaConsumerCallBridge consumerCallBridge,
+                       String threadName,
+                       long pollTimeout,
+                       boolean useMetrics) {
+
+               super(threadName);
+               setDaemon(true);
+
+               this.log = checkNotNull(log);
+               this.handover = checkNotNull(handover);
+               this.kafkaProperties = checkNotNull(kafkaProperties);
+               this.subscribedPartitions = checkNotNull(subscribedPartitions);
+               this.kafkaMetricGroup = checkNotNull(kafkaMetricGroup);
+               this.consumerCallBridge = checkNotNull(consumerCallBridge);
+               this.pollTimeout = pollTimeout;
+               this.useMetrics = useMetrics;
+
+               this.nextOffsetsToCommit = new AtomicReference<>();
+               this.running = true;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void run() {
+               // early exit check
+               if (!running) {
+                       return;
+               }
+
+               // this is the means to talk to FlinkKafkaConsumer's main thread
+               final Handover handover = this.handover;
+
+               // This method initializes the KafkaConsumer and guarantees it 
is torn down properly.
+               // This is important, because the consumer has multi-threading 
issues,
+               // including concurrent 'close()' calls.
+               final KafkaConsumer<byte[], byte[]> consumer;
+               try {
+                       consumer = new KafkaConsumer<>(kafkaProperties);
+               }
+               catch (Throwable t) {
+                       handover.reportError(t);
+                       return;
+               }
+
+               // from here on, the consumer is guaranteed to be closed 
properly
+               try {
+                       // The callback invoked by Kafka once an offset commit 
is complete
+                       final OffsetCommitCallback offsetCommitCallback = new 
CommitCallback();
+
+                       // tell the consumer which partitions to work with
+                       consumerCallBridge.assignPartitions(consumer, 
convertKafkaPartitions(subscribedPartitions));
+
+                       // register Kafka's very own metrics in Flink's metric 
reporters
+                       if (useMetrics) {
+                               // register Kafka metrics to Flink
+                               Map<MetricName, ? extends Metric> metrics = 
consumer.metrics();
+                               if (metrics == null) {
+                                       // MapR's Kafka implementation returns 
null here.
+                                       log.info("Consumer implementation does 
not support metrics");
+                               } else {
+                                       // we have Kafka metrics, register them
+                                       for (Map.Entry<MetricName, ? extends 
Metric> metric: metrics.entrySet()) {
+                                               
kafkaMetricGroup.gauge(metric.getKey().name(), new 
KafkaMetricWrapper(metric.getValue()));
+                                       }
+                               }
+                       }
+
+                       // early exit check
+                       if (!running) {
+                               return;
+                       }
+
+                       // seek the consumer to the initial offsets
+                       for (KafkaTopicPartitionState<TopicPartition> partition 
: subscribedPartitions) {
+                               if (partition.isOffsetDefined()) {
+                                       log.info("Partition {} has restored 
initial offsets {} from checkpoint / savepoint; " +
+                                                       "seeking the consumer 
to position {}",
+                                                       
partition.getKafkaPartitionHandle(), partition.getOffset(), 
partition.getOffset() + 1);
+
+                                       
consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
+                               }
+                               else {
+                                       // for partitions that do not have 
offsets restored from a checkpoint/savepoint,
+                                       // we need to define our internal 
offset state for them using the initial offsets retrieved from Kafka
+                                       // by the KafkaConsumer, so that they 
are correctly checkpointed and committed on the next checkpoint
+
+                                       long fetchedOffset = 
consumer.position(partition.getKafkaPartitionHandle());
+
+                                       log.info("Partition {} has no initial 
offset; the consumer has position {}, " +
+                                                       "so the initial offset 
will be set to {}",
+                                                       
partition.getKafkaPartitionHandle(), fetchedOffset, fetchedOffset - 1);
+
+                                       // the fetched offset represents the 
next record to process, so we need to subtract it by 1
+                                       partition.setOffset(fetchedOffset - 1);
+                               }
+                       }
+
+                       // from now on, external operations may call the 
consumer
+                       this.consumer = consumer;
+
+                       // the latest bulk of records. may carry across the 
loop if the thread is woken up
+                       // from blocking on the handover
+                       ConsumerRecords<byte[], byte[]> records = null;
+
+                       // main fetch loop
+                       while (running) {
+
+                               // check if there is something to commit
+                               if (!commitInProgress) {
+                                       // get and reset the work-to-be 
committed, so we don't repeatedly commit the same
+                                       final Map<TopicPartition, 
OffsetAndMetadata> toCommit = nextOffsetsToCommit.getAndSet(null);
+
+                                       if (toCommit != null) {
+                                               log.debug("Sending async offset 
commit request to Kafka broker");
+
+                                               // also record that a commit is 
already in progress
+                                               // the order here matters! 
first set the flag, then send the commit command.
+                                               commitInProgress = true;
+                                               consumer.commitAsync(toCommit, 
offsetCommitCallback);
+                                       }
+                               }
+
+                               // get the next batch of records, unless we did 
not manage to hand the old batch over
+                               if (records == null) {
+                                       try {
+                                               records = 
consumer.poll(pollTimeout);
+                                       }
+                                       catch (WakeupException we) {
+                                               continue;
+                                       }
+                               }
+
+                               try {
+                                       handover.produce(records);
+                                       records = null;
+                               }
+                               catch (Handover.WakeupException e) {
+                                       // fall through the loop
+                               }
+                       }
+                       // end main fetch loop
+               }
+               catch (Throwable t) {
+                       // let the main thread know and exit
+                       // it may be that this exception comes because the main 
thread closed the handover, in
+                       // which case the below reporting is irrelevant, but 
does not hurt either
+                       handover.reportError(t);
+               }
+               finally {
+                       // make sure the handover is closed if it is not 
already closed or has an error
+                       handover.close();
+
+                       // make sure the KafkaConsumer is closed
+                       try {
+                               consumer.close();
+                       }
+                       catch (Throwable t) {
+                               log.warn("Error while closing Kafka consumer", 
t);
+                       }
+               }
+       }
+
+       /**
+        * Shuts this thread down, waking up the thread gracefully if blocked 
(without Thread.interrupt() calls).
+        */
+       public void shutdown() {
+               running = false;
+
+               // We cannot call close() on the KafkaConsumer, because it will 
actually throw
+               // an exception if a concurrent call is in progress
+
+               // this wakes up the consumer if it is blocked handing over 
records
+               handover.wakeupProducer();
+
+               // this wakes up the consumer if it is blocked in a kafka poll 
+               if (consumer != null) {
+                       consumer.wakeup();
+               }
+       }
+
+       /**
+        * Tells this thread to commit a set of offsets. This method does not 
block, the committing
+        * operation will happen asynchronously.
+        * 
+        * <p>Only one commit operation may be pending at any time. If the 
committing takes longer than
+        * the frequency with which this method is called, then some commits 
may be skipped due to being
+        * superseded  by newer ones.
+        * 
+        * @param offsetsToCommit The offsets to commit
+        */
+       public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> 
offsetsToCommit) {
+               // record the work to be committed by the main consumer thread 
and make sure the consumer notices that
+               if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
+                       log.warn("Committing offsets to Kafka takes longer than 
the checkpoint interval. " +
+                                       "Skipping commit of previous offsets 
because newer complete checkpoint offsets are available. " +
+                                       "This does not compromise Flink's 
checkpoint integrity.");
+               }
+
+               // if the consumer is blocked in a poll() or handover 
operation, wake it up to commit soon
+               handover.wakeupProducer();
+               if (consumer != null) {
+                       consumer.wakeup();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       private static List<TopicPartition> 
convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) {
+               ArrayList<TopicPartition> result = new 
ArrayList<>(partitions.length);
+               for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
+                       result.add(p.getKafkaPartitionHandle());
+               }
+               return result;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private class CommitCallback implements OffsetCommitCallback {
+
+               @Override
+               public void onComplete(Map<TopicPartition, OffsetAndMetadata> 
offsets, Exception ex) {
+                       commitInProgress = false;
+
+                       if (ex != null) {
+                               log.warn("Committing offsets to Kafka failed. 
This does not compromise Flink's checkpoints.", ex);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
new file mode 100644
index 0000000..6bdfb48
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/log4j.properties
@@ -0,0 +1,29 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
new file mode 100644
index 0000000..7a82365
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internal.Handover;
+import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the {@link Kafka09Fetcher}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaConsumerThread.class)
+public class Kafka09FetcherTest {
+
+       @Test
+       public void testCommitDoesNotBlock() throws Exception {
+
+               // test data
+               final KafkaTopicPartition testPartition = new 
KafkaTopicPartition("test", 42);
+               final Map<KafkaTopicPartition, Long> testCommitData = new 
HashMap<>();
+               testCommitData.put(testPartition, 11L);
+
+               // to synchronize when the consumer is in its blocking method
+               final OneShotLatch sync = new OneShotLatch();
+
+               // ----- the mock consumer with blocking poll calls ----
+               final MultiShotLatch blockerLatch = new MultiShotLatch();
+               
+               KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+               when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer<ConsumerRecords<?, ?>>() {
+                       
+                       @Override
+                       public ConsumerRecords<?, ?> answer(InvocationOnMock 
invocation) throws InterruptedException {
+                               sync.trigger();
+                               blockerLatch.await();
+                               return ConsumerRecords.empty();
+                       }
+               });
+
+               doAnswer(new Answer<Void>() {
+                       @Override
+                       public Void answer(InvocationOnMock invocation) {
+                               blockerLatch.trigger();
+                               return null;
+                       }
+               }).when(mockConsumer).wakeup();
+
+               // make sure the fetcher creates the mock consumer
+               
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+               // ----- create the test fetcher -----
+
+               @SuppressWarnings("unchecked")
+               SourceContext<String> sourceContext = mock(SourceContext.class);
+               List<KafkaTopicPartition> topics = 
Collections.singletonList(new KafkaTopicPartition("test", 42));
+               KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+               final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+                               sourceContext,
+                               topics,
+                               null, /* periodic watermark extractor */
+                               null, /* punctuated watermark extractor */
+                               new TestProcessingTimeService(),
+                               10, /* watermark interval */
+                               this.getClass().getClassLoader(),
+                               true, /* checkpointing */
+                               "task_name",
+                               new UnregisteredMetricsGroup(),
+                               schema,
+                               new Properties(),
+                               0L,
+                               false);
+
+               // ----- run the fetcher -----
+
+               final AtomicReference<Throwable> error = new 
AtomicReference<>();
+               final Thread fetcherRunner = new Thread("fetcher runner") {
+
+                       @Override
+                       public void run() {
+                               try {
+                                       fetcher.runFetchLoop();
+                               } catch (Throwable t) {
+                                       error.set(t);
+                               }
+                       }
+               };
+               fetcherRunner.start();
+
+               // wait until the fetcher has reached the method of interest
+               sync.await();
+
+               // ----- trigger the offset commit -----
+               
+               final AtomicReference<Throwable> commitError = new 
AtomicReference<>();
+               final Thread committer = new Thread("committer runner") {
+                       @Override
+                       public void run() {
+                               try {
+                                       
fetcher.commitInternalOffsetsToKafka(testCommitData);
+                               } catch (Throwable t) {
+                                       commitError.set(t);
+                               }
+                       }
+               };
+               committer.start();
+
+               // ----- ensure that the committer finishes in time  -----
+               committer.join(30000);
+               assertFalse("The committer did not finish in time", 
committer.isAlive());
+
+               // ----- test done, wait till the fetcher is done for a clean 
shutdown -----
+               fetcher.cancel();
+               fetcherRunner.join();
+
+               // check that there were no errors in the fetcher
+               final Throwable fetcherError = error.get();
+               if (fetcherError != null && !(fetcherError instanceof 
Handover.ClosedException)) {
+                       throw new Exception("Exception in the fetcher", 
fetcherError);
+               }
+               final Throwable committerError = commitError.get();
+               if (committerError != null) {
+                       throw new Exception("Exception in the committer", 
committerError);
+               }
+       }
+
+       @Test
+       public void ensureOffsetsGetCommitted() throws Exception {
+               
+               // test data
+               final KafkaTopicPartition testPartition1 = new 
KafkaTopicPartition("test", 42);
+               final KafkaTopicPartition testPartition2 = new 
KafkaTopicPartition("another", 99);
+               
+               final Map<KafkaTopicPartition, Long> testCommitData1 = new 
HashMap<>();
+               testCommitData1.put(testPartition1, 11L);
+               testCommitData1.put(testPartition2, 18L);
+
+               final Map<KafkaTopicPartition, Long> testCommitData2 = new 
HashMap<>();
+               testCommitData2.put(testPartition1, 19L);
+               testCommitData2.put(testPartition2, 28L);
+
+               final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> 
commitStore = new LinkedBlockingQueue<>();
+
+
+               // ----- the mock consumer with poll(), wakeup(), and 
commit(A)sync calls ----
+
+               final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+               KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+
+               when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer<ConsumerRecords<?, ?>>() {
+                       @Override
+                       public ConsumerRecords<?, ?> answer(InvocationOnMock 
invocation) throws InterruptedException {
+                               blockerLatch.await();
+                               return ConsumerRecords.empty();
+                       }
+               });
+
+               doAnswer(new Answer<Void>() {
+                       @Override
+                       public Void answer(InvocationOnMock invocation) {
+                               blockerLatch.trigger();
+                               return null;
+                       }
+               }).when(mockConsumer).wakeup();
+
+               doAnswer(new Answer<Void>() {
+                       @Override
+                       public Void answer(InvocationOnMock invocation) {
+                               @SuppressWarnings("unchecked")
+                               Map<TopicPartition, OffsetAndMetadata> offsets 
= 
+                                               (Map<TopicPartition, 
OffsetAndMetadata>) invocation.getArguments()[0];
+
+                               OffsetCommitCallback callback = 
(OffsetCommitCallback) invocation.getArguments()[1];
+
+                               commitStore.add(offsets);
+                               callback.onComplete(offsets, null);
+
+                               return null; 
+                       }
+               }).when(mockConsumer).commitAsync(
+                               Mockito.<Map<TopicPartition, 
OffsetAndMetadata>>any(), any(OffsetCommitCallback.class));
+
+               // make sure the fetcher creates the mock consumer
+               
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+               // ----- create the test fetcher -----
+
+               @SuppressWarnings("unchecked")
+               SourceContext<String> sourceContext = mock(SourceContext.class);
+               List<KafkaTopicPartition> topics = 
Collections.singletonList(new KafkaTopicPartition("test", 42));
+               KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+               final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+                               sourceContext,
+                               topics,
+                               null, /* periodic watermark extractor */
+                               null, /* punctuated watermark extractor */
+                               new TestProcessingTimeService(),
+                               10, /* watermark interval */
+                               this.getClass().getClassLoader(),
+                               true, /* checkpointing */
+                               "task_name",
+                               new UnregisteredMetricsGroup(),
+                               schema,
+                               new Properties(),
+                               0L,
+                               false);
+
+
+               // ----- run the fetcher -----
+
+               final AtomicReference<Throwable> error = new 
AtomicReference<>();
+               final Thread fetcherRunner = new Thread("fetcher runner") {
+
+                       @Override
+                       public void run() {
+                               try {
+                                       fetcher.runFetchLoop();
+                               } catch (Throwable t) {
+                                       error.set(t);
+                               }
+                       }
+               };
+               fetcherRunner.start();
+
+               // ----- trigger the first offset commit -----
+
+               fetcher.commitInternalOffsetsToKafka(testCommitData1);
+               Map<TopicPartition, OffsetAndMetadata> result1 = 
commitStore.take();
+
+               for (Entry<TopicPartition, OffsetAndMetadata> entry : 
result1.entrySet()) {
+                       TopicPartition partition = entry.getKey();
+                       if (partition.topic().equals("test")) {
+                               assertEquals(42, partition.partition());
+                               assertEquals(12L, entry.getValue().offset());
+                       }
+                       else if (partition.topic().equals("another")) {
+                               assertEquals(99, partition.partition());
+                               assertEquals(17L, entry.getValue().offset());
+                       }
+               }
+
+               // ----- trigger the second offset commit -----
+
+               fetcher.commitInternalOffsetsToKafka(testCommitData2);
+               Map<TopicPartition, OffsetAndMetadata> result2 = 
commitStore.take();
+
+               for (Entry<TopicPartition, OffsetAndMetadata> entry : 
result2.entrySet()) {
+                       TopicPartition partition = entry.getKey();
+                       if (partition.topic().equals("test")) {
+                               assertEquals(42, partition.partition());
+                               assertEquals(20L, entry.getValue().offset());
+                       }
+                       else if (partition.topic().equals("another")) {
+                               assertEquals(99, partition.partition());
+                               assertEquals(27L, entry.getValue().offset());
+                       }
+               }
+               
+               // ----- test done, wait till the fetcher is done for a clean 
shutdown -----
+               fetcher.cancel();
+               fetcherRunner.join();
+
+               // check that there were no errors in the fetcher
+               final Throwable caughtError = error.get();
+               if (caughtError != null && !(caughtError instanceof 
Handover.ClosedException)) {
+                       throw new Exception("Exception in the fetcher", 
caughtError);
+               }
+       }
+
+       @Test
+       public void testCancellationWhenEmitBlocks() throws Exception {
+
+               // ----- some test data -----
+
+               final String topic = "test-topic";
+               final int partition = 3;
+               final byte[] payload = new byte[] {1, 2, 3, 4};
+
+               final List<ConsumerRecord<byte[], byte[]>> records = 
Arrays.asList(
+                               new ConsumerRecord<byte[], byte[]>(topic, 
partition, 15, payload, payload),
+                               new ConsumerRecord<byte[], byte[]>(topic, 
partition, 16, payload, payload),
+                               new ConsumerRecord<byte[], byte[]>(topic, 
partition, 17, payload, payload));
+
+               final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
data = new HashMap<>();
+               data.put(new TopicPartition(topic, partition), records);
+
+               final ConsumerRecords<byte[], byte[]> consumerRecords = new 
ConsumerRecords<>(data);
+
+               // ----- the test consumer -----
+
+               final KafkaConsumer<?, ?> mockConsumer = 
mock(KafkaConsumer.class);
+               when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer<ConsumerRecords<?, ?>>() {
+                       @Override
+                       public ConsumerRecords<?, ?> answer(InvocationOnMock 
invocation) {
+                               return consumerRecords;
+                       }
+               });
+
+               
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+               // ----- build a fetcher -----
+
+               BlockingSourceContext<String> sourceContext = new 
BlockingSourceContext<>();
+               List<KafkaTopicPartition> topics = 
Collections.singletonList(new KafkaTopicPartition(topic, partition));
+               KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+               final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+                               sourceContext,
+                               topics,
+                               null, /* periodic watermark extractor */
+                               null, /* punctuated watermark extractor */
+                               new TestProcessingTimeService(),
+                               10, /* watermark interval */
+                               this.getClass().getClassLoader(),
+                               true, /* checkpointing */
+                               "task_name",
+                               new UnregisteredMetricsGroup(),
+                               schema,
+                               new Properties(),
+                               0L,
+                               false);
+
+
+               // ----- run the fetcher -----
+
+               final AtomicReference<Throwable> error = new 
AtomicReference<>();
+               final Thread fetcherRunner = new Thread("fetcher runner") {
+
+                       @Override
+                       public void run() {
+                               try {
+                                       fetcher.runFetchLoop();
+                               } catch (Throwable t) {
+                                       error.set(t);
+                               }
+                       }
+               };
+               fetcherRunner.start();
+
+               // wait until the thread started to emit records to the source 
context
+               sourceContext.waitTillHasBlocker();
+
+               // now we try to cancel the fetcher, including the interruption 
usually done on the task thread
+               // once it has finished, there must be no more thread blocked 
on the source context
+               fetcher.cancel();
+               fetcherRunner.interrupt();
+               fetcherRunner.join();
+
+               assertFalse("fetcher threads did not properly finish", 
sourceContext.isStillBlocking());
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test utilities
+       // 
------------------------------------------------------------------------
+
+       private static final class BlockingSourceContext<T> implements 
SourceContext<T> {
+
+               private final ReentrantLock lock = new ReentrantLock();
+               private final OneShotLatch inBlocking = new OneShotLatch();
+
+               @Override
+               public void collect(T element) {
+                       block();
+               }
+
+               @Override
+               public void collectWithTimestamp(T element, long timestamp) {
+                       block();
+               }
+
+               @Override
+               public void emitWatermark(Watermark mark) {
+                       block();
+               }
+
+               @Override
+               public Object getCheckpointLock() {
+                       return new Object();
+               }
+
+               @Override
+               public void close() {}
+
+               public void waitTillHasBlocker() throws InterruptedException {
+                       inBlocking.await();
+               }
+
+               public boolean isStillBlocking() {
+                       return lock.isLocked();
+               }
+
+               @SuppressWarnings({"InfiniteLoopStatement", 
"SynchronizationOnLocalVariableOrMethodParameter"})
+               private void block() {
+                       lock.lock();
+                       try {
+                               inBlocking.trigger();
+
+                               // put this thread to sleep indefinitely
+                               final Object o = new Object();
+                               while (true) {
+                                       synchronized (o) {
+                                               o.wait();
+                                       }
+                               }
+                       }
+                       catch (InterruptedException e) {
+                               // exit cleanly, simply reset the interruption 
flag
+                               Thread.currentThread().interrupt();
+                       }
+                       finally {
+                               lock.unlock();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
new file mode 100644
index 0000000..d18e2a9
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.Test;
+
+public class Kafka09ITCase extends KafkaConsumerTestBase {
+
+       // 
------------------------------------------------------------------------
+       //  Suite of Tests
+       // 
------------------------------------------------------------------------
+
+       @Test(timeout = 60000)
+       public void testFailOnNoBroker() throws Exception {
+               runFailOnNoBrokerTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testConcurrentProducerConsumerTopology() throws Exception {
+               runSimpleConcurrentProducerConsumerTopology();
+       }
+
+
+       @Test(timeout = 60000)
+       public void testKeyValueSupport() throws Exception {
+               runKeyValueTest();
+       }
+
+       // --- canceling / failures ---
+
+       @Test(timeout = 60000)
+       public void testCancelingEmptyTopic() throws Exception {
+               runCancelingOnEmptyInputTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testCancelingFullTopic() throws Exception {
+               runCancelingOnFullInputTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testFailOnDeploy() throws Exception {
+               runFailOnDeployTest();
+       }
+
+
+       // --- source to partition mappings and exactly once ---
+
+       @Test(timeout = 60000)
+       public void testOneToOneSources() throws Exception {
+               runOneToOneExactlyOnceTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testOneSourceMultiplePartitions() throws Exception {
+               runOneSourceMultiplePartitionsExactlyOnceTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testMultipleSourcesOnePartition() throws Exception {
+               runMultipleSourcesOnePartitionExactlyOnceTest();
+       }
+
+       // --- broker failure ---
+
+       @Test(timeout = 60000)
+       public void testBrokerFailure() throws Exception {
+               runBrokerFailureTest();
+       }
+
+       // --- special executions ---
+
+       @Test(timeout = 60000)
+       public void testBigRecordJob() throws Exception {
+               runBigRecordTestTopology();
+       }
+
+       @Test(timeout = 60000)
+       public void testMultipleTopics() throws Exception {
+               runProduceConsumeMultipleTopics();
+       }
+
+       @Test(timeout = 60000)
+       public void testAllDeletes() throws Exception {
+               runAllDeletesTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testEndOfStream() throws Exception {
+               runEndOfStreamTest();
+       }
+
+       @Test(timeout = 60000)
+       public void testMetrics() throws Throwable {
+               runMetricsTest();
+       }
+
+       // --- offset committing ---
+
+       @Test(timeout = 60000)
+       public void testCommitOffsetsToKafka() throws Exception {
+               runCommitOffsetsToKafka();
+       }
+
+       @Test(timeout = 60000)
+       public void testStartFromKafkaCommitOffsets() throws Exception {
+               runStartFromKafkaCommitOffsets();
+       }
+
+       @Test(timeout = 60000)
+       public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+               runAutoOffsetRetrievalAndCommitToKafka();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
new file mode 100644
index 0000000..45f70ac
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.table.Row;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
+
+       @Override
+       protected KafkaTableSink createTableSink(String topic, Properties 
properties, KafkaPartitioner<Row> partitioner,
+                       final FlinkKafkaProducerBase<Row> kafkaProducer) {
+
+               return new Kafka09JsonTableSink(topic, properties, partitioner) 
{
+                       @Override
+                       protected FlinkKafkaProducerBase<Row> 
createKafkaProducer(String topic, Properties properties,
+                                       SerializationSchema<Row> 
serializationSchema, KafkaPartitioner<Row> partitioner) {
+                               return kafkaProducer;
+                       }
+               };
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected SerializationSchema<Row> getSerializationSchema() {
+               return new JsonRowSerializationSchema(FIELD_NAMES);
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
new file mode 100644
index 0000000..4a75f50
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+
+public class Kafka09JsonTableSourceTest extends KafkaTableSourceTestBase {
+
+       @Override
+       protected KafkaTableSource createTableSource(String topic, Properties 
properties, String[] fieldNames, TypeInformation<?>[] typeInfo) {
+               return new Kafka09JsonTableSource(topic, properties, 
fieldNames, typeInfo);
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
+               return (Class) JsonRowDeserializationSchema.class;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
+               return (Class) FlinkKafkaConsumer09.class;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
new file mode 100644
index 0000000..ae4f5b2
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class Kafka09ProducerITCase extends KafkaProducerTestBase {
+
+       @Test
+       public void testCustomPartitioning() {
+               runCustomPartitioningTest();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
new file mode 100644
index 0000000..e748537
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09SecuredRunITCase.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.test.util.SecureTestEnvironment;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/*
+ * Kafka Secure Connection (kerberos) IT test case
+ */
+public class Kafka09SecuredRunITCase extends KafkaConsumerTestBase {
+
+       protected static final Logger LOG = 
LoggerFactory.getLogger(Kafka09SecuredRunITCase.class);
+
+       @BeforeClass
+       public static void prepare() throws IOException, ClassNotFoundException 
{
+               
LOG.info("-------------------------------------------------------------------------");
+               LOG.info("    Starting Kafka09SecuredRunITCase ");
+               
LOG.info("-------------------------------------------------------------------------");
+
+               SecureTestEnvironment.prepare(tempFolder);
+               
SecureTestEnvironment.populateFlinkSecureConfigurations(getFlinkConfiguration());
+
+               startClusters(true);
+       }
+
+       @AfterClass
+       public static void shutDownServices() {
+               shutdownClusters();
+               SecureTestEnvironment.cleanup();
+       }
+
+
+       //timeout interval is large since in Travis, ZK connection timeout 
occurs frequently
+       //The timeout for the test case is 2 times timeout of ZK connection
+       @Test(timeout = 600000)
+       public void testMultipleTopics() throws Exception {
+               runProduceConsumeMultipleTopics();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
new file mode 100644
index 0000000..18b2aec
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import 
org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(FlinkKafkaProducerBase.class)
+public class KafkaProducerTest extends TestLogger {
+       
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testPropagateExceptions() {
+               try {
+                       // mock kafka producer
+                       KafkaProducer<?, ?> kafkaProducerMock = 
mock(KafkaProducer.class);
+                       
+                       // partition setup
+                       
when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
+                               // returning a unmodifiable list to mimic 
KafkaProducer#partitionsFor() behaviour
+                               Collections.singletonList(new 
PartitionInfo("mock_topic", 42, null, null, null)));
+
+                       // failure when trying to send an element
+                       when(kafkaProducerMock.send(any(ProducerRecord.class), 
any(Callback.class)))
+                               .thenAnswer(new 
Answer<Future<RecordMetadata>>() {
+                                       @Override
+                                       public Future<RecordMetadata> 
answer(InvocationOnMock invocation) throws Throwable {
+                                               Callback callback = (Callback) 
invocation.getArguments()[1];
+                                               callback.onCompletion(null, new 
Exception("Test error"));
+                                               return null;
+                                       }
+                               });
+                       
+                       // make sure the FlinkKafkaProducer instantiates our 
mock producer
+                       
whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
+                       
+                       // (1) producer that propagates errors
+
+                       FlinkKafkaProducer09<String> producerPropagating = new 
FlinkKafkaProducer09<>(
+                                       "mock_topic", new SimpleStringSchema(), 
FakeStandardProducerConfig.get(), null);
+
+                       OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
+                                       new 
OneInputStreamOperatorTestHarness<>(new StreamSink(producerPropagating));
+
+                       testHarness.open();
+
+                       try {
+                               testHarness.processElement(new 
StreamRecord<>("value"));
+                               testHarness.processElement(new 
StreamRecord<>("value"));
+                               fail("This should fail with an exception");
+                       }
+                       catch (Exception e) {
+                               assertNotNull(e.getCause());
+                               assertNotNull(e.getCause().getMessage());
+                               
assertTrue(e.getCause().getMessage().contains("Test error"));
+                       }
+
+                       // (2) producer that only logs errors
+
+                       FlinkKafkaProducer09<String> producerLogging = new 
FlinkKafkaProducer09<>(
+                                       "mock_topic", new SimpleStringSchema(), 
FakeStandardProducerConfig.get(), null);
+                       producerLogging.setLogFailuresOnly(true);
+
+                       testHarness = new 
OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));
+
+                       testHarness.open();
+
+                       testHarness.processElement(new StreamRecord<>("value"));
+                       testHarness.processElement(new StreamRecord<>("value"));
+
+                       testHarness.close();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
new file mode 100644
index 0000000..1802e0c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.admin.AdminUtils;
+import kafka.common.KafkaException;
+import kafka.api.PartitionMetadata;
+import kafka.network.SocketServer;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.SystemTime$;
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.NetUtils;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.Seq;
+
+import java.io.File;
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * An implementation of the KafkaServerProvider for Kafka 0.9
+ */
+public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
+
+       protected static final Logger LOG = 
LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
+       private File tmpZkDir;
+       private File tmpKafkaParent;
+       private List<File> tmpKafkaDirs;
+       private List<KafkaServer> brokers;
+       private TestingServer zookeeper;
+       private String zookeeperConnectionString;
+       private String brokerConnectionString = "";
+       private Properties standardProps;
+       private Properties additionalServerProperties;
+       private boolean secureMode = false;
+       // 6 seconds is default. Seems to be too small for travis. 30 seconds
+       private String zkTimeout = "30000";
+
+       public String getBrokerConnectionString() {
+               return brokerConnectionString;
+       }
+
+       @Override
+       public Properties getStandardProperties() {
+               return standardProps;
+       }
+
+       @Override
+       public String getVersion() {
+               return "0.9";
+       }
+
+       @Override
+       public List<KafkaServer> getBrokers() {
+               return brokers;
+       }
+
+       @Override
+       public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, 
KeyedDeserializationSchema<T> readSchema, Properties props) {
+               return new FlinkKafkaConsumer09<>(topics, readSchema, props);
+       }
+
+       @Override
+       public <T> StreamSink<T> getProducerSink(
+                       String topic,
+                       KeyedSerializationSchema<T> serSchema,
+                       Properties props,
+                       KafkaPartitioner<T> partitioner) {
+               FlinkKafkaProducer09<T> prod = new 
FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
+               prod.setFlushOnCheckpoint(true);
+               return new StreamSink<>(prod);
+       }
+
+       @Override
+       public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, 
String topic, KeyedSerializationSchema<T> serSchema, Properties props, 
KafkaPartitioner<T> partitioner) {
+               FlinkKafkaProducer09<T> prod = new 
FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
+               prod.setFlushOnCheckpoint(true);
+               return stream.addSink(prod);
+       }
+
+       @Override
+       public KafkaOffsetHandler createOffsetHandler(Properties props) {
+               return new KafkaOffsetHandlerImpl(props);
+       }
+
+       @Override
+       public void restartBroker(int leaderId) throws Exception {
+               brokers.set(leaderId, getKafkaServer(leaderId, 
tmpKafkaDirs.get(leaderId)));
+       }
+
+       @Override
+       public int getLeaderToShutDown(String topic) throws Exception {
+               ZkUtils zkUtils = getZkUtils();
+               try {
+                       PartitionMetadata firstPart = null;
+                       do {
+                               if (firstPart != null) {
+                                       LOG.info("Unable to find leader. error 
code {}", firstPart.errorCode());
+                                       // not the first try. Sleep a bit
+                                       Thread.sleep(150);
+                               }
+
+                               Seq<PartitionMetadata> partitionMetadata = 
AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionsMetadata();
+                               firstPart = partitionMetadata.head();
+                       }
+                       while (firstPart.errorCode() != 0);
+
+                       return firstPart.leader().get().id();
+               } finally {
+                       zkUtils.close();
+               }
+       }
+
+       @Override
+       public int getBrokerId(KafkaServer server) {
+               return server.config().brokerId();
+       }
+
+       @Override
+       public boolean isSecureRunSupported() {
+               return true;
+       }
+
+       @Override
+       public void prepare(int numKafkaServers, Properties 
additionalServerProperties, boolean secureMode) {
+
+               //increase the timeout since in Travis ZK connection takes long 
time for secure connection.
+               if(secureMode) {
+                       //run only one kafka server to avoid multiple ZK 
connections from many instances - Travis timeout
+                       numKafkaServers = 1;
+                       zkTimeout = String.valueOf(Integer.parseInt(zkTimeout) 
* 15);
+               }
+
+               this.additionalServerProperties = additionalServerProperties;
+               this.secureMode = secureMode;
+               File tempDir = new File(System.getProperty("java.io.tmpdir"));
+
+               tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + 
(UUID.randomUUID().toString()));
+               assertTrue("cannot create zookeeper temp dir", 
tmpZkDir.mkdirs());
+
+               tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + 
(UUID.randomUUID().toString()));
+               assertTrue("cannot create kafka temp dir", 
tmpKafkaParent.mkdirs());
+
+               tmpKafkaDirs = new ArrayList<>(numKafkaServers);
+               for (int i = 0; i < numKafkaServers; i++) {
+                       File tmpDir = new File(tmpKafkaParent, "server-" + i);
+                       assertTrue("cannot create kafka temp dir", 
tmpDir.mkdir());
+                       tmpKafkaDirs.add(tmpDir);
+               }
+
+               zookeeper = null;
+               brokers = null;
+
+               try {
+                       LOG.info("Starting Zookeeper");
+                       zookeeper = new TestingServer(-1, tmpZkDir);
+                       zookeeperConnectionString = 
zookeeper.getConnectString();
+                       LOG.info("zookeeperConnectionString: {}", 
zookeeperConnectionString);
+
+                       LOG.info("Starting KafkaServer");
+                       brokers = new ArrayList<>(numKafkaServers);
+
+                       for (int i = 0; i < numKafkaServers; i++) {
+                               brokers.add(getKafkaServer(i, 
tmpKafkaDirs.get(i)));
+
+                               SocketServer socketServer = 
brokers.get(i).socketServer();
+                               if(secureMode) {
+                                       brokerConnectionString += 
hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, 
brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
+                               } else {
+                                       brokerConnectionString += 
hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, 
brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
+                               }
+                       }
+
+                       LOG.info("ZK and KafkaServer started.");
+               }
+               catch (Throwable t) {
+                       t.printStackTrace();
+                       fail("Test setup failed: " + t.getMessage());
+               }
+
+               LOG.info("brokerConnectionString --> {}", 
brokerConnectionString);
+
+               standardProps = new Properties();
+               standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
+               standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
+               standardProps.setProperty("group.id", "flink-tests");
+               standardProps.setProperty("enable.auto.commit", "false");
+               standardProps.setProperty("zookeeper.session.timeout.ms", 
zkTimeout);
+               standardProps.setProperty("zookeeper.connection.timeout.ms", 
zkTimeout);
+               standardProps.setProperty("auto.offset.reset", "earliest"); // 
read from the beginning. (earliest is kafka 0.9 value)
+               standardProps.setProperty("max.partition.fetch.bytes", "256"); 
// make a lot of fetches (MESSAGES MUST BE SMALLER!)
+
+       }
+
+       @Override
+       public void shutdown() {
+               for (KafkaServer broker : brokers) {
+                       if (broker != null) {
+                               broker.shutdown();
+                       }
+               }
+               brokers.clear();
+
+               if (zookeeper != null) {
+                       try {
+                               zookeeper.stop();
+                               zookeeper.close();
+                       }
+                       catch (Exception e) {
+                               LOG.warn("ZK.stop() failed", e);
+                       }
+                       zookeeper = null;
+               }
+
+               // clean up the temp spaces
+
+               if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
+                       try {
+                               FileUtils.deleteDirectory(tmpKafkaParent);
+                       }
+                       catch (Exception e) {
+                               // ignore
+                       }
+               }
+               if (tmpZkDir != null && tmpZkDir.exists()) {
+                       try {
+                               FileUtils.deleteDirectory(tmpZkDir);
+                       }
+                       catch (Exception e) {
+                               // ignore
+                       }
+               }
+       }
+
+       public ZkUtils getZkUtils() {
+               LOG.info("In getZKUtils:: zookeeperConnectionString = {}", 
zookeeperConnectionString);
+               ZkClient creator = new ZkClient(zookeeperConnectionString, 
Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
+                               
Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), 
new ZooKeeperStringSerializer());
+               return ZkUtils.apply(creator, false);
+       }
+
+       @Override
+       public void createTestTopic(String topic, int numberOfPartitions, int 
replicationFactor, Properties topicConfig) {
+               // create topic with one client
+               LOG.info("Creating topic {}", topic);
+
+               ZkUtils zkUtils = getZkUtils();
+               try {
+                       AdminUtils.createTopic(zkUtils, topic, 
numberOfPartitions, replicationFactor, topicConfig);
+               } finally {
+                       zkUtils.close();
+               }
+
+               LOG.info("Topic {} create request is successfully posted", 
topic);
+
+               // validate that the topic has been created
+               final long deadline = System.currentTimeMillis() + 
Integer.parseInt(zkTimeout);
+               do {
+                       try {
+                               if(secureMode) {
+                                       //increase wait time since in Travis ZK 
timeout occurs frequently
+                                       int wait = Integer.parseInt(zkTimeout) 
/ 100;
+                                       LOG.info("waiting for {} msecs before 
the topic {} can be checked", wait, topic);
+                                       Thread.sleep(wait);
+                               } else {
+                                       Thread.sleep(100);
+                               }
+
+                       } catch (InterruptedException e) {
+                               // restore interrupted state
+                       }
+                       // we could use AdminUtils.topicExists(zkUtils, topic) 
here, but it's results are
+                       // not always correct.
+
+                       LOG.info("Validating if the topic {} has been created 
or not", topic);
+
+                       // create a new ZK utils connection
+                       ZkUtils checkZKConn = getZkUtils();
+                       if(AdminUtils.topicExists(checkZKConn, topic)) {
+                               LOG.info("topic {} has been created 
successfully", topic);
+                               checkZKConn.close();
+                               return;
+                       }
+                       LOG.info("topic {} has not been created yet. Will check 
again...", topic);
+                       checkZKConn.close();
+               }
+               while (System.currentTimeMillis() < deadline);
+               fail("Test topic could not be created");
+       }
+
+       @Override
+       public void deleteTestTopic(String topic) {
+               ZkUtils zkUtils = getZkUtils();
+               try {
+                       LOG.info("Deleting topic {}", topic);
+
+                       ZkClient zk = new ZkClient(zookeeperConnectionString, 
Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
+                               
Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), 
new ZooKeeperStringSerializer());
+
+                       AdminUtils.deleteTopic(zkUtils, topic);
+
+                       zk.close();
+               } finally {
+                       zkUtils.close();
+               }
+       }
+
+       /**
+        * Copied from 
com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
+        */
+       protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) 
throws Exception {
+               Properties kafkaProperties = new Properties();
+
+               // properties have to be Strings
+               kafkaProperties.put("advertised.host.name", KAFKA_HOST);
+               kafkaProperties.put("broker.id", Integer.toString(brokerId));
+               kafkaProperties.put("log.dir", tmpFolder.toString());
+               kafkaProperties.put("zookeeper.connect", 
zookeeperConnectionString);
+               kafkaProperties.put("message.max.bytes", String.valueOf(50 * 
1024 * 1024));
+               kafkaProperties.put("replica.fetch.max.bytes", 
String.valueOf(50 * 1024 * 1024));
+
+               // for CI stability, increase zookeeper session timeout
+               kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
+               kafkaProperties.put("zookeeper.connection.timeout.ms", 
zkTimeout);
+               if(additionalServerProperties != null) {
+                       kafkaProperties.putAll(additionalServerProperties);
+               }
+
+               final int numTries = 5;
+
+               for (int i = 1; i <= numTries; i++) {
+                       int kafkaPort = NetUtils.getAvailablePort();
+                       kafkaProperties.put("port", 
Integer.toString(kafkaPort));
+
+                       //to support secure kafka cluster
+                       if(secureMode) {
+                               LOG.info("Adding Kafka secure configurations");
+                               kafkaProperties.put("listeners", 
"SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+                               kafkaProperties.put("advertised.listeners", 
"SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+                               kafkaProperties.putAll(getSecureProperties());
+                       }
+
+                       KafkaConfig kafkaConfig = new 
KafkaConfig(kafkaProperties);
+
+                       try {
+                               scala.Option<String> stringNone = 
scala.Option.apply(null);
+                               KafkaServer server = new 
KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone);
+                               server.startup();
+                               return server;
+                       }
+                       catch (KafkaException e) {
+                               if (e.getCause() instanceof BindException) {
+                                       // port conflict, retry...
+                                       LOG.info("Port conflict when starting 
Kafka Broker. Retrying...");
+                               }
+                               else {
+                                       throw e;
+                               }
+                       }
+               }
+
+               throw new Exception("Could not start Kafka after " + numTries + 
" retries due to port conflicts.");
+       }
+
+       public Properties getSecureProperties() {
+               Properties prop = new Properties();
+               if(secureMode) {
+                       prop.put("security.inter.broker.protocol", 
"SASL_PLAINTEXT");
+                       prop.put("security.protocol", "SASL_PLAINTEXT");
+                       prop.put("sasl.kerberos.service.name", "kafka");
+
+                       //add special timeout for Travis
+                       prop.setProperty("zookeeper.session.timeout.ms", 
zkTimeout);
+                       prop.setProperty("zookeeper.connection.timeout.ms", 
zkTimeout);
+                       prop.setProperty("metadata.fetch.timeout.ms","120000");
+               }
+               return prop;
+       }
+
+       private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {
+
+               private final KafkaConsumer<byte[], byte[]> offsetClient;
+
+               public KafkaOffsetHandlerImpl(Properties props) {
+                       offsetClient = new KafkaConsumer<>(props);
+               }
+
+               @Override
+               public Long getCommittedOffset(String topicName, int partition) 
{
+                       OffsetAndMetadata committed = 
offsetClient.committed(new TopicPartition(topicName, partition));
+                       return (committed != null) ? committed.offset() : null;
+               }
+
+               @Override
+               public void close() {
+                       offsetClient.close();
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
new file mode 100644
index 0000000..25040eb
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import 
org.apache.flink.streaming.connectors.kafka.internal.Handover.WakeupException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the {@link Handover} between Kafka Consumer Thread and the 
fetcher's main thread. 
+ */
+public class HandoverTest {
+
+       // 
------------------------------------------------------------------------
+       //  test produce / consumer
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testWithVariableProducer() throws Exception {
+               runProducerConsumerTest(500, 2, 0);
+       }
+
+       @Test
+       public void testWithVariableConsumer() throws Exception {
+               runProducerConsumerTest(500, 0, 2);
+       }
+
+       @Test
+       public void testWithVariableBoth() throws Exception {
+               runProducerConsumerTest(500, 2, 2);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test error propagation
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testPublishErrorOnEmptyHandover() throws Exception {
+               final Handover handover = new Handover();
+
+               Exception error = new Exception();
+               handover.reportError(error);
+
+               try {
+                       handover.pollNext();
+                       fail("should throw an exception");
+               }
+               catch (Exception e) {
+                       assertEquals(error, e);
+               }
+       }
+
+       @Test
+       public void testPublishErrorOnFullHandover() throws Exception {
+               final Handover handover = new Handover();
+               handover.produce(createTestRecords());
+
+               IOException error = new IOException();
+               handover.reportError(error);
+
+               try {
+                       handover.pollNext();
+                       fail("should throw an exception");
+               }
+               catch (Exception e) {
+                       assertEquals(error, e);
+               }
+       }
+
+       @Test
+       public void testExceptionMarksClosedOnEmpty() throws Exception {
+               final Handover handover = new Handover();
+
+               IllegalStateException error = new IllegalStateException();
+               handover.reportError(error);
+
+               try {
+                       handover.produce(createTestRecords());
+                       fail("should throw an exception");
+               }
+               catch (Handover.ClosedException e) {
+                       // expected
+               }
+       }
+
+       @Test
+       public void testExceptionMarksClosedOnFull() throws Exception {
+               final Handover handover = new Handover();
+               handover.produce(createTestRecords());
+
+               LinkageError error = new LinkageError();
+               handover.reportError(error);
+
+               try {
+                       handover.produce(createTestRecords());
+                       fail("should throw an exception");
+               }
+               catch (Handover.ClosedException e) {
+                       // expected
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test closing behavior
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testCloseEmptyForConsumer() throws Exception {
+               final Handover handover = new Handover();
+               handover.close();
+
+               try {
+                       handover.pollNext();
+                       fail("should throw an exception");
+               }
+               catch (Handover.ClosedException e) {
+                       // expected
+               }
+       }
+
+       @Test
+       public void testCloseFullForConsumer() throws Exception {
+               final Handover handover = new Handover();
+               handover.produce(createTestRecords());
+               handover.close();
+
+               try {
+                       handover.pollNext();
+                       fail("should throw an exception");
+               }
+               catch (Handover.ClosedException e) {
+                       // expected
+               }
+       }
+
+       @Test
+       public void testCloseEmptyForProducer() throws Exception {
+               final Handover handover = new Handover();
+               handover.close();
+
+               try {
+                       handover.produce(createTestRecords());
+                       fail("should throw an exception");
+               }
+               catch (Handover.ClosedException e) {
+                       // expected
+               }
+       }
+
+       @Test
+       public void testCloseFullForProducer() throws Exception {
+               final Handover handover = new Handover();
+               handover.produce(createTestRecords());
+               handover.close();
+
+               try {
+                       handover.produce(createTestRecords());
+                       fail("should throw an exception");
+               }
+               catch (Handover.ClosedException e) {
+                       // expected
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test wake up behavior
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testWakeupDoesNotWakeWhenEmpty() throws Exception {
+               Handover handover = new Handover();
+               handover.wakeupProducer();
+
+               // produce into a woken but empty handover
+               try {
+                       handover.produce(createTestRecords());
+               }
+               catch (Handover.WakeupException e) {
+                       fail();
+               }
+
+               // handover now has records, next time we wakeup and produce it 
needs
+               // to throw an exception
+               handover.wakeupProducer();
+               try {
+                       handover.produce(createTestRecords());
+                       fail("should throw an exception");
+               }
+               catch (Handover.WakeupException e) {
+                       // expected
+               }
+
+               // empty the handover
+               assertNotNull(handover.pollNext());
+               
+               // producing into an empty handover should work
+               try {
+                       handover.produce(createTestRecords());
+               }
+               catch (Handover.WakeupException e) {
+                       fail();
+               }
+       }
+
+       @Test
+       public void testWakeupWakesOnlyOnce() throws Exception {
+               // create a full handover
+               final Handover handover = new Handover();
+               handover.produce(createTestRecords());
+
+               handover.wakeupProducer();
+
+               try {
+                       handover.produce(createTestRecords());
+                       fail();
+               } catch (WakeupException e) {
+                       // expected
+               }
+
+               CheckedThread producer = new CheckedThread() {
+                       @Override
+                       public void go() throws Exception {
+                               handover.produce(createTestRecords());
+                       }
+               };
+               producer.start();
+
+               // the producer must go blocking
+               producer.waitUntilThreadHoldsLock(10000);
+
+               // release the thread by consuming something
+               assertNotNull(handover.pollNext());
+               producer.sync();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  utilities
+       // 
------------------------------------------------------------------------
+
+       private void runProducerConsumerTest(int numRecords, int 
maxProducerDelay, int maxConsumerDelay) throws Exception {
+               // generate test data
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               final ConsumerRecords<byte[], byte[]>[] data = new 
ConsumerRecords[numRecords];
+               for (int i = 0; i < numRecords; i++) {
+                       data[i] = createTestRecords();
+               }
+
+               final Handover handover = new Handover();
+
+               ProducerThread producer = new ProducerThread(handover, data, 
maxProducerDelay);
+               ConsumerThread consumer = new ConsumerThread(handover, data, 
maxConsumerDelay);
+
+               consumer.start();
+               producer.start();
+
+               // sync first on the consumer, so it propagates assertion errors
+               consumer.sync();
+               producer.sync();
+       }
+
+       @SuppressWarnings("unchecked")
+       private static ConsumerRecords<byte[], byte[]> createTestRecords() {
+               return mock(ConsumerRecords.class);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static abstract class CheckedThread extends Thread {
+
+               private volatile Throwable error;
+
+               public abstract void go() throws Exception;
+
+               @Override
+               public void run() {
+                       try {
+                               go();
+                       }
+                       catch (Throwable t) {
+                               error = t;
+                       }
+               }
+
+               public void sync() throws Exception {
+                       join();
+                       if (error != null) {
+                               ExceptionUtils.rethrowException(error, 
error.getMessage());
+                       }
+               }
+
+               public void waitUntilThreadHoldsLock(long timeoutMillis) throws 
InterruptedException, TimeoutException {
+                       final long deadline = System.nanoTime() + timeoutMillis 
* 1_000_000;
+                       
+                       while (!isBlockedOrWaiting() && (System.nanoTime() < 
deadline)) {
+                               Thread.sleep(1);
+                       }
+
+                       if (!isBlockedOrWaiting()) {
+                               throw new TimeoutException();
+                       }
+               }
+
+               private boolean isBlockedOrWaiting() {
+                       State state = getState();
+                       return state == State.BLOCKED || state == State.WAITING 
|| state == State.TIMED_WAITING;
+               }
+       }
+
+       private static class ProducerThread extends CheckedThread {
+
+               private final Random rnd = new Random();
+               private final Handover handover;
+               private final ConsumerRecords<byte[], byte[]>[] data;
+               private final int maxDelay;
+
+               private ProducerThread(Handover handover, 
ConsumerRecords<byte[], byte[]>[] data, int maxDelay) {
+                       this.handover = handover;
+                       this.data = data;
+                       this.maxDelay = maxDelay;
+               }
+
+               @Override
+               public void go() throws Exception {
+                       for (ConsumerRecords<byte[], byte[]> rec : data) {
+                               handover.produce(rec);
+
+                               if (maxDelay > 0) {
+                                       int delay = rnd.nextInt(maxDelay);
+                                       Thread.sleep(delay);
+                               }
+                       }
+               }
+       }
+
+       private static class ConsumerThread extends CheckedThread {
+
+               private final Random rnd = new Random();
+               private final Handover handover;
+               private final ConsumerRecords<byte[], byte[]>[] data;
+               private final int maxDelay;
+
+               private ConsumerThread(Handover handover, 
ConsumerRecords<byte[], byte[]>[] data, int maxDelay) {
+                       this.handover = handover;
+                       this.data = data;
+                       this.maxDelay = maxDelay;
+               }
+
+               @Override
+               public void go() throws Exception {
+                       for (ConsumerRecords<byte[], byte[]> rec : data) {
+                               ConsumerRecords<byte[], byte[]> next = 
handover.pollNext();
+
+                               assertEquals(rec, next);
+
+                               if (maxDelay > 0) {
+                                       int delay = rnd.nextInt(maxDelay);
+                                       Thread.sleep(delay);
+                               }
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..4ac1773
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
@@ -0,0 +1,32 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+log4j.logger.org.apache.zookeeper=OFF, testlogger
+log4j.logger.state.change.logger=OFF, testlogger
+log4j.logger.kafka=OFF, testlogger
+
+log4j.logger.org.apache.directory=OFF, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file

Reply via email to