http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java new file mode 100644 index 0000000..1168b27 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A serializable representation of a kafka topic and a partition. + * Used as an operator state for the Kafka consumer + */ +public class KafkaTopicPartition implements Serializable { + + private static final long serialVersionUID = 722083576322742325L; + + private final String topic; + private final int partition; + private final int cachedHash; + + public KafkaTopicPartition(String topic, int partition) { + this.topic = checkNotNull(topic); + this.partition = partition; + this.cachedHash = 31 * topic.hashCode() + partition; + } + + public String getTopic() { + return topic; + } + + public int getPartition() { + return partition; + } + + @Override + public String toString() { + return "KafkaTopicPartition{" + + "topic='" + topic + '\'' + + ", partition=" + partition + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof KafkaTopicPartition)) { + return false; + } + + KafkaTopicPartition that = (KafkaTopicPartition) o; + + if (partition != that.partition) { + return false; + } + return topic.equals(that.topic); + } + + @Override + public int hashCode() { + return cachedHash; + } + + + // ------------------- Utilities ------------------------------------- + + /** + * Returns a unique list of topics from the topic partition map + * + * @param topicPartitionMap A map of KafkaTopicPartition's + * @return A unique list of topics from the input map + */ + public static List<String> getTopics(Map<KafkaTopicPartition, ?> topicPartitionMap) { + HashSet<String> uniqueTopics = new HashSet<>(); + for (KafkaTopicPartition ktp: topicPartitionMap.keySet()) { + uniqueTopics.add(ktp.getTopic()); + } + return new ArrayList<>(uniqueTopics); + } + + public static String toString(Map<KafkaTopicPartition, Long> map) { + StringBuilder sb = new StringBuilder(); + for (Map.Entry<KafkaTopicPartition, Long> p: map.entrySet()) { + KafkaTopicPartition ktp = p.getKey(); + sb.append(ktp.getTopic()).append(":").append(ktp.getPartition()).append("=").append(p.getValue()).append(", "); + } + return sb.toString(); + } + + /** + * Checks whether this partition is contained in the map with KafkaTopicPartitionLeaders + * + * @param map The map of KafkaTopicPartitionLeaders + * @return true if the element is contained. + */ + public boolean isContained(Map<KafkaTopicPartitionLeader, ?> map) { + for(Map.Entry<KafkaTopicPartitionLeader, ?> entry : map.entrySet()) { + if(entry.getKey().getTopicPartition().equals(this)) { + return true; + } + } + return false; + } + + public static List<KafkaTopicPartition> convertToPartitionInfo(List<KafkaTopicPartitionLeader> partitionInfos) { + List<KafkaTopicPartition> ret = new ArrayList<>(partitionInfos.size()); + for(KafkaTopicPartitionLeader ktpl: partitionInfos) { + ret.add(ktpl.getTopicPartition()); + } + return ret; + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java new file mode 100644 index 0000000..8dd9a52 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.apache.kafka.common.Node; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +/** + * Serializable Topic Partition info with leader Node information. + * This class is used at runtime. + */ +public class KafkaTopicPartitionLeader implements Serializable { + + private static final long serialVersionUID = 9145855900303748582L; + + private final int leaderId; + private final int leaderPort; + private final String leaderHost; + private final KafkaTopicPartition topicPartition; + private final int cachedHash; + + public KafkaTopicPartitionLeader(KafkaTopicPartition topicPartition, Node leader) { + this.topicPartition = topicPartition; + if (leader == null) { + this.leaderId = -1; + this.leaderHost = null; + this.leaderPort = -1; + } else { + this.leaderId = leader.id(); + this.leaderPort = leader.port(); + this.leaderHost = leader.host(); + } + int cachedHash = (leader == null) ? 14 : leader.hashCode(); + this.cachedHash = 31 * cachedHash + topicPartition.hashCode(); + } + + public KafkaTopicPartition getTopicPartition() { + return topicPartition; + } + + public Node getLeader() { + if (this.leaderId == -1) { + return null; + } else { + return new Node(leaderId, leaderHost, leaderPort); + } + } + + public static Object toString(List<KafkaTopicPartitionLeader> partitions) { + StringBuilder sb = new StringBuilder(); + for (KafkaTopicPartitionLeader p: partitions) { + sb.append(p.getTopicPartition().getTopic()).append(":").append(p.getTopicPartition().getPartition()).append(", "); + } + return sb.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof KafkaTopicPartitionLeader)) { + return false; + } + + KafkaTopicPartitionLeader that = (KafkaTopicPartitionLeader) o; + + if (!topicPartition.equals(that.topicPartition)) { + return false; + } + return leaderId == that.leaderId && leaderPort == that.leaderPort && leaderHost.equals(that.leaderHost); + } + + @Override + public int hashCode() { + return cachedHash; + } + + @Override + public String toString() { + return "KafkaTopicPartitionLeader{" + + "leaderId=" + leaderId + + ", leaderPort=" + leaderPort + + ", leaderHost='" + leaderHost + '\'' + + ", topic=" + topicPartition.getTopic() + + ", partition=" + topicPartition.getPartition() + + '}'; + } + + + /** + * Replaces an existing KafkaTopicPartition ignoring the leader in the given map. + * + * @param newKey new topicpartition + * @param newValue new offset + * @param map map to do the search in + * @return oldValue the old value (offset) + */ + public static Long replaceIgnoringLeader(KafkaTopicPartitionLeader newKey, Long newValue, Map<KafkaTopicPartitionLeader, Long> map) { + for(Map.Entry<KafkaTopicPartitionLeader, Long> entry: map.entrySet()) { + if(entry.getKey().getTopicPartition().equals(newKey.getTopicPartition())) { + Long oldValue = map.remove(entry.getKey()); + if(map.put(newKey, newValue) != null) { + throw new IllegalStateException("Key was not removed before"); + } + return oldValue; + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java new file mode 100644 index 0000000..001b6cb --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.I0Itec.zkclient.serialize.ZkSerializer; + +import java.nio.charset.Charset; + +/** + * Simple ZooKeeper serializer for Strings. + */ +public class ZooKeeperStringSerializer implements ZkSerializer { + + private static final Charset CHARSET = Charset.forName("UTF-8"); + + @Override + public byte[] serialize(Object data) { + if (data instanceof String) { + return ((String) data).getBytes(CHARSET); + } + else { + throw new IllegalArgumentException("ZooKeeperStringSerializer can only serialize strings."); + } + } + + @Override + public Object deserialize(byte[] bytes) { + if (bytes == null) { + return null; + } + else { + return new String(bytes, CHARSET); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/AvgKafkaMetricAccumulator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/AvgKafkaMetricAccumulator.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/AvgKafkaMetricAccumulator.java new file mode 100644 index 0000000..a038711 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/AvgKafkaMetricAccumulator.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka.internals.metrics; + +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.SampledStat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Field; +import java.util.List; + +public class AvgKafkaMetricAccumulator extends DefaultKafkaMetricAccumulator { + private static final Logger LOG = LoggerFactory.getLogger(AvgKafkaMetricAccumulator.class); + + /** The last sum/count before the serialization **/ + private AvgSumCount lastSumCount; + + public AvgKafkaMetricAccumulator(KafkaMetric kafkaMetric) { + super(kafkaMetric); + } + + @Override + public void merge(Accumulator<Void, Double> other) { + if(!(other instanceof AvgKafkaMetricAccumulator)) { + throw new RuntimeException("Trying to merge incompatible accumulators: "+this+" with "+other); + } + AvgKafkaMetricAccumulator otherMetric = (AvgKafkaMetricAccumulator) other; + + AvgSumCount thisAvg; + if(this.lastSumCount == null) { + Measurable thisMeasurable = DefaultKafkaMetricAccumulator.getMeasurableFromKafkaMetric(this.kafkaMetric); + if (!(thisMeasurable instanceof Avg)) { + throw new RuntimeException("Must be of type Avg"); + } + thisAvg = getAvgSumCount((Avg) thisMeasurable); + } else { + thisAvg = this.lastSumCount; + } + + AvgSumCount otherAvg; + if(otherMetric.lastSumCount == null) { + Measurable otherMeasurable = DefaultKafkaMetricAccumulator.getMeasurableFromKafkaMetric(otherMetric.kafkaMetric); + if(!(otherMeasurable instanceof Avg) ) { + throw new RuntimeException("Must be of type Avg"); + } + otherAvg = getAvgSumCount((Avg) otherMeasurable); + } else { + otherAvg = otherMetric.lastSumCount; + } + + thisAvg.count += otherAvg.count; + thisAvg.sum += otherAvg.sum; + this.mergedValue = thisAvg.sum / thisAvg.count; + } + + @Override + public Accumulator<Void, Double> clone() { + AvgKafkaMetricAccumulator clone = new AvgKafkaMetricAccumulator(kafkaMetric); + clone.lastSumCount = this.lastSumCount; + clone.isMerged = this.isMerged; + clone.mergedValue = this.mergedValue; + return clone; + } + + // ------------ Utilities + + private static class AvgSumCount implements Serializable { + double sum; + long count; + + @Override + public String toString() { + return "AvgSumCount{" + + "sum=" + sum + + ", count=" + count + + ", avg="+(sum/count)+"}"; + } + } + + /** + * Extracts sum and count from Avg using reflection + * + * @param avg Avg SampledStat from Kafka + * @return A KV pair with the average's sum and count + */ + private static AvgSumCount getAvgSumCount(Avg avg) { + try { + Field samplesField = SampledStat.class.getDeclaredField("samples"); + Field sampleValue = Class.forName("org.apache.kafka.common.metrics.stats.SampledStat$Sample").getDeclaredField("value"); + Field sampleEventCount = Class.forName("org.apache.kafka.common.metrics.stats.SampledStat$Sample").getDeclaredField("eventCount"); + samplesField.setAccessible(true); + sampleValue.setAccessible(true); + sampleEventCount.setAccessible(true); + List samples = (List) samplesField.get(avg); + AvgSumCount res = new AvgSumCount(); + for(int i = 0; i < samples.size(); i++) { + res.sum += (double)sampleValue.get(samples.get(i)); + res.count += (long)sampleEventCount.get(samples.get(i)); + } + return res; + } catch(Throwable t) { + throw new RuntimeException("Unable to extract sum and count from Avg using reflection. " + + "You can turn off the metrics from Flink's Kafka connector if this issue persists.", t); + } + } + + private void writeObject(ObjectOutputStream out) throws IOException { + Measurable thisMeasurable = DefaultKafkaMetricAccumulator.getMeasurableFromKafkaMetric(this.kafkaMetric); + if(!(thisMeasurable instanceof Avg) ) { + throw new RuntimeException("Must be of type Avg"); + } + this.lastSumCount = getAvgSumCount((Avg) thisMeasurable); + out.defaultWriteObject(); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/DefaultKafkaMetricAccumulator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/DefaultKafkaMetricAccumulator.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/DefaultKafkaMetricAccumulator.java new file mode 100644 index 0000000..06b7930 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/DefaultKafkaMetricAccumulator.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals.metrics; + +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Min; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Field; + +public class DefaultKafkaMetricAccumulator implements Accumulator<Void, Double>, Serializable { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultKafkaMetricAccumulator.class); + + protected boolean isMerged = false; + protected double mergedValue; + protected transient KafkaMetric kafkaMetric; + + + public static DefaultKafkaMetricAccumulator createFor(Metric metric) { + if(!(metric instanceof KafkaMetric)) { + return null; + } + KafkaMetric kafkaMetric = (KafkaMetric) metric; + Measurable measurable = getMeasurableFromKafkaMetric(kafkaMetric); + if(measurable == null) { + return null; + } + if (measurable instanceof Max) { + return new MaxKafkaMetricAccumulator(kafkaMetric); + } else if (measurable instanceof Min) { + return new MinKafkaMetricAccumulator(kafkaMetric); + } else if (measurable instanceof Avg) { + return new AvgKafkaMetricAccumulator(kafkaMetric); + } else { + // fallback accumulator. works for Rate, Total, Count. + return new DefaultKafkaMetricAccumulator(kafkaMetric); + } + } + + /** + * This utility method is using reflection to get the Measurable from the KafkaMetric. + * Since Kafka 0.9, Kafka is exposing the Measurable properly, but Kafka 0.8.2 does not yet expose it. + * + * @param kafkaMetric the metric to extract the field form + * @return Measurable type (or null in case of an error) + */ + protected static Measurable getMeasurableFromKafkaMetric(KafkaMetric kafkaMetric) { + try { + Field measurableField = kafkaMetric.getClass().getDeclaredField("measurable"); + measurableField.setAccessible(true); + return (Measurable) measurableField.get(kafkaMetric); + } catch (Throwable e) { + LOG.warn("Unable to initialize Kafka metric: " + kafkaMetric, e); + return null; + } + } + + + DefaultKafkaMetricAccumulator(KafkaMetric kafkaMetric) { + this.kafkaMetric = kafkaMetric; + } + + @Override + public void add(Void value) { + // noop + } + + @Override + public Double getLocalValue() { + if(isMerged && kafkaMetric == null) { + return mergedValue; + } + return kafkaMetric.value(); + } + + @Override + public void resetLocal() { + // noop + } + + @Override + public void merge(Accumulator<Void, Double> other) { + if(!(other instanceof DefaultKafkaMetricAccumulator)) { + throw new RuntimeException("Trying to merge incompatible accumulators"); + } + DefaultKafkaMetricAccumulator otherMetric = (DefaultKafkaMetricAccumulator) other; + if(this.isMerged) { + if(otherMetric.isMerged) { + this.mergedValue += otherMetric.mergedValue; + } else { + this.mergedValue += otherMetric.getLocalValue(); + } + } else { + this.isMerged = true; + if(otherMetric.isMerged) { + this.mergedValue = this.getLocalValue() + otherMetric.mergedValue; + } else { + this.mergedValue = this.getLocalValue() + otherMetric.getLocalValue(); + } + + } + } + + @Override + public Accumulator<Void, Double> clone() { + DefaultKafkaMetricAccumulator clone = new DefaultKafkaMetricAccumulator(this.kafkaMetric); + clone.isMerged = this.isMerged; + clone.mergedValue = this.mergedValue; + return clone; + } + + @Override + public String toString() { + if(isMerged) { + return Double.toString(mergedValue); + } + if(kafkaMetric == null) { + return "null"; + } + return Double.toString(kafkaMetric.value()); + } + + // -------- custom serialization methods + private void writeObject(ObjectOutputStream out) throws IOException { + this.isMerged = true; + this.mergedValue = kafkaMetric.value(); + out.defaultWriteObject(); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MaxKafkaMetricAccumulator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MaxKafkaMetricAccumulator.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MaxKafkaMetricAccumulator.java new file mode 100644 index 0000000..c1770ff --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MaxKafkaMetricAccumulator.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka.internals.metrics; + +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.kafka.common.metrics.KafkaMetric; + + +public class MaxKafkaMetricAccumulator extends DefaultKafkaMetricAccumulator { + public MaxKafkaMetricAccumulator(KafkaMetric kafkaMetric) { + super(kafkaMetric); + } + + @Override + public void merge(Accumulator<Void, Double> other) { + if(!(other instanceof MaxKafkaMetricAccumulator)) { + throw new RuntimeException("Trying to merge incompatible accumulators"); + } + MaxKafkaMetricAccumulator otherMetric = (MaxKafkaMetricAccumulator) other; + if(this.isMerged) { + if(otherMetric.isMerged) { + this.mergedValue = Math.max(this.mergedValue, otherMetric.mergedValue); + } else { + this.mergedValue = Math.max(this.mergedValue, otherMetric.getLocalValue()); + } + } else { + this.isMerged = true; + if(otherMetric.isMerged) { + this.mergedValue = Math.max(this.getLocalValue(), otherMetric.mergedValue); + } else { + this.mergedValue = Math.max(this.getLocalValue(), otherMetric.getLocalValue()); + } + } + } + + @Override + public Accumulator<Void, Double> clone() { + MaxKafkaMetricAccumulator clone = new MaxKafkaMetricAccumulator(this.kafkaMetric); + clone.isMerged = this.isMerged; + clone.mergedValue = this.mergedValue; + return clone; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MinKafkaMetricAccumulator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MinKafkaMetricAccumulator.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MinKafkaMetricAccumulator.java new file mode 100644 index 0000000..4794893 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MinKafkaMetricAccumulator.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka.internals.metrics; + +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.kafka.common.metrics.KafkaMetric; + +public class MinKafkaMetricAccumulator extends DefaultKafkaMetricAccumulator { + + public MinKafkaMetricAccumulator(KafkaMetric kafkaMetric) { + super(kafkaMetric); + } + + @Override + public void merge(Accumulator<Void, Double> other) { + if(!(other instanceof MinKafkaMetricAccumulator)) { + throw new RuntimeException("Trying to merge incompatible accumulators"); + } + MinKafkaMetricAccumulator otherMetric = (MinKafkaMetricAccumulator) other; + if(this.isMerged) { + if(otherMetric.isMerged) { + this.mergedValue = Math.min(this.mergedValue, otherMetric.mergedValue); + } else { + this.mergedValue = Math.min(this.mergedValue, otherMetric.getLocalValue()); + } + } else { + this.isMerged = true; + if(otherMetric.isMerged) { + this.mergedValue = Math.min(this.getLocalValue(), otherMetric.mergedValue); + } else { + this.mergedValue = Math.min(this.getLocalValue(), otherMetric.getLocalValue()); + } + } + } + + @Override + public Accumulator<Void, Double> clone() { + MinKafkaMetricAccumulator clone = new MinKafkaMetricAccumulator(this.kafkaMetric); + clone.isMerged = this.isMerged; + clone.mergedValue = this.mergedValue; + return clone; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java new file mode 100644 index 0000000..d9dcfc1 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka.partitioner; + +import java.io.Serializable; + +/** + * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition. + * + * Note, one Kafka partition can contain multiple Flink partitions. + * + * Cases: + * # More Flink partitions than kafka partitions + * <pre> + * Flink Sinks: Kafka Partitions + * 1 ----------------> 1 + * 2 --------------/ + * 3 -------------/ + * 4 ------------/ + * </pre> + * Some (or all) kafka partitions contain the output of more than one flink partition + * + *# Fewer Flink partitions than Kafka + * <pre> + * Flink Sinks: Kafka Partitions + * 1 ----------------> 1 + * 2 ----------------> 2 + * 3 + * 4 + * 5 + * </pre> + * + * Not all Kafka partitions contain data + * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will + * cause a lot of network connections between all the Flink instances and all the Kafka brokers + * + * + */ +public class FixedPartitioner<T> extends KafkaPartitioner<T> implements Serializable { + private static final long serialVersionUID = 1627268846962918126L; + + int targetPartition = -1; + + @Override + public void open(int parallelInstanceId, int parallelInstances, int[] partitions) { + int p = 0; + for (int i = 0; i < parallelInstances; i++) { + if (i == parallelInstanceId) { + targetPartition = partitions[p]; + return; + } + if (++p == partitions.length) { + p = 0; + } + } + } + + @Override + public int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { + if (targetPartition == -1) { + throw new RuntimeException("The partitioner has not been initialized properly"); + } + return targetPartition; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java new file mode 100644 index 0000000..038f414 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka.partitioner; + + +import java.io.Serializable; + +/** + * It contains a open() method which is called on each parallel instance. + * Partitioners must be serializable! + */ +public abstract class KafkaPartitioner<T> implements Serializable { + + private static final long serialVersionUID = -1974260817778593473L; + + /** + * Initializer for the Partitioner. + * @param parallelInstanceId 0-indexed id of the parallel instance in Flink + * @param parallelInstances the total number of parallel instances + * @param partitions an array describing the partition IDs of the available Kafka partitions. + */ + public void open(int parallelInstanceId, int parallelInstances, int[] partitions) { + // overwrite this method if needed. + } + + public abstract int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions); +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java new file mode 100644 index 0000000..01e72ca --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.serialization; + +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; + +import java.io.IOException; +import java.io.Serializable; + +/** + * The deserialization schema describes how to turn the byte key / value messages delivered by certain + * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are + * processed by Flink. + * + * @param <T> The type created by the keyed deserialization schema. + */ +public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> { + + /** + * Deserializes the byte message. + * + * @param messageKey the key as a byte array (null if no key has been set) + * @param message The message, as a byte array. (null if the message was empty or deleted) + * @param partition The partition the message has originated from + * @param offset the offset of the message in the original source (for example the Kafka offset) @return The deserialized message as an object. + */ + T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; + + /** + * Method to decide whether the element signals the end of the stream. If + * true is returned the element won't be emitted. + * + * @param nextElement The element to test for the end-of-stream signal. + * @return True, if the element signals end of stream, false otherwise. + */ + boolean isEndOfStream(T nextElement); +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java new file mode 100644 index 0000000..4b9dba2 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.io.IOException; + +/** + * A simple wrapper for using the DeserializationSchema with the KeyedDeserializationSchema + * interface + * @param <T> The type created by the deserialization schema. + */ +public class KeyedDeserializationSchemaWrapper<T> implements KeyedDeserializationSchema<T> { + + private static final long serialVersionUID = 2651665280744549932L; + + private final DeserializationSchema<T> deserializationSchema; + + public KeyedDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) { + this.deserializationSchema = deserializationSchema; + } + @Override + public T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + return deserializationSchema.deserialize(message); + } + + @Override + public boolean isEndOfStream(T nextElement) { + return deserializationSchema.isEndOfStream(nextElement); + } + + @Override + public TypeInformation<T> getProducedType() { + return deserializationSchema.getProducedType(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java new file mode 100644 index 0000000..be3e87e --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.Serializable; + +/** + * The serialization schema describes how to turn a data object into a different serialized + * representation. Most data sinks (for example Apache Kafka) require the data to be handed + * to them in a specific format (for example as byte strings). + * + * @param <T> The type to be serialized. + */ +public interface KeyedSerializationSchema<T> extends Serializable { + + /** + * Serializes the key of the incoming element to a byte array + * This method might return null if no key is available. + * + * @param element The incoming element to be serialized + * @return the key of the element as a byte array + */ + byte[] serializeKey(T element); + + + /** + * Serializes the value of the incoming element to a byte array + * + * @param element The incoming element to be serialized + * @return the value of the element as a byte array + */ + byte[] serializeValue(T element); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java new file mode 100644 index 0000000..a1a8fc0 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +/** + * A simple wrapper for using the SerializationSchema with the KeyedDeserializationSchema + * interface + * @param <T> The type to serialize + */ +public class KeyedSerializationSchemaWrapper<T> implements KeyedSerializationSchema<T> { + + private static final long serialVersionUID = 1351665280744549933L; + + private final SerializationSchema<T> serializationSchema; + + public KeyedSerializationSchemaWrapper(SerializationSchema<T> serializationSchema) { + this.serializationSchema = serializationSchema; + } + + @Override + public byte[] serializeKey(T element) { + return null; + } + + @Override + public byte[] serializeValue(T element) { + return serializationSchema.serialize(element); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java new file mode 100644 index 0000000..a35c01e --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.serialization; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView; +import org.apache.flink.runtime.util.DataOutputSerializer; + +import java.io.IOException; + +/** + * A serialization and deserialization schema for Key Value Pairs that uses Flink's serialization stack to + * transform typed from and to byte arrays. + * + * @param <K> The key type to be serialized. + * @param <V> The value type to be serialized. + */ +public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K,V>> { + + private static final long serialVersionUID = -5359448468131559102L; + + /** The serializer for the key */ + private final TypeSerializer<K> keySerializer; + + /** The serializer for the value */ + private final TypeSerializer<V> valueSerializer; + + /** reusable output serialization buffers */ + private transient DataOutputSerializer keyOutputSerializer; + private transient DataOutputSerializer valueOutputSerializer; + + /** The type information, to be returned by {@link #getProducedType()}. It is + * transient, because it is not serializable. Note that this means that the type information + * is not available at runtime, but only prior to the first serialization / deserialization */ + private final transient TypeInformation<Tuple2<K, V>> typeInfo; + + // ------------------------------------------------------------------------ + + /** + * Creates a new de-/serialization schema for the given types. + * + * @param keyTypeInfo The type information for the key type de-/serialized by this schema. + * @param valueTypeInfo The type information for the value type de-/serialized by this schema. + * @param ec The execution config, which is used to parametrize the type serializers. + */ + public TypeInformationKeyValueSerializationSchema(TypeInformation<K> keyTypeInfo, TypeInformation<V> valueTypeInfo, ExecutionConfig ec) { + this.typeInfo = new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo); + this.keySerializer = keyTypeInfo.createSerializer(ec); + this.valueSerializer = valueTypeInfo.createSerializer(ec); + } + + public TypeInformationKeyValueSerializationSchema(Class<K> keyClass, Class<V> valueClass, ExecutionConfig config) { + //noinspection unchecked + this( (TypeInformation<K>) TypeExtractor.createTypeInfo(keyClass), (TypeInformation<V>) TypeExtractor.createTypeInfo(valueClass), config); + } + + // ------------------------------------------------------------------------ + + + @Override + public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + K key = null; + if(messageKey != null) { + key = keySerializer.deserialize(new ByteArrayInputView(messageKey)); + } + V value = null; + if(message != null) { + value = valueSerializer.deserialize(new ByteArrayInputView(message)); + } + return new Tuple2<>(key, value); + } + + /** + * This schema never considers an element to signal end-of-stream, so this method returns always false. + * @param nextElement The element to test for the end-of-stream signal. + * @return Returns false. + */ + @Override + public boolean isEndOfStream(Tuple2<K,V> nextElement) { + return false; + } + + + @Override + public byte[] serializeKey(Tuple2<K, V> element) { + if(element.f0 == null) { + return null; + } else { + // key is not null. serialize it: + if (keyOutputSerializer == null) { + keyOutputSerializer = new DataOutputSerializer(16); + } + try { + keySerializer.serialize(element.f0, keyOutputSerializer); + } + catch (IOException e) { + throw new RuntimeException("Unable to serialize record", e); + } + // check if key byte array size changed + byte[] res = keyOutputSerializer.getByteArray(); + if (res.length != keyOutputSerializer.length()) { + byte[] n = new byte[keyOutputSerializer.length()]; + System.arraycopy(res, 0, n, 0, keyOutputSerializer.length()); + res = n; + } + keyOutputSerializer.clear(); + return res; + } + } + + @Override + public byte[] serializeValue(Tuple2<K, V> element) { + // if the value is null, its serialized value is null as well. + if(element.f1 == null) { + return null; + } + + if (valueOutputSerializer == null) { + valueOutputSerializer = new DataOutputSerializer(16); + } + + try { + valueSerializer.serialize(element.f1, valueOutputSerializer); + } + catch (IOException e) { + throw new RuntimeException("Unable to serialize record", e); + } + + byte[] res = valueOutputSerializer.getByteArray(); + if (res.length != valueOutputSerializer.length()) { + byte[] n = new byte[valueOutputSerializer.length()]; + System.arraycopy(res, 0, n, 0, valueOutputSerializer.length()); + res = n; + } + valueOutputSerializer.clear(); + return res; + } + + + @Override + public TypeInformation<Tuple2<K,V>> getProducedType() { + if (typeInfo != null) { + return typeInfo; + } + else { + throw new IllegalStateException( + "The type information is not available after this class has been serialized and distributed."); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java new file mode 100644 index 0000000..e86d51a --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + + +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader; +import org.apache.kafka.common.Node; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.*; + + +/** + * Tests that the partition assignment is deterministic and stable. + */ +public class KafkaConsumerPartitionAssignmentTest { + + private final Node fake = new Node(1337, "localhost", 1337); + + @Test + public void testPartitionsEqualConsumers() { + try { + List<KafkaTopicPartitionLeader> inPartitions = new ArrayList<>(); + inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 4), fake)); + inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 52), fake)); + inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 17), fake)); + inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 1), fake)); + + for (int i = 0; i < inPartitions.size(); i++) { + List<KafkaTopicPartitionLeader> parts = FlinkKafkaConsumerBase.assignPartitions( + inPartitions, inPartitions.size(), i); + + assertNotNull(parts); + assertEquals(1, parts.size()); + assertTrue(contains(inPartitions, parts.get(0).getTopicPartition().getPartition())); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private boolean contains(List<KafkaTopicPartitionLeader> inPartitions, int partition) { + for (KafkaTopicPartitionLeader ktp: inPartitions) { + if (ktp.getTopicPartition().getPartition() == partition) { + return true; + } + } + return false; + } + + @Test + public void testMultiplePartitionsPerConsumers() { + try { + final int[] partitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14}; + + final List<KafkaTopicPartitionLeader> partitions = new ArrayList<>(); + final Set<KafkaTopicPartitionLeader> allPartitions = new HashSet<>(); + + for (int p : partitionIDs) { + KafkaTopicPartitionLeader part = new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", p), fake); + partitions.add(part); + allPartitions.add(part); + } + + final int numConsumers = 3; + final int minPartitionsPerConsumer = partitions.size() / numConsumers; + final int maxPartitionsPerConsumer = partitions.size() / numConsumers + 1; + + for (int i = 0; i < numConsumers; i++) { + List<KafkaTopicPartitionLeader> parts = FlinkKafkaConsumerBase.assignPartitions(partitions, numConsumers, i); + + assertNotNull(parts); + assertTrue(parts.size() >= minPartitionsPerConsumer); + assertTrue(parts.size() <= maxPartitionsPerConsumer); + + for (KafkaTopicPartitionLeader p : parts) { + // check that the element was actually contained + assertTrue(allPartitions.remove(p)); + } + } + + // all partitions must have been assigned + assertTrue(allPartitions.isEmpty()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPartitionsFewerThanConsumers() { + try { + List<KafkaTopicPartitionLeader> inPartitions = new ArrayList<>(); + inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 4), fake)); + inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 52), fake)); + inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 17), fake)); + inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 1), fake)); + + final Set<KafkaTopicPartitionLeader> allPartitions = new HashSet<>(); + allPartitions.addAll(inPartitions); + + final int numConsumers = 2 * inPartitions.size() + 3; + + for (int i = 0; i < numConsumers; i++) { + List<KafkaTopicPartitionLeader> parts = FlinkKafkaConsumerBase.assignPartitions(inPartitions, numConsumers, i); + + assertNotNull(parts); + assertTrue(parts.size() <= 1); + + for (KafkaTopicPartitionLeader p : parts) { + // check that the element was actually contained + assertTrue(allPartitions.remove(p)); + } + } + + // all partitions must have been assigned + assertTrue(allPartitions.isEmpty()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testAssignEmptyPartitions() { + try { + List<KafkaTopicPartitionLeader> ep = new ArrayList<>(); + List<KafkaTopicPartitionLeader> parts1 = FlinkKafkaConsumerBase.assignPartitions(ep, 4, 2); + assertNotNull(parts1); + assertTrue(parts1.isEmpty()); + + List<KafkaTopicPartitionLeader> parts2 = FlinkKafkaConsumerBase.assignPartitions(ep, 1, 0); + assertNotNull(parts2); + assertTrue(parts2.isEmpty()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testGrowingPartitionsRemainsStable() { + try { + final int[] newPartitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14}; + List<KafkaTopicPartitionLeader> newPartitions = new ArrayList<>(); + + for (int p : newPartitionIDs) { + KafkaTopicPartitionLeader part = new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", p), fake); + newPartitions.add(part); + } + + List<KafkaTopicPartitionLeader> initialPartitions = newPartitions.subList(0, 7); + + final Set<KafkaTopicPartitionLeader> allNewPartitions = new HashSet<>(newPartitions); + final Set<KafkaTopicPartitionLeader> allInitialPartitions = new HashSet<>(initialPartitions); + + final int numConsumers = 3; + final int minInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers; + final int maxInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers + 1; + final int minNewPartitionsPerConsumer = newPartitions.size() / numConsumers; + final int maxNewPartitionsPerConsumer = newPartitions.size() / numConsumers + 1; + + List<KafkaTopicPartitionLeader> parts1 = FlinkKafkaConsumerBase.assignPartitions( + initialPartitions, numConsumers, 0); + List<KafkaTopicPartitionLeader> parts2 = FlinkKafkaConsumerBase.assignPartitions( + initialPartitions, numConsumers, 1); + List<KafkaTopicPartitionLeader> parts3 = FlinkKafkaConsumerBase.assignPartitions( + initialPartitions, numConsumers, 2); + + assertNotNull(parts1); + assertNotNull(parts2); + assertNotNull(parts3); + + assertTrue(parts1.size() >= minInitialPartitionsPerConsumer); + assertTrue(parts1.size() <= maxInitialPartitionsPerConsumer); + assertTrue(parts2.size() >= minInitialPartitionsPerConsumer); + assertTrue(parts2.size() <= maxInitialPartitionsPerConsumer); + assertTrue(parts3.size() >= minInitialPartitionsPerConsumer); + assertTrue(parts3.size() <= maxInitialPartitionsPerConsumer); + + for (KafkaTopicPartitionLeader p : parts1) { + // check that the element was actually contained + assertTrue(allInitialPartitions.remove(p)); + } + for (KafkaTopicPartitionLeader p : parts2) { + // check that the element was actually contained + assertTrue(allInitialPartitions.remove(p)); + } + for (KafkaTopicPartitionLeader p : parts3) { + // check that the element was actually contained + assertTrue(allInitialPartitions.remove(p)); + } + + // all partitions must have been assigned + assertTrue(allInitialPartitions.isEmpty()); + + // grow the set of partitions and distribute anew + + List<KafkaTopicPartitionLeader> parts1new = FlinkKafkaConsumerBase.assignPartitions( + newPartitions, numConsumers, 0); + List<KafkaTopicPartitionLeader> parts2new = FlinkKafkaConsumerBase.assignPartitions( + newPartitions, numConsumers, 1); + List<KafkaTopicPartitionLeader> parts3new = FlinkKafkaConsumerBase.assignPartitions( + newPartitions, numConsumers, 2); + + // new partitions must include all old partitions + + assertTrue(parts1new.size() > parts1.size()); + assertTrue(parts2new.size() > parts2.size()); + assertTrue(parts3new.size() > parts3.size()); + + assertTrue(parts1new.containsAll(parts1)); + assertTrue(parts2new.containsAll(parts2)); + assertTrue(parts3new.containsAll(parts3)); + + assertTrue(parts1new.size() >= minNewPartitionsPerConsumer); + assertTrue(parts1new.size() <= maxNewPartitionsPerConsumer); + assertTrue(parts2new.size() >= minNewPartitionsPerConsumer); + assertTrue(parts2new.size() <= maxNewPartitionsPerConsumer); + assertTrue(parts3new.size() >= minNewPartitionsPerConsumer); + assertTrue(parts3new.size() <= maxNewPartitionsPerConsumer); + + for (KafkaTopicPartitionLeader p : parts1new) { + // check that the element was actually contained + assertTrue(allNewPartitions.remove(p)); + } + for (KafkaTopicPartitionLeader p : parts2new) { + // check that the element was actually contained + assertTrue(allNewPartitions.remove(p)); + } + for (KafkaTopicPartitionLeader p : parts3new) { + // check that the element was actually contained + assertTrue(allNewPartitions.remove(p)); + } + + // all partitions must have been assigned + assertTrue(allNewPartitions.isEmpty()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + +}
