http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java new file mode 100644 index 0000000..c68fe28 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink's description of a partition in a Kafka topic. + * Serializable, and common across all Kafka consumer subclasses (0.8, 0.9, ...) + * + * <p>Note: This class must not change in its structure, because it would change the + * serialization format and make previous savepoints unreadable. + */ +public final class KafkaTopicPartition implements Serializable { + + /** THIS SERIAL VERSION UID MUST NOT CHANGE, BECAUSE IT WOULD BREAK + * READING OLD SERIALIZED INSTANCES FROM SAVEPOINTS */ + private static final long serialVersionUID = 722083576322742325L; + + // ------------------------------------------------------------------------ + + private final String topic; + private final int partition; + private final int cachedHash; + + public KafkaTopicPartition(String topic, int partition) { + this.topic = requireNonNull(topic); + this.partition = partition; + this.cachedHash = 31 * topic.hashCode() + partition; + } + + // ------------------------------------------------------------------------ + + public String getTopic() { + return topic; + } + + public int getPartition() { + return partition; + } + + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return "KafkaTopicPartition{" + + "topic='" + topic + '\'' + + ", partition=" + partition + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + else if (o instanceof KafkaTopicPartition) { + KafkaTopicPartition that = (KafkaTopicPartition) o; + return this.partition == that.partition && this.topic.equals(that.topic); + } + else { + return false; + } + } + + @Override + public int hashCode() { + return cachedHash; + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + public static String toString(Map<KafkaTopicPartition, Long> map) { + StringBuilder sb = new StringBuilder(); + for (Map.Entry<KafkaTopicPartition, Long> p: map.entrySet()) { + KafkaTopicPartition ktp = p.getKey(); + sb.append(ktp.getTopic()).append(":").append(ktp.getPartition()).append("=").append(p.getValue()).append(", "); + } + return sb.toString(); + } + + public static String toString(List<KafkaTopicPartition> partitions) { + StringBuilder sb = new StringBuilder(); + for (KafkaTopicPartition p: partitions) { + sb.append(p.getTopic()).append(":").append(p.getPartition()).append(", "); + } + return sb.toString(); + } + + + public static List<KafkaTopicPartition> dropLeaderData(List<KafkaTopicPartitionLeader> partitionInfos) { + List<KafkaTopicPartition> ret = new ArrayList<>(partitionInfos.size()); + for(KafkaTopicPartitionLeader ktpl: partitionInfos) { + ret.add(ktpl.getTopicPartition()); + } + return ret; + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java new file mode 100644 index 0000000..1959a05 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.apache.kafka.common.Node; + +import java.io.Serializable; + +/** + * Serializable Topic Partition info with leader Node information. + * This class is used at runtime. + */ +public class KafkaTopicPartitionLeader implements Serializable { + + private static final long serialVersionUID = 9145855900303748582L; + + private final int leaderId; + private final int leaderPort; + private final String leaderHost; + private final KafkaTopicPartition topicPartition; + private final int cachedHash; + + public KafkaTopicPartitionLeader(KafkaTopicPartition topicPartition, Node leader) { + this.topicPartition = topicPartition; + if (leader == null) { + this.leaderId = -1; + this.leaderHost = null; + this.leaderPort = -1; + } else { + this.leaderId = leader.id(); + this.leaderPort = leader.port(); + this.leaderHost = leader.host(); + } + int cachedHash = (leader == null) ? 14 : leader.hashCode(); + this.cachedHash = 31 * cachedHash + topicPartition.hashCode(); + } + + public KafkaTopicPartition getTopicPartition() { + return topicPartition; + } + + public Node getLeader() { + if (this.leaderId == -1) { + return null; + } else { + return new Node(leaderId, leaderHost, leaderPort); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof KafkaTopicPartitionLeader)) { + return false; + } + + KafkaTopicPartitionLeader that = (KafkaTopicPartitionLeader) o; + + if (!topicPartition.equals(that.topicPartition)) { + return false; + } + return leaderId == that.leaderId && leaderPort == that.leaderPort && leaderHost.equals(that.leaderHost); + } + + @Override + public int hashCode() { + return cachedHash; + } + + @Override + public String toString() { + return "KafkaTopicPartitionLeader{" + + "leaderId=" + leaderId + + ", leaderPort=" + leaderPort + + ", leaderHost='" + leaderHost + '\'' + + ", topic=" + topicPartition.getTopic() + + ", partition=" + topicPartition.getPartition() + + '}'; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java new file mode 100644 index 0000000..7cb5f46 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +/** + * The state that the Flink Kafka Consumer holds for each Kafka partition. + * Includes the Kafka descriptor for partitions. + * + * <p>This class describes the most basic state (only the offset), subclasses + * define more elaborate state, containing current watermarks and timestamp + * extractors. + * + * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions. + */ +public class KafkaTopicPartitionState<KPH> { + + /** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid), + * and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */ + public static final long OFFSET_NOT_SET = -915623761776L; + + // ------------------------------------------------------------------------ + + /** The Flink description of a Kafka partition */ + private final KafkaTopicPartition partition; + + /** The Kafka description of a Kafka partition (varies across different Kafka versions) */ + private final KPH kafkaPartitionHandle; + + /** The offset within the Kafka partition that we already processed */ + private volatile long offset; + + /** The offset of the Kafka partition that has been committed */ + private volatile long committedOffset; + + // ------------------------------------------------------------------------ + + public KafkaTopicPartitionState(KafkaTopicPartition partition, KPH kafkaPartitionHandle) { + this.partition = partition; + this.kafkaPartitionHandle = kafkaPartitionHandle; + this.offset = OFFSET_NOT_SET; + this.committedOffset = OFFSET_NOT_SET; + } + + // ------------------------------------------------------------------------ + + /** + * Gets Flink's descriptor for the Kafka Partition. + * @return The Flink partition descriptor. + */ + public final KafkaTopicPartition getKafkaTopicPartition() { + return partition; + } + + /** + * Gets Kafka's descriptor for the Kafka Partition. + * @return The Kafka partition descriptor. + */ + public final KPH getKafkaPartitionHandle() { + return kafkaPartitionHandle; + } + + public final String getTopic() { + return partition.getTopic(); + } + + public final int getPartition() { + return partition.getPartition(); + } + + /** + * The current offset in the partition. This refers to the offset last element that + * we retrieved and emitted successfully. It is the offset that should be stored in + * a checkpoint. + */ + public final long getOffset() { + return offset; + } + + public final void setOffset(long offset) { + this.offset = offset; + } + + public final boolean isOffsetDefined() { + return offset != OFFSET_NOT_SET; + } + + public final void setCommittedOffset(long offset) { + this.committedOffset = offset; + } + + public final long getCommittedOffset() { + return committedOffset; + } + + + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return "Partition: " + partition + ", KafkaPartitionHandle=" + kafkaPartitionHandle + + ", offset=" + (isOffsetDefined() ? String.valueOf(offset) : "(not set)"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java new file mode 100644 index 0000000..efdc73f --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.watermark.Watermark; + +/** + * A special version of the per-kafka-partition-state that additionally holds + * a periodic watermark generator (and timestamp extractor) per partition. + * + * @param <T> The type of records handled by the watermark generator + * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions. + */ +public final class KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> extends KafkaTopicPartitionState<KPH> { + + /** The timestamp assigner and watermark generator for the partition */ + private final AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks; + + /** The last watermark timestamp generated by this partition */ + private long partitionWatermark; + + // ------------------------------------------------------------------------ + + public KafkaTopicPartitionStateWithPeriodicWatermarks( + KafkaTopicPartition partition, KPH kafkaPartitionHandle, + AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks) + { + super(partition, kafkaPartitionHandle); + + this.timestampsAndWatermarks = timestampsAndWatermarks; + this.partitionWatermark = Long.MIN_VALUE; + } + + // ------------------------------------------------------------------------ + + public long getTimestampForRecord(T record, long kafkaEventTimestamp) { + return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp); + } + + public long getCurrentWatermarkTimestamp() { + Watermark wm = timestampsAndWatermarks.getCurrentWatermark(); + if (wm != null) { + partitionWatermark = Math.max(partitionWatermark, wm.getTimestamp()); + } + return partitionWatermark; + } + + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return "KafkaTopicPartitionStateWithPeriodicWatermarks: partition=" + getKafkaTopicPartition() + + ", offset=" + getOffset() + ", watermark=" + partitionWatermark; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java new file mode 100644 index 0000000..edf40ce --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.watermark.Watermark; + +import javax.annotation.Nullable; + +/** + * A special version of the per-kafka-partition-state that additionally holds + * a periodic watermark generator (and timestamp extractor) per partition. + * + * <p>This class is not thread safe, but it gives volatile access to the current + * partition watermark ({@link #getCurrentPartitionWatermark()}). + * + * @param <T> The type of records handled by the watermark generator + * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions + */ +public final class KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> extends KafkaTopicPartitionState<KPH> { + + /** The timestamp assigner and watermark generator for the partition */ + private final AssignerWithPunctuatedWatermarks<T> timestampsAndWatermarks; + + /** The last watermark timestamp generated by this partition */ + private volatile long partitionWatermark; + + // ------------------------------------------------------------------------ + + public KafkaTopicPartitionStateWithPunctuatedWatermarks( + KafkaTopicPartition partition, KPH kafkaPartitionHandle, + AssignerWithPunctuatedWatermarks<T> timestampsAndWatermarks) + { + super(partition, kafkaPartitionHandle); + + this.timestampsAndWatermarks = timestampsAndWatermarks; + this.partitionWatermark = Long.MIN_VALUE; + } + + // ------------------------------------------------------------------------ + + public long getTimestampForRecord(T record, long kafkaEventTimestamp) { + return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp); + } + + @Nullable + public Watermark checkAndGetNewWatermark(T record, long timestamp) { + Watermark mark = timestampsAndWatermarks.checkAndGetNextWatermark(record, timestamp); + if (mark != null && mark.getTimestamp() > partitionWatermark) { + partitionWatermark = mark.getTimestamp(); + return mark; + } + else { + return null; + } + } + + public long getCurrentPartitionWatermark() { + return partitionWatermark; + } + + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return "KafkaTopicPartitionStateWithPunctuatedWatermarks: partition=" + getKafkaTopicPartition() + + ", offset=" + getOffset() + ", watermark=" + partitionWatermark; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java new file mode 100644 index 0000000..7a41ade --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/TypeUtil.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +public class TypeUtil { + private TypeUtil() {} + + /** + * Creates TypeInformation array for an array of Classes. + * @param fieldTypes classes to extract type information from + * @return type information + */ + public static TypeInformation<?>[] toTypeInfo(Class<?>[] fieldTypes) { + TypeInformation<?>[] typeInfos = new TypeInformation[fieldTypes.length]; + for (int i = 0; i < fieldTypes.length; i++) { + typeInfos[i] = TypeExtractor.getForClass(fieldTypes[i]); + } + return typeInfos; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java new file mode 100644 index 0000000..cedb696 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricWrapper.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals.metrics; + +import org.apache.flink.metrics.Gauge; + +/** + * Gauge for getting the current value of a Kafka metric. + */ +public class KafkaMetricWrapper implements Gauge<Double> { + private final org.apache.kafka.common.Metric kafkaMetric; + + public KafkaMetricWrapper(org.apache.kafka.common.Metric metric) { + this.kafkaMetric = metric; + } + + @Override + public Double getValue() { + return kafkaMetric.value(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java new file mode 100644 index 0000000..9b848e0 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka.partitioner; + +import java.io.Serializable; + +/** + * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition. + * + * Note, one Kafka partition can contain multiple Flink partitions. + * + * Cases: + * # More Flink partitions than kafka partitions + * <pre> + * Flink Sinks: Kafka Partitions + * 1 ----------------> 1 + * 2 --------------/ + * 3 -------------/ + * 4 ------------/ + * </pre> + * Some (or all) kafka partitions contain the output of more than one flink partition + * + *# Fewer Flink partitions than Kafka + * <pre> + * Flink Sinks: Kafka Partitions + * 1 ----------------> 1 + * 2 ----------------> 2 + * 3 + * 4 + * 5 + * </pre> + * + * Not all Kafka partitions contain data + * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will + * cause a lot of network connections between all the Flink instances and all the Kafka brokers + * + * + */ +public class FixedPartitioner<T> extends KafkaPartitioner<T> implements Serializable { + private static final long serialVersionUID = 1627268846962918126L; + + private int targetPartition = -1; + + @Override + public void open(int parallelInstanceId, int parallelInstances, int[] partitions) { + if (parallelInstanceId < 0 || parallelInstances <= 0 || partitions.length == 0) { + throw new IllegalArgumentException(); + } + + this.targetPartition = partitions[parallelInstanceId % partitions.length]; + } + + @Override + public int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { + if (targetPartition >= 0) { + return targetPartition; + } else { + throw new RuntimeException("The partitioner has not been initialized properly"); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java new file mode 100644 index 0000000..37e2ef6 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.partitioner; + +import java.io.Serializable; + +/** + * It contains a open() method which is called on each parallel instance. + * Partitioners must be serializable! + */ +public abstract class KafkaPartitioner<T> implements Serializable { + + private static final long serialVersionUID = -1974260817778593473L; + + /** + * Initializer for the Partitioner. + * @param parallelInstanceId 0-indexed id of the parallel instance in Flink + * @param parallelInstances the total number of parallel instances + * @param partitions an array describing the partition IDs of the available Kafka partitions. + */ + public void open(int parallelInstanceId, int parallelInstances, int[] partitions) { + // overwrite this method if needed. + } + + public abstract int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions); +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java new file mode 100644 index 0000000..d170058 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import java.io.IOException; + + +/** + * DeserializationSchema that deserializes a JSON String into an ObjectNode. + * <p> + * Fields can be accessed by calling objectNode.get(<name>).as(<type>) + */ +public class JSONDeserializationSchema extends AbstractDeserializationSchema<ObjectNode> { + private ObjectMapper mapper; + + @Override + public ObjectNode deserialize(byte[] message) throws IOException { + if (mapper == null) { + mapper = new ObjectMapper(); + } + return mapper.readValue(message, ObjectNode.class); + } + + @Override + public boolean isEndOfStream(ObjectNode nextElement) { + return false; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java new file mode 100644 index 0000000..261a111 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.io.IOException; + +import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass; + +/** + * DeserializationSchema that deserializes a JSON String into an ObjectNode. + * <p> + * Key fields can be accessed by calling objectNode.get("key").get(<name>).as(<type>) + * <p> + * Value fields can be accessed by calling objectNode.get("value").get(<name>).as(<type>) + * <p> + * Metadata fields can be accessed by calling objectNode.get("metadata").get(<name>).as(<type>) and include + * the "offset" (long), "topic" (String) and "partition" (int). + */ +public class JSONKeyValueDeserializationSchema implements KeyedDeserializationSchema<ObjectNode> { + private final boolean includeMetadata; + private ObjectMapper mapper; + + public JSONKeyValueDeserializationSchema(boolean includeMetadata) { + this.includeMetadata = includeMetadata; + } + + @Override + public ObjectNode deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + if (mapper == null) { + mapper = new ObjectMapper(); + } + ObjectNode node = mapper.createObjectNode(); + node.set("key", mapper.readValue(messageKey, JsonNode.class)); + node.set("value", mapper.readValue(message, JsonNode.class)); + if (includeMetadata) { + node.putObject("metadata") + .put("offset", offset) + .put("topic", topic) + .put("partition", partition); + } + return node; + } + + @Override + public boolean isEndOfStream(ObjectNode nextElement) { + return false; + } + + @Override + public TypeInformation<ObjectNode> getProducedType() { + return getForClass(ObjectNode.class); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java new file mode 100644 index 0000000..4344810 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.serialization; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * Deserialization schema from JSON to {@link Row}. + * + * <p>Deserializes the <code>byte[]</code> messages as a JSON object and reads + * the specified fields. + * + * <p>Failure during deserialization are forwarded as wrapped IOExceptions. + */ +public class JsonRowDeserializationSchema implements DeserializationSchema<Row> { + + /** Field names to parse. Indices match fieldTypes indices. */ + private final String[] fieldNames; + + /** Types to parse fields as. Indices match fieldNames indices. */ + private final TypeInformation<?>[] fieldTypes; + + /** Object mapper for parsing the JSON. */ + private final ObjectMapper objectMapper = new ObjectMapper(); + + /** Flag indicating whether to fail on a missing field. */ + private boolean failOnMissingField; + + /** + * Creates a JSON deserialization schema for the given fields and type classes. + * + * @param fieldNames Names of JSON fields to parse. + * @param fieldTypes Type classes to parse JSON fields as. + */ + public JsonRowDeserializationSchema(String[] fieldNames, Class<?>[] fieldTypes) { + this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names"); + + this.fieldTypes = new TypeInformation[fieldTypes.length]; + for (int i = 0; i < fieldTypes.length; i++) { + this.fieldTypes[i] = TypeExtractor.getForClass(fieldTypes[i]); + } + + Preconditions.checkArgument(fieldNames.length == fieldTypes.length, + "Number of provided field names and types does not match."); + } + + /** + * Creates a JSON deserialization schema for the given fields and types. + * + * @param fieldNames Names of JSON fields to parse. + * @param fieldTypes Types to parse JSON fields as. + */ + public JsonRowDeserializationSchema(String[] fieldNames, TypeInformation<?>[] fieldTypes) { + this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names"); + this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types"); + + Preconditions.checkArgument(fieldNames.length == fieldTypes.length, + "Number of provided field names and types does not match."); + } + + @Override + public Row deserialize(byte[] message) throws IOException { + try { + JsonNode root = objectMapper.readTree(message); + + Row row = new Row(fieldNames.length); + for (int i = 0; i < fieldNames.length; i++) { + JsonNode node = root.get(fieldNames[i]); + + if (node == null) { + if (failOnMissingField) { + throw new IllegalStateException("Failed to find field with name '" + + fieldNames[i] + "'."); + } else { + row.setField(i, null); + } + } else { + // Read the value as specified type + Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass()); + row.setField(i, value); + } + } + + return row; + } catch (Throwable t) { + throw new IOException("Failed to deserialize JSON object.", t); + } + } + + @Override + public boolean isEndOfStream(Row nextElement) { + return false; + } + + @Override + public TypeInformation<Row> getProducedType() { + return new RowTypeInfo(fieldTypes); + } + + /** + * Configures the failure behaviour if a JSON field is missing. + * + * <p>By default, a missing field is ignored and the field is set to null. + * + * @param failOnMissingField Flag indicating whether to fail or not on a missing field. + */ + public void setFailOnMissingField(boolean failOnMissingField) { + this.failOnMissingField = failOnMissingField; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java new file mode 100644 index 0000000..077ff13 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.api.table.Row; +import org.apache.flink.util.Preconditions; + + +/** + * Serialization schema that serializes an object into a JSON bytes. + * + * <p>Serializes the input {@link Row} object into a JSON string and + * converts it into <code>byte[]</code>. + * + * <p>Result <code>byte[]</code> messages can be deserialized using + * {@link JsonRowDeserializationSchema}. + */ +public class JsonRowSerializationSchema implements SerializationSchema<Row> { + /** Fields names in the input Row object */ + private final String[] fieldNames; + /** Object mapper that is used to create output JSON objects */ + private static ObjectMapper mapper = new ObjectMapper(); + + /** + * Creates a JSON serialization schema for the given fields and types. + * + * @param fieldNames Names of JSON fields to parse. + */ + public JsonRowSerializationSchema(String[] fieldNames) { + this.fieldNames = Preconditions.checkNotNull(fieldNames); + } + + @Override + public byte[] serialize(Row row) { + if (row.productArity() != fieldNames.length) { + throw new IllegalStateException(String.format( + "Number of elements in the row %s is different from number of field names: %d", row, fieldNames.length)); + } + + ObjectNode objectNode = mapper.createObjectNode(); + + for (int i = 0; i < row.productArity(); i++) { + JsonNode node = mapper.valueToTree(row.productElement(i)); + objectNode.set(fieldNames[i], node); + } + + try { + return mapper.writeValueAsBytes(objectNode); + } catch (Exception e) { + throw new RuntimeException("Failed to serialize row", e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java new file mode 100644 index 0000000..01e72ca --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.serialization; + +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; + +import java.io.IOException; +import java.io.Serializable; + +/** + * The deserialization schema describes how to turn the byte key / value messages delivered by certain + * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are + * processed by Flink. + * + * @param <T> The type created by the keyed deserialization schema. + */ +public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> { + + /** + * Deserializes the byte message. + * + * @param messageKey the key as a byte array (null if no key has been set) + * @param message The message, as a byte array. (null if the message was empty or deleted) + * @param partition The partition the message has originated from + * @param offset the offset of the message in the original source (for example the Kafka offset) @return The deserialized message as an object. + */ + T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; + + /** + * Method to decide whether the element signals the end of the stream. If + * true is returned the element won't be emitted. + * + * @param nextElement The element to test for the end-of-stream signal. + * @return True, if the element signals end of stream, false otherwise. + */ + boolean isEndOfStream(T nextElement); +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java new file mode 100644 index 0000000..4b9dba2 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.io.IOException; + +/** + * A simple wrapper for using the DeserializationSchema with the KeyedDeserializationSchema + * interface + * @param <T> The type created by the deserialization schema. + */ +public class KeyedDeserializationSchemaWrapper<T> implements KeyedDeserializationSchema<T> { + + private static final long serialVersionUID = 2651665280744549932L; + + private final DeserializationSchema<T> deserializationSchema; + + public KeyedDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) { + this.deserializationSchema = deserializationSchema; + } + @Override + public T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + return deserializationSchema.deserialize(message); + } + + @Override + public boolean isEndOfStream(T nextElement) { + return deserializationSchema.isEndOfStream(nextElement); + } + + @Override + public TypeInformation<T> getProducedType() { + return deserializationSchema.getProducedType(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java new file mode 100644 index 0000000..701281e --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import java.io.Serializable; + +/** + * The serialization schema describes how to turn a data object into a different serialized + * representation. Most data sinks (for example Apache Kafka) require the data to be handed + * to them in a specific format (for example as byte strings). + * + * @param <T> The type to be serialized. + */ +public interface KeyedSerializationSchema<T> extends Serializable { + + /** + * Serializes the key of the incoming element to a byte array + * This method might return null if no key is available. + * + * @param element The incoming element to be serialized + * @return the key of the element as a byte array + */ + byte[] serializeKey(T element); + + + /** + * Serializes the value of the incoming element to a byte array + * + * @param element The incoming element to be serialized + * @return the value of the element as a byte array + */ + byte[] serializeValue(T element); + + /** + * Optional method to determine the target topic for the element + * + * @param element Incoming element to determine the target topic from + * @return null or the target topic + */ + String getTargetTopic(T element); +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java new file mode 100644 index 0000000..1b3e486 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +/** + * A simple wrapper for using the SerializationSchema with the KeyedDeserializationSchema + * interface + * @param <T> The type to serialize + */ +public class KeyedSerializationSchemaWrapper<T> implements KeyedSerializationSchema<T> { + + private static final long serialVersionUID = 1351665280744549933L; + + private final SerializationSchema<T> serializationSchema; + + public KeyedSerializationSchemaWrapper(SerializationSchema<T> serializationSchema) { + this.serializationSchema = serializationSchema; + } + + @Override + public byte[] serializeKey(T element) { + return null; + } + + @Override + public byte[] serializeValue(T element) { + return serializationSchema.serialize(element); + } + + @Override + public String getTargetTopic(T element) { + return null; // we are never overriding the topic + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java new file mode 100644 index 0000000..51bc8d1 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.serialization; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.runtime.util.DataInputDeserializer; +import org.apache.flink.runtime.util.DataOutputSerializer; + +import java.io.IOException; + +/** + * A serialization and deserialization schema for Key Value Pairs that uses Flink's serialization stack to + * transform typed from and to byte arrays. + * + * @param <K> The key type to be serialized. + * @param <V> The value type to be serialized. + */ +public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K,V>> { + + private static final long serialVersionUID = -5359448468131559102L; + + /** The serializer for the key */ + private final TypeSerializer<K> keySerializer; + + /** The serializer for the value */ + private final TypeSerializer<V> valueSerializer; + + /** reusable input deserialization buffer */ + private final DataInputDeserializer inputDeserializer; + + /** reusable output serialization buffer for the key */ + private transient DataOutputSerializer keyOutputSerializer; + + /** reusable output serialization buffer for the value */ + private transient DataOutputSerializer valueOutputSerializer; + + + /** The type information, to be returned by {@link #getProducedType()}. It is + * transient, because it is not serializable. Note that this means that the type information + * is not available at runtime, but only prior to the first serialization / deserialization */ + private final transient TypeInformation<Tuple2<K, V>> typeInfo; + + // ------------------------------------------------------------------------ + + /** + * Creates a new de-/serialization schema for the given types. + * + * @param keyTypeInfo The type information for the key type de-/serialized by this schema. + * @param valueTypeInfo The type information for the value type de-/serialized by this schema. + * @param ec The execution config, which is used to parametrize the type serializers. + */ + public TypeInformationKeyValueSerializationSchema(TypeInformation<K> keyTypeInfo, TypeInformation<V> valueTypeInfo, ExecutionConfig ec) { + this.typeInfo = new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo); + this.keySerializer = keyTypeInfo.createSerializer(ec); + this.valueSerializer = valueTypeInfo.createSerializer(ec); + this.inputDeserializer = new DataInputDeserializer(); + } + + /** + * Creates a new de-/serialization schema for the given types. This constructor accepts the types + * as classes and internally constructs the type information from the classes. + * + * <p>If the types are parametrized and cannot be fully defined via classes, use the constructor + * that accepts {@link TypeInformation} instead. + * + * @param keyClass The class of the key de-/serialized by this schema. + * @param valueClass The class of the value de-/serialized by this schema. + * @param config The execution config, which is used to parametrize the type serializers. + */ + public TypeInformationKeyValueSerializationSchema(Class<K> keyClass, Class<V> valueClass, ExecutionConfig config) { + this(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass), config); + } + + // ------------------------------------------------------------------------ + + + @Override + public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + K key = null; + V value = null; + + if (messageKey != null) { + inputDeserializer.setBuffer(messageKey, 0, messageKey.length); + key = keySerializer.deserialize(inputDeserializer); + } + if (message != null) { + inputDeserializer.setBuffer(message, 0, message.length); + value = valueSerializer.deserialize(inputDeserializer); + } + return new Tuple2<>(key, value); + } + + /** + * This schema never considers an element to signal end-of-stream, so this method returns always false. + * @param nextElement The element to test for the end-of-stream signal. + * @return Returns false. + */ + @Override + public boolean isEndOfStream(Tuple2<K,V> nextElement) { + return false; + } + + + @Override + public byte[] serializeKey(Tuple2<K, V> element) { + if (element.f0 == null) { + return null; + } else { + // key is not null. serialize it: + if (keyOutputSerializer == null) { + keyOutputSerializer = new DataOutputSerializer(16); + } + try { + keySerializer.serialize(element.f0, keyOutputSerializer); + } + catch (IOException e) { + throw new RuntimeException("Unable to serialize record", e); + } + // check if key byte array size changed + byte[] res = keyOutputSerializer.getByteArray(); + if (res.length != keyOutputSerializer.length()) { + byte[] n = new byte[keyOutputSerializer.length()]; + System.arraycopy(res, 0, n, 0, keyOutputSerializer.length()); + res = n; + } + keyOutputSerializer.clear(); + return res; + } + } + + @Override + public byte[] serializeValue(Tuple2<K, V> element) { + // if the value is null, its serialized value is null as well. + if (element.f1 == null) { + return null; + } + + if (valueOutputSerializer == null) { + valueOutputSerializer = new DataOutputSerializer(16); + } + + try { + valueSerializer.serialize(element.f1, valueOutputSerializer); + } + catch (IOException e) { + throw new RuntimeException("Unable to serialize record", e); + } + + byte[] res = valueOutputSerializer.getByteArray(); + if (res.length != valueOutputSerializer.length()) { + byte[] n = new byte[valueOutputSerializer.length()]; + System.arraycopy(res, 0, n, 0, valueOutputSerializer.length()); + res = n; + } + valueOutputSerializer.clear(); + return res; + } + + @Override + public String getTargetTopic(Tuple2<K, V> element) { + return null; // we are never overriding the topic + } + + + @Override + public TypeInformation<Tuple2<K,V>> getProducedType() { + if (typeInfo != null) { + return typeInfo; + } + else { + throw new IllegalStateException( + "The type information is not available after this class has been serialized and distributed."); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java new file mode 100644 index 0000000..b96ba30 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.commons.collections.map.LinkedMap; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.util.SerializedValue; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.Serializable; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class FlinkKafkaConsumerBaseTest { + + /** + * Tests that not both types of timestamp extractors / watermark generators can be used. + */ + @Test + public void testEitherWatermarkExtractor() { + try { + new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks<Object>) null); + fail(); + } catch (NullPointerException ignored) {} + + try { + new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks<Object>) null); + fail(); + } catch (NullPointerException ignored) {} + + @SuppressWarnings("unchecked") + final AssignerWithPeriodicWatermarks<String> periodicAssigner = mock(AssignerWithPeriodicWatermarks.class); + @SuppressWarnings("unchecked") + final AssignerWithPunctuatedWatermarks<String> punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class); + + DummyFlinkKafkaConsumer<String> c1 = new DummyFlinkKafkaConsumer<>(); + c1.assignTimestampsAndWatermarks(periodicAssigner); + try { + c1.assignTimestampsAndWatermarks(punctuatedAssigner); + fail(); + } catch (IllegalStateException ignored) {} + + DummyFlinkKafkaConsumer<String> c2 = new DummyFlinkKafkaConsumer<>(); + c2.assignTimestampsAndWatermarks(punctuatedAssigner); + try { + c2.assignTimestampsAndWatermarks(periodicAssigner); + fail(); + } catch (IllegalStateException ignored) {} + } + + /** + * Tests that no checkpoints happen when the fetcher is not running. + */ + @Test + public void ignoreCheckpointWhenNotRunning() throws Exception { + @SuppressWarnings("unchecked") + final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class); + + FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, new LinkedMap(), false); + OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); + TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = new TestingListState<>(); + when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); + + consumer.snapshotState(new StateSnapshotContextSynchronousImpl(1, 1)); + + assertFalse(listState.get().iterator().hasNext()); + consumer.notifyCheckpointComplete(66L); + } + + /** + * Tests that no checkpoints happen when the fetcher is not running. + */ + @Test + public void checkRestoredCheckpointWhenFetcherNotReady() throws Exception { + OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); + + TestingListState<Serializable> listState = new TestingListState<>(); + listState.add(Tuple2.of(new KafkaTopicPartition("abc", 13), 16768L)); + listState.add(Tuple2.of(new KafkaTopicPartition("def", 7), 987654321L)); + + FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true); + + when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState); + + StateInitializationContext initializationContext = mock(StateInitializationContext.class); + + when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); + when(initializationContext.isRestored()).thenReturn(true); + + consumer.initializeState(initializationContext); + + consumer.snapshotState(new StateSnapshotContextSynchronousImpl(17, 17)); + + // ensure that the list was cleared and refilled. while this is an implementation detail, we use it here + // to figure out that snapshotState() actually did something. + Assert.assertTrue(listState.isClearCalled()); + + Set<Serializable> expected = new HashSet<>(); + + for (Serializable serializable : listState.get()) { + expected.add(serializable); + } + + int counter = 0; + + for (Serializable serializable : listState.get()) { + assertTrue(expected.contains(serializable)); + counter++; + } + + assertEquals(expected.size(), counter); + } + + /** + * Tests that no checkpoints happen when the fetcher is not running. + */ + @Test + public void checkRestoredNullCheckpointWhenFetcherNotReady() throws Exception { + FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true); + + OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); + TestingListState<Serializable> listState = new TestingListState<>(); + when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState); + + StateInitializationContext initializationContext = mock(StateInitializationContext.class); + + when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); + when(initializationContext.isRestored()).thenReturn(false); + + consumer.initializeState(initializationContext); + + consumer.snapshotState(new StateSnapshotContextSynchronousImpl(17, 17)); + + assertFalse(listState.get().iterator().hasNext()); + } + + /** + * Tests that on snapshots, states and offsets to commit to Kafka are correct + */ + @Test + public void checkUseFetcherWhenNoCheckpoint() throws Exception { + + FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true); + List<KafkaTopicPartition> partitionList = new ArrayList<>(1); + partitionList.add(new KafkaTopicPartition("test", 0)); + consumer.setSubscribedPartitions(partitionList); + + OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); + TestingListState<Serializable> listState = new TestingListState<>(); + when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState); + + StateInitializationContext initializationContext = mock(StateInitializationContext.class); + + when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); + + // make the context signal that there is no restored state, then validate that + when(initializationContext.isRestored()).thenReturn(false); + consumer.initializeState(initializationContext); + consumer.run(mock(SourceFunction.SourceContext.class)); + } + + @Test + @SuppressWarnings("unchecked") + public void testSnapshotState() throws Exception { + + // -------------------------------------------------------------------- + // prepare fake states + // -------------------------------------------------------------------- + + final HashMap<KafkaTopicPartition, Long> state1 = new HashMap<>(); + state1.put(new KafkaTopicPartition("abc", 13), 16768L); + state1.put(new KafkaTopicPartition("def", 7), 987654321L); + + final HashMap<KafkaTopicPartition, Long> state2 = new HashMap<>(); + state2.put(new KafkaTopicPartition("abc", 13), 16770L); + state2.put(new KafkaTopicPartition("def", 7), 987654329L); + + final HashMap<KafkaTopicPartition, Long> state3 = new HashMap<>(); + state3.put(new KafkaTopicPartition("abc", 13), 16780L); + state3.put(new KafkaTopicPartition("def", 7), 987654377L); + + // -------------------------------------------------------------------- + + final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class); + when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3); + + final LinkedMap pendingOffsetsToCommit = new LinkedMap(); + + FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, pendingOffsetsToCommit, true); + assertEquals(0, pendingOffsetsToCommit.size()); + + OperatorStateStore backend = mock(OperatorStateStore.class); + + TestingListState<Serializable> listState = new TestingListState<>(); + + when(backend.getSerializableListState(Matchers.any(String.class))).thenReturn(listState); + + StateInitializationContext initializationContext = mock(StateInitializationContext.class); + + when(initializationContext.getOperatorStateStore()).thenReturn(backend); + when(initializationContext.isRestored()).thenReturn(false, true, true, true); + + consumer.initializeState(initializationContext); + + // checkpoint 1 + consumer.snapshotState(new StateSnapshotContextSynchronousImpl(138, 138)); + + HashMap<KafkaTopicPartition, Long> snapshot1 = new HashMap<>(); + + for (Serializable serializable : listState.get()) { + Tuple2<KafkaTopicPartition, Long> kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) serializable; + snapshot1.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1); + } + + assertEquals(state1, snapshot1); + assertEquals(1, pendingOffsetsToCommit.size()); + assertEquals(state1, pendingOffsetsToCommit.get(138L)); + + // checkpoint 2 + consumer.snapshotState(new StateSnapshotContextSynchronousImpl(140, 140)); + + HashMap<KafkaTopicPartition, Long> snapshot2 = new HashMap<>(); + + for (Serializable serializable : listState.get()) { + Tuple2<KafkaTopicPartition, Long> kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) serializable; + snapshot2.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1); + } + + assertEquals(state2, snapshot2); + assertEquals(2, pendingOffsetsToCommit.size()); + assertEquals(state2, pendingOffsetsToCommit.get(140L)); + + // ack checkpoint 1 + consumer.notifyCheckpointComplete(138L); + assertEquals(1, pendingOffsetsToCommit.size()); + assertTrue(pendingOffsetsToCommit.containsKey(140L)); + + // checkpoint 3 + consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141)); + + HashMap<KafkaTopicPartition, Long> snapshot3 = new HashMap<>(); + + for (Serializable serializable : listState.get()) { + Tuple2<KafkaTopicPartition, Long> kafkaTopicPartitionLongTuple2 = (Tuple2<KafkaTopicPartition, Long>) serializable; + snapshot3.put(kafkaTopicPartitionLongTuple2.f0, kafkaTopicPartitionLongTuple2.f1); + } + + assertEquals(state3, snapshot3); + assertEquals(2, pendingOffsetsToCommit.size()); + assertEquals(state3, pendingOffsetsToCommit.get(141L)); + + // ack checkpoint 3, subsumes number 2 + consumer.notifyCheckpointComplete(141L); + assertEquals(0, pendingOffsetsToCommit.size()); + + + consumer.notifyCheckpointComplete(666); // invalid checkpoint + assertEquals(0, pendingOffsetsToCommit.size()); + + OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); + listState = new TestingListState<>(); + when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState); + + // create 500 snapshots + for (int i = 100; i < 600; i++) { + consumer.snapshotState(new StateSnapshotContextSynchronousImpl(i, i)); + listState.clear(); + } + assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, pendingOffsetsToCommit.size()); + + // commit only the second last + consumer.notifyCheckpointComplete(598); + assertEquals(1, pendingOffsetsToCommit.size()); + + // access invalid checkpoint + consumer.notifyCheckpointComplete(590); + + // and the last + consumer.notifyCheckpointComplete(599); + assertEquals(0, pendingOffsetsToCommit.size()); + } + + // ------------------------------------------------------------------------ + + private static <T> FlinkKafkaConsumerBase<T> getConsumer( + AbstractFetcher<T, ?> fetcher, LinkedMap pendingOffsetsToCommit, boolean running) throws Exception + { + FlinkKafkaConsumerBase<T> consumer = new DummyFlinkKafkaConsumer<>(); + + Field fetcherField = FlinkKafkaConsumerBase.class.getDeclaredField("kafkaFetcher"); + fetcherField.setAccessible(true); + fetcherField.set(consumer, fetcher); + + Field mapField = FlinkKafkaConsumerBase.class.getDeclaredField("pendingOffsetsToCommit"); + mapField.setAccessible(true); + mapField.set(consumer, pendingOffsetsToCommit); + + Field runningField = FlinkKafkaConsumerBase.class.getDeclaredField("running"); + runningField.setAccessible(true); + runningField.set(consumer, running); + + return consumer; + } + + // ------------------------------------------------------------------------ + + private static class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> { + private static final long serialVersionUID = 1L; + + @SuppressWarnings("unchecked") + public DummyFlinkKafkaConsumer() { + super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class)); + } + + @Override + protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> sourceContext, List<KafkaTopicPartition> thisSubtaskPartitions, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception { + AbstractFetcher<T, ?> fetcher = mock(AbstractFetcher.class); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + Assert.fail("Trying to restore offsets even though there was no restore state."); + return null; + } + }).when(fetcher).restoreOffsets(any(HashMap.class)); + return fetcher; + } + + @Override + protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) { + return Collections.emptyList(); + } + + @Override + public RuntimeContext getRuntimeContext() { + return mock(StreamingRuntimeContext.class); + } + } + + private static final class TestingListState<T> implements ListState<T> { + + private final List<T> list = new ArrayList<>(); + private boolean clearCalled = false; + + @Override + public void clear() { + list.clear(); + clearCalled = true; + } + + @Override + public Iterable<T> get() throws Exception { + return list; + } + + @Override + public void add(T value) throws Exception { + list.add(value); + } + + public List<T> getList() { + return list; + } + + public boolean isClearCalled() { + return clearCalled; + } + } +}
