[FLINK-5048] [kafka consumer] Change thread model of FlinkKafkaConsumer to 
better handel shutdown/interrupt situations

Prior to this commit, the FlinkKafkaConsumer (0.9 / 0.10) spawns a separate 
thread that operates Kafka's consumer.
That thread ws shielded from interrupts, because the Kafka Consumer has not 
been handling thread interrupts well.
Since that thread was also the thread that emitted records, it would block in 
the network stack (backpressure) or in chained operators.
The later case lead to situations where cancellations got very slow unless that 
thread would be interrupted (which it could not be).

This commit changes the thread model:
  - A spawned consumer thread polls a batch or records from the KafkaConsumer 
and pushes the
    batch of records into a blocking queue (size one)
  - The main thread of the task will pull the record batches from the blocking 
queue and
    emit the records.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a66e7ad1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a66e7ad1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a66e7ad1

Branch: refs/heads/master
Commit: a66e7ad14e41fa07737f447d68920ad5cc4ed6d3
Parents: fa1864c
Author: Stephan Ewen <[email protected]>
Authored: Thu Nov 10 11:13:43 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Wed Nov 16 19:08:07 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/util/ExceptionUtils.java   |  20 +
 .../kafka/internal/Kafka010Fetcher.java         |   7 +-
 .../internal/KafkaConsumerCallBridge010.java    |  40 ++
 .../connectors/kafka/Kafka010FetcherTest.java   | 172 ++++++++-
 .../kafka/KafkaShortRetention010ITCase.java     |  34 --
 .../connectors/kafka/internal/Handover.java     | 214 ++++++++++
 .../kafka/internal/Kafka09Fetcher.java          | 274 +++----------
 .../kafka/internal/KafkaConsumerCallBridge.java |  41 ++
 .../kafka/internal/KafkaConsumerThread.java     | 332 ++++++++++++++++
 .../connectors/kafka/Kafka09FetcherTest.java    | 164 +++++++-
 .../kafka/KafkaShortRetention09ITCase.java      |  34 --
 .../connectors/kafka/internal/HandoverTest.java | 387 +++++++++++++++++++
 .../kafka/KafkaShortRetentionTestBase.java      |   1 +
 13 files changed, 1422 insertions(+), 298 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index cc7f56f..32bc1d2 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -100,6 +100,26 @@ public final class ExceptionUtils {
        }
 
        /**
+        * Throws the given {@code Throwable} in scenarios where the signatures 
do allow to
+        * throw a Exception. Errors and Exceptions are thrown directly, other 
"exotic"
+        * subclasses of Throwable are wrapped in an Exception.
+        *
+        * @param t The throwable to be thrown.
+        * @param parentMessage The message for the parent Exception, if one is 
needed.
+        */
+       public static void rethrowException(Throwable t, String parentMessage) 
throws Exception {
+               if (t instanceof Error) {
+                       throw (Error) t;
+               }
+               else if (t instanceof Exception) {
+                       throw (Exception) t;
+               }
+               else {
+                       throw new Exception(parentMessage, t);
+               }
+       }
+
+       /**
         * Tries to throw the given {@code Throwable} in scenarios where the 
signatures allows only IOExceptions
         * (and RuntimeException and Error). Throws this exception directly, if 
it is an IOException,
         * a RuntimeException, or an Error. Otherwise does nothing.

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
index 024cd38..71dd29a 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
@@ -29,7 +29,6 @@ import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 
 import java.util.List;
@@ -91,11 +90,11 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
 
        /**
         * This method needs to be overridden because Kafka broke binary 
compatibility between 0.9 and 0.10,
-        * changing the List in the signature to a Collection.
+        * changing binary signatures
         */
        @Override
