http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
new file mode 100644
index 0000000..1168b27
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A serializable representation of a kafka topic and a partition.
+ * Used as an operator state for the Kafka consumer
+ */
+public class KafkaTopicPartition implements Serializable {
+
+       private static final long serialVersionUID = 722083576322742325L;
+
+       private final String topic;
+       private final int partition;
+       private final int cachedHash;
+
+       public KafkaTopicPartition(String topic, int partition) {
+               this.topic = checkNotNull(topic);
+               this.partition = partition;
+               this.cachedHash = 31 * topic.hashCode() + partition;
+       }
+
+       public String getTopic() {
+               return topic;
+       }
+
+       public int getPartition() {
+               return partition;
+       }
+
+       @Override
+       public String toString() {
+               return "KafkaTopicPartition{" +
+                               "topic='" + topic + '\'' +
+                               ", partition=" + partition +
+                               '}';
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (!(o instanceof KafkaTopicPartition)) {
+                       return false;
+               }
+
+               KafkaTopicPartition that = (KafkaTopicPartition) o;
+
+               if (partition != that.partition) {
+                       return false;
+               }
+               return topic.equals(that.topic);
+       }
+
+       @Override
+       public int hashCode() {
+               return cachedHash;
+       }
+
+
+       // ------------------- Utilities -------------------------------------
+
+       /**
+        * Returns a unique list of topics from the topic partition map
+        *
+        * @param topicPartitionMap A map of KafkaTopicPartition's
+        * @return A unique list of topics from the input map
+        */
+       public static List<String> getTopics(Map<KafkaTopicPartition, ?> 
topicPartitionMap) {
+               HashSet<String> uniqueTopics = new HashSet<>();
+               for (KafkaTopicPartition ktp: topicPartitionMap.keySet()) {
+                       uniqueTopics.add(ktp.getTopic());
+               }
+               return new ArrayList<>(uniqueTopics);
+       }
+
+       public static String toString(Map<KafkaTopicPartition, Long> map) {
+               StringBuilder sb = new StringBuilder();
+               for (Map.Entry<KafkaTopicPartition, Long> p: map.entrySet()) {
+                       KafkaTopicPartition ktp = p.getKey();
+                       
sb.append(ktp.getTopic()).append(":").append(ktp.getPartition()).append("=").append(p.getValue()).append(",
 ");
+               }
+               return sb.toString();
+       }
+
+       /**
+        * Checks whether this partition is contained in the map with 
KafkaTopicPartitionLeaders
+        *
+        * @param map The map of KafkaTopicPartitionLeaders
+        * @return true if the element is contained.
+        */
+       public boolean isContained(Map<KafkaTopicPartitionLeader, ?> map) {
+               for(Map.Entry<KafkaTopicPartitionLeader, ?> entry : 
map.entrySet()) {
+                       if(entry.getKey().getTopicPartition().equals(this)) {
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+       public static List<KafkaTopicPartition> 
convertToPartitionInfo(List<KafkaTopicPartitionLeader> partitionInfos) {
+               List<KafkaTopicPartition> ret = new 
ArrayList<>(partitionInfos.size());
+               for(KafkaTopicPartitionLeader ktpl: partitionInfos) {
+                       ret.add(ktpl.getTopicPartition());
+               }
+               return ret;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
new file mode 100644
index 0000000..8dd9a52
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.kafka.common.Node;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Serializable Topic Partition info with leader Node information.
+ * This class is used at runtime.
+ */
+public class KafkaTopicPartitionLeader implements Serializable {
+
+       private static final long serialVersionUID = 9145855900303748582L;
+
+       private final int leaderId;
+       private final int leaderPort;
+       private final String leaderHost;
+       private final KafkaTopicPartition topicPartition;
+       private final int cachedHash;
+
+       public KafkaTopicPartitionLeader(KafkaTopicPartition topicPartition, 
Node leader) {
+               this.topicPartition = topicPartition;
+               if (leader == null) {
+                       this.leaderId = -1;
+                       this.leaderHost = null;
+                       this.leaderPort = -1;
+               } else {
+                       this.leaderId = leader.id();
+                       this.leaderPort = leader.port();
+                       this.leaderHost = leader.host();
+               }
+               int cachedHash = (leader == null) ? 14 : leader.hashCode();
+               this.cachedHash = 31 * cachedHash + topicPartition.hashCode();
+       }
+
+       public KafkaTopicPartition getTopicPartition() {
+               return topicPartition;
+       }
+
+       public Node getLeader() {
+               if (this.leaderId == -1) {
+                       return null;
+               } else {
+                       return new Node(leaderId, leaderHost, leaderPort);
+               }
+       }
+
+       public static Object toString(List<KafkaTopicPartitionLeader> 
partitions) {
+               StringBuilder sb = new StringBuilder();
+               for (KafkaTopicPartitionLeader p: partitions) {
+                       
sb.append(p.getTopicPartition().getTopic()).append(":").append(p.getTopicPartition().getPartition()).append(",
 ");
+               }
+               return sb.toString();
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (!(o instanceof KafkaTopicPartitionLeader)) {
+                       return false;
+               }
+
+               KafkaTopicPartitionLeader that = (KafkaTopicPartitionLeader) o;
+
+               if (!topicPartition.equals(that.topicPartition)) {
+                       return false;
+               }
+               return leaderId == that.leaderId && leaderPort == 
that.leaderPort && leaderHost.equals(that.leaderHost);
+       }
+
+       @Override
+       public int hashCode() {
+               return cachedHash;
+       }
+
+       @Override
+       public String toString() {
+               return "KafkaTopicPartitionLeader{" +
+                               "leaderId=" + leaderId +
+                               ", leaderPort=" + leaderPort +
+                               ", leaderHost='" + leaderHost + '\'' +
+                               ", topic=" + topicPartition.getTopic() +
+                               ", partition=" + topicPartition.getPartition() +
+                               '}';
+       }
+
+
+       /**
+        * Replaces an existing KafkaTopicPartition ignoring the leader in the 
given map.
+        *
+        * @param newKey new topicpartition
+        * @param newValue new offset
+        * @param map map to do the search in
+        * @return oldValue the old value (offset)
+        */
+       public static Long replaceIgnoringLeader(KafkaTopicPartitionLeader 
newKey, Long newValue, Map<KafkaTopicPartitionLeader, Long> map) {
+               for(Map.Entry<KafkaTopicPartitionLeader, Long> entry: 
map.entrySet()) {
+                       
if(entry.getKey().getTopicPartition().equals(newKey.getTopicPartition())) {
+                               Long oldValue = map.remove(entry.getKey());
+                               if(map.put(newKey, newValue) != null) {
+                                       throw new IllegalStateException("Key 
was not removed before");
+                               }
+                               return oldValue;
+                       }
+               }
+               return null;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java
new file mode 100644
index 0000000..001b6cb
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
+import java.nio.charset.Charset;
+
+/**
+ * Simple ZooKeeper serializer for Strings.
+ */
+public class ZooKeeperStringSerializer implements ZkSerializer {
+
+       private static final Charset CHARSET = Charset.forName("UTF-8");
+       
+       @Override
+       public byte[] serialize(Object data) {
+               if (data instanceof String) {
+                       return ((String) data).getBytes(CHARSET);
+               }
+               else {
+                       throw new 
IllegalArgumentException("ZooKeeperStringSerializer can only serialize 
strings.");
+               }
+       }
+
+       @Override
+       public Object deserialize(byte[] bytes) {
+               if (bytes == null) {
+                       return null;
+               }
+               else {
+                       return new String(bytes, CHARSET);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/AvgKafkaMetricAccumulator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/AvgKafkaMetricAccumulator.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/AvgKafkaMetricAccumulator.java
new file mode 100644
index 0000000..a038711
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/AvgKafkaMetricAccumulator.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.internals.metrics;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.SampledStat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.util.List;
+
+public class AvgKafkaMetricAccumulator extends DefaultKafkaMetricAccumulator {
+       private static final Logger LOG = 
LoggerFactory.getLogger(AvgKafkaMetricAccumulator.class);
+
+       /** The last sum/count before the serialization  **/
+       private AvgSumCount lastSumCount;
+
+       public AvgKafkaMetricAccumulator(KafkaMetric kafkaMetric) {
+               super(kafkaMetric);
+       }
+
+       @Override
+       public void merge(Accumulator<Void, Double> other) {
+               if(!(other instanceof AvgKafkaMetricAccumulator)) {
+                       throw new RuntimeException("Trying to merge 
incompatible accumulators: "+this+" with "+other);
+               }
+               AvgKafkaMetricAccumulator otherMetric = 
(AvgKafkaMetricAccumulator) other;
+
+               AvgSumCount thisAvg;
+               if(this.lastSumCount == null) {
+                       Measurable thisMeasurable = 
DefaultKafkaMetricAccumulator.getMeasurableFromKafkaMetric(this.kafkaMetric);
+                       if (!(thisMeasurable instanceof Avg)) {
+                               throw new RuntimeException("Must be of type 
Avg");
+                       }
+                       thisAvg = getAvgSumCount((Avg) thisMeasurable);
+               } else {
+                       thisAvg = this.lastSumCount;
+               }
+
+               AvgSumCount otherAvg;
+               if(otherMetric.lastSumCount == null) {
+                       Measurable otherMeasurable = 
DefaultKafkaMetricAccumulator.getMeasurableFromKafkaMetric(otherMetric.kafkaMetric);
+                       if(!(otherMeasurable instanceof Avg) ) {
+                               throw new RuntimeException("Must be of type 
Avg");
+                       }
+                       otherAvg = getAvgSumCount((Avg) otherMeasurable);
+               } else {
+                       otherAvg = otherMetric.lastSumCount;
+               }
+
+               thisAvg.count += otherAvg.count;
+               thisAvg.sum += otherAvg.sum;
+               this.mergedValue = thisAvg.sum / thisAvg.count;
+       }
+
+       @Override
+       public Accumulator<Void, Double> clone() {
+               AvgKafkaMetricAccumulator clone = new 
AvgKafkaMetricAccumulator(kafkaMetric);
+               clone.lastSumCount = this.lastSumCount;
+               clone.isMerged = this.isMerged;
+               clone.mergedValue = this.mergedValue;
+               return clone;
+       }
+
+       // ------------ Utilities
+
+       private static class AvgSumCount implements Serializable {
+               double sum;
+               long count;
+
+               @Override
+               public String toString() {
+                       return "AvgSumCount{" +
+                                       "sum=" + sum +
+                                       ", count=" + count +
+                                       ", avg="+(sum/count)+"}";
+               }
+       }
+
+       /**
+        * Extracts sum and count from Avg using reflection
+        *
+        * @param avg Avg SampledStat from Kafka
+        * @return A KV pair with the average's sum and count
+        */
+       private static AvgSumCount getAvgSumCount(Avg avg) {
+               try {
+                       Field samplesField = 
SampledStat.class.getDeclaredField("samples");
+                       Field sampleValue = 
Class.forName("org.apache.kafka.common.metrics.stats.SampledStat$Sample").getDeclaredField("value");
+                       Field sampleEventCount = 
Class.forName("org.apache.kafka.common.metrics.stats.SampledStat$Sample").getDeclaredField("eventCount");
+                       samplesField.setAccessible(true);
+                       sampleValue.setAccessible(true);
+                       sampleEventCount.setAccessible(true);
+                       List samples = (List) samplesField.get(avg);
+                       AvgSumCount res = new AvgSumCount();
+                       for(int i = 0; i < samples.size(); i++) {
+                               res.sum += 
(double)sampleValue.get(samples.get(i));
+                               res.count += 
(long)sampleEventCount.get(samples.get(i));
+                       }
+                       return res;
+               } catch(Throwable t) {
+                       throw new RuntimeException("Unable to extract sum and 
count from Avg using reflection. " +
+                                       "You can turn off the metrics from 
Flink's Kafka connector if this issue persists.", t);
+               }
+       }
+
+       private void writeObject(ObjectOutputStream out) throws IOException {
+               Measurable thisMeasurable = 
DefaultKafkaMetricAccumulator.getMeasurableFromKafkaMetric(this.kafkaMetric);
+               if(!(thisMeasurable instanceof Avg) ) {
+                       throw new RuntimeException("Must be of type Avg");
+               }
+               this.lastSumCount = getAvgSumCount((Avg) thisMeasurable);
+               out.defaultWriteObject();
+       }
+
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               in.defaultReadObject();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/DefaultKafkaMetricAccumulator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/DefaultKafkaMetricAccumulator.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/DefaultKafkaMetricAccumulator.java
new file mode 100644
index 0000000..06b7930
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/DefaultKafkaMetricAccumulator.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals.metrics;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Field;
+
+public class DefaultKafkaMetricAccumulator implements Accumulator<Void, 
Double>, Serializable {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultKafkaMetricAccumulator.class);
+
+       protected boolean isMerged = false;
+       protected double mergedValue;
+       protected transient KafkaMetric kafkaMetric;
+
+
+       public static DefaultKafkaMetricAccumulator createFor(Metric metric) {
+               if(!(metric instanceof KafkaMetric)) {
+                       return null;
+               }
+               KafkaMetric kafkaMetric = (KafkaMetric) metric;
+               Measurable measurable = 
getMeasurableFromKafkaMetric(kafkaMetric);
+               if(measurable == null) {
+                       return null;
+               }
+               if (measurable instanceof Max) {
+                       return new MaxKafkaMetricAccumulator(kafkaMetric);
+               } else if (measurable instanceof Min) {
+                       return new MinKafkaMetricAccumulator(kafkaMetric);
+               } else if (measurable instanceof Avg) {
+                       return new AvgKafkaMetricAccumulator(kafkaMetric);
+               } else {
+                       // fallback accumulator. works for Rate, Total, Count.
+                       return new DefaultKafkaMetricAccumulator(kafkaMetric);
+               }
+       }
+
+       /**
+        * This utility method is using reflection to get the Measurable from 
the KafkaMetric.
+        * Since Kafka 0.9, Kafka is exposing the Measurable properly, but 
Kafka 0.8.2 does not yet expose it.
+        *
+        * @param kafkaMetric the metric to extract the field form
+        * @return Measurable type (or null in case of an error)
+        */
+       protected static Measurable getMeasurableFromKafkaMetric(KafkaMetric 
kafkaMetric) {
+               try {
+                       Field measurableField = 
kafkaMetric.getClass().getDeclaredField("measurable");
+                       measurableField.setAccessible(true);
+                       return (Measurable) measurableField.get(kafkaMetric);
+               } catch (Throwable e) {
+                       LOG.warn("Unable to initialize Kafka metric: " + 
kafkaMetric, e);
+                       return null;
+               }
+       }
+
+
+       DefaultKafkaMetricAccumulator(KafkaMetric kafkaMetric) {
+               this.kafkaMetric = kafkaMetric;
+       }
+
+       @Override
+       public void add(Void value) {
+               // noop
+       }
+
+       @Override
+       public Double getLocalValue() {
+               if(isMerged && kafkaMetric == null) {
+                       return mergedValue;
+               }
+               return kafkaMetric.value();
+       }
+
+       @Override
+       public void resetLocal() {
+               // noop
+       }
+
+       @Override
+       public void merge(Accumulator<Void, Double> other) {
+               if(!(other instanceof DefaultKafkaMetricAccumulator)) {
+                       throw new RuntimeException("Trying to merge 
incompatible accumulators");
+               }
+               DefaultKafkaMetricAccumulator otherMetric = 
(DefaultKafkaMetricAccumulator) other;
+               if(this.isMerged) {
+                       if(otherMetric.isMerged) {
+                               this.mergedValue += otherMetric.mergedValue;
+                       } else {
+                               this.mergedValue += otherMetric.getLocalValue();
+                       }
+               } else {
+                       this.isMerged = true;
+                       if(otherMetric.isMerged) {
+                               this.mergedValue = this.getLocalValue() + 
otherMetric.mergedValue;
+                       } else {
+                               this.mergedValue = this.getLocalValue() + 
otherMetric.getLocalValue();
+                       }
+
+               }
+       }
+
+       @Override
+       public Accumulator<Void, Double> clone() {
+               DefaultKafkaMetricAccumulator clone = new 
DefaultKafkaMetricAccumulator(this.kafkaMetric);
+               clone.isMerged = this.isMerged;
+               clone.mergedValue = this.mergedValue;
+               return clone;
+       }
+
+       @Override
+       public String toString() {
+               if(isMerged) {
+                       return Double.toString(mergedValue);
+               }
+               if(kafkaMetric == null) {
+                       return "null";
+               }
+               return Double.toString(kafkaMetric.value());
+       }
+
+       // -------- custom serialization methods
+       private void writeObject(ObjectOutputStream out) throws IOException {
+               this.isMerged = true;
+               this.mergedValue = kafkaMetric.value();
+               out.defaultWriteObject();
+       }
+
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               in.defaultReadObject();
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MaxKafkaMetricAccumulator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MaxKafkaMetricAccumulator.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MaxKafkaMetricAccumulator.java
new file mode 100644
index 0000000..c1770ff
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MaxKafkaMetricAccumulator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.internals.metrics;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.kafka.common.metrics.KafkaMetric;
+
+
+public class MaxKafkaMetricAccumulator extends DefaultKafkaMetricAccumulator {
+       public MaxKafkaMetricAccumulator(KafkaMetric kafkaMetric) {
+               super(kafkaMetric);
+       }
+
+       @Override
+       public void merge(Accumulator<Void, Double> other) {
+               if(!(other instanceof MaxKafkaMetricAccumulator)) {
+                       throw new RuntimeException("Trying to merge 
incompatible accumulators");
+               }
+               MaxKafkaMetricAccumulator otherMetric = 
(MaxKafkaMetricAccumulator) other;
+               if(this.isMerged) {
+                       if(otherMetric.isMerged) {
+                               this.mergedValue = Math.max(this.mergedValue, 
otherMetric.mergedValue);
+                       } else {
+                               this.mergedValue = Math.max(this.mergedValue, 
otherMetric.getLocalValue());
+                       }
+               } else {
+                       this.isMerged = true;
+                       if(otherMetric.isMerged) {
+                               this.mergedValue = 
Math.max(this.getLocalValue(), otherMetric.mergedValue);
+                       } else {
+                               this.mergedValue = 
Math.max(this.getLocalValue(), otherMetric.getLocalValue());
+                       }
+               }
+       }
+
+       @Override
+       public Accumulator<Void, Double> clone() {
+               MaxKafkaMetricAccumulator clone = new 
MaxKafkaMetricAccumulator(this.kafkaMetric);
+               clone.isMerged = this.isMerged;
+               clone.mergedValue = this.mergedValue;
+               return clone;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MinKafkaMetricAccumulator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MinKafkaMetricAccumulator.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MinKafkaMetricAccumulator.java
new file mode 100644
index 0000000..4794893
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MinKafkaMetricAccumulator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.internals.metrics;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.kafka.common.metrics.KafkaMetric;
+
+public class MinKafkaMetricAccumulator extends DefaultKafkaMetricAccumulator {
+
+       public MinKafkaMetricAccumulator(KafkaMetric kafkaMetric) {
+               super(kafkaMetric);
+       }
+
+       @Override
+       public void merge(Accumulator<Void, Double> other) {
+               if(!(other instanceof MinKafkaMetricAccumulator)) {
+                       throw new RuntimeException("Trying to merge 
incompatible accumulators");
+               }
+               MinKafkaMetricAccumulator otherMetric = 
(MinKafkaMetricAccumulator) other;
+               if(this.isMerged) {
+                       if(otherMetric.isMerged) {
+                               this.mergedValue = Math.min(this.mergedValue, 
otherMetric.mergedValue);
+                       } else {
+                               this.mergedValue = Math.min(this.mergedValue, 
otherMetric.getLocalValue());
+                       }
+               } else {
+                       this.isMerged = true;
+                       if(otherMetric.isMerged) {
+                               this.mergedValue = 
Math.min(this.getLocalValue(), otherMetric.mergedValue);
+                       } else {
+                               this.mergedValue = 
Math.min(this.getLocalValue(), otherMetric.getLocalValue());
+                       }
+               }
+       }
+
+       @Override
+       public Accumulator<Void, Double> clone() {
+               MinKafkaMetricAccumulator clone = new 
MinKafkaMetricAccumulator(this.kafkaMetric);
+               clone.isMerged = this.isMerged;
+               clone.mergedValue = this.mergedValue;
+               return clone;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
new file mode 100644
index 0000000..d9dcfc1
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+import java.io.Serializable;
+
+/**
+ * A partitioner ensuring that each internal Flink partition ends up in one 
Kafka partition.
+ *
+ * Note, one Kafka partition can contain multiple Flink partitions.
+ *
+ * Cases:
+ *     # More Flink partitions than kafka partitions
+ * <pre>
+ *             Flink Sinks:            Kafka Partitions
+ *                     1       ----------------&gt;    1
+ *                     2   --------------/
+ *                     3   -------------/
+ *                     4       ------------/
+ * </pre>
+ * Some (or all) kafka partitions contain the output of more than one flink 
partition
+ *
+ *# Fewer Flink partitions than Kafka
+ * <pre>
+ *             Flink Sinks:            Kafka Partitions
+ *                     1       ----------------&gt;    1
+ *                     2       ----------------&gt;    2
+ *                                                                             
3
+ *                                                                             
4
+ *                                                                             
5
+ * </pre>
+ *
+ *  Not all Kafka partitions contain data
+ *  To avoid such an unbalanced partitioning, use a round-robin kafka 
partitioner. (note that this will
+ *  cause a lot of network connections between all the Flink instances and all 
the Kafka brokers
+ *
+ *
+ */
+public class FixedPartitioner<T> extends KafkaPartitioner<T> implements 
Serializable {
+       private static final long serialVersionUID = 1627268846962918126L;
+
+       int targetPartition = -1;
+
+       @Override
+       public void open(int parallelInstanceId, int parallelInstances, int[] 
partitions) {
+               int p = 0;
+               for (int i = 0; i < parallelInstances; i++) {
+                       if (i == parallelInstanceId) {
+                               targetPartition = partitions[p];
+                               return;
+                       }
+                       if (++p == partitions.length) {
+                               p = 0;
+                       }
+               }
+       }
+
+       @Override
+       public int partition(T next, byte[] serializedKey, byte[] 
serializedValue, int numPartitions) {
+               if (targetPartition == -1) {
+                       throw new RuntimeException("The partitioner has not 
been initialized properly");
+               }
+               return targetPartition;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
new file mode 100644
index 0000000..038f414
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+
+import java.io.Serializable;
+
+/**
+ * It contains a open() method which is called on each parallel instance.
+ * Partitioners must be serializable!
+ */
+public abstract class KafkaPartitioner<T> implements Serializable {
+
+       private static final long serialVersionUID = -1974260817778593473L;
+
+       /**
+        * Initializer for the Partitioner.
+        * @param parallelInstanceId 0-indexed id of the parallel instance in 
Flink
+        * @param parallelInstances the total number of parallel instances
+        * @param partitions an array describing the partition IDs of the 
available Kafka partitions.
+        */
+       public void open(int parallelInstanceId, int parallelInstances, int[] 
partitions) {
+               // overwrite this method if needed.
+       }
+
+       public abstract int partition(T next, byte[] serializedKey, byte[] 
serializedValue, int numPartitions);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
new file mode 100644
index 0000000..01e72ca
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The deserialization schema describes how to turn the byte key / value 
messages delivered by certain
+ * data sources (for example Apache Kafka) into data types (Java/Scala 
objects) that are
+ * processed by Flink.
+ * 
+ * @param <T> The type created by the keyed deserialization schema.
+ */
+public interface KeyedDeserializationSchema<T> extends Serializable, 
ResultTypeQueryable<T> {
+
+       /**
+        * Deserializes the byte message.
+        *
+        * @param messageKey the key as a byte array (null if no key has been 
set)
+        * @param message The message, as a byte array. (null if the message 
was empty or deleted)
+        * @param partition The partition the message has originated from
+        * @param offset the offset of the message in the original source (for 
example the Kafka offset)  @return The deserialized message as an object.
+        */
+       T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset) throws IOException;
+
+       /**
+        * Method to decide whether the element signals the end of the stream. 
If
+        * true is returned the element won't be emitted.
+        * 
+        * @param nextElement The element to test for the end-of-stream signal.
+        * @return True, if the element signals end of stream, false otherwise.
+        */
+       boolean isEndOfStream(T nextElement);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
new file mode 100644
index 0000000..4b9dba2
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.IOException;
+
+/**
+ * A simple wrapper for using the DeserializationSchema with the 
KeyedDeserializationSchema
+ * interface
+ * @param <T> The type created by the deserialization schema.
+ */
+public class KeyedDeserializationSchemaWrapper<T> implements 
KeyedDeserializationSchema<T> {
+
+       private static final long serialVersionUID = 2651665280744549932L;
+
+       private final DeserializationSchema<T> deserializationSchema;
+
+       public KeyedDeserializationSchemaWrapper(DeserializationSchema<T> 
deserializationSchema) {
+               this.deserializationSchema = deserializationSchema;
+       }
+       @Override
+       public T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset) throws IOException {
+               return deserializationSchema.deserialize(message);
+       }
+
+       @Override
+       public boolean isEndOfStream(T nextElement) {
+               return deserializationSchema.isEndOfStream(nextElement);
+       }
+
+       @Override
+       public TypeInformation<T> getProducedType() {
+               return deserializationSchema.getProducedType();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
new file mode 100644
index 0000000..be3e87e
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.Serializable;
+
+/**
+ * The serialization schema describes how to turn a data object into a 
different serialized
+ * representation. Most data sinks (for example Apache Kafka) require the data 
to be handed
+ * to them in a specific format (for example as byte strings).
+ * 
+ * @param <T> The type to be serialized.
+ */
+public interface KeyedSerializationSchema<T> extends Serializable {
+
+       /**
+        * Serializes the key of the incoming element to a byte array
+        * This method might return null if no key is available.
+        *
+        * @param element The incoming element to be serialized
+        * @return the key of the element as a byte array
+        */
+       byte[] serializeKey(T element);
+
+
+       /**
+        * Serializes the value of the incoming element to a byte array
+        * 
+        * @param element The incoming element to be serialized
+        * @return the value of the element as a byte array
+        */
+       byte[] serializeValue(T element);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
new file mode 100644
index 0000000..a1a8fc0
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+/**
+ * A simple wrapper for using the SerializationSchema with the 
KeyedDeserializationSchema
+ * interface
+ * @param <T> The type to serialize
+ */
+public class KeyedSerializationSchemaWrapper<T> implements 
KeyedSerializationSchema<T> {
+
+       private static final long serialVersionUID = 1351665280744549933L;
+
+       private final SerializationSchema<T> serializationSchema;
+
+       public KeyedSerializationSchemaWrapper(SerializationSchema<T> 
serializationSchema) {
+               this.serializationSchema = serializationSchema;
+       }
+
+       @Override
+       public byte[] serializeKey(T element) {
+               return null;
+       }
+
+       @Override
+       public byte[] serializeValue(T element) {
+               return serializationSchema.serialize(element);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
new file mode 100644
index 0000000..a35c01e
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+
+import java.io.IOException;
+
+/**
+ * A serialization and deserialization schema for Key Value Pairs that uses 
Flink's serialization stack to
+ * transform typed from and to byte arrays.
+ * 
+ * @param <K> The key type to be serialized.
+ * @param <V> The value type to be serialized.
+ */
+public class TypeInformationKeyValueSerializationSchema<K, V> implements 
KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K,V>> 
{
+
+       private static final long serialVersionUID = -5359448468131559102L;
+
+       /** The serializer for the key */
+       private final TypeSerializer<K> keySerializer;
+
+       /** The serializer for the value */
+       private final TypeSerializer<V> valueSerializer;
+
+       /** reusable output serialization buffers */
+       private transient DataOutputSerializer keyOutputSerializer;
+       private transient DataOutputSerializer valueOutputSerializer;
+
+       /** The type information, to be returned by {@link #getProducedType()}. 
It is
+        * transient, because it is not serializable. Note that this means that 
the type information
+        * is not available at runtime, but only prior to the first 
serialization / deserialization */
+       private final transient TypeInformation<Tuple2<K, V>> typeInfo;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new de-/serialization schema for the given types.
+        *
+        * @param keyTypeInfo The type information for the key type 
de-/serialized by this schema.
+        * @param valueTypeInfo The type information for the value type 
de-/serialized by this schema.
+        * @param ec The execution config, which is used to parametrize the 
type serializers.
+        */
+       public TypeInformationKeyValueSerializationSchema(TypeInformation<K> 
keyTypeInfo, TypeInformation<V> valueTypeInfo, ExecutionConfig ec) {
+               this.typeInfo = new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo);
+               this.keySerializer = keyTypeInfo.createSerializer(ec);
+               this.valueSerializer = valueTypeInfo.createSerializer(ec);
+       }
+
+       public TypeInformationKeyValueSerializationSchema(Class<K> keyClass, 
Class<V> valueClass, ExecutionConfig config) {
+               //noinspection unchecked
+               this( (TypeInformation<K>) 
TypeExtractor.createTypeInfo(keyClass), (TypeInformation<V>) 
TypeExtractor.createTypeInfo(valueClass), config);
+       }
+
+       // 
------------------------------------------------------------------------
+
+
+       @Override
+       public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, 
String topic, int partition, long offset) throws IOException {
+               K key = null;
+               if(messageKey != null) {
+                       key = keySerializer.deserialize(new 
ByteArrayInputView(messageKey));
+               }
+               V value = null;
+               if(message != null) {
+                       value = valueSerializer.deserialize(new 
ByteArrayInputView(message));
+               }
+               return new Tuple2<>(key, value);
+       }
+
+       /**
+        * This schema never considers an element to signal end-of-stream, so 
this method returns always false.
+        * @param nextElement The element to test for the end-of-stream signal.
+        * @return Returns false.
+        */
+       @Override
+       public boolean isEndOfStream(Tuple2<K,V> nextElement) {
+               return false;
+       }
+
+
+       @Override
+       public byte[] serializeKey(Tuple2<K, V> element) {
+               if(element.f0 == null) {
+                       return null;
+               } else {
+                       // key is not null. serialize it:
+                       if (keyOutputSerializer == null) {
+                               keyOutputSerializer = new 
DataOutputSerializer(16);
+                       }
+                       try {
+                               keySerializer.serialize(element.f0, 
keyOutputSerializer);
+                       }
+                       catch (IOException e) {
+                               throw new RuntimeException("Unable to serialize 
record", e);
+                       }
+                       // check if key byte array size changed
+                       byte[] res = keyOutputSerializer.getByteArray();
+                       if (res.length != keyOutputSerializer.length()) {
+                               byte[] n = new 
byte[keyOutputSerializer.length()];
+                               System.arraycopy(res, 0, n, 0, 
keyOutputSerializer.length());
+                               res = n;
+                       }
+                       keyOutputSerializer.clear();
+                       return res;
+               }
+       }
+
+       @Override
+       public byte[] serializeValue(Tuple2<K, V> element) {
+               // if the value is null, its serialized value is null as well.
+               if(element.f1 == null) {
+                       return null;
+               }
+
+               if (valueOutputSerializer == null) {
+                       valueOutputSerializer = new DataOutputSerializer(16);
+               }
+
+               try {
+                       valueSerializer.serialize(element.f1, 
valueOutputSerializer);
+               }
+               catch (IOException e) {
+                       throw new RuntimeException("Unable to serialize 
record", e);
+               }
+
+               byte[] res = valueOutputSerializer.getByteArray();
+               if (res.length != valueOutputSerializer.length()) {
+                       byte[] n = new byte[valueOutputSerializer.length()];
+                       System.arraycopy(res, 0, n, 0, 
valueOutputSerializer.length());
+                       res = n;
+               }
+               valueOutputSerializer.clear();
+               return res;
+       }
+
+
+       @Override
+       public TypeInformation<Tuple2<K,V>> getProducedType() {
+               if (typeInfo != null) {
+                       return typeInfo;
+               }
+               else {
+                       throw new IllegalStateException(
+                                       "The type information is not available 
after this class has been serialized and distributed.");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
new file mode 100644
index 0000000..e86d51a
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
+import org.apache.kafka.common.Node;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+
+
+/**
+ * Tests that the partition assignment is deterministic and stable.
+ */
+public class KafkaConsumerPartitionAssignmentTest {
+
+       private final Node fake = new Node(1337, "localhost", 1337);
+
+       @Test
+       public void testPartitionsEqualConsumers() {
+               try {
+                       List<KafkaTopicPartitionLeader> inPartitions = new 
ArrayList<>();
+                       inPartitions.add(new KafkaTopicPartitionLeader(new 
KafkaTopicPartition("test-topic", 4), fake));
+                       inPartitions.add(new KafkaTopicPartitionLeader(new 
KafkaTopicPartition("test-topic", 52), fake));
+                       inPartitions.add(new KafkaTopicPartitionLeader(new 
KafkaTopicPartition("test-topic", 17), fake));
+                       inPartitions.add(new KafkaTopicPartitionLeader(new 
KafkaTopicPartition("test-topic", 1), fake));
+
+                       for (int i = 0; i < inPartitions.size(); i++) {
+                               List<KafkaTopicPartitionLeader> parts = 
FlinkKafkaConsumerBase.assignPartitions(
+                                               inPartitions, 
inPartitions.size(), i);
+
+                               assertNotNull(parts);
+                               assertEquals(1, parts.size());
+                               assertTrue(contains(inPartitions, 
parts.get(0).getTopicPartition().getPartition()));
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       private boolean contains(List<KafkaTopicPartitionLeader> inPartitions, 
int partition) {
+               for (KafkaTopicPartitionLeader ktp: inPartitions) {
+                       if (ktp.getTopicPartition().getPartition() == 
partition) {
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+       @Test
+       public void testMultiplePartitionsPerConsumers() {
+               try {
+                       final int[] partitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 
31, 127, 14};
+
+                       final List<KafkaTopicPartitionLeader> partitions = new 
ArrayList<>();
+                       final Set<KafkaTopicPartitionLeader> allPartitions = 
new HashSet<>();
+
+                       for (int p : partitionIDs) {
+                               KafkaTopicPartitionLeader part = new 
KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", p), fake);
+                               partitions.add(part);
+                               allPartitions.add(part);
+                       }
+
+                       final int numConsumers = 3;
+                       final int minPartitionsPerConsumer = partitions.size() 
/ numConsumers;
+                       final int maxPartitionsPerConsumer = partitions.size() 
/ numConsumers + 1;
+
+                       for (int i = 0; i < numConsumers; i++) {
+                               List<KafkaTopicPartitionLeader> parts = 
FlinkKafkaConsumerBase.assignPartitions(partitions, numConsumers, i);
+
+                               assertNotNull(parts);
+                               assertTrue(parts.size() >= 
minPartitionsPerConsumer);
+                               assertTrue(parts.size() <= 
maxPartitionsPerConsumer);
+
+                               for (KafkaTopicPartitionLeader p : parts) {
+                                       // check that the element was actually 
contained
+                                       assertTrue(allPartitions.remove(p));
+                               }
+                       }
+
+                       // all partitions must have been assigned
+                       assertTrue(allPartitions.isEmpty());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testPartitionsFewerThanConsumers() {
+               try {
+                       List<KafkaTopicPartitionLeader> inPartitions = new 
ArrayList<>();
+                       inPartitions.add(new KafkaTopicPartitionLeader(new 
KafkaTopicPartition("test-topic", 4), fake));
+                       inPartitions.add(new KafkaTopicPartitionLeader(new 
KafkaTopicPartition("test-topic", 52), fake));
+                       inPartitions.add(new KafkaTopicPartitionLeader(new 
KafkaTopicPartition("test-topic", 17), fake));
+                       inPartitions.add(new KafkaTopicPartitionLeader(new 
KafkaTopicPartition("test-topic", 1), fake));
+
+                       final Set<KafkaTopicPartitionLeader> allPartitions = 
new HashSet<>();
+                       allPartitions.addAll(inPartitions);
+
+                       final int numConsumers = 2 * inPartitions.size() + 3;
+
+                       for (int i = 0; i < numConsumers; i++) {
+                               List<KafkaTopicPartitionLeader> parts = 
FlinkKafkaConsumerBase.assignPartitions(inPartitions, numConsumers, i);
+
+                               assertNotNull(parts);
+                               assertTrue(parts.size() <= 1);
+
+                               for (KafkaTopicPartitionLeader p : parts) {
+                                       // check that the element was actually 
contained
+                                       assertTrue(allPartitions.remove(p));
+                               }
+                       }
+
+                       // all partitions must have been assigned
+                       assertTrue(allPartitions.isEmpty());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testAssignEmptyPartitions() {
+               try {
+                       List<KafkaTopicPartitionLeader> ep = new ArrayList<>();
+                       List<KafkaTopicPartitionLeader> parts1 = 
FlinkKafkaConsumerBase.assignPartitions(ep, 4, 2);
+                       assertNotNull(parts1);
+                       assertTrue(parts1.isEmpty());
+
+                       List<KafkaTopicPartitionLeader> parts2 = 
FlinkKafkaConsumerBase.assignPartitions(ep, 1, 0);
+                       assertNotNull(parts2);
+                       assertTrue(parts2.isEmpty());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testGrowingPartitionsRemainsStable() {
+               try {
+                       final int[] newPartitionIDs = {4, 52, 17, 1, 2, 3, 89, 
42, 31, 127, 14};
+                       List<KafkaTopicPartitionLeader> newPartitions = new 
ArrayList<>();
+
+                       for (int p : newPartitionIDs) {
+                               KafkaTopicPartitionLeader part = new 
KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", p), fake);
+                               newPartitions.add(part);
+                       }
+
+                       List<KafkaTopicPartitionLeader> initialPartitions = 
newPartitions.subList(0, 7);
+
+                       final Set<KafkaTopicPartitionLeader> allNewPartitions = 
new HashSet<>(newPartitions);
+                       final Set<KafkaTopicPartitionLeader> 
allInitialPartitions = new HashSet<>(initialPartitions);
+
+                       final int numConsumers = 3;
+                       final int minInitialPartitionsPerConsumer = 
initialPartitions.size() / numConsumers;
+                       final int maxInitialPartitionsPerConsumer = 
initialPartitions.size() / numConsumers + 1;
+                       final int minNewPartitionsPerConsumer = 
newPartitions.size() / numConsumers;
+                       final int maxNewPartitionsPerConsumer = 
newPartitions.size() / numConsumers + 1;
+
+                       List<KafkaTopicPartitionLeader> parts1 = 
FlinkKafkaConsumerBase.assignPartitions(
+                                       initialPartitions, numConsumers, 0);
+                       List<KafkaTopicPartitionLeader> parts2 = 
FlinkKafkaConsumerBase.assignPartitions(
+                                       initialPartitions, numConsumers, 1);
+                       List<KafkaTopicPartitionLeader> parts3 = 
FlinkKafkaConsumerBase.assignPartitions(
+                                       initialPartitions, numConsumers, 2);
+
+                       assertNotNull(parts1);
+                       assertNotNull(parts2);
+                       assertNotNull(parts3);
+
+                       assertTrue(parts1.size() >= 
minInitialPartitionsPerConsumer);
+                       assertTrue(parts1.size() <= 
maxInitialPartitionsPerConsumer);
+                       assertTrue(parts2.size() >= 
minInitialPartitionsPerConsumer);
+                       assertTrue(parts2.size() <= 
maxInitialPartitionsPerConsumer);
+                       assertTrue(parts3.size() >= 
minInitialPartitionsPerConsumer);
+                       assertTrue(parts3.size() <= 
maxInitialPartitionsPerConsumer);
+
+                       for (KafkaTopicPartitionLeader p : parts1) {
+                               // check that the element was actually contained
+                               assertTrue(allInitialPartitions.remove(p));
+                       }
+                       for (KafkaTopicPartitionLeader p : parts2) {
+                               // check that the element was actually contained
+                               assertTrue(allInitialPartitions.remove(p));
+                       }
+                       for (KafkaTopicPartitionLeader p : parts3) {
+                               // check that the element was actually contained
+                               assertTrue(allInitialPartitions.remove(p));
+                       }
+
+                       // all partitions must have been assigned
+                       assertTrue(allInitialPartitions.isEmpty());
+
+                       // grow the set of partitions and distribute anew
+
+                       List<KafkaTopicPartitionLeader> parts1new = 
FlinkKafkaConsumerBase.assignPartitions(
+                                       newPartitions, numConsumers, 0);
+                       List<KafkaTopicPartitionLeader> parts2new = 
FlinkKafkaConsumerBase.assignPartitions(
+                                       newPartitions, numConsumers, 1);
+                       List<KafkaTopicPartitionLeader> parts3new = 
FlinkKafkaConsumerBase.assignPartitions(
+                                       newPartitions, numConsumers, 2);
+
+                       // new partitions must include all old partitions
+
+                       assertTrue(parts1new.size() > parts1.size());
+                       assertTrue(parts2new.size() > parts2.size());
+                       assertTrue(parts3new.size() > parts3.size());
+
+                       assertTrue(parts1new.containsAll(parts1));
+                       assertTrue(parts2new.containsAll(parts2));
+                       assertTrue(parts3new.containsAll(parts3));
+
+                       assertTrue(parts1new.size() >= 
minNewPartitionsPerConsumer);
+                       assertTrue(parts1new.size() <= 
maxNewPartitionsPerConsumer);
+                       assertTrue(parts2new.size() >= 
minNewPartitionsPerConsumer);
+                       assertTrue(parts2new.size() <= 
maxNewPartitionsPerConsumer);
+                       assertTrue(parts3new.size() >= 
minNewPartitionsPerConsumer);
+                       assertTrue(parts3new.size() <= 
maxNewPartitionsPerConsumer);
+
+                       for (KafkaTopicPartitionLeader p : parts1new) {
+                               // check that the element was actually contained
+                               assertTrue(allNewPartitions.remove(p));
+                       }
+                       for (KafkaTopicPartitionLeader p : parts2new) {
+                               // check that the element was actually contained
+                               assertTrue(allNewPartitions.remove(p));
+                       }
+                       for (KafkaTopicPartitionLeader p : parts3new) {
+                               // check that the element was actually contained
+                               assertTrue(allNewPartitions.remove(p));
+                       }
+
+                       // all partitions must have been assigned
+                       assertTrue(allNewPartitions.isEmpty());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+}

Reply via email to