http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
new file mode 100644
index 0000000..c68fe28
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink's description of a partition in a Kafka topic.
+ * Serializable, and common across all Kafka consumer subclasses (0.8, 0.9, 
...)
+ * 
+ * <p>Note: This class must not change in its structure, because it would 
change the
+ * serialization format and make previous savepoints unreadable.
+ */
+public final class KafkaTopicPartition implements Serializable {
+
+       /** THIS SERIAL VERSION UID MUST NOT CHANGE, BECAUSE IT WOULD BREAK
+        * READING OLD SERIALIZED INSTANCES FROM SAVEPOINTS */
+       private static final long serialVersionUID = 722083576322742325L;
+       
+       // 
------------------------------------------------------------------------
+
+       private final String topic;
+       private final int partition;
+       private final int cachedHash;
+
+       public KafkaTopicPartition(String topic, int partition) {
+               this.topic = requireNonNull(topic);
+               this.partition = partition;
+               this.cachedHash = 31 * topic.hashCode() + partition;
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       public String getTopic() {
+               return topic;
+       }
+
+       public int getPartition() {
+               return partition;
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public String toString() {
+               return "KafkaTopicPartition{" +
+                               "topic='" + topic + '\'' +
+                               ", partition=" + partition +
+                               '}';
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               else if (o instanceof KafkaTopicPartition) {
+                       KafkaTopicPartition that = (KafkaTopicPartition) o;
+                       return this.partition == that.partition && 
this.topic.equals(that.topic);
+               }
+               else {
+                       return false;
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               return cachedHash;
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       public static String toString(Map<KafkaTopicPartition, Long> map) {
+               StringBuilder sb = new StringBuilder();
+               for (Map.Entry<KafkaTopicPartition, Long> p: map.entrySet()) {
+                       KafkaTopicPartition ktp = p.getKey();
+                       
sb.append(ktp.getTopic()).append(":").append(ktp.getPartition()).append("=").append(p.getValue()).append(",
 ");
+               }
+               return sb.toString();
+       }
+
+       public static String toString(List<KafkaTopicPartition> partitions) {
+               StringBuilder sb = new StringBuilder();
+               for (KafkaTopicPartition p: partitions) {
+                       
sb.append(p.getTopic()).append(":").append(p.getPartition()).append(", ");
+               }
+               return sb.toString();
+       }
+
+
+       public static List<KafkaTopicPartition> 
dropLeaderData(List<KafkaTopicPartitionLeader> partitionInfos) {
+               List<KafkaTopicPartition> ret = new 
ArrayList<>(partitionInfos.size());
+               for(KafkaTopicPartitionLeader ktpl: partitionInfos) {
+                       ret.add(ktpl.getTopicPartition());
+               }
+               return ret;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
new file mode 100644
index 0000000..1959a05
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.kafka.common.Node;
+
+import java.io.Serializable;
+
+/**
+ * Serializable Topic Partition info with leader Node information.
+ * This class is used at runtime.
+ */
+public class KafkaTopicPartitionLeader implements Serializable {
+
+       private static final long serialVersionUID = 9145855900303748582L;
+
+       private final int leaderId;
+       private final int leaderPort;
+       private final String leaderHost;
+       private final KafkaTopicPartition topicPartition;
+       private final int cachedHash;
+
+       public KafkaTopicPartitionLeader(KafkaTopicPartition topicPartition, 
Node leader) {
+               this.topicPartition = topicPartition;
+               if (leader == null) {
+                       this.leaderId = -1;
+                       this.leaderHost = null;
+                       this.leaderPort = -1;
+               } else {
+                       this.leaderId = leader.id();
+                       this.leaderPort = leader.port();
+                       this.leaderHost = leader.host();
+               }
+               int cachedHash = (leader == null) ? 14 : leader.hashCode();
+               this.cachedHash = 31 * cachedHash + topicPartition.hashCode();
+       }
+
+       public KafkaTopicPartition getTopicPartition() {
+               return topicPartition;
+       }
+
+       public Node getLeader() {
+               if (this.leaderId == -1) {
+                       return null;
+               } else {
+                       return new Node(leaderId, leaderHost, leaderPort);
+               }
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (!(o instanceof KafkaTopicPartitionLeader)) {
+                       return false;
+               }
+
+               KafkaTopicPartitionLeader that = (KafkaTopicPartitionLeader) o;
+
+               if (!topicPartition.equals(that.topicPartition)) {
+                       return false;
+               }
+               return leaderId == that.leaderId && leaderPort == 
that.leaderPort && leaderHost.equals(that.leaderHost);
+       }
+
+       @Override
+       public int hashCode() {
+               return cachedHash;
+       }
+
+       @Override
+       public String toString() {
+               return "KafkaTopicPartitionLeader{" +
+                               "leaderId=" + leaderId +
+                               ", leaderPort=" + leaderPort +
+                               ", leaderHost='" + leaderHost + '\'' +
+                               ", topic=" + topicPartition.getTopic() +
+                               ", partition=" + topicPartition.getPartition() +
+                               '}';
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
new file mode 100644
index 0000000..7cb5f46
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+/**
+ * The state that the Flink Kafka Consumer holds for each Kafka partition.
+ * Includes the Kafka descriptor for partitions.
+ * 
+ * <p>This class describes the most basic state (only the offset), subclasses
+ * define more elaborate state, containing current watermarks and timestamp
+ * extractors.
+ * 
+ * @param <KPH> The type of the Kafka partition descriptor, which varies 
across Kafka versions.
+ */
+public class KafkaTopicPartitionState<KPH> {
+
+       /** Magic number to define an unset offset. Negative offsets are not 
used by Kafka (invalid),
+        * and we pick a number that is probably (hopefully) not used by Kafka 
as a magic number for anything else. */
+       public static final long OFFSET_NOT_SET = -915623761776L;
+       
+       // 
------------------------------------------------------------------------
+
+       /** The Flink description of a Kafka partition */
+       private final KafkaTopicPartition partition;
+
+       /** The Kafka description of a Kafka partition (varies across different 
Kafka versions) */
+       private final KPH kafkaPartitionHandle;
+       
+       /** The offset within the Kafka partition that we already processed */
+       private volatile long offset;
+
+       /** The offset of the Kafka partition that has been committed */
+       private volatile long committedOffset;
+
+       // 
------------------------------------------------------------------------
+       
+       public KafkaTopicPartitionState(KafkaTopicPartition partition, KPH 
kafkaPartitionHandle) {
+               this.partition = partition;
+               this.kafkaPartitionHandle = kafkaPartitionHandle;
+               this.offset = OFFSET_NOT_SET;
+               this.committedOffset = OFFSET_NOT_SET;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Gets Flink's descriptor for the Kafka Partition.
+        * @return The Flink partition descriptor.
+        */
+       public final KafkaTopicPartition getKafkaTopicPartition() {
+               return partition;
+       }
+
+       /**
+        * Gets Kafka's descriptor for the Kafka Partition.
+        * @return The Kafka partition descriptor.
+        */
+       public final KPH getKafkaPartitionHandle() {
+               return kafkaPartitionHandle;
+       }
+
+       public final String getTopic() {
+               return partition.getTopic();
+       }
+
+       public final int getPartition() {
+               return partition.getPartition();
+       }
+
+       /**
+        * The current offset in the partition. This refers to the offset last 
element that
+        * we retrieved and emitted successfully. It is the offset that should 
be stored in
+        * a checkpoint.
+        */
+       public final long getOffset() {
+               return offset;
+       }
+
+       public final void setOffset(long offset) {
+               this.offset = offset;
+       }
+
+       public final boolean isOffsetDefined() {
+               return offset != OFFSET_NOT_SET;
+       }
+
+       public final void setCommittedOffset(long offset) {
+               this.committedOffset = offset;
+       }
+
+       public final long getCommittedOffset() {
+               return committedOffset;
+       }
+
+       
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public String toString() {
+               return "Partition: " + partition + ", KafkaPartitionHandle=" + 
kafkaPartitionHandle
+                               + ", offset=" + (isOffsetDefined() ? 
String.valueOf(offset) : "(not set)");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
new file mode 100644
index 0000000..efdc73f
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+/**
+ * A special version of the per-kafka-partition-state that additionally holds
+ * a periodic watermark generator (and timestamp extractor) per partition.
+ * 
+ * @param <T> The type of records handled by the watermark generator
+ * @param <KPH> The type of the Kafka partition descriptor, which varies 
across Kafka versions.
+ */
+public final class KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> 
extends KafkaTopicPartitionState<KPH> {
+       
+       /** The timestamp assigner and watermark generator for the partition */
+       private final AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks;
+       
+       /** The last watermark timestamp generated by this partition */
+       private long partitionWatermark;
+
+       // 
------------------------------------------------------------------------
+       
+       public KafkaTopicPartitionStateWithPeriodicWatermarks(
+                       KafkaTopicPartition partition, KPH kafkaPartitionHandle,
+                       AssignerWithPeriodicWatermarks<T> 
timestampsAndWatermarks)
+       {
+               super(partition, kafkaPartitionHandle);
+               
+               this.timestampsAndWatermarks = timestampsAndWatermarks;
+               this.partitionWatermark = Long.MIN_VALUE;
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       public long getTimestampForRecord(T record, long kafkaEventTimestamp) {
+               return timestampsAndWatermarks.extractTimestamp(record, 
kafkaEventTimestamp);
+       }
+       
+       public long getCurrentWatermarkTimestamp() {
+               Watermark wm = timestampsAndWatermarks.getCurrentWatermark();
+               if (wm != null) {
+                       partitionWatermark = Math.max(partitionWatermark, 
wm.getTimestamp());
+               }
+               return partitionWatermark;
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public String toString() {
+               return "KafkaTopicPartitionStateWithPeriodicWatermarks: 
partition=" + getKafkaTopicPartition()
+                               + ", offset=" + getOffset() + ", watermark=" + 
partitionWatermark;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
new file mode 100644
index 0000000..edf40ce
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import javax.annotation.Nullable;
+
+/**
+ * A special version of the per-kafka-partition-state that additionally holds
+ * a periodic watermark generator (and timestamp extractor) per partition.
+ * 
+ * <p>This class is not thread safe, but it gives volatile access to the 
current
+ * partition watermark ({@link #getCurrentPartitionWatermark()}).
+ * 
+ * @param <T> The type of records handled by the watermark generator
+ * @param <KPH> The type of the Kafka partition descriptor, which varies 
across Kafka versions
+ */
+public final class KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> 
extends KafkaTopicPartitionState<KPH> {
+       
+       /** The timestamp assigner and watermark generator for the partition */
+       private final AssignerWithPunctuatedWatermarks<T> 
timestampsAndWatermarks;
+       
+       /** The last watermark timestamp generated by this partition */
+       private volatile long partitionWatermark;
+
+       // 
------------------------------------------------------------------------
+       
+       public KafkaTopicPartitionStateWithPunctuatedWatermarks(
+                       KafkaTopicPartition partition, KPH kafkaPartitionHandle,
+                       AssignerWithPunctuatedWatermarks<T> 
timestampsAndWatermarks)
+       {
+               super(partition, kafkaPartitionHandle);
+               
+               this.timestampsAndWatermarks = timestampsAndWatermarks;
+               this.partitionWatermark = Long.MIN_VALUE;
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       public long getTimestampForRecord(T record, long kafkaEventTimestamp) {
+               return timestampsAndWatermarks.extractTimestamp(record, 
kafkaEventTimestamp);
+       }
+
+       @Nullable
+       public Watermark checkAndGetNewWatermark(T record, long timestamp) {
+               Watermark mark = 
timestampsAndWatermarks.checkAndGetNextWatermark(record, timestamp);
+               if (mark != null && mark.getTimestamp() > partitionWatermark) {
+                       partitionWatermark = mark.getTimestamp();
+                       return mark;
+               }
+               else {
+                       return null;
+               }
+       }
+       
+       public long getCurrentPartitionWatermark() {
+               return partitionWatermark;
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public String toString() {
+               return "KafkaTopicPartitionStateWithPunctuatedWatermarks: 
partition=" + getKafkaTopicPartition()
+                               + ", offset=" + getOffset() + ", watermark=" + 
partitionWatermark;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
new file mode 100644
index 0000000..7a41ade
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+public class TypeUtil {
+       private TypeUtil() {}
+
+       /**
+        * Creates TypeInformation array for an array of Classes.
+        * @param fieldTypes classes to extract type information from
+        * @return type information
+        */
+       public static TypeInformation<?>[] toTypeInfo(Class<?>[] fieldTypes) {
+               TypeInformation<?>[] typeInfos = new 
TypeInformation[fieldTypes.length];
+               for (int i = 0; i < fieldTypes.length; i++) {
+                       typeInfos[i] = TypeExtractor.getForClass(fieldTypes[i]);
+               }
+               return typeInfos;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
new file mode 100644
index 0000000..cedb696
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals.metrics;
+
+import org.apache.flink.metrics.Gauge;
+
+/**
+ * Gauge for getting the current value of a Kafka metric.
+ */
+public class KafkaMetricWrapper implements Gauge<Double> {
+       private final org.apache.kafka.common.Metric kafkaMetric;
+
+       public KafkaMetricWrapper(org.apache.kafka.common.Metric metric) {
+               this.kafkaMetric = metric;
+       }
+
+       @Override
+       public Double getValue() {
+               return kafkaMetric.value();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
new file mode 100644
index 0000000..9b848e0
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+import java.io.Serializable;
+
+/**
+ * A partitioner ensuring that each internal Flink partition ends up in one 
Kafka partition.
+ *
+ * Note, one Kafka partition can contain multiple Flink partitions.
+ *
+ * Cases:
+ *     # More Flink partitions than kafka partitions
+ * <pre>
+ *             Flink Sinks:            Kafka Partitions
+ *                     1       ----------------&gt;    1
+ *                     2   --------------/
+ *                     3   -------------/
+ *                     4       ------------/
+ * </pre>
+ * Some (or all) kafka partitions contain the output of more than one flink 
partition
+ *
+ *# Fewer Flink partitions than Kafka
+ * <pre>
+ *             Flink Sinks:            Kafka Partitions
+ *                     1       ----------------&gt;    1
+ *                     2       ----------------&gt;    2
+ *                                                                             
3
+ *                                                                             
4
+ *                                                                             
5
+ * </pre>
+ *
+ *  Not all Kafka partitions contain data
+ *  To avoid such an unbalanced partitioning, use a round-robin kafka 
partitioner. (note that this will
+ *  cause a lot of network connections between all the Flink instances and all 
the Kafka brokers
+ *
+ *
+ */
+public class FixedPartitioner<T> extends KafkaPartitioner<T> implements 
Serializable {
+       private static final long serialVersionUID = 1627268846962918126L;
+
+       private int targetPartition = -1;
+
+       @Override
+       public void open(int parallelInstanceId, int parallelInstances, int[] 
partitions) {
+               if (parallelInstanceId < 0 || parallelInstances <= 0 || 
partitions.length == 0) {
+                       throw new IllegalArgumentException();
+               }
+               
+               this.targetPartition = partitions[parallelInstanceId % 
partitions.length];
+       }
+
+       @Override
+       public int partition(T next, byte[] serializedKey, byte[] 
serializedValue, int numPartitions) {
+               if (targetPartition >= 0) {
+                       return targetPartition;
+               } else {
+                       throw new RuntimeException("The partitioner has not 
been initialized properly");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
new file mode 100644
index 0000000..37e2ef6
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+import java.io.Serializable;
+
+/**
+ * It contains a open() method which is called on each parallel instance.
+ * Partitioners must be serializable!
+ */
+public abstract class KafkaPartitioner<T> implements Serializable {
+
+       private static final long serialVersionUID = -1974260817778593473L;
+
+       /**
+        * Initializer for the Partitioner.
+        * @param parallelInstanceId 0-indexed id of the parallel instance in 
Flink
+        * @param parallelInstances the total number of parallel instances
+        * @param partitions an array describing the partition IDs of the 
available Kafka partitions.
+        */
+       public void open(int parallelInstanceId, int parallelInstances, int[] 
partitions) {
+               // overwrite this method if needed.
+       }
+
+       public abstract int partition(T next, byte[] serializedKey, byte[] 
serializedValue, int numPartitions);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
new file mode 100644
index 0000000..d170058
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+
+
+/**
+ * DeserializationSchema that deserializes a JSON String into an ObjectNode.
+ * <p>
+ * Fields can be accessed by calling objectNode.get(&lt;name>).as(&lt;type>)
+ */
+public class JSONDeserializationSchema extends 
AbstractDeserializationSchema<ObjectNode> {
+       private ObjectMapper mapper;
+
+       @Override
+       public ObjectNode deserialize(byte[] message) throws IOException {
+               if (mapper == null) {
+                       mapper = new ObjectMapper();
+               }
+               return mapper.readValue(message, ObjectNode.class);
+       }
+
+       @Override
+       public boolean isEndOfStream(ObjectNode nextElement) {
+               return false;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
new file mode 100644
index 0000000..261a111
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.IOException;
+
+import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
+
+/**
+ * DeserializationSchema that deserializes a JSON String into an ObjectNode.
+ * <p>
+ * Key fields can be accessed by calling 
objectNode.get("key").get(&lt;name>).as(&lt;type>)
+ * <p>
+ * Value fields can be accessed by calling 
objectNode.get("value").get(&lt;name>).as(&lt;type>)
+ * <p>
+ * Metadata fields can be accessed by calling 
objectNode.get("metadata").get(&lt;name>).as(&lt;type>) and include
+ * the "offset" (long), "topic" (String) and "partition" (int).
+ */
+public class JSONKeyValueDeserializationSchema implements 
KeyedDeserializationSchema<ObjectNode> {
+       private final boolean includeMetadata;
+       private ObjectMapper mapper;
+
+       public JSONKeyValueDeserializationSchema(boolean includeMetadata) {
+               this.includeMetadata = includeMetadata;
+       }
+
+       @Override
+       public ObjectNode deserialize(byte[] messageKey, byte[] message, String 
topic, int partition, long offset) throws IOException {
+               if (mapper == null) {
+                       mapper = new ObjectMapper();
+               }
+               ObjectNode node = mapper.createObjectNode();
+               node.set("key", mapper.readValue(messageKey, JsonNode.class));
+               node.set("value", mapper.readValue(message, JsonNode.class));
+               if (includeMetadata) {
+                       node.putObject("metadata")
+                               .put("offset", offset)
+                               .put("topic", topic)
+                               .put("partition", partition);
+               }
+               return node;
+       }
+
+       @Override
+       public boolean isEndOfStream(ObjectNode nextElement) {
+               return false;
+       }
+
+       @Override
+       public TypeInformation<ObjectNode> getProducedType() {
+               return getForClass(ObjectNode.class);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
new file mode 100644
index 0000000..4344810
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Deserialization schema from JSON to {@link Row}.
+ *
+ * <p>Deserializes the <code>byte[]</code> messages as a JSON object and reads
+ * the specified fields.
+ *
+ * <p>Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+public class JsonRowDeserializationSchema implements 
DeserializationSchema<Row> {
+
+       /** Field names to parse. Indices match fieldTypes indices. */
+       private final String[] fieldNames;
+
+       /** Types to parse fields as. Indices match fieldNames indices. */
+       private final TypeInformation<?>[] fieldTypes;
+
+       /** Object mapper for parsing the JSON. */
+       private final ObjectMapper objectMapper = new ObjectMapper();
+
+       /** Flag indicating whether to fail on a missing field. */
+       private boolean failOnMissingField;
+
+       /**
+        * Creates a JSON deserialization schema for the given fields and type 
classes.
+        *
+        * @param fieldNames Names of JSON fields to parse.
+        * @param fieldTypes Type classes to parse JSON fields as.
+        */
+       public JsonRowDeserializationSchema(String[] fieldNames, Class<?>[] 
fieldTypes) {
+               this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field 
names");
+
+               this.fieldTypes = new TypeInformation[fieldTypes.length];
+               for (int i = 0; i < fieldTypes.length; i++) {
+                       this.fieldTypes[i] = 
TypeExtractor.getForClass(fieldTypes[i]);
+               }
+
+               Preconditions.checkArgument(fieldNames.length == 
fieldTypes.length,
+                               "Number of provided field names and types does 
not match.");
+       }
+
+       /**
+        * Creates a JSON deserialization schema for the given fields and types.
+        *
+        * @param fieldNames Names of JSON fields to parse.
+        * @param fieldTypes Types to parse JSON fields as.
+        */
+       public JsonRowDeserializationSchema(String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
+               this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field 
names");
+               this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field 
types");
+
+               Preconditions.checkArgument(fieldNames.length == 
fieldTypes.length,
+                               "Number of provided field names and types does 
not match.");
+       }
+
+       @Override
+       public Row deserialize(byte[] message) throws IOException {
+               try {
+                       JsonNode root = objectMapper.readTree(message);
+
+                       Row row = new Row(fieldNames.length);
+                       for (int i = 0; i < fieldNames.length; i++) {
+                               JsonNode node = root.get(fieldNames[i]);
+
+                               if (node == null) {
+                                       if (failOnMissingField) {
+                                               throw new 
IllegalStateException("Failed to find field with name '"
+                                                               + fieldNames[i] 
+ "'.");
+                                       } else {
+                                               row.setField(i, null);
+                                       }
+                               } else {
+                                       // Read the value as specified type
+                                       Object value = 
objectMapper.treeToValue(node, fieldTypes[i].getTypeClass());
+                                       row.setField(i, value);
+                               }
+                       }
+
+                       return row;
+               } catch (Throwable t) {
+                       throw new IOException("Failed to deserialize JSON 
object.", t);
+               }
+       }
+
+       @Override
+       public boolean isEndOfStream(Row nextElement) {
+               return false;
+       }
+
+       @Override
+       public TypeInformation<Row> getProducedType() {
+               return new RowTypeInfo(fieldTypes);
+       }
+
+       /**
+        * Configures the failure behaviour if a JSON field is missing.
+        *
+        * <p>By default, a missing field is ignored and the field is set to 
null.
+        *
+        * @param failOnMissingField Flag indicating whether to fail or not on 
a missing field.
+        */
+       public void setFailOnMissingField(boolean failOnMissingField) {
+               this.failOnMissingField = failOnMissingField;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
new file mode 100644
index 0000000..077ff13
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.util.Preconditions;
+
+
+/**
+ * Serialization schema that serializes an object into a JSON bytes.
+ *
+ * <p>Serializes the input {@link Row} object into a JSON string and
+ * converts it into <code>byte[]</code>.
+ *
+ * <p>Result <code>byte[]</code> messages can be deserialized using
+ * {@link JsonRowDeserializationSchema}.
+ */
+public class JsonRowSerializationSchema implements SerializationSchema<Row> {
+       /** Fields names in the input Row object */
+       private final String[] fieldNames;
+       /** Object mapper that is used to create output JSON objects */
+       private static ObjectMapper mapper = new ObjectMapper();
+
+       /**
+        * Creates a JSON serialization schema for the given fields and types.
+        *
+        * @param fieldNames Names of JSON fields to parse.
+        */
+       public JsonRowSerializationSchema(String[] fieldNames) {
+               this.fieldNames = Preconditions.checkNotNull(fieldNames);
+       }
+
+       @Override
+       public byte[] serialize(Row row) {
+               if (row.productArity() != fieldNames.length) {
+                       throw new IllegalStateException(String.format(
+                               "Number of elements in the row %s is different 
from number of field names: %d", row, fieldNames.length));
+               }
+
+               ObjectNode objectNode = mapper.createObjectNode();
+
+               for (int i = 0; i < row.productArity(); i++) {
+                       JsonNode node = 
mapper.valueToTree(row.productElement(i));
+                       objectNode.set(fieldNames[i], node);
+               }
+
+               try {
+                       return mapper.writeValueAsBytes(objectNode);
+               } catch (Exception e) {
+                       throw new RuntimeException("Failed to serialize row", 
e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
new file mode 100644
index 0000000..01e72ca
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The deserialization schema describes how to turn the byte key / value 
messages delivered by certain
+ * data sources (for example Apache Kafka) into data types (Java/Scala 
objects) that are
+ * processed by Flink.
+ * 
+ * @param <T> The type created by the keyed deserialization schema.
+ */
+public interface KeyedDeserializationSchema<T> extends Serializable, 
ResultTypeQueryable<T> {
+
+       /**
+        * Deserializes the byte message.
+        *
+        * @param messageKey the key as a byte array (null if no key has been 
set)
+        * @param message The message, as a byte array. (null if the message 
was empty or deleted)
+        * @param partition The partition the message has originated from
+        * @param offset the offset of the message in the original source (for 
example the Kafka offset)  @return The deserialized message as an object.
+        */
+       T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset) throws IOException;
+
+       /**
+        * Method to decide whether the element signals the end of the stream. 
If
+        * true is returned the element won't be emitted.
+        * 
+        * @param nextElement The element to test for the end-of-stream signal.
+        * @return True, if the element signals end of stream, false otherwise.
+        */
+       boolean isEndOfStream(T nextElement);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
new file mode 100644
index 0000000..4b9dba2
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.IOException;
+
+/**
+ * A simple wrapper for using the DeserializationSchema with the 
KeyedDeserializationSchema
+ * interface
+ * @param <T> The type created by the deserialization schema.
+ */
+public class KeyedDeserializationSchemaWrapper<T> implements 
KeyedDeserializationSchema<T> {
+
+       private static final long serialVersionUID = 2651665280744549932L;
+
+       private final DeserializationSchema<T> deserializationSchema;
+
+       public KeyedDeserializationSchemaWrapper(DeserializationSchema<T> 
deserializationSchema) {
+               this.deserializationSchema = deserializationSchema;
+       }
+       @Override
+       public T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset) throws IOException {
+               return deserializationSchema.deserialize(message);
+       }
+
+       @Override
+       public boolean isEndOfStream(T nextElement) {
+               return deserializationSchema.isEndOfStream(nextElement);
+       }
+
+       @Override
+       public TypeInformation<T> getProducedType() {
+               return deserializationSchema.getProducedType();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
new file mode 100644
index 0000000..701281e
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.Serializable;
+
+/**
+ * The serialization schema describes how to turn a data object into a 
different serialized
+ * representation. Most data sinks (for example Apache Kafka) require the data 
to be handed
+ * to them in a specific format (for example as byte strings).
+ * 
+ * @param <T> The type to be serialized.
+ */
+public interface KeyedSerializationSchema<T> extends Serializable {
+
+       /**
+        * Serializes the key of the incoming element to a byte array
+        * This method might return null if no key is available.
+        *
+        * @param element The incoming element to be serialized
+        * @return the key of the element as a byte array
+        */
+       byte[] serializeKey(T element);
+
+
+       /**
+        * Serializes the value of the incoming element to a byte array
+        * 
+        * @param element The incoming element to be serialized
+        * @return the value of the element as a byte array
+        */
+       byte[] serializeValue(T element);
+
+       /**
+        * Optional method to determine the target topic for the element
+        *
+        * @param element Incoming element to determine the target topic from
+        * @return null or the target topic
+        */
+       String getTargetTopic(T element);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
new file mode 100644
index 0000000..1b3e486
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+/**
+ * A simple wrapper for using the SerializationSchema with the 
KeyedDeserializationSchema
+ * interface
+ * @param <T> The type to serialize
+ */
+public class KeyedSerializationSchemaWrapper<T> implements 
KeyedSerializationSchema<T> {
+
+       private static final long serialVersionUID = 1351665280744549933L;
+
+       private final SerializationSchema<T> serializationSchema;
+
+       public KeyedSerializationSchemaWrapper(SerializationSchema<T> 
serializationSchema) {
+               this.serializationSchema = serializationSchema;
+       }
+
+       @Override
+       public byte[] serializeKey(T element) {
+               return null;
+       }
+
+       @Override
+       public byte[] serializeValue(T element) {
+               return serializationSchema.serialize(element);
+       }
+
+       @Override
+       public String getTargetTopic(T element) {
+               return null; // we are never overriding the topic
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
new file mode 100644
index 0000000..51bc8d1
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+
+import java.io.IOException;
+
+/**
+ * A serialization and deserialization schema for Key Value Pairs that uses 
Flink's serialization stack to
+ * transform typed from and to byte arrays.
+ * 
+ * @param <K> The key type to be serialized.
+ * @param <V> The value type to be serialized.
+ */
+public class TypeInformationKeyValueSerializationSchema<K, V> implements 
KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K,V>> 
{
+
+       private static final long serialVersionUID = -5359448468131559102L;
+
+       /** The serializer for the key */
+       private final TypeSerializer<K> keySerializer;
+
+       /** The serializer for the value */
+       private final TypeSerializer<V> valueSerializer;
+
+       /** reusable input deserialization buffer */
+       private final DataInputDeserializer inputDeserializer;
+       
+       /** reusable output serialization buffer for the key */
+       private transient DataOutputSerializer keyOutputSerializer;
+
+       /** reusable output serialization buffer for the value */
+       private transient DataOutputSerializer valueOutputSerializer;
+       
+       
+       /** The type information, to be returned by {@link #getProducedType()}. 
It is
+        * transient, because it is not serializable. Note that this means that 
the type information
+        * is not available at runtime, but only prior to the first 
serialization / deserialization */
+       private final transient TypeInformation<Tuple2<K, V>> typeInfo;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new de-/serialization schema for the given types.
+        *
+        * @param keyTypeInfo The type information for the key type 
de-/serialized by this schema.
+        * @param valueTypeInfo The type information for the value type 
de-/serialized by this schema.
+        * @param ec The execution config, which is used to parametrize the 
type serializers.
+        */
+       public TypeInformationKeyValueSerializationSchema(TypeInformation<K> 
keyTypeInfo, TypeInformation<V> valueTypeInfo, ExecutionConfig ec) {
+               this.typeInfo = new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo);
+               this.keySerializer = keyTypeInfo.createSerializer(ec);
+               this.valueSerializer = valueTypeInfo.createSerializer(ec);
+               this.inputDeserializer = new DataInputDeserializer();
+       }
+
+       /**
+        * Creates a new de-/serialization schema for the given types. This 
constructor accepts the types
+        * as classes and internally constructs the type information from the 
classes.
+        * 
+        * <p>If the types are parametrized and cannot be fully defined via 
classes, use the constructor
+        * that accepts {@link TypeInformation} instead.
+        * 
+        * @param keyClass The class of the key de-/serialized by this schema.
+        * @param valueClass The class of the value de-/serialized by this 
schema.
+        * @param config The execution config, which is used to parametrize the 
type serializers.
+        */
+       public TypeInformationKeyValueSerializationSchema(Class<K> keyClass, 
Class<V> valueClass, ExecutionConfig config) {
+               this(TypeExtractor.createTypeInfo(keyClass), 
TypeExtractor.createTypeInfo(valueClass), config);
+       }
+
+       // 
------------------------------------------------------------------------
+
+
+       @Override
+       public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, 
String topic, int partition, long offset) throws IOException {
+               K key = null;
+               V value = null;
+               
+               if (messageKey != null) {
+                       inputDeserializer.setBuffer(messageKey, 0, 
messageKey.length);
+                       key = keySerializer.deserialize(inputDeserializer);
+               }
+               if (message != null) {
+                       inputDeserializer.setBuffer(message, 0, message.length);
+                       value = valueSerializer.deserialize(inputDeserializer);
+               }
+               return new Tuple2<>(key, value);
+       }
+
+       /**
+        * This schema never considers an element to signal end-of-stream, so 
this method returns always false.
+        * @param nextElement The element to test for the end-of-stream signal.
+        * @return Returns false.
+        */
+       @Override
+       public boolean isEndOfStream(Tuple2<K,V> nextElement) {
+               return false;
+       }
+
+
+       @Override
+       public byte[] serializeKey(Tuple2<K, V> element) {
+               if (element.f0 == null) {
+                       return null;
+               } else {
+                       // key is not null. serialize it:
+                       if (keyOutputSerializer == null) {
+                               keyOutputSerializer = new 
DataOutputSerializer(16);
+                       }
+                       try {
+                               keySerializer.serialize(element.f0, 
keyOutputSerializer);
+                       }
+                       catch (IOException e) {
+                               throw new RuntimeException("Unable to serialize 
record", e);
+                       }
+                       // check if key byte array size changed
+                       byte[] res = keyOutputSerializer.getByteArray();
+                       if (res.length != keyOutputSerializer.length()) {
+                               byte[] n = new 
byte[keyOutputSerializer.length()];
+                               System.arraycopy(res, 0, n, 0, 
keyOutputSerializer.length());
+                               res = n;
+                       }
+                       keyOutputSerializer.clear();
+                       return res;
+               }
+       }
+
+       @Override
+       public byte[] serializeValue(Tuple2<K, V> element) {
+               // if the value is null, its serialized value is null as well.
+               if (element.f1 == null) {
+                       return null;
+               }
+
+               if (valueOutputSerializer == null) {
+                       valueOutputSerializer = new DataOutputSerializer(16);
+               }
+
+               try {
+                       valueSerializer.serialize(element.f1, 
valueOutputSerializer);
+               }
+               catch (IOException e) {
+                       throw new RuntimeException("Unable to serialize 
record", e);
+               }
+
+               byte[] res = valueOutputSerializer.getByteArray();
+               if (res.length != valueOutputSerializer.length()) {
+                       byte[] n = new byte[valueOutputSerializer.length()];
+                       System.arraycopy(res, 0, n, 0, 
valueOutputSerializer.length());
+                       res = n;
+               }
+               valueOutputSerializer.clear();
+               return res;
+       }
+
+       @Override
+       public String getTargetTopic(Tuple2<K, V> element) {
+               return null; // we are never overriding the topic
+       }
+
+
+       @Override
+       public TypeInformation<Tuple2<K,V>> getProducedType() {
+               if (typeInfo != null) {
+                       return typeInfo;
+               }
+               else {
+                       throw new IllegalStateException(
+                                       "The type information is not available 
after this class has been serialized and distributed.");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
new file mode 100644
index 0000000..b96ba30
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.SerializedValue;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class FlinkKafkaConsumerBaseTest {
+
+       /**
+        * Tests that not both types of timestamp extractors / watermark 
generators can be used.
+        */
+       @Test
+       public void testEitherWatermarkExtractor() {
+               try {
+                       new 
DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks<Object>)
 null);
+                       fail();
+               } catch (NullPointerException ignored) {}
+
+               try {
+                       new 
DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks<Object>)
 null);
+                       fail();
+               } catch (NullPointerException ignored) {}
+               
+               @SuppressWarnings("unchecked")
+               final AssignerWithPeriodicWatermarks<String> periodicAssigner = 
mock(AssignerWithPeriodicWatermarks.class);
+               @SuppressWarnings("unchecked")
+               final AssignerWithPunctuatedWatermarks<String> 
punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class);
+               
+               DummyFlinkKafkaConsumer<String> c1 = new 
DummyFlinkKafkaConsumer<>();
+               c1.assignTimestampsAndWatermarks(periodicAssigner);
+               try {
+                       c1.assignTimestampsAndWatermarks(punctuatedAssigner);
+                       fail();
+               } catch (IllegalStateException ignored) {}
+
+               DummyFlinkKafkaConsumer<String> c2 = new 
DummyFlinkKafkaConsumer<>();
+               c2.assignTimestampsAndWatermarks(punctuatedAssigner);
+               try {
+                       c2.assignTimestampsAndWatermarks(periodicAssigner);
+                       fail();
+               } catch (IllegalStateException ignored) {}
+       }
+
+       /**
+        * Tests that no checkpoints happen when the fetcher is not running.
+        */
+       @Test
+       public void ignoreCheckpointWhenNotRunning() throws Exception {
+               @SuppressWarnings("unchecked")
+               final AbstractFetcher<String, ?> fetcher = 
mock(AbstractFetcher.class);
+
+               FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, 
new LinkedMap(), false);
+               OperatorStateStore operatorStateStore = 
mock(OperatorStateStore.class);
+               TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = 
new TestingListState<>();
+               
when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
+
+               consumer.snapshotState(new 
StateSnapshotContextSynchronousImpl(1, 1));
+
+               assertFalse(listState.get().iterator().hasNext());
+               consumer.notifyCheckpointComplete(66L);
+       }
+
+       /**
+        * Tests that no checkpoints happen when the fetcher is not running.
+        */
+       @Test
+       public void checkRestoredCheckpointWhenFetcherNotReady() throws 
Exception {
+               OperatorStateStore operatorStateStore = 
mock(OperatorStateStore.class);
+
+               TestingListState<Serializable> listState = new 
TestingListState<>();
+               listState.add(Tuple2.of(new KafkaTopicPartition("abc", 13), 
16768L));
+               listState.add(Tuple2.of(new KafkaTopicPartition("def", 7), 
987654321L));
+
+               FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new 
LinkedMap(), true);
+
+               
when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
+
+               StateInitializationContext initializationContext = 
mock(StateInitializationContext.class);
+
+               
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
+               when(initializationContext.isRestored()).thenReturn(true);
+
+               consumer.initializeState(initializationContext);
+
+               consumer.snapshotState(new 
StateSnapshotContextSynchronousImpl(17, 17));
+
+               // ensure that the list was cleared and refilled. while this is 
an implementation detail, we use it here
+               // to figure out that snapshotState() actually did something.
+               Assert.assertTrue(listState.isClearCalled());
+
+               Set<Serializable> expected = new HashSet<>();
+
+               for (Serializable serializable : listState.get()) {
+                       expected.add(serializable);
+               }
+
+               int counter = 0;
+
+               for (Serializable serializable : listState.get()) {
+                       assertTrue(expected.contains(serializable));
+                       counter++;
+               }
+
+               assertEquals(expected.size(), counter);
+       }
+
+       /**
+        * Tests that no checkpoints happen when the fetcher is not running.
+        */
+       @Test
+       public void checkRestoredNullCheckpointWhenFetcherNotReady() throws 
Exception {
+               FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new 
LinkedMap(), true);
+
+               OperatorStateStore operatorStateStore = 
mock(OperatorStateStore.class);
+               TestingListState<Serializable> listState = new 
TestingListState<>();
+               
when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
+
+               StateInitializationContext initializationContext = 
mock(StateInitializationContext.class);
+
+               
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
+               when(initializationContext.isRestored()).thenReturn(false);
+
+               consumer.initializeState(initializationContext);
+
+               consumer.snapshotState(new 
StateSnapshotContextSynchronousImpl(17, 17));
+
+               assertFalse(listState.get().iterator().hasNext());
+       }
+
+       /**
+        * Tests that on snapshots, states and offsets to commit to Kafka are 
correct
+        */
+       @Test
+       public void checkUseFetcherWhenNoCheckpoint() throws Exception {
+
+               FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new 
LinkedMap(), true);
+               List<KafkaTopicPartition> partitionList = new ArrayList<>(1);
+               partitionList.add(new KafkaTopicPartition("test", 0));
+               consumer.setSubscribedPartitions(partitionList);
+
+               OperatorStateStore operatorStateStore = 
mock(OperatorStateStore.class);
+               TestingListState<Serializable> listState = new 
TestingListState<>();
+               
when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
+
+               StateInitializationContext initializationContext = 
mock(StateInitializationContext.class);
+
+               
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
+
+               // make the context signal that there is no restored state, 
then validate that
+               when(initializationContext.isRestored()).thenReturn(false);
+               consumer.initializeState(initializationContext);
+               consumer.run(mock(SourceFunction.SourceContext.class));
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testSnapshotState() throws Exception {
+
+               // 
--------------------------------------------------------------------
+               //   prepare fake states
+               // 
--------------------------------------------------------------------
+
+               final HashMap<KafkaTopicPartition, Long> state1 = new 
HashMap<>();
+               state1.put(new KafkaTopicPartition("abc", 13), 16768L);
+               state1.put(new KafkaTopicPartition("def", 7), 987654321L);
+
+               final HashMap<KafkaTopicPartition, Long> state2 = new 
HashMap<>();
+               state2.put(new KafkaTopicPartition("abc", 13), 16770L);
+               state2.put(new KafkaTopicPartition("def", 7), 987654329L);
+
+               final HashMap<KafkaTopicPartition, Long> state3 = new 
HashMap<>();
+               state3.put(new KafkaTopicPartition("abc", 13), 16780L);
+               state3.put(new KafkaTopicPartition("def", 7), 987654377L);
+
+               // 
--------------------------------------------------------------------
+               
+               final AbstractFetcher<String, ?> fetcher = 
mock(AbstractFetcher.class);
+               when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, 
state3);
+                       
+               final LinkedMap pendingOffsetsToCommit = new LinkedMap();
+       
+               FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, 
pendingOffsetsToCommit, true);
+               assertEquals(0, pendingOffsetsToCommit.size());
+
+               OperatorStateStore backend = mock(OperatorStateStore.class);
+
+               TestingListState<Serializable> listState = new 
TestingListState<>();
+
+               
when(backend.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
+
+               StateInitializationContext initializationContext = 
mock(StateInitializationContext.class);
+
+               
when(initializationContext.getOperatorStateStore()).thenReturn(backend);
+               when(initializationContext.isRestored()).thenReturn(false, 
true, true, true);
+
+               consumer.initializeState(initializationContext);
+
+               // checkpoint 1
+               consumer.snapshotState(new 
StateSnapshotContextSynchronousImpl(138, 138));
+
+               HashMap<KafkaTopicPartition, Long> snapshot1 = new HashMap<>();
+
+               for (Serializable serializable : listState.get()) {
+                       Tuple2<KafkaTopicPartition, Long> 
kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) 
serializable;
+                       snapshot1.put(kafkaTopicPartitionLongTuple2.f0, 
kafkaTopicPartitionLongTuple2.f1);
+               }
+
+               assertEquals(state1, snapshot1);
+               assertEquals(1, pendingOffsetsToCommit.size());
+               assertEquals(state1, pendingOffsetsToCommit.get(138L));
+
+               // checkpoint 2
+               consumer.snapshotState(new 
StateSnapshotContextSynchronousImpl(140, 140));
+
+               HashMap<KafkaTopicPartition, Long> snapshot2 = new HashMap<>();
+
+               for (Serializable serializable : listState.get()) {
+                       Tuple2<KafkaTopicPartition, Long> 
kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) 
serializable;
+                       snapshot2.put(kafkaTopicPartitionLongTuple2.f0, 
kafkaTopicPartitionLongTuple2.f1);
+               }
+
+               assertEquals(state2, snapshot2);
+               assertEquals(2, pendingOffsetsToCommit.size());
+               assertEquals(state2, pendingOffsetsToCommit.get(140L));
+               
+               // ack checkpoint 1
+               consumer.notifyCheckpointComplete(138L);
+               assertEquals(1, pendingOffsetsToCommit.size());
+               assertTrue(pendingOffsetsToCommit.containsKey(140L));
+
+               // checkpoint 3
+               consumer.snapshotState(new 
StateSnapshotContextSynchronousImpl(141, 141));
+
+               HashMap<KafkaTopicPartition, Long> snapshot3 = new HashMap<>();
+
+               for (Serializable serializable : listState.get()) {
+                       Tuple2<KafkaTopicPartition, Long> 
kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) 
serializable;
+                       snapshot3.put(kafkaTopicPartitionLongTuple2.f0, 
kafkaTopicPartitionLongTuple2.f1);
+               }
+
+               assertEquals(state3, snapshot3);
+               assertEquals(2, pendingOffsetsToCommit.size());
+               assertEquals(state3, pendingOffsetsToCommit.get(141L));
+               
+               // ack checkpoint 3, subsumes number 2
+               consumer.notifyCheckpointComplete(141L);
+               assertEquals(0, pendingOffsetsToCommit.size());
+
+
+               consumer.notifyCheckpointComplete(666); // invalid checkpoint
+               assertEquals(0, pendingOffsetsToCommit.size());
+
+               OperatorStateStore operatorStateStore = 
mock(OperatorStateStore.class);
+               listState = new TestingListState<>();
+               
when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
+
+               // create 500 snapshots
+               for (int i = 100; i < 600; i++) {
+                       consumer.snapshotState(new 
StateSnapshotContextSynchronousImpl(i, i));
+                       listState.clear();
+               }
+               
assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, 
pendingOffsetsToCommit.size());
+
+               // commit only the second last
+               consumer.notifyCheckpointComplete(598);
+               assertEquals(1, pendingOffsetsToCommit.size());
+
+               // access invalid checkpoint
+               consumer.notifyCheckpointComplete(590);
+
+               // and the last
+               consumer.notifyCheckpointComplete(599);
+               assertEquals(0, pendingOffsetsToCommit.size());
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static <T> FlinkKafkaConsumerBase<T> getConsumer(
+                       AbstractFetcher<T, ?> fetcher, LinkedMap 
pendingOffsetsToCommit, boolean running) throws Exception
+       {
+               FlinkKafkaConsumerBase<T> consumer = new 
DummyFlinkKafkaConsumer<>();
+
+               Field fetcherField = 
FlinkKafkaConsumerBase.class.getDeclaredField("kafkaFetcher");
+               fetcherField.setAccessible(true);
+               fetcherField.set(consumer, fetcher);
+
+               Field mapField = 
FlinkKafkaConsumerBase.class.getDeclaredField("pendingOffsetsToCommit");
+               mapField.setAccessible(true);
+               mapField.set(consumer, pendingOffsetsToCommit);
+
+               Field runningField = 
FlinkKafkaConsumerBase.class.getDeclaredField("running");
+               runningField.setAccessible(true);
+               runningField.set(consumer, running);
+
+               return consumer;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static class DummyFlinkKafkaConsumer<T> extends 
FlinkKafkaConsumerBase<T> {
+               private static final long serialVersionUID = 1L;
+
+               @SuppressWarnings("unchecked")
+               public DummyFlinkKafkaConsumer() {
+                       super(Arrays.asList("dummy-topic"), 
(KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class));
+               }
+
+               @Override
+               protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> 
sourceContext, List<KafkaTopicPartition> thisSubtaskPartitions, 
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, 
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, 
StreamingRuntimeContext runtimeContext) throws Exception {
+                       AbstractFetcher<T, ?> fetcher = 
mock(AbstractFetcher.class);
+                       doAnswer(new Answer() {
+                               @Override
+                               public Object answer(InvocationOnMock 
invocationOnMock) throws Throwable {
+                                       Assert.fail("Trying to restore offsets 
even though there was no restore state.");
+                                       return null;
+                               }
+                       }).when(fetcher).restoreOffsets(any(HashMap.class));
+                       return fetcher;
+               }
+
+               @Override
+               protected List<KafkaTopicPartition> 
getKafkaPartitions(List<String> topics) {
+                       return Collections.emptyList();
+               }
+
+               @Override
+               public RuntimeContext getRuntimeContext() {
+                       return mock(StreamingRuntimeContext.class);
+               }
+       }
+
+       private static final class TestingListState<T> implements ListState<T> {
+
+               private final List<T> list = new ArrayList<>();
+               private boolean clearCalled = false;
+
+               @Override
+               public void clear() {
+                       list.clear();
+                       clearCalled = true;
+               }
+
+               @Override
+               public Iterable<T> get() throws Exception {
+                       return list;
+               }
+
+               @Override
+               public void add(T value) throws Exception {
+                       list.add(value);
+               }
+
+               public List<T> getList() {
+                       return list;
+               }
+
+               public boolean isClearCalled() {
+                       return clearCalled;
+               }
+       }
+}

Reply via email to