http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
new file mode 100644
index 0000000..6bad180
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.ExceptionProxy;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer 
API.
+ * 
+ * @param <T> The type of elements produced by the fetcher.
+ */
+public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> 
implements Runnable {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(Kafka09Fetcher.class);
+
+       // 
------------------------------------------------------------------------
+
+       /** The schema to convert between Kafka's byte messages, and Flink's 
objects */
+       private final KeyedDeserializationSchema<T> deserializer;
+
+       /** The subtask's runtime context */
+       private final RuntimeContext runtimeContext;
+
+       /** The configuration for the Kafka consumer */
+       private final Properties kafkaProperties;
+
+       /** The maximum number of milliseconds to wait for a fetch batch */
+       private final long pollTimeout;
+
+       /** Flag whether to register Kafka metrics as Flink accumulators */
+       private final boolean forwardKafkaMetrics;
+
+       /** Mutex to guard against concurrent access to the non-threadsafe 
Kafka consumer */
+       private final Object consumerLock = new Object();
+
+       /** Reference to the Kafka consumer, once it is created */
+       private volatile KafkaConsumer<byte[], byte[]> consumer;
+
+       /** Reference to the proxy, forwarding exceptions from the fetch thread 
to the main thread */
+       private volatile ExceptionProxy errorHandler;
+
+       /** Flag to mark the main work loop as alive */
+       private volatile boolean running = true;
+
+       // 
------------------------------------------------------------------------
+
+       public Kafka09Fetcher(
+                       SourceContext<T> sourceContext,
+                       List<KafkaTopicPartition> assignedPartitions,
+                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
+                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
+                       StreamingRuntimeContext runtimeContext,
+                       KeyedDeserializationSchema<T> deserializer,
+                       Properties kafkaProperties,
+                       long pollTimeout,
+                       boolean forwardKafkaMetrics) throws Exception
+       {
+               super(sourceContext, assignedPartitions, watermarksPeriodic, 
watermarksPunctuated, runtimeContext);
+
+               this.deserializer = deserializer;
+               this.runtimeContext = runtimeContext;
+               this.kafkaProperties = kafkaProperties;
+               this.pollTimeout = pollTimeout;
+               this.forwardKafkaMetrics = forwardKafkaMetrics;
+
+               // if checkpointing is enabled, we are not automatically 
committing to Kafka.
+               
kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+                               
Boolean.toString(!runtimeContext.isCheckpointingEnabled()));
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Fetcher work methods
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void runFetchLoop() throws Exception {
+               this.errorHandler = new ExceptionProxy(Thread.currentThread());
+
+               // rather than running the main fetch loop directly here, we 
spawn a dedicated thread
+               // this makes sure that no interrupt() call upon canceling 
reaches the Kafka consumer code
+               Thread runner = new Thread(this, "Kafka 0.9 Fetcher for " + 
runtimeContext.getTaskNameWithSubtasks());
+               runner.setDaemon(true);
+               runner.start();
+
+               try {
+                       runner.join();
+               } catch (InterruptedException e) {
+                       // may be the result of a wake-up after an exception. 
we ignore this here and only
+                       // restore the interruption state
+                       Thread.currentThread().interrupt();
+               }
+
+               // make sure we propagate any exception that occurred in the 
concurrent fetch thread,
+               // before leaving this method
+               this.errorHandler.checkAndThrowException();
+       }
+
+       @Override
+       public void cancel() {
+               // flag the main thread to exit
+               running = false;
+
+               // NOTE:
+               //   - We cannot interrupt the runner thread, because the Kafka 
consumer may
+               //     deadlock when the thread is interrupted while in certain 
methods
+               //   - We cannot call close() on the consumer, because it will 
actually throw
+               //     an exception if a concurrent call is in progress
+
+               // make sure the consumer finds out faster that we are shutting 
down 
+               if (consumer != null) {
+                       consumer.wakeup();
+               }
+       }
+
+       @Override
+       public void run() {
+               // This method initializes the KafkaConsumer and guarantees it 
is torn down properly.
+               // This is important, because the consumer has multi-threading 
issues,
+               // including concurrent 'close()' calls.
+
+               final KafkaConsumer<byte[], byte[]> consumer;
+               try {
+                       consumer = new KafkaConsumer<>(kafkaProperties);
+               }
+               catch (Throwable t) {
+                       running = false;
+                       errorHandler.reportError(t);
+                       return;
+               }
+
+               // from here on, the consumer will be closed properly
+               try {
+                       
consumer.assign(convertKafkaPartitions(subscribedPartitions()));
+
+                       // register Kafka metrics to Flink accumulators
+                       if (forwardKafkaMetrics) {
+                               Map<MetricName, ? extends Metric> metrics = 
consumer.metrics();
+                               if (metrics == null) {
+                                       // MapR's Kafka implementation returns 
null here.
+                                       LOG.info("Consumer implementation does 
not support metrics");
+                               } else {
+                                       // we have metrics, register them where 
possible
+                                       for (Map.Entry<MetricName, ? extends 
Metric> metric : metrics.entrySet()) {
+                                               String name = "KafkaConsumer-" 
+ metric.getKey().name();
+                                               DefaultKafkaMetricAccumulator 
kafkaAccumulator =
+                                                               
DefaultKafkaMetricAccumulator.createFor(metric.getValue());
+
+                                               // best effort: we only add the 
accumulator if available.
+                                               if (kafkaAccumulator != null) {
+                                                       
runtimeContext.addAccumulator(name, kafkaAccumulator);
+                                               }
+                                       }
+                               }
+                       }
+
+                       // seek the consumer to the initial offsets
+                       for (KafkaTopicPartitionState<TopicPartition> partition 
: subscribedPartitions()) {
+                               if (partition.isOffsetDefined()) {
+                                       
consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
+                               }
+                       }
+
+                       // from now on, external operations may call the 
consumer
+                       this.consumer = consumer;
+
+                       // main fetch loop
+                       while (running) {
+                               // get the next batch of records
+                               final ConsumerRecords<byte[], byte[]> records;
+                               synchronized (consumerLock) {
+                                       try {
+                                               records = 
consumer.poll(pollTimeout);
+                                       }
+                                       catch (WakeupException we) {
+                                               if (running) {
+                                                       throw we;
+                                               } else {
+                                                       continue;
+                                               }
+                                       }
+                               }
+
+                               // get the records for each topic partition
+                               for (KafkaTopicPartitionState<TopicPartition> 
partition : subscribedPartitions()) {
+                                       
+                                       List<ConsumerRecord<byte[], byte[]>> 
partitionRecords = records.records(partition.getKafkaPartitionHandle());
+
+                                       for (ConsumerRecord<byte[], byte[]> 
record : partitionRecords) {
+                                               T value = 
deserializer.deserialize(
+                                                               record.key(), 
record.value(),
+                                                               record.topic(), 
record.partition(), record.offset());
+
+                                               if 
(deserializer.isEndOfStream(value)) {
+                                                       // end of stream 
signaled
+                                                       running = false;
+                                                       break;
+                                               }
+
+                                               // emit the actual record. this 
also update offset state atomically
+                                               // and deals with timestamps 
and watermark generation
+                                               emitRecord(value, partition, 
record.offset());
+                                       }
+                               }
+                       }
+                       // end main fetch loop
+               }
+               catch (Throwable t) {
+                       if (running) {
+                               running = false;
+                               errorHandler.reportError(t);
+                       } else {
+                               LOG.debug("Stopped ConsumerThread threw 
exception", t);
+                       }
+               }
+               finally {
+                       try {
+                               synchronized (consumerLock) {
+                                       consumer.close();
+                               }
+                       } catch (Throwable t) {
+                               LOG.warn("Error while closing Kafka 0.9 
consumer", t);
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Kafka 0.9 specific fetcher behavior
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public TopicPartition createKafkaPartitionHandle(KafkaTopicPartition 
partition) {
+               return new TopicPartition(partition.getTopic(), 
partition.getPartition());
+       }
+
+       @Override
+       public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> 
offsets) throws Exception {
+               KafkaTopicPartitionState<TopicPartition>[] partitions = 
subscribedPartitions();
+               Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new 
HashMap<>(partitions.length);
+
+               for (KafkaTopicPartitionState<TopicPartition> partition : 
partitions) {
+                       Long offset = 
offsets.get(partition.getKafkaTopicPartition());
+                       if (offset != null) {
+                               
offsetsToCommit.put(partition.getKafkaPartitionHandle(), new 
OffsetAndMetadata(offset, ""));
+                       }
+               }
+
+               if (this.consumer != null) {
+                       synchronized (consumerLock) {
+                               this.consumer.commitSync(offsetsToCommit);
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       public static List<TopicPartition> 
convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) {
+               ArrayList<TopicPartition> result = new 
ArrayList<>(partitions.length);
+               for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
+                       result.add(p.getKafkaPartitionHandle());
+               }
+               return result;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index 82e1dce..afb0056 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -27,11 +27,6 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
        // 
------------------------------------------------------------------------
 
        @Test(timeout = 60000)
-       public void testCheckpointing() throws Exception {
-               runCheckpointingTest();
-       }
-
-       @Test(timeout = 60000)
        public void testFailOnNoBroker() throws Exception {
                runFailOnNoBrokerTest();
        }
@@ -41,15 +36,15 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
                runSimpleConcurrentProducerConsumerTopology();
        }
 
-       @Test(timeout = 60000)
-       public void testPunctuatedExplicitWMConsumer() throws Exception {
-               runExplicitPunctuatedWMgeneratingConsumerTest(false);
-       }
+//     @Test(timeout = 60000)
+//     public void testPunctuatedExplicitWMConsumer() throws Exception {
+//             runExplicitPunctuatedWMgeneratingConsumerTest(false);
+//     }
 
-       @Test(timeout = 60000)
-       public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws 
Exception {
-               runExplicitPunctuatedWMgeneratingConsumerTest(true);
-       }
+//     @Test(timeout = 60000)
+//     public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws 
Exception {
+//             runExplicitPunctuatedWMgeneratingConsumerTest(true);
+//     }
 
        @Test(timeout = 60000)
        public void testKeyValueSupport() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
index a2c4f73..b80a231 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -22,19 +22,23 @@ import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.util.TestLogger;
+
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.PartitionInfo;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.Properties;
 import java.util.concurrent.Future;
 
@@ -60,7 +64,7 @@ public class KafkaProducerTest extends TestLogger {
                        
                        // partition setup
                        
when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
-                                       Arrays.asList(new 
PartitionInfo("mock_topic", 42, null, null, null)));
+                                       Collections.singletonList(new 
PartitionInfo("mock_topic", 42, null, null, null)));
 
                        // failure when trying to send an element
                        when(kafkaProducerMock.send(any(ProducerRecord.class), 
any(Callback.class)))

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
index 74b35af..c1b21b7 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.junit.Test;
 
+@SuppressWarnings("serial")
 public class KafkaShortRetention09ITCase extends KafkaShortRetentionTestBase {
 
        @Test(timeout=60000)

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
index 6bdfb48..fbeb110 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
@@ -25,5 +25,6 @@ log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] 
%-5p %c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
 log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-
-
+log4j.logger.org.apache.zookeeper=OFF, testlogger
+log4j.logger.state.change.logger=OFF, testlogger
+log4j.logger.kafka=OFF, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index d9e813f..0ca8fd5 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -18,427 +18,291 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.commons.collections.map.LinkedMap;
-import org.apache.flink.api.common.ExecutionConfig;
+
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.TimestampAssigner;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionState;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.SerializedValue;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
-
-import static java.util.Objects.requireNonNull;
-import static 
org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.checkArgument;
-
-public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFunction<T>
-               implements CheckpointListener, 
CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>, 
ResultTypeQueryable<T>, Triggerable {
-
-       // 
------------------------------------------------------------------------
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
+/**
+ * Base class of all Flink Kafka Consumer data sources.
+ * This implements the common behavior across all Kafka versions.
+ * 
+ * <p>The Kafka version specific behavior is defined mainly in the specific 
subclasses of the
+ * {@link AbstractFetcher}.
+ * 
+ * @param <T> The type of records produced by this data source
+ */
+public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFunction<T> implements 
+               CheckpointListener,
+               CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>,
+               ResultTypeQueryable<T>
+{
        private static final long serialVersionUID = -6272159445203409112L;
 
-       /** Magic number to define an unset offset. Negative offsets are not 
used by Kafka (invalid),
-        * and we pick a number that is probably (hopefully) not used by Kafka 
as a magic number for anything else. */
-       public static final long OFFSET_NOT_SET = -915623761776L;
-
+       protected static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
+       
        /** The maximum number of pending non-committed checkpoints to track, 
to avoid memory leaks */
        public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
 
-
-       /** The schema to convert between Kafka#s byte messages, and Flink's 
objects */
+       // 
------------------------------------------------------------------------
+       //  configuration state, set on the client relevant for all subtasks
+       // 
------------------------------------------------------------------------
+       
+       /** The schema to convert between Kafka's byte messages, and Flink's 
objects */
        protected final KeyedDeserializationSchema<T> deserializer;
 
-       // ------  Runtime State  -------
+       /** The set of topic partitions that the source will read */
+       protected List<KafkaTopicPartition> allSubscribedPartitions;
+       
+       /** Optional timestamp extractor / watermark generator that will be run 
per Kafka partition,
+        * to exploit per-partition timestamp characteristics.
+        * The assigner is kept in serialized form, to deserialize it into 
multiple copies */
+       private SerializedValue<AssignerWithPeriodicWatermarks<T>> 
periodicWatermarkAssigner;
+       
+       /** Optional timestamp extractor / watermark generator that will be run 
per Kafka partition,
+        * to exploit per-partition timestamp characteristics. 
+        * The assigner is kept in serialized form, to deserialize it into 
multiple copies */
+       private SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
punctuatedWatermarkAssigner;
 
+       // 
------------------------------------------------------------------------
+       //  runtime state (used individually by each parallel subtask) 
+       // 
------------------------------------------------------------------------
+       
        /** Data for pending but uncommitted checkpoints */
-       protected final LinkedMap pendingCheckpoints = new LinkedMap();
-
-       /**
-        * Information about the partitions being read by the local consumer. 
This contains:
-        * offsets of the last returned elements, and if a timestamp assigner 
is used, it
-        * also contains the maximum seen timestamp in the partition and if the 
partition
-        * still receives elements or it is inactive.
-        */
-       protected transient HashMap<KafkaTopicPartition, KafkaPartitionState> 
partitionState;
+       private final LinkedMap pendingCheckpoints = new LinkedMap();
 
+       /** The fetcher implements the connections to the Kafka brokers */
+       private transient volatile AbstractFetcher<T, ?> kafkaFetcher;
+       
        /** The offsets to restore to, if the consumer restores state from a 
checkpoint */
-       protected transient HashMap<KafkaTopicPartition, Long> restoreToOffset;
-
+       private transient volatile HashMap<KafkaTopicPartition, Long> 
restoreToOffset;
+       
        /** Flag indicating whether the consumer is still running **/
-       protected volatile boolean running = true;
+       private volatile boolean running = true;
 
        // 
------------------------------------------------------------------------
-       //                                                      WATERMARK 
EMISSION
-       // 
------------------------------------------------------------------------
 
        /**
-        * The user-specified methods to extract the timestamps from the 
records in Kafka, and
-        * to decide when to emit watermarks.
-        */
-       private AssignerWithPunctuatedWatermarks<T> punctuatedWatermarkAssigner;
-
-       /**
-        * The user-specified methods to extract the timestamps from the 
records in Kafka, and
-        * to decide when to emit watermarks.
-        */
-       private AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner;
-
-       private StreamingRuntimeContext runtime = null;
-
-       private SourceContext<T> srcContext = null;
-
-       /**
-        * The interval between consecutive periodic watermark emissions,
-        * as configured via the {@link 
ExecutionConfig#getAutoWatermarkInterval()}.
-        */
-       private long watermarkInterval = -1;
-
-       /** The last emitted watermark. */
-       private long lastEmittedWatermark = Long.MIN_VALUE;
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a new Flink Kafka Consumer, using the given type of fetcher 
and offset handler.
-        *
-        * <p>To determine which kink of fetcher and offset handler to use, 
please refer to the docs
-        * at the beginning of this class.</p>
+        * Base constructor.
         *
         * @param deserializer
         *           The deserializer to turn raw byte messages into Java/Scala 
objects.
-        * @param props
-        *           The properties that are used to configure both the fetcher 
and the offset handler.
         */
-       public FlinkKafkaConsumerBase(KeyedDeserializationSchema<T> 
deserializer, Properties props) {
-               this.deserializer = requireNonNull(deserializer, 
"valueDeserializer");
+       public FlinkKafkaConsumerBase(KeyedDeserializationSchema<T> 
deserializer) {
+               this.deserializer = checkNotNull(deserializer, 
"valueDeserializer");
        }
 
        /**
-        * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit 
watermarks in a punctuated manner. Bare in mind
-        * that the source can either have an {@link 
AssignerWithPunctuatedWatermarks} or an
-        * {@link AssignerWithPeriodicWatermarks}, not both.
+        * This method must be called from the subclasses, to set the list of 
all subscribed partitions
+        * that this consumer will fetch from (across all subtasks).
+        * 
+        * @param allSubscribedPartitions The list of all partitions that all 
subtasks together should fetch from.
         */
-       public FlinkKafkaConsumerBase<T> 
setPunctuatedWatermarkEmitter(AssignerWithPunctuatedWatermarks<T> assigner) {
-               checkEmitterDuringInit();
-               this.punctuatedWatermarkAssigner = assigner;
-               return this;
+       protected void setSubscribedPartitions(List<KafkaTopicPartition> 
allSubscribedPartitions) {
+               checkNotNull(allSubscribedPartitions);
+               this.allSubscribedPartitions = 
Collections.unmodifiableList(allSubscribedPartitions);
        }
 
+       // 
------------------------------------------------------------------------
+       //  Configuration
+       // 
------------------------------------------------------------------------
+       
        /**
-        * Specifies an {@link AssignerWithPeriodicWatermarks} to emit 
watermarks periodically. Bare in mind that the
-        * source can either have an {@link AssignerWithPunctuatedWatermarks} 
or an
-        * {@link AssignerWithPeriodicWatermarks}, not both.
+        * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit 
watermarks in a punctuated manner.
+        * The watermark extractor will run per Kafka partition, watermarks 
will be merged across partitions
+        * in the same way as in the Flink runtime, when streams are merged.
+        * 
+        * <p>When a subtask of a FlinkKafkaConsumer source reads multiple 
Kafka partitions,
+        * the streams from the partitions are unioned in a "first come first 
serve" fashion. Per-partition
+        * characteristics are usually lost that way. For example, if the 
timestamps are strictly ascending
+        * per Kafka partition, they will not be strictly ascending in the 
resulting Flink DataStream, if the
+        * parallel source subtask reads more that one partition.
+        * 
+        * <p>Running timestamp extractors / watermark generators directly 
inside the Kafka source, per Kafka
+        * partition, allows users to let them exploit the per-partition 
characteristics.
+        * 
+        * <p>Note: One can use either an {@link 
AssignerWithPunctuatedWatermarks} or an
+        * {@link AssignerWithPeriodicWatermarks}, not both at the same time.
+        * 
+        * @param assigner The timestamp assigner / watermark generator to use.
+        * @return The consumer object, to allow function chaining.   
         */
-       public FlinkKafkaConsumerBase<T> 
setPeriodicWatermarkEmitter(AssignerWithPeriodicWatermarks<T> assigner) {
-               checkEmitterDuringInit();
-               this.periodicWatermarkAssigner = assigner;
-               return this;
-       }
-
-       /**
-        * Processes the element after having been read from Kafka and 
deserialized, and updates the
-        * last read offset for the specifies partition. These two actions 
should be performed in
-        * an atomic way in order to guarantee exactly once semantics.
-        * @param sourceContext
-        *           The context the task operates in.
-        * @param partDescriptor
-        *            A descriptor containing the topic and the id of the 
partition.
-        * @param value
-        *           The element to process.
-        * @param offset
-        *           The offset of the element in the partition.
-        * */
-       public void processElement(SourceContext<T> sourceContext, 
KafkaTopicPartition partDescriptor, T value, long offset) {
-               if (punctuatedWatermarkAssigner == null && 
periodicWatermarkAssigner == null) {
-                       // the case where no watermark emitter is specified.
-                       sourceContext.collect(value);
-               } else {
-
-                       if (srcContext == null) {
-                               srcContext = sourceContext;
-                       }
-
-                       long extractedTimestamp = 
extractTimestampAndEmitElement(partDescriptor, value);
-
-                       // depending on the specified watermark emitter, either 
send a punctuated watermark,
-                       // or set the timer for the first periodic watermark. 
In the periodic case, we set the timer
-                       // only for the first watermark, as it is the trigger() 
that will set the subsequent ones.
-
-                       if (punctuatedWatermarkAssigner != null) {
-                               final Watermark nextWatermark = 
punctuatedWatermarkAssigner
-                                       .checkAndGetNextWatermark(value, 
extractedTimestamp);
-                               if (nextWatermark != null) {
-                                       
emitWatermarkIfMarkingProgress(sourceContext);
-                               }
-                       } else if(periodicWatermarkAssigner != null && runtime 
== null) {
-                               runtime = (StreamingRuntimeContext) 
getRuntimeContext();
-                               watermarkInterval = 
runtime.getExecutionConfig().getAutoWatermarkInterval();
-                               if (watermarkInterval > 0) {
-                                       
runtime.registerTimer(System.currentTimeMillis() + watermarkInterval, this);
-                               }
-                       }
+       public FlinkKafkaConsumerBase<T> 
setPunctuatedWatermarkEmitter(AssignerWithPunctuatedWatermarks<T> assigner) {
+               checkNotNull(assigner);
+               
+               if (this.periodicWatermarkAssigner != null) {
+                       throw new IllegalStateException("A periodic watermark 
emitter has already been set.");
+               }
+               try {
+                       this.punctuatedWatermarkAssigner = new 
SerializedValue<>(assigner);
+                       return this;
+               } catch (Exception e) {
+                       throw new IllegalArgumentException("The given assigner 
is not serializable", e);
                }
-               updateOffsetForPartition(partDescriptor, offset);
        }
 
        /**
-        * Extract the timestamp from the element based on the user-specified 
extractor,
-        * emit the element with the new timestamp, and update the partition 
monitoring info (if necessary).
-        * In more detail, upon reception of an element with a timestamp 
greater than the greatest timestamp
-        * seen so far in that partition, this method updates the maximum 
timestamp seen for that partition,
-        * and marks the partition as {@code active}, i.e. it still receives 
fresh data.
-        * @param partDescriptor the partition the new element belongs to.
-        * @param value the element to be forwarded.
-        * @return the timestamp of the new element.
+        * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit 
watermarks in a punctuated manner.
+        * The watermark extractor will run per Kafka partition, watermarks 
will be merged across partitions
+        * in the same way as in the Flink runtime, when streams are merged.
+        *
+        * <p>When a subtask of a FlinkKafkaConsumer source reads multiple 
Kafka partitions,
+        * the streams from the partitions are unioned in a "first come first 
serve" fashion. Per-partition
+        * characteristics are usually lost that way. For example, if the 
timestamps are strictly ascending
+        * per Kafka partition, they will not be strictly ascending in the 
resulting Flink DataStream, if the
+        * parallel source subtask reads more that one partition.
+        *
+        * <p>Running timestamp extractors / watermark generators directly 
inside the Kafka source, per Kafka
+        * partition, allows users to let them exploit the per-partition 
characteristics.
+        *
+        * <p>Note: One can use either an {@link 
AssignerWithPunctuatedWatermarks} or an
+        * {@link AssignerWithPeriodicWatermarks}, not both at the same time.
+        *
+        * @param assigner The timestamp assigner / watermark generator to use.
+        * @return The consumer object, to allow function chaining.   
         */
-       private long extractTimestampAndEmitElement(KafkaTopicPartition 
partDescriptor, T value) {
-               long extractedTimestamp = 
getTimestampAssigner().extractTimestamp(value, Long.MIN_VALUE);
-               srcContext.collectWithTimestamp(value, extractedTimestamp);
-               updateMaximumTimestampForPartition(partDescriptor, 
extractedTimestamp);
-               return extractedTimestamp;
-       }
-
-       /**
-        * Upon reception of an element with a timestamp greater than the 
greatest timestamp seen so far in the partition,
-        * this method updates the maximum timestamp seen for that partition to 
{@code timestamp}, and marks the partition
-        * as {@code active}, i.e. it still receives fresh data. If the 
partition is not known to the system, then a new
-        * {@link KafkaPartitionState} is created and is associated to the new 
partition for future monitoring.
-        * @param partDescriptor
-        *            A descriptor containing the topic and the id of the 
partition.
-        * @param timestamp
-        *           The timestamp to set the minimum to, if smaller than the 
already existing one.
-        * @return {@code true} if the minimum was updated successfully to 
{@code timestamp}, {@code false}
-        *           if the previous value is smaller than the provided 
timestamp
-        * */
-       private boolean updateMaximumTimestampForPartition(KafkaTopicPartition 
partDescriptor, long timestamp) {
-               KafkaPartitionState info = getOrInitializeInfo(partDescriptor);
-
-               if(timestamp > info.getMaxTimestamp()) {
-
-                       // the flag is set to false as soon as the current 
partition's max timestamp is sent as a watermark.
-                       // if then, and for that partition, only late elements 
arrive, then the max timestamp will stay the
-                       // same, and it will keep the overall system from 
progressing.
-                       // To avoid this, we only mark a partition as active on 
non-late elements.
-
-                       info.setActive(true);
-                       info.setMaxTimestamp(timestamp);
-                       return  true;
+       public FlinkKafkaConsumerBase<T> 
setPeriodicWatermarkEmitter(AssignerWithPeriodicWatermarks<T> assigner) {
+               checkNotNull(assigner);
+               
+               if (this.punctuatedWatermarkAssigner != null) {
+                       throw new IllegalStateException("A punctuated watermark 
emitter has already been set.");
+               }
+               try {
+                       this.periodicWatermarkAssigner = new 
SerializedValue<>(assigner);
+                       return this;
+               } catch (Exception e) {
+                       throw new IllegalArgumentException("The given assigner 
is not serializable", e);
                }
-               return false;
        }
 
-       /**
-        * Updates the last read offset for the partition specified by the 
{@code partDescriptor} to {@code offset}.
-        * If it is the first time we see the partition, then a new {@link 
KafkaPartitionState} is created to monitor
-        * this specific partition.
-        * @param partDescriptor the partition whose info to update.
-        * @param offset the last read offset of the partition.
-        */
-       public void updateOffsetForPartition(KafkaTopicPartition 
partDescriptor, long offset) {
-               KafkaPartitionState info = getOrInitializeInfo(partDescriptor);
-               info.setOffset(offset);
-       }
+       // 
------------------------------------------------------------------------
+       //  Work methods
+       // 
------------------------------------------------------------------------
 
        @Override
-       public void trigger(long timestamp) throws Exception {
-               if(this.srcContext == null) {
-                       // if the trigger is called before any elements, then we
-                       // just set the next timer to fire when it should and we
-                       // ignore the triggering as this would produce no 
results.
-                       setNextWatermarkTimer();
-                       return;
+       public void run(SourceContext<T> sourceContext) throws Exception {
+               if (allSubscribedPartitions == null) {
+                       throw new Exception("The partitions were not set for 
the consumer");
                }
-
-               // this is valid because this method is only called when 
watermarks
-               // are set to be emitted periodically.
-               final Watermark nextWatermark = 
periodicWatermarkAssigner.getCurrentWatermark();
-               if(nextWatermark != null) {
-                       emitWatermarkIfMarkingProgress(srcContext);
-               }
-               setNextWatermarkTimer();
-       }
-
-       /**
-        * Emits a new watermark, with timestamp equal to the minimum across 
all the maximum timestamps
-        * seen per local partition (across all topics). The new watermark is 
emitted if and only if
-        * it signals progress in event-time, i.e. if its timestamp is greater 
than the timestamp of
-        * the last emitted watermark. In addition, this method marks as 
inactive the partition whose
-        * timestamp was emitted as watermark, i.e. the one with the minimum 
across the maximum timestamps
-        * of the local partitions. This is done to avoid not making progress 
because
-        * a partition stopped receiving data. The partition is going to be 
marked as {@code active}
-        * as soon as the <i>next non-late</i> element arrives.
-        *
-        * @return {@code true} if the Watermark was successfully emitted, 
{@code false} otherwise.
-        */
-       private boolean 
emitWatermarkIfMarkingProgress(SourceFunction.SourceContext<T> sourceContext) {
-               Tuple2<KafkaTopicPartition, Long> globalMinTs = 
getMinTimestampAcrossAllTopics();
-               if(globalMinTs.f0 != null ) {
-                       synchronized (sourceContext.getCheckpointLock()) {
-                               long minTs = globalMinTs.f1;
-                               if(minTs > lastEmittedWatermark) {
-                                       lastEmittedWatermark = minTs;
-                                       Watermark toEmit = new Watermark(minTs);
-                                       sourceContext.emitWatermark(toEmit);
-                                       return true;
-                               }
+               
+               // figure out which partitions this subtask should process
+               final List<KafkaTopicPartition> thisSubtaskPartitions = 
assignPartitions(allSubscribedPartitions,
+                               
getRuntimeContext().getNumberOfParallelSubtasks(), 
getRuntimeContext().getIndexOfThisSubtask());
+               
+               // we need only do work, if we actually have partitions assigned
+               if (!thisSubtaskPartitions.isEmpty()) {
+
+                       // (1) create the fetcher that will communicate with 
the Kafka brokers
+                       final AbstractFetcher<T, ?> fetcher = createFetcher(
+                                       sourceContext, thisSubtaskPartitions, 
+                                       periodicWatermarkAssigner, 
punctuatedWatermarkAssigner,
+                                       (StreamingRuntimeContext) 
getRuntimeContext());
+
+                       // (2) set the fetcher to the restored checkpoint 
offsets
+                       if (restoreToOffset != null) {
+                               fetcher.restoreOffsets(restoreToOffset);
                        }
-               }
-               return false;
-       }
 
-       /**
-        * Kafka sources with timestamp extractors are expected to keep the 
maximum timestamp seen per
-        * partition they are reading from. This is to mark the per-partition 
event-time progress.
-        *
-        * This method iterates this list, and returns the minimum timestamp 
across these per-partition
-        * max timestamps, and across all topics. In addition to this 
information, it also returns the topic and
-        * the partition within the topic the timestamp belongs to.
-        */
-       private Tuple2<KafkaTopicPartition, Long> 
getMinTimestampAcrossAllTopics() {
-               Tuple2<KafkaTopicPartition, Long> minTimestamp = new 
Tuple2<>(null, Long.MAX_VALUE);
-               for(Map.Entry<KafkaTopicPartition, KafkaPartitionState> 
entries: partitionState.entrySet()) {
-                       KafkaTopicPartition part = entries.getKey();
-                       KafkaPartitionState info = entries.getValue();
-
-                       if(partitionIsActive(part) && info.getMaxTimestamp() < 
minTimestamp.f1) {
-                               minTimestamp.f0 = part;
-                               minTimestamp.f1 = info.getMaxTimestamp();
+                       // publish the reference, for snapshot-, commit-, and 
cancel calls
+                       // IMPORTANT: We can only do that now, because only now 
will calls to
+                       //            the fetchers 'snapshotCurrentState()' 
method return at least
+                       //            the restored offsets
+                       this.kafkaFetcher = fetcher;
+                       if (!running) {
+                               return;
                        }
+                       
+                       // (3) run the fetcher' main work method
+                       fetcher.runFetchLoop();
                }
-
-               if(minTimestamp.f0 != null) {
-                       // it means that we have a winner and we have to set 
its flag to
-                       // inactive, until its next non-late element.
-                       KafkaTopicPartition partitionDescriptor = 
minTimestamp.f0;
-                       setActiveFlagForPartition(partitionDescriptor, false);
-               }
-
-               return minTimestamp;
-       }
-
-       /**
-        * Sets the {@code active} flag for a given partition of a topic to 
{@code isActive}.
-        * This flag signals if the partition is still receiving data and it is 
used to avoid the case
-        * where a partition stops receiving data, so its max seen timestamp 
does not advance, and it
-        * holds back the progress of the watermark for all partitions. Note 
that if the partition is
-        * not known to the system, then a new {@link KafkaPartitionState} is 
created and is associated
-        * to the new partition for future monitoring.
-        *
-        * @param partDescriptor
-        *                              A descriptor containing the topic and 
the id of the partition.
-        * @param isActive
-        *                              The value {@code true} or {@code false} 
to set the flag to.
-        */
-       private void setActiveFlagForPartition(KafkaTopicPartition 
partDescriptor, boolean isActive) {
-               KafkaPartitionState info = getOrInitializeInfo(partDescriptor);
-               info.setActive(isActive);
-       }
-
-       /**
-        * Gets the statistics for a given partition specified by the {@code 
partition} argument.
-        * If it is the first time we see this partition, a new {@link 
KafkaPartitionState} data structure
-        * is initialized to monitor it from now on. This method never throws a 
{@link NullPointerException}.
-        * @param partition the partition to be fetched.
-        * @return the gathered statistics for that partition.
-        * */
-       private KafkaPartitionState getOrInitializeInfo(KafkaTopicPartition 
partition) {
-               KafkaPartitionState info = partitionState.get(partition);
-               if(info == null) {
-                       info = new 
KafkaPartitionState(partition.getPartition(), 
FlinkKafkaConsumerBase.OFFSET_NOT_SET);
-                       partitionState.put(partition, info);
+               else {
+                       // this source never completes, so emit a 
Long.MAX_VALUE watermark
+                       // to not block watermark forwarding
+                       sourceContext.emitWatermark(new 
Watermark(Long.MAX_VALUE));
+
+                       // wait until this is canceled
+                       final Object waitLock = new Object();
+                       while (running) {
+                               try {
+                                       //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
+                                       synchronized (waitLock) {
+                                               waitLock.wait();
+                                       }
+                               }
+                               catch (InterruptedException e) {
+                                       if (!running) {
+                                               // restore the interrupted 
state, and fall through the loop
+                                               
Thread.currentThread().interrupt();
+                                       }
+                               }
+                       }
                }
-               return info;
        }
 
-       /**
-        * Checks if a partition of a topic is still active, i.e. if it still 
receives data.
-        * @param partDescriptor
-        *          A descriptor containing the topic and the id of the 
partition.
-        * */
-       private boolean partitionIsActive(KafkaTopicPartition partDescriptor) {
-               KafkaPartitionState info = partitionState.get(partDescriptor);
-               if(info == null) {
-                       throw new RuntimeException("Unknown Partition: Topic=" 
+ partDescriptor.getTopic() +
-                               " Partition=" + partDescriptor.getPartition());
+       @Override
+       public void cancel() {
+               // set ourselves as not running
+               running = false;
+               
+               // abort the fetcher, if there is one
+               if (kafkaFetcher != null) {
+                       kafkaFetcher.cancel();
                }
-               return info.isActive();
-       }
 
-       private TimestampAssigner<T> getTimestampAssigner() {
-               checkEmitterStateAfterInit();
-               return periodicWatermarkAssigner != null ? 
periodicWatermarkAssigner : punctuatedWatermarkAssigner;
-       }
-
-       private void setNextWatermarkTimer() {
-               long timeToNextWatermark = System.currentTimeMillis() + 
watermarkInterval;
-               runtime.registerTimer(timeToNextWatermark, this);
-       }
-
-       private void checkEmitterDuringInit() {
-               if(periodicWatermarkAssigner != null) {
-                       throw new RuntimeException("A periodic watermark 
emitter has already been provided.");
-               } else if(punctuatedWatermarkAssigner != null) {
-                       throw new RuntimeException("A punctuated watermark 
emitter has already been provided.");
-               }
+               // there will be an interrupt() call to the main thread anyways
        }
 
-       private void checkEmitterStateAfterInit() {
-               if(periodicWatermarkAssigner == null && 
punctuatedWatermarkAssigner == null) {
-                       throw new RuntimeException("The timestamp assigner has 
not been initialized.");
-               } else if(periodicWatermarkAssigner != null && 
punctuatedWatermarkAssigner != null) {
-                       throw new RuntimeException("The source can either have 
an assigner with punctuated " +
-                               "watermarks or one with periodic watermarks, 
not both.");
+       @Override
+       public void close() throws Exception {
+               // pretty much the same logic as cancelling
+               try {
+                       cancel();
+               } finally {
+                       super.close();
                }
        }
-
+       
        // 
------------------------------------------------------------------------
        //  Checkpoint and restore
        // 
------------------------------------------------------------------------
-
-       HashMap<KafkaTopicPartition, KafkaPartitionState> 
restoreInfoFromCheckpoint() {
-               HashMap<KafkaTopicPartition, KafkaPartitionState> partInfo = 
new HashMap<>(restoreToOffset.size());
-               for(Map.Entry<KafkaTopicPartition, Long> offsets: 
restoreToOffset.entrySet()) {
-                       KafkaTopicPartition key = offsets.getKey();
-                       partInfo.put(key, new 
KafkaPartitionState(key.getPartition(), offsets.getValue()));
-               }
-               return partInfo;
-       }
-
+       
        @Override
        public HashMap<KafkaTopicPartition, Long> snapshotState(long 
checkpointId, long checkpointTimestamp) throws Exception {
-               if (partitionState == null) {
-                       LOG.debug("snapshotState() requested on not yet opened 
source; returning null.");
-                       return null;
-               }
                if (!running) {
                        LOG.debug("snapshotState() called on closed source");
                        return null;
                }
-
-               HashMap<KafkaTopicPartition, Long> currentOffsets = new 
HashMap<>();
-               for (Map.Entry<KafkaTopicPartition, KafkaPartitionState> entry: 
partitionState.entrySet()) {
-                       currentOffsets.put(entry.getKey(), 
entry.getValue().getOffset());
+               
+               final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
+               if (fetcher == null) {
+                       // the fetcher has not yet been initialized, which 
means we need to return the
+                       // originally restored offsets
+                       return restoreToOffset;
                }
 
+               HashMap<KafkaTopicPartition, Long> currentOffsets = 
fetcher.snapshotCurrentState();
+
                if (LOG.isDebugEnabled()) {
                        LOG.debug("Snapshotting state. Offsets: {}, checkpoint 
id: {}, timestamp: {}",
                                        
KafkaTopicPartition.toString(currentOffsets), checkpointId, 
checkpointTimestamp);
@@ -447,7 +311,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                // the map cannot be asynchronously updated, because only one 
checkpoint call can happen
                // on this function at a time: either snapshotState() or 
notifyCheckpointComplete()
                pendingCheckpoints.put(checkpointId, currentOffsets);
-                       
+               
+               // truncate the map, to prevent infinite growth
                while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) 
{
                        pendingCheckpoints.remove(0);
                }
@@ -457,51 +322,49 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
 
        @Override
        public void restoreState(HashMap<KafkaTopicPartition, Long> 
restoredOffsets) {
-               LOG.info("Setting restore state in Kafka");
+               LOG.info("Setting restore state in the FlinkKafkaConsumer");
                restoreToOffset = restoredOffsets;
        }
 
        @Override
        public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
-               if (partitionState == null) {
-                       LOG.debug("notifyCheckpointComplete() called on 
uninitialized source");
-                       return;
-               }
                if (!running) {
                        LOG.debug("notifyCheckpointComplete() called on closed 
source");
                        return;
                }
+
+               final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
+               if (fetcher == null) {
+                       LOG.debug("notifyCheckpointComplete() called on 
uninitialized source");
+                       return;
+               }
                
                // only one commit operation must be in progress
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("Committing offsets externally for checkpoint 
{}", checkpointId);
+                       LOG.debug("Committing offsets to Kafka/ZooKeeper for 
checkpoint " + checkpointId);
                }
 
                try {
-                       HashMap<KafkaTopicPartition, Long> checkpointOffsets;
-       
-                       // the map may be asynchronously updates when 
snapshotting state, so we synchronize
-                       synchronized (pendingCheckpoints) {
-                               final int posInMap = 
pendingCheckpoints.indexOf(checkpointId);
-                               if (posInMap == -1) {
-                                       LOG.warn("Received confirmation for 
unknown checkpoint id {}", checkpointId);
-                                       return;
-                               }
+                       final int posInMap = 
pendingCheckpoints.indexOf(checkpointId);
+                       if (posInMap == -1) {
+                               LOG.warn("Received confirmation for unknown 
checkpoint id {}", checkpointId);
+                               return;
+                       }
 
-                               //noinspection unchecked
-                               checkpointOffsets = 
(HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap);
+                       @SuppressWarnings("unchecked")
+                       HashMap<KafkaTopicPartition, Long> checkpointOffsets = 
+                                       (HashMap<KafkaTopicPartition, Long>) 
pendingCheckpoints.remove(posInMap);
 
-                               
-                               // remove older checkpoints in map
-                               for (int i = 0; i < posInMap; i++) {
-                                       pendingCheckpoints.remove(0);
-                               }
+                       // remove older checkpoints in map
+                       for (int i = 0; i < posInMap; i++) {
+                               pendingCheckpoints.remove(0);
                        }
+
                        if (checkpointOffsets == null || 
checkpointOffsets.size() == 0) {
                                LOG.debug("Checkpoint state was empty.");
                                return;
                        }
-                       commitOffsets(checkpointOffsets);
+                       fetcher.commitSpecificOffsetsToKafka(checkpointOffsets);
                }
                catch (Exception e) {
                        if (running) {
@@ -511,33 +374,77 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                }
        }
 
-       protected abstract void commitOffsets(HashMap<KafkaTopicPartition, 
Long> checkpointOffsets) throws Exception;
-
-
+       // 
------------------------------------------------------------------------
+       //  Kafka Consumer specific methods
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * Creates the fetcher that connect to the Kafka brokers, pulls data, 
deserialized the
+        * data, and emits it into the data streams.
+        * 
+        * @param sourceContext The source context to emit data to.
+        * @param thisSubtaskPartitions The set of partitions that this subtask 
should handle.
+        * @param watermarksPeriodic Optional, a serialized timestamp extractor 
/ periodic watermark generator.
+        * @param watermarksPunctuated Optional, a serialized timestamp 
extractor / punctuated watermark generator.
+        * @param runtimeContext The task's runtime context.
+        * 
+        * @return The instantiated fetcher
+        * 
+        * @throws Exception The method should forward exceptions
+        */
+       protected abstract AbstractFetcher<T, ?> createFetcher(
+                       SourceContext<T> sourceContext,
+                       List<KafkaTopicPartition> thisSubtaskPartitions,
+                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
+                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
+                       StreamingRuntimeContext runtimeContext) throws 
Exception;
+       
+       // 
------------------------------------------------------------------------
+       //  ResultTypeQueryable methods 
+       // 
------------------------------------------------------------------------
+       
        @Override
        public TypeInformation<T> getProducedType() {
                return deserializer.getProducedType();
        }
 
-       protected static <T> List<T> assignPartitions(List<T> partitions, int 
numConsumers, int consumerIndex) {
-               checkArgument(numConsumers > 0);
-               checkArgument(consumerIndex < numConsumers);
-
-               List<T> partitionsToSub = new ArrayList<>();
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
 
-               for (int i = 0; i < partitions.size(); i++) {
+       /**
+        * Selects which of the given partitions should be handled by a 
specific consumer,
+        * given a certain number of consumers.
+        * 
+        * @param allPartitions The partitions to select from
+        * @param numConsumers The number of consumers
+        * @param consumerIndex The index of the specific consumer
+        * 
+        * @return The sublist of partitions to be handled by that consumer.
+        */
+       protected static List<KafkaTopicPartition> assignPartitions(
+                       List<KafkaTopicPartition> allPartitions,
+                       int numConsumers, int consumerIndex)
+       {
+               final List<KafkaTopicPartition> thisSubtaskPartitions = new 
ArrayList<>(
+                               allPartitions.size() / numConsumers + 1);
+
+               for (int i = 0; i < allPartitions.size(); i++) {
                        if (i % numConsumers == consumerIndex) {
-                               partitionsToSub.add(partitions.get(i));
+                               thisSubtaskPartitions.add(allPartitions.get(i));
                        }
                }
-               return partitionsToSub;
+               
+               return thisSubtaskPartitions;
        }
-
+       
        /**
-        * Method to log partition information.
+        * Logs the partition information in INFO level.
+        * 
+        * @param logger The logger to log to.
         * @param partitionInfos List of subscribed partitions
         */
-       public static void logPartitionInfo(List<KafkaTopicPartition> 
partitionInfos) {
+       protected static void logPartitionInfo(Logger logger, 
List<KafkaTopicPartition> partitionInfos) {
                Map<String, Integer> countPerTopic = new HashMap<>();
                for (KafkaTopicPartition partition : partitionInfos) {
                        Integer count = countPerTopic.get(partition.getTopic());
@@ -548,12 +455,13 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                        }
                        countPerTopic.put(partition.getTopic(), count);
                }
-               StringBuilder sb = new StringBuilder();
+               StringBuilder sb = new StringBuilder(
+                               "Consumer is going to read the following topics 
(with number of partitions): ");
+               
                for (Map.Entry<String, Integer> e : countPerTopic.entrySet()) {
                        sb.append(e.getKey()).append(" 
(").append(e.getValue()).append("), ");
                }
-               LOG.info("Consumer is going to read the following topics (with 
number of partitions): {}", sb.toString());
+               
+               logger.info(sb.toString());
        }
-
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
new file mode 100644
index 0000000..594aa66
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all fetchers, which implement the connections to Kafka 
brokers and
+ * pull records from Kafka partitions.
+ * 
+ * <p>This fetcher base class implements the logic around emitting records and 
tracking offsets,
+ * as well as around the optional timestamp assignment and watermark 
generation. 
+ * 
+ * @param <T> The type of elements deserialized from Kafka's byte records, and 
emitted into
+ *            the Flink data streams.
+ * @param <KPH> The type of topic/partition identifier used by Kafka in the 
specific version.
+ */
+public abstract class AbstractFetcher<T, KPH> {
+       
+       private static final int NO_TIMESTAMPS_WATERMARKS = 0;
+       private static final int PERIODIC_WATERMARKS = 1;
+       private static final int PUNCTUATED_WATERMARKS = 2;
+       
+       // 
------------------------------------------------------------------------
+       
+       /** The source context to emit records and watermarks to */
+       private final SourceContext<T> sourceContext;
+
+       /** The lock that guarantees that record emission and state updates are 
atomic,
+        * from the view of taking a checkpoint */
+       private final Object checkpointLock;
+
+       /** All partitions (and their state) that this fetcher is subscribed to 
*/
+       private final KafkaTopicPartitionState<KPH>[] allPartitions;
+
+       /** The mode describing whether the fetcher also generates timestamps 
and watermarks */
+       private final int timestampWatermarkMode;
+       
+       /** Only relevant for punctuated watermarks: The current cross 
partition watermark */
+       private volatile long maxWatermarkSoFar = Long.MIN_VALUE;
+
+       // 
------------------------------------------------------------------------
+       
+       protected AbstractFetcher(
+                       SourceContext<T> sourceContext,
+                       List<KafkaTopicPartition> assignedPartitions,
+                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
+                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
+                       StreamingRuntimeContext runtimeContext) throws Exception
+       {
+               this.sourceContext = checkNotNull(sourceContext);
+               this.checkpointLock = sourceContext.getCheckpointLock();
+               
+               // figure out what we watermark mode we will be using
+               
+               if (watermarksPeriodic == null) {
+                       if (watermarksPunctuated == null) {
+                               // simple case, no watermarks involved
+                               timestampWatermarkMode = 
NO_TIMESTAMPS_WATERMARKS;
+                       } else {
+                               timestampWatermarkMode = PUNCTUATED_WATERMARKS;
+                       }
+               } else {
+                       if (watermarksPunctuated == null) {
+                               timestampWatermarkMode = PERIODIC_WATERMARKS;
+                       } else {
+                               throw new IllegalArgumentException("Cannot have 
both periodic and punctuated watermarks");
+                       }
+               }
+               
+               // create our partition state according to the 
timestamp/watermark mode 
+               this.allPartitions = initializePartitions(
+                               assignedPartitions,
+                               timestampWatermarkMode,
+                               watermarksPeriodic, watermarksPunctuated,
+                               runtimeContext.getUserCodeClassLoader());
+               
+               // if we have periodic watermarks, kick off the interval 
scheduler
+               if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+                       KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] 
parts = 
+                                       
(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;
+                       
+                       PeriodicWatermarkEmitter periodicEmitter = 
+                                       new PeriodicWatermarkEmitter(parts, 
sourceContext, runtimeContext);
+                       periodicEmitter.start();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Properties
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Gets all partitions (with partition state) that this fetcher is 
subscribed to.
+        *
+        * @return All subscribed partitions.
+        */
+       protected final KafkaTopicPartitionState<KPH>[] subscribedPartitions() {
+               return allPartitions;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Core fetcher work methods
+       // 
------------------------------------------------------------------------
+
+       public abstract void runFetchLoop() throws Exception;
+       
+       public abstract void cancel();
+
+       // 
------------------------------------------------------------------------
+       //  Kafka version specifics
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * Creates the Kafka version specific representation of the given
+        * topic partition.
+        * 
+        * @param partition The Flink representation of the Kafka topic 
partition.
+        * @return The specific Kafka representation of the Kafka topic 
partition.
+        */
+       public abstract KPH createKafkaPartitionHandle(KafkaTopicPartition 
partition);
+
+       /**
+        * Commits the given partition offsets to the Kafka brokers (or to 
ZooKeeper for
+        * older Kafka versions).
+        * 
+        * @param offsets The offsets to commit to Kafka.
+        * @throws Exception This method forwards exceptions.
+        */
+       public abstract void 
commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws 
Exception;
+       
+       // 
------------------------------------------------------------------------
+       //  snapshot and restore the state
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Takes a snapshot of the partition offsets.
+        * 
+        * <p>Important: This method mus be called under the checkpoint lock.
+        * 
+        * @return A map from partition to current offset.
+        */
+       public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
+               // this method assumes that the checkpoint lock is held
+               assert Thread.holdsLock(checkpointLock);
+
+               HashMap<KafkaTopicPartition, Long> state = new 
HashMap<>(allPartitions.length);
+               for (KafkaTopicPartitionState<?> partition : 
subscribedPartitions()) {
+                       if (partition.isOffsetDefined()) {
+                               state.put(partition.getKafkaTopicPartition(), 
partition.getOffset());
+                       }
+               }
+               return state;
+       }
+
+       /**
+        * Restores the partition offsets.
+        * 
+        * @param snapshotState The offsets for the partitions 
+        */
+       public void restoreOffsets(HashMap<KafkaTopicPartition, Long> 
snapshotState) {
+               for (KafkaTopicPartitionState<?> partition : allPartitions) {
+                       Long offset = 
snapshotState.get(partition.getKafkaTopicPartition());
+                       if (offset != null) {
+                               partition.setOffset(offset);
+                       }
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  emitting records
+       // 
------------------------------------------------------------------------
+
+       /**
+        * 
+        * <p>Implementation Note: This method is kept brief to be JIT inlining 
friendly.
+        * That makes the fast path efficient, the extended paths are called as 
separate methods.
+        * 
+        * @param record The record to emit
+        * @param partitionState The state of the Kafka partition from which 
the record was fetched
+        * @param offset The offset from which the record was fetched
+        */
+       protected final void emitRecord(T record, KafkaTopicPartitionState<KPH> 
partitionState, long offset) {
+               if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
+                       // fast path logic, in case there are no watermarks
+
+                       // emit the record, using the checkpoint lock to 
guarantee
+                       // atomicity of record emission and offset state update
+                       synchronized (checkpointLock) {
+                               sourceContext.collect(record);
+                               partitionState.setOffset(offset);
+                       }
+               }
+               else if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
+                       emitRecordWithTimestampAndPeriodicWatermark(record, 
partitionState, offset);
+               }
+               else {
+                       emitRecordWithTimestampAndPunctuatedWatermark(record, 
partitionState, offset);
+               }
+       }
+
+       /**
+        * Record emission, if a timestamp will be attached from an assigner 
that is
+        * also a periodic watermark generator.
+        */
+       private void emitRecordWithTimestampAndPeriodicWatermark(
+                       T record, KafkaTopicPartitionState<KPH> partitionState, 
long offset)
+       {
+               @SuppressWarnings("unchecked")
+               final KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> 
withWatermarksState =
+                               
(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>) partitionState;
+
+               // extract timestamp - this accesses/modifies the per-partition 
state inside the
+               // watermark generator instance, so we need to lock the access 
on the
+               // partition state. concurrent access can happen from the 
periodic emitter
+               final long timestamp;
+               //noinspection SynchronizationOnLocalVariableOrMethodParameter
+               synchronized (withWatermarksState) {
+                       timestamp = 
withWatermarksState.getTimestampForRecord(record);
+               }
+
+               // emit the record with timestamp, using the usual checkpoint 
lock to guarantee
+               // atomicity of record emission and offset state update 
+               synchronized (checkpointLock) {
+                       sourceContext.collectWithTimestamp(record, timestamp);
+                       partitionState.setOffset(offset);
+               }
+       }
+
+       /**
+        * Record emission, if a timestamp will be attached from an assigner 
that is
+        * also a punctuated watermark generator.
+        */
+       private void emitRecordWithTimestampAndPunctuatedWatermark(
+                       T record, KafkaTopicPartitionState<KPH> partitionState, 
long offset)
+       {
+               @SuppressWarnings("unchecked")
+               final KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> 
withWatermarksState =
+                               
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) partitionState;
+
+               // only one thread ever works on accessing timestamps and 
watermarks
+               // from the punctuated extractor
+               final long timestamp = 
withWatermarksState.getTimestampForRecord(record);
+               final Watermark newWatermark = 
withWatermarksState.checkAndGetNewWatermark(record, timestamp);
+                       
+               // emit the record with timestamp, using the usual checkpoint 
lock to guarantee
+               // atomicity of record emission and offset state update 
+               synchronized (checkpointLock) {
+                       sourceContext.collectWithTimestamp(record, timestamp);
+                       partitionState.setOffset(offset);
+               }
+               
+               // if we also have a new per-partition watermark, check if that 
is also a
+               // new cross-partition watermark
+               if (newWatermark != null) {
+                       updateMinPunctuatedWatermark(newWatermark);
+               }
+       }
+       /**
+        *Checks whether a new per-partition watermark is also a new 
cross-partition watermark.
+        */
+       private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
+               if (nextWatermark.getTimestamp() > maxWatermarkSoFar) {
+                       long newMin = Long.MAX_VALUE;
+                       
+                       for (KafkaTopicPartitionState<?> state : allPartitions) 
{
+                               @SuppressWarnings("unchecked")
+                               final 
KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> withWatermarksState =
+                                               
(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>) state;
+                               
+                               newMin = Math.min(newMin, 
withWatermarksState.getCurrentPartitionWatermark());
+                       }
+                       
+                       // double-check locking pattern
+                       if (newMin > maxWatermarkSoFar) {
+                               synchronized (checkpointLock) {
+                                       if (newMin > maxWatermarkSoFar) {
+                                               maxWatermarkSoFar = newMin;
+                                               sourceContext.emitWatermark(new 
Watermark(newMin));
+                                       }
+                               }
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Utility method that takes the topic partitions and creates the topic 
partition state
+        * holders. If a watermark generator per partition exists, this will 
also initialize those.
+        */
+       private KafkaTopicPartitionState<KPH>[] initializePartitions(
+                       List<KafkaTopicPartition> assignedPartitions,
+                       int timestampWatermarkMode,
+                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
+                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
+                       ClassLoader userCodeClassLoader)
+               throws IOException, ClassNotFoundException
+       {
+               @SuppressWarnings("unchecked")
+               KafkaTopicPartitionState<KPH>[] partitions =
+                               (KafkaTopicPartitionState<KPH>[]) new 
KafkaTopicPartitionState<?>[assignedPartitions.size()];
+
+               int pos = 0;
+               for (KafkaTopicPartition partition : assignedPartitions) {
+                       // create the kafka version specific partition handle
+                       KPH kafkaHandle = createKafkaPartitionHandle(partition);
+                       
+                       // create the partition state
+                       KafkaTopicPartitionState<KPH> partitionState;
+                       switch (timestampWatermarkMode) {
+                               case NO_TIMESTAMPS_WATERMARKS:
+                                       partitionState = new 
KafkaTopicPartitionState<>(partition, kafkaHandle);
+                                       break;
+                               case PERIODIC_WATERMARKS: {
+                                       AssignerWithPeriodicWatermarks<T> 
assignerInstance =
+                                                       
watermarksPeriodic.deserializeValue(userCodeClassLoader);
+                                       partitionState = new 
KafkaTopicPartitionStateWithPeriodicWatermarks<>(
+                                                       partition, kafkaHandle, 
assignerInstance);
+                                       break;
+                               }
+                                       
+                               case PUNCTUATED_WATERMARKS: {
+                                       AssignerWithPunctuatedWatermarks<T> 
assignerInstance =
+                                                       
watermarksPunctuated.deserializeValue(userCodeClassLoader);
+                                       partitionState = new 
KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
+                                                       partition, kafkaHandle, 
assignerInstance);
+                                       break;
+                               }
+                               default:
+                                       // cannot happen, add this as a guard 
for the future
+                                       throw new RuntimeException();
+                       }
+
+                       partitions[pos++] = partitionState;
+               }
+               
+               return partitions;
+       }
+       
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * The periodic watermark emitter. In its given interval, it checks all 
partitions for
+        * the current event time watermark, and possibly emits the next 
watermark.
+        */
+       private static class PeriodicWatermarkEmitter implements Triggerable {
+
+               private final KafkaTopicPartitionStateWithPeriodicWatermarks<?, 
?>[] allPartitions;
+               
+               private final SourceContext<?> emitter;
+               
+               private final StreamingRuntimeContext triggerContext;
+
+               private final long interval;
+               
+               private long lastWatermarkTimestamp;
+               
+               //-------------------------------------------------
+
+               PeriodicWatermarkEmitter(
+                               
KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions,
+                               SourceContext<?> emitter,
+                               StreamingRuntimeContext runtimeContext)
+               {
+                       this.allPartitions = checkNotNull(allPartitions);
+                       this.emitter = checkNotNull(emitter);
+                       this.triggerContext = checkNotNull(runtimeContext);
+                       this.interval = 
runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
+                       this.lastWatermarkTimestamp = Long.MIN_VALUE;
+               }
+
+               //-------------------------------------------------
+               
+               public void start() {
+                       triggerContext.registerTimer(System.currentTimeMillis() 
+ interval, this);
+               }
+               
+               @Override
+               public void trigger(long timestamp) throws Exception {
+                       // sanity check
+                       assert Thread.holdsLock(emitter.getCheckpointLock());
+                       
+                       long minAcrossAll = Long.MAX_VALUE;
+                       for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, 
?> state : allPartitions) {
+                               
+                               // we access the current watermark for the 
periodic assigners under the state
+                               // lock, to prevent concurrent modification to 
any internal variables
+                               final long curr;
+                               //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
+                               synchronized (state) {
+                                       curr = 
state.getCurrentWatermarkTimestamp();
+                               }
+                               
+                               minAcrossAll = Math.min(minAcrossAll, curr);
+                       }
+                       
+                       // emit next watermark, if there is one
+                       if (minAcrossAll > lastWatermarkTimestamp) {
+                               lastWatermarkTimestamp = minAcrossAll;
+                               emitter.emitWatermark(new 
Watermark(minAcrossAll));
+                       }
+                       
+                       // schedule the next watermark
+                       triggerContext.registerTimer(System.currentTimeMillis() 
+ interval, this);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
new file mode 100644
index 0000000..9a0e4e3
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * 
+ */
+public class ExceptionProxy {
+       
+       /** The thread that should be interrupted when an exception occurs */
+       private final Thread toInterrupt;
+       
+       /** The exception to throw */ 
+       private final AtomicReference<Throwable> exception;
+
+       /**
+        * 
+        * @param toInterrupt The thread to interrupt upon an exception. May be 
null.
+        */
+       public ExceptionProxy(@Nullable Thread toInterrupt) {
+               this.toInterrupt = toInterrupt;
+               this.exception = new AtomicReference<>();
+       }
+       
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * Sets the exception occurred and interrupts the target thread,
+        * if no other exception has occurred so far.
+        * 
+        * @param t The exception that occurred
+        */
+       public void reportError(Throwable t) {
+               // set the exception, if it is the first
+               if (exception.compareAndSet(null, t) && toInterrupt != null) {
+                       toInterrupt.interrupt();
+               }
+       }
+       
+       public void checkAndThrowException() throws Exception {
+               Throwable t = exception.get();
+               if (t != null) {
+                       if (t instanceof Exception) {
+                               throw (Exception) t;
+                       }
+                       else if (t instanceof Error) {
+                               throw (Error) t;
+                       }
+                       else {
+                               throw new Exception(t);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionState.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionState.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionState.java
deleted file mode 100644
index 11a392a..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionState.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import java.io.Serializable;
-
-public class KafkaPartitionState implements Serializable {
-
-       private static final long serialVersionUID = 722083576322742328L;
-
-       private final int partitionID;
-       private long offset;
-
-       private long maxTimestamp = Long.MIN_VALUE;
-       private boolean isActive = false;
-
-       public KafkaPartitionState(int id, long offset) {
-               this.partitionID = id;
-               this.offset = offset;
-       }
-
-       public void setOffset(long offset) {
-               this.offset = offset;
-       }
-
-       public void setActive(boolean isActive) {
-               this.isActive = isActive;
-       }
-
-       public void setMaxTimestamp(long timestamp) {
-               maxTimestamp = timestamp;
-       }
-
-       public int getPartition() {
-               return partitionID;
-       }
-
-       public boolean isActive() {
-               return isActive;
-       }
-
-       public long getMaxTimestamp() {
-               return maxTimestamp;
-       }
-
-       public long getOffset() {
-               return offset;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
index aea14cf..c68fe28 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
@@ -24,14 +24,20 @@ import java.util.Map;
 
 import static java.util.Objects.requireNonNull;
 
-
 /**
- * A serializable representation of a kafka topic and a partition.
- * Used as an operator state for the Kafka consumer
+ * Flink's description of a partition in a Kafka topic.
+ * Serializable, and common across all Kafka consumer subclasses (0.8, 0.9, 
...)
+ * 
+ * <p>Note: This class must not change in its structure, because it would 
change the
+ * serialization format and make previous savepoints unreadable.
  */
-public class KafkaTopicPartition implements Serializable {
+public final class KafkaTopicPartition implements Serializable {
 
+       /** THIS SERIAL VERSION UID MUST NOT CHANGE, BECAUSE IT WOULD BREAK
+        * READING OLD SERIALIZED INSTANCES FROM SAVEPOINTS */
        private static final long serialVersionUID = 722083576322742325L;
+       
+       // 
------------------------------------------------------------------------
 
        private final String topic;
        private final int partition;
@@ -43,6 +49,8 @@ public class KafkaTopicPartition implements Serializable {
                this.cachedHash = 31 * topic.hashCode() + partition;
        }
 
+       // 
------------------------------------------------------------------------
+       
        public String getTopic() {
                return topic;
        }
@@ -51,6 +59,8 @@ public class KafkaTopicPartition implements Serializable {
                return partition;
        }
 
+       // 
------------------------------------------------------------------------
+       
        @Override
        public String toString() {
                return "KafkaTopicPartition{" +
@@ -64,25 +74,23 @@ public class KafkaTopicPartition implements Serializable {
                if (this == o) {
                        return true;
                }
-               if (!(o instanceof KafkaTopicPartition)) {
-                       return false;
+               else if (o instanceof KafkaTopicPartition) {
+                       KafkaTopicPartition that = (KafkaTopicPartition) o;
+                       return this.partition == that.partition && 
this.topic.equals(that.topic);
                }
-
-               KafkaTopicPartition that = (KafkaTopicPartition) o;
-
-               if (partition != that.partition) {
+               else {
                        return false;
                }
-               return topic.equals(that.topic);
        }
 
        @Override
        public int hashCode() {
                return cachedHash;
        }
-
-
-       // ------------------- Utilities -------------------------------------
+       
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
 
        public static String toString(Map<KafkaTopicPartition, Long> map) {
                StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
new file mode 100644
index 0000000..36612a4
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+/**
+ * The state that the Flink Kafka Consumer holds for each Kafka partition.
+ * Includes the Kafka descriptor for partitions.
+ * 
+ * <p>This class describes the most basic state (only the offset), subclasses
+ * define more elaborate state, containing current watermarks and timestamp
+ * extractors.
+ * 
+ * @param <KPH> The type of the Kafka partition descriptor, which varies 
across Kafka versions.
+ */
+public class KafkaTopicPartitionState<KPH> {
+
+       /** Magic number to define an unset offset. Negative offsets are not 
used by Kafka (invalid),
+        * and we pick a number that is probably (hopefully) not used by Kafka 
as a magic number for anything else. */
+       public static final long OFFSET_NOT_SET = -915623761776L;
+       
+       // 
------------------------------------------------------------------------
+
+       /** The Flink description of a Kafka partition */
+       private final KafkaTopicPartition partition;
+
+       /** The Kafka description of a Kafka partition (varies across different 
Kafka versions) */
+       private final KPH kafkaPartitionHandle;
+       
+       /** The offset within the Kafka partition that we already processed */
+       private volatile long offset;
+
+       // 
------------------------------------------------------------------------
+       
+       public KafkaTopicPartitionState(KafkaTopicPartition partition, KPH 
kafkaPartitionHandle) {
+               this.partition = partition;
+               this.kafkaPartitionHandle = kafkaPartitionHandle;
+               this.offset = OFFSET_NOT_SET;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Gets Flink's descriptor for the Kafka Partition.
+        * @return The Flink partition descriptor.
+        */
+       public final KafkaTopicPartition getKafkaTopicPartition() {
+               return partition;
+       }
+
+       /**
+        * Gets Kafka's descriptor for the Kafka Partition.
+        * @return The Kafka partition descriptor.
+        */
+       public final KPH getKafkaPartitionHandle() {
+               return kafkaPartitionHandle;
+       }
+
+       public final String getTopic() {
+               return partition.getTopic();
+       }
+
+       public final int getPartition() {
+               return partition.getPartition();
+       }
+
+       /**
+        * The current offset in the partition. This refers to the offset last 
element that
+        * we retrieved and emitted successfully. It is the offset that should 
be stored in
+        * a checkpoint.
+        */
+       public final long getOffset() {
+               return offset;
+       }
+
+       public final void setOffset(long offset) {
+               this.offset = offset;
+       }
+       
+       public final boolean isOffsetDefined() {
+               return offset != OFFSET_NOT_SET;
+       }
+       
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public String toString() {
+               return "Partition: " + partition + ", KafkaPartitionHandle=" + 
kafkaPartitionHandle
+                               + ", offset=" + (isOffsetDefined() ? 
String.valueOf(offset) : "(not set)");
+       }
+}

Reply via email to