-       protected void assignPartitionsToConsumer(KafkaConsumer<?, ?> consumer, 
List<TopicPartition> topicPartitions) {
-               consumer.assign(topicPartitions);
+       protected KafkaConsumerCallBridge010 createCallBridge() {
+               return new KafkaConsumerCallBridge010();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
new file mode 100644
index 0000000..a81b098
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+
+/**
+ * The ConsumerCallBridge simply calls the {@link 
KafkaConsumer#assign(java.util.Collection)} method.
+ * 
+ * This indirection is necessary, because Kafka broke binary compatibility 
between 0.9 and 0.10,
+ * changing {@code assign(List)} to {@code assign(Collection)}.
+ * 
+ * Because of that, we need two versions whose compiled code goes against 
different method signatures.
+ */
+public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge {
+
+       @Override
+       public void assignPartitions(KafkaConsumer<?, ?> consumer, 
List<TopicPartition> topicPartitions) throws Exception {
+               consumer.assign(topicPartitions);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 037d25b..6ee0429 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -20,16 +20,20 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internal.Handover;
 import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -45,6 +49,7 @@ import org.mockito.stubbing.Answer;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -54,6 +59,7 @@ import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -69,7 +75,7 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
  * Unit tests for the {@link Kafka010Fetcher}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(Kafka010Fetcher.class)
+@PrepareForTest(KafkaConsumerThread.class)
 public class Kafka010FetcherTest {
 
     @Test
@@ -125,7 +131,7 @@ public class Kafka010FetcherTest {
                 getClass().getClassLoader(),
                 false, /* checkpointing */
                 "taskname-with-subtask",
-                mock(MetricGroup.class),
+                new UnregisteredMetricsGroup(),
                 schema,
                 new Properties(),
                 0L,
@@ -174,9 +180,13 @@ public class Kafka010FetcherTest {
         fetcherRunner.join();
 
         // check that there were no errors in the fetcher
-        final Throwable caughtError = error.get();
-        if (caughtError != null) {
-            throw new Exception("Exception in the fetcher", caughtError);
+        final Throwable fetcherError = error.get();
+        if (fetcherError != null && !(fetcherError instanceof 
Handover.ClosedException)) {
+            throw new Exception("Exception in the fetcher", fetcherError);
+        }
+        final Throwable committerError = commitError.get();
+        if (committerError != null) {
+            throw new Exception("Exception in the committer", committerError);
         }
     }
 
@@ -258,7 +268,7 @@ public class Kafka010FetcherTest {
                 getClass().getClassLoader(),
                 false, /* checkpointing */
                 "taskname-with-subtask",
-                mock(MetricGroup.class),
+                new UnregisteredMetricsGroup(),
                 schema,
                 new Properties(),
                 0L,
@@ -321,8 +331,154 @@ public class Kafka010FetcherTest {
 
         // check that there were no errors in the fetcher
         final Throwable caughtError = error.get();
-        if (caughtError != null) {
+        if (caughtError != null && !(caughtError instanceof 
Handover.ClosedException)) {
             throw new Exception("Exception in the fetcher", caughtError);
         }
     }
+
+    @Test
+    public void testCancellationWhenEmitBlocks() throws Exception {
+
+        // ----- some test data -----
+
+        final String topic = "test-topic";
+        final int partition = 3;
+        final byte[] payload = new byte[] {1, 2, 3, 4};
+
+        final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
+                new ConsumerRecord<byte[], byte[]>(topic, partition, 15, 
payload, payload),
+                new ConsumerRecord<byte[], byte[]>(topic, partition, 16, 
payload, payload),
+                new ConsumerRecord<byte[], byte[]>(topic, partition, 17, 
payload, payload));
+
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = 
new HashMap<>();
+        data.put(new TopicPartition(topic, partition), records);
+
+        final ConsumerRecords<byte[], byte[]> consumerRecords = new 
ConsumerRecords<>(data);
+
+        // ----- the test consumer -----
+
+        final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+        when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer<ConsumerRecords<?, ?>>() {
+            @Override
+            public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
+                return consumerRecords;
+            }
+        });
+
+        
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+        // ----- build a fetcher -----
+
+        BlockingSourceContext<String> sourceContext = new 
BlockingSourceContext<>();
+        List<KafkaTopicPartition> topics = Collections.singletonList(new 
KafkaTopicPartition(topic, partition));
+        KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+        final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+                sourceContext,
+                topics,
+                null, /* periodic watermark extractor */
+                null, /* punctuated watermark extractor */
+                new TestProcessingTimeService(),
+                10, /* watermark interval */
+                this.getClass().getClassLoader(),
+                true, /* checkpointing */
+                "task_name",
+                new UnregisteredMetricsGroup(),
+                schema,
+                new Properties(),
+                0L,
+                false);
+
+
+        // ----- run the fetcher -----
+
+        final AtomicReference<Throwable> error = new AtomicReference<>();
+        final Thread fetcherRunner = new Thread("fetcher runner") {
+
+            @Override
+            public void run() {
+                try {
+                    fetcher.runFetchLoop();
+                } catch (Throwable t) {
+                    error.set(t);
+                }
+            }
+        };
+        fetcherRunner.start();
+
+        // wait until the thread started to emit records to the source context
+        sourceContext.waitTillHasBlocker();
+
+        // now we try to cancel the fetcher, including the interruption 
usually done on the task thread
+        // once it has finished, there must be no more thread blocked on the 
source context
+        fetcher.cancel();
+        fetcherRunner.interrupt();
+        fetcherRunner.join();
+
+        assertFalse("fetcher threads did not properly finish", 
sourceContext.isStillBlocking());
+    }
+
+    // ------------------------------------------------------------------------
+    //  test utilities
+    // ------------------------------------------------------------------------
+
+    private static final class BlockingSourceContext<T> implements 
SourceContext<T> {
+
+        private final ReentrantLock lock = new ReentrantLock();
+        private final OneShotLatch inBlocking = new OneShotLatch();
+
+        @Override
+        public void collect(T element) {
+            block();
+        }
+
+        @Override
+        public void collectWithTimestamp(T element, long timestamp) {
+            block();
+        }
+
+        @Override
+        public void emitWatermark(Watermark mark) {
+            block();
+        }
+
+        @Override
+        public Object getCheckpointLock() {
+            return new Object();
+        }
+
+        @Override
+        public void close() {}
+
+        public void waitTillHasBlocker() throws InterruptedException {
+            inBlocking.await();
+        }
+
+        public boolean isStillBlocking() {
+            return lock.isLocked();
+        }
+
+        @SuppressWarnings({"InfiniteLoopStatement", 
"SynchronizationOnLocalVariableOrMethodParameter"})
+        private void block() {
+            lock.lock();
+            try {
+                inBlocking.trigger();
+
+                // put this thread to sleep indefinitely
+                final Object o = new Object();
+                while (true) {
+                    synchronized (o) {
+                        o.wait();
+                    }
+                }
+            }
+            catch (InterruptedException e) {
+                // exit cleanly, simply reset the interruption flag
+                Thread.currentThread().interrupt();
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java
deleted file mode 100644
index 1d36198..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class KafkaShortRetention010ITCase extends KafkaShortRetentionTestBase {
-
-       @Test(timeout=60000)
-       public void testAutoOffsetReset() throws Exception {
-               runAutoOffsetResetTest();
-       }
-
-       @Test(timeout=60000)
-       public void testAutoOffsetResetNone() throws Exception {
-               runFailOnAutoOffsetResetNone();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
new file mode 100644
index 0000000..e6e3c51
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.Closeable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The Handover is a utility to hand over data (a buffer of records) and 
exception from a
+ * <i>producer</i> thread to a <i>consumer</i> thread. It effectively behaves 
like a
+ * "size one blocking queue", with some extras around exception reporting, 
closing, and
+ * waking up thread without {@link Thread#interrupt() interrupting} threads.
+ * 
+ * <p>This class is used in the Flink Kafka Consumer to hand over data and 
exceptions between
+ * the thread that runs the KafkaConsumer class and the main thread.
+ * 
+ * <p>The Handover has the notion of "waking up" the producer thread with a 
{@link WakeupException}
+ * rather than a thread interrupt.
+ * 
+ * <p>The Handover can also be "closed", signalling from one thread to the 
other that it
+ * the thread has terminated.
+ */
+@ThreadSafe
+public final class Handover implements Closeable {
+
+       private final Object lock = new Object();
+
+       private ConsumerRecords<byte[], byte[]> next;
+       private Throwable error;
+       private boolean wakeupProducer;
+
+       /**
+        * Polls the next element from the Handover, possibly blocking until 
the next element is
+        * available. This method behaves similar to polling from a blocking 
queue.
+        * 
+        * <p>If an exception was handed in by the producer ({@link 
#reportError(Throwable)}), then
+        * that exception is thrown rather than an element being returned.
+        * 
+        * @return The next element (buffer of records, never null).
+        * 
+        * @throws ClosedException Thrown if the Handover was {@link #close() 
closed}.
+        * @throws Exception Rethrows exceptions from the {@link 
#reportError(Throwable)} method.
+        */
+       @Nonnull
+       public ConsumerRecords<byte[], byte[]> pollNext() throws Exception {
+               synchronized (lock) {
+                       while (next == null && error == null) {
+                               lock.wait();
+                       }
+
+                       ConsumerRecords<byte[], byte[]> n = next;
+                       if (n != null) {
+                               next = null;
+                               lock.notifyAll();
+                               return n;
+                       }
+                       else {
+                               ExceptionUtils.rethrowException(error, 
error.getMessage());
+
+                               // this statement cannot be reached since the 
above method always throws an exception
+                               // this is only here to silence the compiler 
and any warnings
+                               return ConsumerRecords.empty(); 
+                       }
+               }
+       }
+
+       /**
+        * Hands over an element from the producer. If the Handover already has 
an element that was
+        * not yet picked up by the consumer thread, this call blocks until the 
consumer picks up that
+        * previous element.
+        * 
+        * <p>This behavior is similar to a "size one" blocking queue.
+        * 
+        * @param element The next element to hand over.
+        * 
+        * @throws InterruptedException
+        *                 Thrown, if the thread is interrupted while blocking 
for the Handover to be empty.
+        * @throws WakeupException
+        *                 Thrown, if the {@link #wakeupProducer()} method is 
called while blocking for
+        *                 the Handover to be empty.
+        * @throws ClosedException
+        *                 Thrown if the Handover was closed or concurrently 
being closed.
+        */
+       public void produce(final ConsumerRecords<byte[], byte[]> element)
+                       throws InterruptedException, WakeupException, 
ClosedException {
+
+               checkNotNull(element);
+
+               synchronized (lock) {
+                       while (next != null && !wakeupProducer) {
+                               lock.wait();
+                       }
+
+                       wakeupProducer = false;
+
+                       // if there is still an element, we must have been 
woken up
+                       if (next != null) {
+                               throw new WakeupException();
+                       }
+                       // if there is no error, then this is open and can 
accept this element
+                       else if (error == null) {
+                               next = element;
+                               lock.notifyAll();
+                       }
+                       // an error marks this as closed for the producer
+                       else {
+                               throw new ClosedException();
+                       }
+               }
+       }
+
+       /**
+        * Reports an exception. The consumer will throw the given exception 
immediately, if
+        * it is currently blocked in the {@link #pollNext()} method, or the 
next time it
+        * calls that method.
+        * 
+        * <p>After this method has been called, no call to either {@link 
#produce(ConsumerRecords)}
+        * or {@link #pollNext()} will ever return regularly any more, but will 
always return
+        * exceptionally.
+        * 
+        * <p>If another exception was already reported, this method does 
nothing.
+        * 
+        * <p>For the producer, the Handover will appear as if it was {@link 
#close() closed}.
+        * 
+        * @param t The exception to report.
+        */
+       public void reportError(Throwable t) {
+               checkNotNull(t);
+
+               synchronized (lock) {
+                       // do not override the initial exception
+                       if (error == null) {
+                               error = t;
+                       }
+                       next = null;
+                       lock.notifyAll();
+               }
+       }
+
+       /**
+        * Closes the handover. Both the {@link #produce(ConsumerRecords)} 
method and the
+        * {@link #pollNext()} will throw a {@link ClosedException} on any 
currently blocking and
+        * future invocations.
+        * 
+        * <p>If an exception was previously reported via the {@link 
#reportError(Throwable)} method,
+        * that exception will not be overridden. The consumer thread will 
throw that exception upon
+        * calling {@link #pollNext()}, rather than the {@code ClosedException}.
+        */
+       @Override
+       public void close() {
+               synchronized (lock) {
+                       next = null;
+                       wakeupProducer = false;
+
+                       if (error == null) {
+                               error = new ClosedException();
+                       }
+                       lock.notifyAll();
+               }
+       }
+
+       /**
+        * Wakes the producer thread up. If the producer thread is currently 
blocked in
+        * the {@link #produce(ConsumerRecords)} method, it will exit the 
method throwing
+        * a {@link WakeupException}.
+        */
+       public void wakeupProducer() {
+               synchronized (lock) {
+                       wakeupProducer = true;
+                       lock.notifyAll();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * An exception thrown by the Handover in the {@link #pollNext()} or
+        * {@link #produce(ConsumerRecords)} method, after the Handover was 
closed via
+        * {@link #close()}.
+        */
+       public static final class ClosedException extends Exception {
+               private static final long serialVersionUID = 1L;
+       }
+
+       /**
+        * A special exception thrown bv the Handover in the {@link 
#produce(ConsumerRecords)}
+        * method when the producer is woken up from a blocking call via {@link 
#wakeupProducer()}.
+        */
+       public static final class WakeupException extends Exception {
+               private static final long serialVersionUID = 1L;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index acdcb61..d495327 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -23,10 +23,8 @@ import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.ExceptionProxy;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
-import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
@@ -34,30 +32,23 @@ import org.apache.flink.util.SerializedValue;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetCommitCallback;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer 
API.
  * 
  * @param <T> The type of elements produced by the fetcher.
  */
-public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> 
implements Runnable {
+public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(Kafka09Fetcher.class);
 
@@ -66,36 +57,15 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
        /** The schema to convert between Kafka's byte messages, and Flink's 
objects */
        private final KeyedDeserializationSchema<T> deserializer;
 
-       /** The configuration for the Kafka consumer */
-       private final Properties kafkaProperties;
+       /** The handover of data and exceptions between the consumer thread and 
the task thread */
+       private final Handover handover;
 
-       /** The maximum number of milliseconds to wait for a fetch batch */
-       private final long pollTimeout;
-
-       /** The next offsets that the main thread should commit */
-       private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> 
nextOffsetsToCommit;
-       
-       /** The callback invoked by Kafka once an offset commit is complete */
-       private final OffsetCommitCallback offsetCommitCallback;
-
-       /** Reference to the Kafka consumer, once it is created */
-       private volatile KafkaConsumer<byte[], byte[]> consumer;
-       
-       /** Reference to the proxy, forwarding exceptions from the fetch thread 
to the main thread */
-       private volatile ExceptionProxy errorHandler;
+       /** The thread that runs the actual KafkaConsumer and hand the record 
batches to this fetcher */
+       private final KafkaConsumerThread consumerThread;
 
        /** Flag to mark the main work loop as alive */
        private volatile boolean running = true;
 
-       /** Flag tracking whether the latest commit request has completed */
-       private volatile boolean commitInProgress;
-
-       /** For Debug output **/
-       private String taskNameWithSubtasks;
-
-       /** We get this from the outside to publish metrics. **/
-       private MetricGroup metricGroup;
-
        // 
------------------------------------------------------------------------
 
        public Kafka09Fetcher(
@@ -125,16 +95,26 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
                                useMetrics);
 
                this.deserializer = deserializer;
-               this.kafkaProperties = kafkaProperties;
-               this.pollTimeout = pollTimeout;
-               this.nextOffsetsToCommit = new AtomicReference<>();
-               this.offsetCommitCallback = new CommitCallback();
-               this.taskNameWithSubtasks = taskNameWithSubtasks;
-               this.metricGroup = metricGroup;
+               this.handover = new Handover();
+
+               final MetricGroup kafkaMetricGroup = 
metricGroup.addGroup("KafkaConsumer");
+               addOffsetStateGauge(kafkaMetricGroup);
 
                // if checkpointing is enabled, we are not automatically 
committing to Kafka.
-               
kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+               kafkaProperties.setProperty(
+                               ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
                                Boolean.toString(!enableCheckpointing));
+               
+               this.consumerThread = new KafkaConsumerThread(
+                               LOG,
+                               handover,
+                               kafkaProperties,
+                               subscribedPartitions(),
+                               kafkaMetricGroup,
+                               createCallBridge(),
+                               getFetcherName() + " for " + 
taskNameWithSubtasks,
+                               pollTimeout,
+                               useMetrics);
        }
 
        // 
------------------------------------------------------------------------
@@ -143,133 +123,26 @@ public class Kafka09Fetcher<T> extends 
AbstractFetcher<T, TopicPartition> implem
 
        @Override
        public void runFetchLoop() throws Exception {
-               this.errorHandler = new ExceptionProxy(Thread.currentThread());
-
-               // rather than running the main fetch loop directly here, we 
spawn a dedicated thread
-               // this makes sure that no interrupt() call upon canceling 
reaches the Kafka consumer code
-               Thread runner = new Thread(this, getFetcherName() + " for " + 
taskNameWithSubtasks);
-               runner.setDaemon(true);
-               runner.start();
-
                try {
-                       runner.join();
-               } catch (InterruptedException e) {
-                       // may be the result of a wake-up after an exception. 
we ignore this here and only
-                       // restore the interruption state
-                       Thread.currentThread().interrupt();
-               }
-
-               // make sure we propagate any exception that occurred in the 
concurrent fetch thread,
-               // before leaving this method
-               this.errorHandler.checkAndThrowException();
-       }
-
-       @Override
-       public void cancel() {
-               // flag the main thread to exit
-               running = false;
-
-               // NOTE:
-               //   - We cannot interrupt the runner thread, because the Kafka 
consumer may
-               //     deadlock when the thread is interrupted while in certain 
methods
-               //   - We cannot call close() on the consumer, because it will 
actually throw
-               //     an exception if a concurrent call is in progress
-
-               // make sure the consumer finds out faster that we are shutting 
down 
-               if (consumer != null) {
-                       consumer.wakeup();
-               }
-       }
-
-       @Override
-       public void run() {
-               // This method initializes the KafkaConsumer and guarantees it 
is torn down properly.
-               // This is important, because the consumer has multi-threading 
issues,
-               // including concurrent 'close()' calls.
-
-               final KafkaConsumer<byte[], byte[]> consumer;
-               try {
-                       consumer = new KafkaConsumer<>(kafkaProperties);
-               }
-               catch (Throwable t) {
-                       running = false;
-                       errorHandler.reportError(t);
-                       return;
-               }
-
-               // from here on, the consumer will be closed properly
-               try {
-                       assignPartitionsToConsumer(consumer, 
convertKafkaPartitions(subscribedPartitions()));
-
-                       if (useMetrics) {
-                               final MetricGroup kafkaMetricGroup = 
metricGroup.addGroup("KafkaConsumer");
-                               addOffsetStateGauge(kafkaMetricGroup);
-                               // register Kafka metrics to Flink
-                               Map<MetricName, ? extends Metric> metrics = 
consumer.metrics();
-                               if (metrics == null) {
-                                       // MapR's Kafka implementation returns 
null here.
-                                       LOG.info("Consumer implementation does 
not support metrics");
-                               } else {
-                                       // we have Kafka metrics, register them
-                                       for (Map.Entry<MetricName, ? extends 
Metric> metric: metrics.entrySet()) {
-                                               
kafkaMetricGroup.gauge(metric.getKey().name(), new 
KafkaMetricWrapper(metric.getValue()));
-                                       }
-                               }
-                       }
-
-                       // seek the consumer to the initial offsets
-                       for (KafkaTopicPartitionState<TopicPartition> partition 
: subscribedPartitions()) {
-                               if (partition.isOffsetDefined()) {
-                                       LOG.info("Partition {} has restored 
initial offsets {} from checkpoint / savepoint; seeking the consumer " +
-                                               "to position {}", 
partition.getKafkaPartitionHandle(), partition.getOffset(), 
partition.getOffset() + 1);
-
-                                       
consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
-                               } else {
-                                       // for partitions that do not have 
offsets restored from a checkpoint/savepoint,
-                                       // we need to define our internal 
offset state for them using the initial offsets retrieved from Kafka
-                                       // by the KafkaConsumer, so that they 
are correctly checkpointed and committed on the next checkpoint
-
-                                       long fetchedOffset = 
consumer.position(partition.getKafkaPartitionHandle());
-
-                                       LOG.info("Partition {} has no initial 
offset; the consumer has position {}, so the initial offset " +
-                                               "will be set to {}", 
partition.getKafkaPartitionHandle(), fetchedOffset, fetchedOffset - 1);
-
-                                       // the fetched offset represents the 
next record to process, so we need to subtract it by 1
-                                       partition.setOffset(fetchedOffset - 1);
-                               }
-                       }
+                       final Handover handover = this.handover;
 
-                       // from now on, external operations may call the 
consumer
-                       this.consumer = consumer;
+                       // kick off the actual Kafka consumer
+                       consumerThread.start();
 
-                       // main fetch loop
                        while (running) {
-
-                               // check if there is something to commit
-                               final Map<TopicPartition, OffsetAndMetadata> 
toCommit = nextOffsetsToCommit.getAndSet(null);
-                               if (toCommit != null && !commitInProgress) {
-                                       // reset the work-to-be committed, so 
we don't repeatedly commit the same
-                                       // also record that a commit is already 
in progress
-                                       commitInProgress = true;
-                                       consumer.commitAsync(toCommit, 
offsetCommitCallback);
-                               }
-
-                               // get the next batch of records
-                               final ConsumerRecords<byte[], byte[]> records;
-                               try {
-                                       records = consumer.poll(pollTimeout);
-                               }
-                               catch (WakeupException we) {
-                                       continue;
-                               }
+                               // this blocks until we get the next records
+                               // it automatically re-throws exceptions 
encountered in the fetcher thread
+                               final ConsumerRecords<byte[], byte[]> records = 
handover.pollNext();
 
                                // get the records for each topic partition
                                for (KafkaTopicPartitionState<TopicPartition> 
partition : subscribedPartitions()) {
-                                       
-                                       List<ConsumerRecord<byte[], byte[]>> 
partitionRecords = records.records(partition.getKafkaPartitionHandle());
+
+                                       List<ConsumerRecord<byte[], byte[]>> 
partitionRecords =
+                                                       
records.records(partition.getKafkaPartitionHandle());
 
                                        for (ConsumerRecord<byte[], byte[]> 
record : partitionRecords) {
-                                               T value = 
deserializer.deserialize(
+
+                                               final T value = 
deserializer.deserialize(
                                                                record.key(), 
record.value(),
                                                                record.topic(), 
record.partition(), record.offset());
 
@@ -279,32 +152,37 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
                                                        break;
                                                }
 
-                                               // emit the actual record. this 
also update offset state atomically
+                                               // emit the actual record. this 
also updates offset state atomically
                                                // and deals with timestamps 
and watermark generation
                                                emitRecord(value, partition, 
record.offset(), record);
                                        }
                                }
                        }
-                       // end main fetch loop
-               }
-               catch (Throwable t) {
-                       if (running) {
-                               running = false;
-                               errorHandler.reportError(t);
-                       } else {
-                               LOG.debug("Stopped ConsumerThread threw 
exception", t);
-                       }
                }
                finally {
-                       try {
-                               consumer.close();
-                       }
-                       catch (Throwable t) {
-                               LOG.warn("Error while closing Kafka 0.9 
consumer", t);
-                       }
+                       // this signals the consumer thread that no more work 
is to be done
+                       consumerThread.shutdown();
+               }
+
+               // on a clean exit, wait for the runner thread
+               try {
+                       consumerThread.join();
+               }
+               catch (InterruptedException e) {
+                       // may be the result of a wake-up interruption after an 
exception.
+                       // we ignore this here and only restore the 
interruption state
+                       Thread.currentThread().interrupt();
                }
        }
 
+       @Override
+       public void cancel() {
+               // flag the main thread to exit. A thread interrupt will come 
anyways.
+               running = false;
+               handover.close();
+               consumerThread.shutdown();
+       }
+
        // 
------------------------------------------------------------------------
        //  The below methods are overridden in the 0.10 fetcher, which 
otherwise
        //   reuses most of the 0.9 fetcher behavior
@@ -320,14 +198,17 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
                emitRecord(record, partition, offset);
        }
 
-       protected void assignPartitionsToConsumer(KafkaConsumer<?, ?> consumer, 
List<TopicPartition> topicPartitions) {
-               consumer.assign(topicPartitions);
-       }
-
+       /**
+        * Gets the name of this fetcher, for thread naming and logging 
purposes.
+        */
        protected String getFetcherName() {
                return "Kafka 0.9 Fetcher";
        }
 
+       protected KafkaConsumerCallBridge createCallBridge() {
+               return new KafkaConsumerCallBridge();
+       }
+
        // 
------------------------------------------------------------------------
        //  Implement Methods of the AbstractFetcher
        // 
------------------------------------------------------------------------
@@ -355,37 +236,6 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
                }
 
                // record the work to be committed by the main consumer thread 
and make sure the consumer notices that
-               if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
-                       LOG.warn("Committing offsets to Kafka takes longer than 
the checkpoint interval. " +
-                                       "Skipping commit of previous offsets 
because newer complete checkpoint offsets are available. " +
-                                       "This does not compromise Flink's 
checkpoint integrity.");
-               }
-               if (consumer != null) {
-                       consumer.wakeup();
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       public static List<TopicPartition> 
convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) {
-               ArrayList<TopicPartition> result = new 
ArrayList<>(partitions.length);
-               for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
-                       result.add(p.getKafkaPartitionHandle());
-               }
-               return result;
-       }
-
-       private class CommitCallback implements OffsetCommitCallback {
-
-               @Override
-               public void onComplete(Map<TopicPartition, OffsetAndMetadata> 
offsets, Exception ex) {
-                       commitInProgress = false;
-
-                       if (ex != null) {
-                               LOG.warn("Committing offsets to Kafka failed. 
This does not compromise Flink's checkpoints.", ex);
-                       }
-               }
+               consumerThread.setOffsetsToCommit(offsetsToCommit);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
new file mode 100644
index 0000000..c17aae6
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+
+/**
+ * The ConsumerCallBridge simply calls methods on the {@link KafkaConsumer}.
+ * 
+ * This indirection is necessary, because Kafka broke binary compatibility 
between 0.9 and 0.10,
+ * for example changing {@code assign(List)} to {@code assign(Collection)}.
+ * 
+ * Because of that, we need to two versions whose compiled code goes against 
different method signatures.
+ * Even though the source of subclasses may look identical, the byte code will 
be different, because they
+ * are compiled against different dependencies.
+ */
+public class KafkaConsumerCallBridge {
+
+       public void assignPartitions(KafkaConsumer<?, ?> consumer, 
List<TopicPartition> topicPartitions) throws Exception {
+               consumer.assign(topicPartitions);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
new file mode 100644
index 0000000..9cfa840
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.metrics.MetricGroup;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The thread the runs the {@link KafkaConsumer}, connecting to the brokers 
and polling records.
+ * The thread pushes the data into a {@link Handover} to be picked up by the 
fetcher that will
+ * deserialize and emit the records.
+ * 
+ * <p><b>IMPORTANT:</b> This thread must not be interrupted when attempting to 
shut it down.
+ * The Kafka consumer code was found to not always handle interrupts well, and 
to even
+ * deadlock in certain situations.
+ * 
+ * <p>Implementation Note: This code is written to be reusable in later 
versions of the KafkaConsumer.
+ * Because Kafka is not maintaining binary compatibility, we use a "call 
bridge" as an indirection
+ * to the KafkaConsumer calls that change signature.
+ */
+public class KafkaConsumerThread extends Thread {
+
+       /** Logger for this consumer */
+       private final Logger log;
+
+       /** The handover of data and exceptions between the consumer thread and 
the task thread */
+       private final Handover handover;
+
+       /** The next offsets that the main thread should commit */
+       private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> 
nextOffsetsToCommit;
+
+       /** The configuration for the Kafka consumer */
+       private final Properties kafkaProperties;
+
+       /** The partitions that this consumer reads from */ 
+       private final KafkaTopicPartitionState<TopicPartition>[] 
subscribedPartitions;
+
+       /** We get this from the outside to publish metrics. **/
+       private final MetricGroup kafkaMetricGroup;
+
+       /** The indirections on KafkaConsumer methods, for cases where 
KafkaConsumer compatibility is broken */
+       private final KafkaConsumerCallBridge consumerCallBridge;
+
+       /** The maximum number of milliseconds to wait for a fetch batch */
+       private final long pollTimeout;
+
+       /** Flag whether to add Kafka's metrics to the Flink metrics */
+       private final boolean useMetrics;
+
+       /** Reference to the Kafka consumer, once it is created */
+       private volatile KafkaConsumer<byte[], byte[]> consumer;
+
+       /** Flag to mark the main work loop as alive */
+       private volatile boolean running;
+
+       /** Flag tracking whether the latest commit request has completed */
+       private volatile boolean commitInProgress;
+
+
+       public KafkaConsumerThread(
+                       Logger log,
+                       Handover handover,
+                       Properties kafkaProperties,
+                       KafkaTopicPartitionState<TopicPartition>[] 
subscribedPartitions,
+                       MetricGroup kafkaMetricGroup,
+                       KafkaConsumerCallBridge consumerCallBridge,
+                       String threadName,
+                       long pollTimeout,
+                       boolean useMetrics) {
+
+               super(threadName);
+               setDaemon(true);
+
+               this.log = checkNotNull(log);
+               this.handover = checkNotNull(handover);
+               this.kafkaProperties = checkNotNull(kafkaProperties);
+               this.subscribedPartitions = checkNotNull(subscribedPartitions);
+               this.kafkaMetricGroup = checkNotNull(kafkaMetricGroup);
+               this.consumerCallBridge = checkNotNull(consumerCallBridge);
+               this.pollTimeout = pollTimeout;
+               this.useMetrics = useMetrics;
+
+               this.nextOffsetsToCommit = new AtomicReference<>();
+               this.running = true;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void run() {
+               // early exit check
+               if (!running) {
+                       return;
+               }
+
+               // this is the means to talk to FlinkKafkaConsumer's main thread
+               final Handover handover = this.handover;
+
+               // This method initializes the KafkaConsumer and guarantees it 
is torn down properly.
+               // This is important, because the consumer has multi-threading 
issues,
+               // including concurrent 'close()' calls.
+               final KafkaConsumer<byte[], byte[]> consumer;
+               try {
+                       consumer = new KafkaConsumer<>(kafkaProperties);
+               }
+               catch (Throwable t) {
+                       handover.reportError(t);
+                       return;
+               }
+
+               // from here on, the consumer is guaranteed to be closed 
properly
+               try {
+                       // The callback invoked by Kafka once an offset commit 
is complete
+                       final OffsetCommitCallback offsetCommitCallback = new 
CommitCallback();
+
+                       // tell the consumer which partitions to work with
+                       consumerCallBridge.assignPartitions(consumer, 
convertKafkaPartitions(subscribedPartitions));
+
+                       // register Kafka's very own metrics in Flink's metric 
reporters
+                       if (useMetrics) {
+                               // register Kafka metrics to Flink
+                               Map<MetricName, ? extends Metric> metrics = 
consumer.metrics();
+                               if (metrics == null) {
+                                       // MapR's Kafka implementation returns 
null here.
+                                       log.info("Consumer implementation does 
not support metrics");
+                               } else {
+                                       // we have Kafka metrics, register them
+                                       for (Map.Entry<MetricName, ? extends 
Metric> metric: metrics.entrySet()) {
+                                               
kafkaMetricGroup.gauge(metric.getKey().name(), new 
KafkaMetricWrapper(metric.getValue()));
+                                       }
+                               }
+                       }
+
+                       // early exit check
+                       if (!running) {
+                               return;
+                       }
+
+                       // seek the consumer to the initial offsets
+                       for (KafkaTopicPartitionState<TopicPartition> partition 
: subscribedPartitions) {
+                               if (partition.isOffsetDefined()) {
+                                       log.info("Partition {} has restored 
initial offsets {} from checkpoint / savepoint; " +
+                                                       "seeking the consumer 
to position {}",
+                                                       
partition.getKafkaPartitionHandle(), partition.getOffset(), 
partition.getOffset() + 1);
+
+                                       
consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
+                               }
+                               else {
+                                       // for partitions that do not have 
offsets restored from a checkpoint/savepoint,
+                                       // we need to define our internal 
offset state for them using the initial offsets retrieved from Kafka
+                                       // by the KafkaConsumer, so that they 
are correctly checkpointed and committed on the next checkpoint
+
+                                       long fetchedOffset = 
consumer.position(partition.getKafkaPartitionHandle());
+
+                                       log.info("Partition {} has no initial 
offset; the consumer has position {}, " +
+                                                       "so the initial offset 
will be set to {}",
+                                                       
partition.getKafkaPartitionHandle(), fetchedOffset, fetchedOffset - 1);
+
+                                       // the fetched offset represents the 
next record to process, so we need to subtract it by 1
+                                       partition.setOffset(fetchedOffset - 1);
+                               }
+                       }
+
+                       // from now on, external operations may call the 
consumer
+                       this.consumer = consumer;
+
+                       // the latest bulk of records. may carry across the 
loop if the thread is woken up
+                       // from blocking on the handover
+                       ConsumerRecords<byte[], byte[]> records = null;
+
+                       // main fetch loop
+                       while (running) {
+
+                               // check if there is something to commit
+                               if (!commitInProgress) {
+                                       // get and reset the work-to-be 
committed, so we don't repeatedly commit the same
+                                       final Map<TopicPartition, 
OffsetAndMetadata> toCommit = nextOffsetsToCommit.getAndSet(null);
+
+                                       if (toCommit != null) {
+                                               log.debug("Sending async offset 
commit request to Kafka broker");
+
+                                               // also record that a commit is 
already in progress
+                                               // the order here matters! 
first set the flag, then send the commit command.
+                                               commitInProgress = true;
+                                               consumer.commitAsync(toCommit, 
offsetCommitCallback);
+                                       }
+                               }
+
+                               // get the next batch of records, unless we did 
not manage to hand the old batch over
+                               if (records == null) {
+                                       try {
+                                               records = 
consumer.poll(pollTimeout);
+                                       }
+                                       catch (WakeupException we) {
+                                               continue;
+                                       }
+                               }
+
+                               try {
+                                       handover.produce(records);
+                                       records = null;
+                               }
+                               catch (Handover.WakeupException e) {
+                                       // fall through the loop
+                               }
+                       }
+                       // end main fetch loop
+               }
+               catch (Throwable t) {
+                       // let the main thread know and exit
+                       // it may be that this exception comes because the main 
thread closed the handover, in
+                       // which case the below reporting is irrelevant, but 
does not hurt either
+                       handover.reportError(t);
+               }
+               finally {
+                       // make sure the handover is closed if it is not 
already closed or has an error
+                       handover.close();
+
+                       // make sure the KafkaConsumer is closed
+                       try {
+                               consumer.close();
+                       }
+                       catch (Throwable t) {
+                               log.warn("Error while closing Kafka consumer", 
t);
+                       }
+               }
+       }
+
+       /**
+        * Shuts this thread down, waking up the thread gracefully if blocked 
(without Thread.interrupt() calls).
+        */
+       public void shutdown() {
+               running = false;
+
+               // We cannot call close() on the KafkaConsumer, because it will 
actually throw
+               // an exception if a concurrent call is in progress
+
+               // this wakes up the consumer if it is blocked handing over 
records
+               handover.wakeupProducer();
+
+               // this wakes up the consumer if it is blocked in a kafka poll 
+               if (consumer != null) {
+                       consumer.wakeup();
+               }
+       }
+
+       /**
+        * Tells this thread to commit a set of offsets. This method does not 
block, the committing
+        * operation will happen asynchronously.
+        * 
+        * <p>Only one commit operation may be pending at any time. If the 
committing takes longer than
+        * the frequency with which this method is called, then some commits 
may be skipped due to being
+        * superseded  by newer ones.
+        * 
+        * @param offsetsToCommit The offsets to commit
+        */
+       public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> 
offsetsToCommit) {
+               // record the work to be committed by the main consumer thread 
and make sure the consumer notices that
+               if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
+                       log.warn("Committing offsets to Kafka takes longer than 
the checkpoint interval. " +
+                                       "Skipping commit of previous offsets 
because newer complete checkpoint offsets are available. " +
+                                       "This does not compromise Flink's 
checkpoint integrity.");
+               }
+
+               // if the consumer is blocked in a poll() or handover 
operation, wake it up to commit soon
+               handover.wakeupProducer();
+               if (consumer != null) {
+                       consumer.wakeup();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       private static List<TopicPartition> 
convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) {
+               ArrayList<TopicPartition> result = new 
ArrayList<>(partitions.length);
+               for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
+                       result.add(p.getKafkaPartitionHandle());
+               }
+               return result;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private class CommitCallback implements OffsetCommitCallback {
+
+               @Override
+               public void onComplete(Map<TopicPartition, OffsetAndMetadata> 
offsets, Exception ex) {
+                       commitInProgress = false;
+
+                       if (ex != null) {
+                               log.warn("Committing offsets to Kafka failed. 
This does not compromise Flink's checkpoints.", ex);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 1162599..7a82365 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -20,15 +20,19 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internal.Handover;
 import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -44,6 +48,7 @@ import org.mockito.stubbing.Answer;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -53,6 +58,7 @@ import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -68,7 +74,7 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
  * Unit tests for the {@link Kafka09Fetcher}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(Kafka09Fetcher.class)
+@PrepareForTest(KafkaConsumerThread.class)
 public class Kafka09FetcherTest {
 
        @Test
@@ -124,7 +130,7 @@ public class Kafka09FetcherTest {
                                this.getClass().getClassLoader(),
                                true, /* checkpointing */
                                "task_name",
-                               mock(MetricGroup.class),
+                               new UnregisteredMetricsGroup(),
                                schema,
                                new Properties(),
                                0L,
@@ -174,7 +180,7 @@ public class Kafka09FetcherTest {
 
                // check that there were no errors in the fetcher
                final Throwable fetcherError = error.get();
-               if (fetcherError != null) {
+               if (fetcherError != null && !(fetcherError instanceof 
Handover.ClosedException)) {
                        throw new Exception("Exception in the fetcher", 
fetcherError);
                }
                final Throwable committerError = commitError.get();
@@ -260,7 +266,7 @@ public class Kafka09FetcherTest {
                                this.getClass().getClassLoader(),
                                true, /* checkpointing */
                                "task_name",
-                               mock(MetricGroup.class),
+                               new UnregisteredMetricsGroup(),
                                schema,
                                new Properties(),
                                0L,
@@ -323,8 +329,154 @@ public class Kafka09FetcherTest {
 
                // check that there were no errors in the fetcher
                final Throwable caughtError = error.get();
-               if (caughtError != null) {
+               if (caughtError != null && !(caughtError instanceof 
Handover.ClosedException)) {
                        throw new Exception("Exception in the fetcher", 
caughtError);
                }
        }
+
+       @Test
+       public void testCancellationWhenEmitBlocks() throws Exception {
+
+               // ----- some test data -----
+
+               final String topic = "test-topic";
+               final int partition = 3;
+               final byte[] payload = new byte[] {1, 2, 3, 4};
+
+               final List<ConsumerRecord<byte[], byte[]>> records = 
Arrays.asList(
+                               new ConsumerRecord<byte[], byte[]>(topic, 
partition, 15, payload, payload),
+                               new ConsumerRecord<byte[], byte[]>(topic, 
partition, 16, payload, payload),
+                               new ConsumerRecord<byte[], byte[]>(topic, 
partition, 17, payload, payload));
+
+               final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
data = new HashMap<>();
+               data.put(new TopicPartition(topic, partition), records);
+
+               final ConsumerRecords<byte[], byte[]> consumerRecords = new 
ConsumerRecords<>(data);
+
+               // ----- the test consumer -----
+
+               final KafkaConsumer<?, ?> mockConsumer = 
mock(KafkaConsumer.class);
+               when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer<ConsumerRecords<?, ?>>() {
+                       @Override
+                       public ConsumerRecords<?, ?> answer(InvocationOnMock 
invocation) {
+                               return consumerRecords;
+                       }
+               });
+
+               
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+               // ----- build a fetcher -----
+
+               BlockingSourceContext<String> sourceContext = new 
BlockingSourceContext<>();
+               List<KafkaTopicPartition> topics = 
Collections.singletonList(new KafkaTopicPartition(topic, partition));
+               KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+               final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+                               sourceContext,
+                               topics,
+                               null, /* periodic watermark extractor */
+                               null, /* punctuated watermark extractor */
+                               new TestProcessingTimeService(),
+                               10, /* watermark interval */
+                               this.getClass().getClassLoader(),
+                               true, /* checkpointing */
+                               "task_name",
+                               new UnregisteredMetricsGroup(),
+                               schema,
+                               new Properties(),
+                               0L,
+                               false);
+
+
+               // ----- run the fetcher -----
+
+               final AtomicReference<Throwable> error = new 
AtomicReference<>();
+               final Thread fetcherRunner = new Thread("fetcher runner") {
+
+                       @Override
+                       public void run() {
+                               try {
+                                       fetcher.runFetchLoop();
+                               } catch (Throwable t) {
+                                       error.set(t);
+                               }
+                       }
+               };
+               fetcherRunner.start();
+
+               // wait until the thread started to emit records to the source 
context
+               sourceContext.waitTillHasBlocker();
+
+               // now we try to cancel the fetcher, including the interruption 
usually done on the task thread
+               // once it has finished, there must be no more thread blocked 
on the source context
+               fetcher.cancel();
+               fetcherRunner.interrupt();
+               fetcherRunner.join();
+
+               assertFalse("fetcher threads did not properly finish", 
sourceContext.isStillBlocking());
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test utilities
+       // 
------------------------------------------------------------------------
+
+       private static final class BlockingSourceContext<T> implements 
SourceContext<T> {
+
+               private final ReentrantLock lock = new ReentrantLock();
+               private final OneShotLatch inBlocking = new OneShotLatch();
+
+               @Override
+               public void collect(T element) {
+                       block();
+               }
+
+               @Override
+               public void collectWithTimestamp(T element, long timestamp) {
+                       block();
+               }
+
+               @Override
+               public void emitWatermark(Watermark mark) {
+                       block();
+               }
+
+               @Override
+               public Object getCheckpointLock() {
+                       return new Object();
+               }
+
+               @Override
+               public void close() {}
+
+               public void waitTillHasBlocker() throws InterruptedException {
+                       inBlocking.await();
+               }
+
+               public boolean isStillBlocking() {
+                       return lock.isLocked();
+               }
+
+               @SuppressWarnings({"InfiniteLoopStatement", 
"SynchronizationOnLocalVariableOrMethodParameter"})
+               private void block() {
+                       lock.lock();
+                       try {
+                               inBlocking.trigger();
+
+                               // put this thread to sleep indefinitely
+                               final Object o = new Object();
+                               while (true) {
+                                       synchronized (o) {
+                                               o.wait();
+                                       }
+                               }
+                       }
+                       catch (InterruptedException e) {
+                               // exit cleanly, simply reset the interruption 
flag
+                               Thread.currentThread().interrupt();
+                       }
+                       finally {
+                               lock.unlock();
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
deleted file mode 100644
index c1b21b7..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class KafkaShortRetention09ITCase extends KafkaShortRetentionTestBase {
-
-       @Test(timeout=60000)
-       public void testAutoOffsetReset() throws Exception {
-               runAutoOffsetResetTest();
-       }
-
-       @Test(timeout=60000)
-       public void testAutoOffsetResetNone() throws Exception {
-               runFailOnAutoOffsetResetNone();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
new file mode 100644
index 0000000..25040eb
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import 
org.apache.flink.streaming.connectors.kafka.internal.Handover.WakeupException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the {@link Handover} between Kafka Consumer Thread and the 
fetcher's main thread. 
+ */
+public class HandoverTest {
+
+       // 
------------------------------------------------------------------------
+       //  test produce / consumer
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testWithVariableProducer() throws Exception {
+               runProducerConsumerTest(500, 2, 0);
+       }
+
+       @Test
+       public void testWithVariableConsumer() throws Exception {
+               runProducerConsumerTest(500, 0, 2);
+       }
+
+       @Test
+       public void testWithVariableBoth() throws Exception {
+               runProducerConsumerTest(500, 2, 2);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test error propagation
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testPublishErrorOnEmptyHandover() throws Exception {
+               final Handover handover = new Handover();
+
+               Exception error = new Exception();
+               handover.reportError(error);
+
+               try {
+                       handover.pollNext();
+                       fail("should throw an exception");
+               }
+               catch (Exception e) {
+                       assertEquals(error, e);
+               }
+       }
+
+       @Test
+       public void testPublishErrorOnFullHandover() throws Exception {
+               final Handover handover = new Handover();
+               handover.produce(createTestRecords());
+
+               IOException error = new IOException();
+               handover.reportError(error);
+
+               try {
+                       handover.pollNext();
+                       fail("should throw an exception");
+               }
+               catch (Exception e) {
+                       assertEquals(error, e);
+               }
+       }
+
+       @Test
+       public void testExceptionMarksClosedOnEmpty() throws Exception {
+               final Handover handover = new Handover();
+
+               IllegalStateException error = new IllegalStateException();
+               handover.reportError(error);
+
+               try {
+                       handover.produce(createTestRecords());
+                       fail("should throw an exception");
+               }
+               catch (Handover.ClosedException e) {
+                       // expected
+               }
+       }
+
+       @Test
+       public void testExceptionMarksClosedOnFull() throws Exception {
+               final Handover handover = new Handover();
+               handover.produce(createTestRecords());
+
+               LinkageError error = new LinkageError();
+               handover.reportError(error);
+
+               try {
+                       handover.produce(createTestRecords());
+                       fail("should throw an exception");
+               }
+               catch (Handover.ClosedException e) {
+                       // expected
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test closing behavior
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testCloseEmptyForConsumer() throws Exception {
+               final Handover handover = new Handover();
+               handover.close();
+
+               try {
+                       handover.pollNext();
+                       fail("should throw an exception");
+               }
+               catch (Handover.ClosedException e) {
+                       // expected
+               }
+       }
+
+       @Test
+       public void testCloseFullForConsumer() throws Exception {
+               final Handover handover = new Handover();
+               handover.produce(createTestRecords());
+               handover.close();
+
+               try {
+                       handover.pollNext();
+                       fail("should throw an exception");
+               }
+               catch (Handover.ClosedException e) {
+                       // expected
+               }
+       }
+
+       @Test
+       public void testCloseEmptyForProducer() throws Exception {
+               final Handover handover = new Handover();
+               handover.close();
+
+               try {
+                       handover.produce(createTestRecords());
+                       fail("should throw an exception");
+               }
+               catch (Handover.ClosedException e) {
+                       // expected
+               }
+       }
+
+       @Test
+       public void testCloseFullForProducer() throws Exception {
+               final Handover handover = new Handover();
+               handover.produce(createTestRecords());
+               handover.close();
+
+               try {
+                       handover.produce(createTestRecords());
+                       fail("should throw an exception");
+               }
+               catch (Handover.ClosedException e) {
+                       // expected
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test wake up behavior
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testWakeupDoesNotWakeWhenEmpty() throws Exception {
+               Handover handover = new Handover();
+               handover.wakeupProducer();
+
+               // produce into a woken but empty handover
+               try {
+                       handover.produce(createTestRecords());
+               }
+               catch (Handover.WakeupException e) {
+                       fail();
+               }
+
+               // handover now has records, next time we wakeup and produce it 
needs
+               // to throw an exception
+               handover.wakeupProducer();
+               try {
+                       handover.produce(createTestRecords());
+                       fail("should throw an exception");
+               }
+               catch (Handover.WakeupException e) {
+                       // expected
+               }
+
+               // empty the handover
+               assertNotNull(handover.pollNext());
+               
+               // producing into an empty handover should work
+               try {
+                       handover.produce(createTestRecords());
+               }
+               catch (Handover.WakeupException e) {
+                       fail();
+               }
+       }
+
+       @Test
+       public void testWakeupWakesOnlyOnce() throws Exception {
+               // create a full handover
+               final Handover handover = new Handover();
+               handover.produce(createTestRecords());
+
+               handover.wakeupProducer();
+
+               try {
+                       handover.produce(createTestRecords());
+                       fail();
+               } catch (WakeupException e) {
+                       // expected
+               }
+
+               CheckedThread producer = new CheckedThread() {
+                       @Override
+                       public void go() throws Exception {
+                               handover.produce(createTestRecords());
+                       }
+               };
+               producer.start();
+
+               // the producer must go blocking
+               producer.waitUntilThreadHoldsLock(10000);
+
+               // release the thread by consuming something
+               assertNotNull(handover.pollNext());
+               producer.sync();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  utilities
+       // 
------------------------------------------------------------------------
+
+       private void runProducerConsumerTest(int numRecords, int 
maxProducerDelay, int maxConsumerDelay) throws Exception {
+               // generate test data
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               final ConsumerRecords<byte[], byte[]>[] data = new 
ConsumerRecords[numRecords];
+               for (int i = 0; i < numRecords; i++) {
+                       data[i] = createTestRecords();
+               }
+
+               final Handover handover = new Handover();
+
+               ProducerThread producer = new ProducerThread(handover, data, 
maxProducerDelay);
+               ConsumerThread consumer = new ConsumerThread(handover, data, 
maxConsumerDelay);
+
+               consumer.start();
+               producer.start();
+
+               // sync first on the consumer, so it propagates assertion errors
+               consumer.sync();
+               producer.sync();
+       }
+
+       @SuppressWarnings("unchecked")
+       private static ConsumerRecords<byte[], byte[]> createTestRecords() {
+               return mock(ConsumerRecords.class);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static abstract class CheckedThread extends Thread {
+
+               private volatile Throwable error;
+
+               public abstract void go() throws Exception;
+
+               @Override
+               public void run() {
+                       try {
+                               go();
+                       }
+                       catch (Throwable t) {
+                               error = t;
+                       }
+               }
+
+               public void sync() throws Exception {
+                       join();
+                       if (error != null) {
+                               ExceptionUtils.rethrowException(error, 
error.getMessage());
+                       }
+               }
+
+               public void waitUntilThreadHoldsLock(long timeoutMillis) throws 
InterruptedException, TimeoutException {
+                       final long deadline = System.nanoTime() + timeoutMillis 
* 1_000_000;
+                       
+                       while (!isBlockedOrWaiting() && (System.nanoTime() < 
deadline)) {
+                               Thread.sleep(1);
+                       }
+
+                       if (!isBlockedOrWaiting()) {
+                               throw new TimeoutException();
+                       }
+               }
+
+               private boolean isBlockedOrWaiting() {
+                       State state = getState();
+                       return state == State.BLOCKED || state == State.WAITING 
|| state == State.TIMED_WAITING;
+               }
+       }
+
+       private static class ProducerThread extends CheckedThread {
+
+               private final Random rnd = new Random();
+               private final Handover handover;
+               private final ConsumerRecords<byte[], byte[]>[] data;
+               private final int maxDelay;
+
+               private ProducerThread(Handover handover, 
ConsumerRecords<byte[], byte[]>[] data, int maxDelay) {
+                       this.handover = handover;
+                       this.data = data;
+                       this.maxDelay = maxDelay;
+               }
+
+               @Override
+               public void go() throws Exception {
+                       for (ConsumerRecords<byte[], byte[]> rec : data) {
+                               handover.produce(rec);
+
+                               if (maxDelay > 0) {
+                                       int delay = rnd.nextInt(maxDelay);
+                                       Thread.sleep(delay);
+                               }
+                       }
+               }
+       }
+
+       private static class ConsumerThread extends CheckedThread {
+
+               private final Random rnd = new Random();
+               private final Handover handover;
+               private final ConsumerRecords<byte[], byte[]>[] data;
+               private final int maxDelay;
+
+               private ConsumerThread(Handover handover, 
ConsumerRecords<byte[], byte[]>[] data, int maxDelay) {
+                       this.handover = handover;
+                       this.data = data;
+                       this.maxDelay = maxDelay;
+               }
+
+               @Override
+               public void go() throws Exception {
+                       for (ConsumerRecords<byte[], byte[]> rec : data) {
+                               ConsumerRecords<byte[], byte[]> next = 
handover.pollNext();
+
+                               assertEquals(rec, next);
+
+                               if (maxDelay > 0) {
+                                       int delay = rnd.nextInt(maxDelay);
+                                       Thread.sleep(delay);
+                               }
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 5c03b78..dccf698 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -122,6 +122,7 @@ public class KafkaShortRetentionTestBase implements 
Serializable {
         *
         */
        private static boolean stopProducer = false;
+
        public void runAutoOffsetResetTest() throws Exception {
                final String topic = "auto-offset-reset-test";
 

Reply via email to