http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java new file mode 100644 index 0000000..d015157 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import kafka.common.TopicAndPartition; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.metrics.MetricGroup; +import org.apache.kafka.common.Node; + +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.SerializedValue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A fetcher that fetches data from Kafka brokers via the Kafka 0.8 low-level consumer API. + * The fetcher also handles the explicit communication with ZooKeeper to fetch initial offsets + * and to write offsets to ZooKeeper. + * + * @param <T> The type of elements produced by the fetcher. + */ +public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> { + + static final KafkaTopicPartitionState<TopicAndPartition> MARKER = + new KafkaTopicPartitionState<>(new KafkaTopicPartition("n/a", -1), new TopicAndPartition("n/a", -1)); + + private static final Logger LOG = LoggerFactory.getLogger(Kafka08Fetcher.class); + + // ------------------------------------------------------------------------ + + /** The schema to convert between Kafka's byte messages, and Flink's objects */ + private final KeyedDeserializationSchema<T> deserializer; + + /** The properties that configure the Kafka connection */ + private final Properties kafkaConfig; + + /** The subtask's runtime context */ + private final RuntimeContext runtimeContext; + + /** The queue of partitions that are currently not assigned to a broker connection */ + private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitionsQueue; + + /** The behavior to use in case that an offset is not valid (any more) for a partition */ + private final long invalidOffsetBehavior; + + /** The interval in which to automatically commit (-1 if deactivated) */ + private final long autoCommitInterval; + + /** The handler that reads/writes offsets from/to ZooKeeper */ + private volatile ZookeeperOffsetHandler zookeeperOffsetHandler; + + /** Flag to track the main work loop as alive */ + private volatile boolean running = true; + + + public Kafka08Fetcher( + SourceContext<T> sourceContext, + List<KafkaTopicPartition> assignedPartitions, + SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, + SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, + StreamingRuntimeContext runtimeContext, + KeyedDeserializationSchema<T> deserializer, + Properties kafkaProperties, + long invalidOffsetBehavior, + long autoCommitInterval, + boolean useMetrics) throws Exception + { + super( + sourceContext, + assignedPartitions, + watermarksPeriodic, + watermarksPunctuated, + runtimeContext.getProcessingTimeService(), + runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), + runtimeContext.getUserCodeClassLoader(), + useMetrics); + + this.deserializer = checkNotNull(deserializer); + this.kafkaConfig = checkNotNull(kafkaProperties); + this.runtimeContext = runtimeContext; + this.invalidOffsetBehavior = invalidOffsetBehavior; + this.autoCommitInterval = autoCommitInterval; + this.unassignedPartitionsQueue = new ClosableBlockingQueue<>(); + + // initially, all these partitions are not assigned to a specific broker connection + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { + unassignedPartitionsQueue.add(partition); + } + } + + // ------------------------------------------------------------------------ + // Main Work Loop + // ------------------------------------------------------------------------ + + @Override + public void runFetchLoop() throws Exception { + // the map from broker to the thread that is connected to that broker + final Map<Node, SimpleConsumerThread<T>> brokerToThread = new HashMap<>(); + + // this holds possible the exceptions from the concurrent broker connection threads + final ExceptionProxy errorHandler = new ExceptionProxy(Thread.currentThread()); + + // the offset handler handles the communication with ZooKeeper, to commit externally visible offsets + final ZookeeperOffsetHandler zookeeperOffsetHandler = new ZookeeperOffsetHandler(kafkaConfig); + this.zookeeperOffsetHandler = zookeeperOffsetHandler; + + PeriodicOffsetCommitter periodicCommitter = null; + try { + // read offsets from ZooKeeper for partitions that did not restore offsets + { + List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>(); + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { + if (!partition.isOffsetDefined()) { + partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); + } + } + + Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset); + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { + Long zkOffset = zkOffsets.get(partition.getKafkaTopicPartition()); + if (zkOffset != null) { + // the offset in ZK represents the "next record to process", so we need to subtract it by 1 + // to correctly represent our internally checkpointed offsets + partition.setOffset(zkOffset - 1); + } + } + } + + // start the periodic offset committer thread, if necessary + if (autoCommitInterval > 0) { + LOG.info("Starting periodic offset committer, with commit interval of {}ms", autoCommitInterval); + + periodicCommitter = new PeriodicOffsetCommitter(zookeeperOffsetHandler, + subscribedPartitions(), errorHandler, autoCommitInterval); + periodicCommitter.setName("Periodic Kafka partition offset committer"); + periodicCommitter.setDaemon(true); + periodicCommitter.start(); + } + + // register offset metrics + if (useMetrics) { + final MetricGroup kafkaMetricGroup = runtimeContext.getMetricGroup().addGroup("KafkaConsumer"); + addOffsetStateGauge(kafkaMetricGroup); + } + + // Main loop polling elements from the unassignedPartitions queue to the threads + while (running) { + // re-throw any exception from the concurrent fetcher threads + errorHandler.checkAndThrowException(); + + // wait for max 5 seconds trying to get partitions to assign + // if threads shut down, this poll returns earlier, because the threads inject the + // special marker into the queue + List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToAssign = + unassignedPartitionsQueue.getBatchBlocking(5000); + partitionsToAssign.remove(MARKER); + + if (!partitionsToAssign.isEmpty()) { + LOG.info("Assigning {} partitions to broker threads", partitionsToAssign.size()); + Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeaders = + findLeaderForPartitions(partitionsToAssign, kafkaConfig); + + // assign the partitions to the leaders (maybe start the threads) + for (Map.Entry<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeader : + partitionsWithLeaders.entrySet()) + { + final Node leader = partitionsWithLeader.getKey(); + final List<KafkaTopicPartitionState<TopicAndPartition>> partitions = partitionsWithLeader.getValue(); + SimpleConsumerThread<T> brokerThread = brokerToThread.get(leader); + + if (!running) { + break; + } + + if (brokerThread == null || !brokerThread.getNewPartitionsQueue().isOpen()) { + // start new thread + brokerThread = createAndStartSimpleConsumerThread(partitions, leader, errorHandler); + brokerToThread.put(leader, brokerThread); + } + else { + // put elements into queue of thread + ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> newPartitionsQueue = + brokerThread.getNewPartitionsQueue(); + + for (KafkaTopicPartitionState<TopicAndPartition> fp : partitions) { + if (!newPartitionsQueue.addIfOpen(fp)) { + // we were unable to add the partition to the broker's queue + // the broker has closed in the meantime (the thread will shut down) + // create a new thread for connecting to this broker + List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions = new ArrayList<>(); + seedPartitions.add(fp); + brokerThread = createAndStartSimpleConsumerThread(seedPartitions, leader, errorHandler); + brokerToThread.put(leader, brokerThread); + newPartitionsQueue = brokerThread.getNewPartitionsQueue(); // update queue for the subsequent partitions + } + } + } + } + } + else { + // there were no partitions to assign. Check if any broker threads shut down. + // we get into this section of the code, if either the poll timed out, or the + // blocking poll was woken up by the marker element + Iterator<SimpleConsumerThread<T>> bttIterator = brokerToThread.values().iterator(); + while (bttIterator.hasNext()) { + SimpleConsumerThread<T> thread = bttIterator.next(); + if (!thread.getNewPartitionsQueue().isOpen()) { + LOG.info("Removing stopped consumer thread {}", thread.getName()); + bttIterator.remove(); + } + } + } + + if (brokerToThread.size() == 0 && unassignedPartitionsQueue.isEmpty()) { + if (unassignedPartitionsQueue.close()) { + LOG.info("All consumer threads are finished, there are no more unassigned partitions. Stopping fetcher"); + break; + } + // we end up here if somebody added something to the queue in the meantime --> continue to poll queue again + } + } + } + catch (InterruptedException e) { + // this may be thrown because an exception on one of the concurrent fetcher threads + // woke this thread up. make sure we throw the root exception instead in that case + errorHandler.checkAndThrowException(); + + // no other root exception, throw the interrupted exception + throw e; + } + finally { + this.running = false; + this.zookeeperOffsetHandler = null; + + // if we run a periodic committer thread, shut that down + if (periodicCommitter != null) { + periodicCommitter.shutdown(); + } + + // clear the interruption flag + // this allows the joining on consumer threads (on best effort) to happen in + // case the initial interrupt already + Thread.interrupted(); + + // make sure that in any case (completion, abort, error), all spawned threads are stopped + try { + int runningThreads; + do { + // check whether threads are alive and cancel them + runningThreads = 0; + Iterator<SimpleConsumerThread<T>> threads = brokerToThread.values().iterator(); + while (threads.hasNext()) { + SimpleConsumerThread<?> t = threads.next(); + if (t.isAlive()) { + t.cancel(); + runningThreads++; + } else { + threads.remove(); + } + } + + // wait for the threads to finish, before issuing a cancel call again + if (runningThreads > 0) { + for (SimpleConsumerThread<?> t : brokerToThread.values()) { + t.join(500 / runningThreads + 1); + } + } + } + while (runningThreads > 0); + } + catch (InterruptedException ignored) { + // waiting for the thread shutdown apparently got interrupted + // restore interrupted state and continue + Thread.currentThread().interrupt(); + } + catch (Throwable t) { + // we catch all here to preserve the original exception + LOG.error("Exception while shutting down consumer threads", t); + } + + try { + zookeeperOffsetHandler.close(); + } + catch (Throwable t) { + // we catch all here to preserve the original exception + LOG.error("Exception while shutting down ZookeeperOffsetHandler", t); + } + } + } + + @Override + public void cancel() { + // signal the main thread to exit + this.running = false; + + // make sure the main thread wakes up soon + this.unassignedPartitionsQueue.addIfOpen(MARKER); + } + + // ------------------------------------------------------------------------ + // Kafka 0.8 specific class instantiation + // ------------------------------------------------------------------------ + + @Override + public TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition partition) { + return new TopicAndPartition(partition.getTopic(), partition.getPartition()); + } + + // ------------------------------------------------------------------------ + // Offset handling + // ------------------------------------------------------------------------ + + @Override + public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception { + ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler; + if (zkHandler != null) { + // the ZK handler takes care of incrementing the offsets by 1 before committing + zkHandler.prepareAndCommitOffsets(offsets); + } + + // Set committed offsets in topic partition state + KafkaTopicPartitionState<TopicAndPartition>[] partitions = subscribedPartitions(); + for (KafkaTopicPartitionState<TopicAndPartition> partition : partitions) { + Long offset = offsets.get(partition.getKafkaTopicPartition()); + if (offset != null) { + partition.setCommittedOffset(offset); + } + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + private SimpleConsumerThread<T> createAndStartSimpleConsumerThread( + List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions, + Node leader, + ExceptionProxy errorHandler) throws IOException, ClassNotFoundException + { + // each thread needs its own copy of the deserializer, because the deserializer is + // not necessarily thread safe + final KeyedDeserializationSchema<T> clonedDeserializer = + InstantiationUtil.clone(deserializer, runtimeContext.getUserCodeClassLoader()); + + // seed thread with list of fetch partitions (otherwise it would shut down immediately again + SimpleConsumerThread<T> brokerThread = new SimpleConsumerThread<>( + this, errorHandler, kafkaConfig, leader, seedPartitions, unassignedPartitionsQueue, + clonedDeserializer, invalidOffsetBehavior); + + brokerThread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)", + runtimeContext.getTaskName(), leader.id(), leader.host(), leader.port())); + brokerThread.setDaemon(true); + brokerThread.start(); + + LOG.info("Starting thread {}", brokerThread.getName()); + return brokerThread; + } + + /** + * Returns a list of unique topics from for the given partitions + * + * @param partitions A the partitions + * @return A list of unique topics + */ + private static List<String> getTopics(List<KafkaTopicPartitionState<TopicAndPartition>> partitions) { + HashSet<String> uniqueTopics = new HashSet<>(); + for (KafkaTopicPartitionState<TopicAndPartition> fp: partitions) { + uniqueTopics.add(fp.getTopic()); + } + return new ArrayList<>(uniqueTopics); + } + + /** + * Find leaders for the partitions + * + * From a high level, the method does the following: + * - Get a list of FetchPartitions (usually only a few partitions) + * - Get the list of topics from the FetchPartitions list and request the partitions for the topics. (Kafka doesn't support getting leaders for a set of partitions) + * - Build a Map<Leader, List<FetchPartition>> where only the requested partitions are contained. + * + * @param partitionsToAssign fetch partitions list + * @return leader to partitions map + */ + private static Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> findLeaderForPartitions( + List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToAssign, + Properties kafkaProperties) throws Exception + { + if (partitionsToAssign.isEmpty()) { + throw new IllegalArgumentException("Leader request for empty partitions list"); + } + + LOG.info("Refreshing leader information for partitions {}", partitionsToAssign); + + // this request is based on the topic names + PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(getTopics(partitionsToAssign), kafkaProperties); + infoFetcher.start(); + + // NOTE: The kafka client apparently locks itself up sometimes + // when it is interrupted, so we run it only in a separate thread. + // since it sometimes refuses to shut down, we resort to the admittedly harsh + // means of killing the thread after a timeout. + KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000); + watchDog.start(); + + // this list contains ALL partitions of the requested topics + List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = infoFetcher.getPartitions(); + + // copy list to track unassigned partitions + List<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions = new ArrayList<>(partitionsToAssign); + + // final mapping from leader -> list(fetchPartition) + Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> leaderToPartitions = new HashMap<>(); + + for(KafkaTopicPartitionLeader partitionLeader: topicPartitionWithLeaderList) { + if (unassignedPartitions.size() == 0) { + // we are done: all partitions are assigned + break; + } + + Iterator<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitionsIterator = unassignedPartitions.iterator(); + while (unassignedPartitionsIterator.hasNext()) { + KafkaTopicPartitionState<TopicAndPartition> unassignedPartition = unassignedPartitionsIterator.next(); + + if (unassignedPartition.getKafkaTopicPartition().equals(partitionLeader.getTopicPartition())) { + // we found the leader for one of the fetch partitions + Node leader = partitionLeader.getLeader(); + + List<KafkaTopicPartitionState<TopicAndPartition>> partitionsOfLeader = leaderToPartitions.get(leader); + if (partitionsOfLeader == null) { + partitionsOfLeader = new ArrayList<>(); + leaderToPartitions.put(leader, partitionsOfLeader); + } + partitionsOfLeader.add(unassignedPartition); + unassignedPartitionsIterator.remove(); // partition has been assigned + break; + } + } + } + + if (unassignedPartitions.size() > 0) { + throw new RuntimeException("Unable to find a leader for partitions: " + unassignedPartitions); + } + + LOG.debug("Partitions with assigned leaders {}", leaderToPartitions); + + return leaderToPartitions; + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java new file mode 100644 index 0000000..4d61e53 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +/** + * A watch dog thread that forcibly kills another thread, if that thread does not + * finish in time. + * + * <p>This uses the discouraged {@link Thread#stop()} method. While this is not + * advisable, this watch dog is only for extreme cases of thread that simply + * to not terminate otherwise. + */ +class KillerWatchDog extends Thread { + + private final Thread toKill; + private final long timeout; + + KillerWatchDog(Thread toKill, long timeout) { + super("KillerWatchDog"); + setDaemon(true); + + this.toKill = toKill; + this.timeout = timeout; + } + + @SuppressWarnings("deprecation") + @Override + public void run() { + final long deadline = System.currentTimeMillis() + timeout; + long now; + + while (toKill.isAlive() && (now = System.currentTimeMillis()) < deadline) { + try { + toKill.join(deadline - now); + } + catch (InterruptedException e) { + // ignore here, our job is important! + } + } + + // this is harsh, but this watchdog is a last resort + if (toKill.isAlive()) { + toKill.stop(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java new file mode 100644 index 0000000..d8d927d --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; + +import java.util.List; +import java.util.Properties; + +class PartitionInfoFetcher extends Thread { + + private final List<String> topics; + private final Properties properties; + + private volatile List<KafkaTopicPartitionLeader> result; + private volatile Throwable error; + + + PartitionInfoFetcher(List<String> topics, Properties properties) { + this.topics = topics; + this.properties = properties; + } + + @Override + public void run() { + try { + result = FlinkKafkaConsumer08.getPartitionsForTopic(topics, properties); + } + catch (Throwable t) { + this.error = t; + } + } + + public List<KafkaTopicPartitionLeader> getPartitions() throws Exception { + try { + this.join(); + } + catch (InterruptedException e) { + throw new Exception("Partition fetching was cancelled before completion"); + } + + if (error != null) { + throw new Exception("Failed to fetch partitions for topics " + topics.toString(), error); + } + if (result != null) { + return result; + } + throw new Exception("Partition fetching failed"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java new file mode 100644 index 0000000..27d90f2 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import java.util.HashMap; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A thread that periodically writes the current Kafka partition offsets to Zookeeper. + */ +public class PeriodicOffsetCommitter extends Thread { + + /** The ZooKeeper handler */ + private final ZookeeperOffsetHandler offsetHandler; + + private final KafkaTopicPartitionState<?>[] partitionStates; + + /** The proxy to forward exceptions to the main thread */ + private final ExceptionProxy errorHandler; + + /** Interval in which to commit, in milliseconds */ + private final long commitInterval; + + /** Flag to mark the periodic committer as running */ + private volatile boolean running = true; + + PeriodicOffsetCommitter(ZookeeperOffsetHandler offsetHandler, + KafkaTopicPartitionState<?>[] partitionStates, + ExceptionProxy errorHandler, + long commitInterval) + { + this.offsetHandler = checkNotNull(offsetHandler); + this.partitionStates = checkNotNull(partitionStates); + this.errorHandler = checkNotNull(errorHandler); + this.commitInterval = commitInterval; + + checkArgument(commitInterval > 0); + } + + @Override + public void run() { + try { + while (running) { + Thread.sleep(commitInterval); + + // create copy a deep copy of the current offsets + HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.length); + for (KafkaTopicPartitionState<?> partitionState : partitionStates) { + offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset()); + } + + offsetHandler.prepareAndCommitOffsets(offsetsToCommit); + } + } + catch (Throwable t) { + if (running) { + errorHandler.reportError( + new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t)); + } + } + } + + public void shutdown() { + this.running = false; + this.interrupt(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java new file mode 100644 index 0000000..35e491a --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import kafka.api.FetchRequestBuilder; +import kafka.api.PartitionOffsetRequestInfo; +import kafka.common.ErrorMapping; +import kafka.common.TopicAndPartition; +import kafka.javaapi.FetchResponse; +import kafka.javaapi.OffsetResponse; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.javaapi.message.ByteBufferMessageSet; +import kafka.message.MessageAndOffset; + +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.util.StringUtils; + +import org.apache.kafka.common.Node; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.PropertiesUtil.getInt; + +/** + * This class implements a thread with a connection to a single Kafka broker. The thread + * pulls records for a set of topic partitions for which the connected broker is currently + * the leader. The thread deserializes these records and emits them. + * + * @param <T> The type of elements that this consumer thread creates from Kafka's byte messages + * and emits into the Flink DataStream. + */ +class SimpleConsumerThread<T> extends Thread { + + private static final Logger LOG = LoggerFactory.getLogger(SimpleConsumerThread.class); + + private static final KafkaTopicPartitionState<TopicAndPartition> MARKER = Kafka08Fetcher.MARKER; + + // ------------------------------------------------------------------------ + + private final Kafka08Fetcher<T> owner; + + private final KeyedDeserializationSchema<T> deserializer; + + private final List<KafkaTopicPartitionState<TopicAndPartition>> partitions; + + private final Node broker; + + /** Queue containing new fetch partitions for the consumer thread */ + private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> newPartitionsQueue; + + private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions; + + private final ExceptionProxy errorHandler; + + private final long invalidOffsetBehavior; + + private volatile boolean running = true; + + + // ----------------- Simple Consumer ---------------------- + private volatile SimpleConsumer consumer; + + private final int soTimeout; + private final int minBytes; + private final int maxWait; + private final int fetchSize; + private final int bufferSize; + private final int reconnectLimit; + + + // exceptions are thrown locally + public SimpleConsumerThread( + Kafka08Fetcher<T> owner, + ExceptionProxy errorHandler, + Properties config, + Node broker, + List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions, + ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions, + KeyedDeserializationSchema<T> deserializer, + long invalidOffsetBehavior) + { + this.owner = owner; + this.errorHandler = errorHandler; + this.broker = broker; + this.partitions = seedPartitions; + this.deserializer = requireNonNull(deserializer); + this.unassignedPartitions = requireNonNull(unassignedPartitions); + this.newPartitionsQueue = new ClosableBlockingQueue<>(); + this.invalidOffsetBehavior = invalidOffsetBehavior; + + // these are the actual configuration values of Kafka + their original default values. + this.soTimeout = getInt(config, "socket.timeout.ms", 30000); + this.minBytes = getInt(config, "fetch.min.bytes", 1); + this.maxWait = getInt(config, "fetch.wait.max.ms", 100); + this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576); + this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536); + this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3); + } + + public ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> getNewPartitionsQueue() { + return newPartitionsQueue; + } + + // ------------------------------------------------------------------------ + // main work loop + // ------------------------------------------------------------------------ + + @Override + public void run() { + LOG.info("Starting to fetch from {}", this.partitions); + + // set up the config values + final String clientId = "flink-kafka-consumer-legacy-" + broker.id(); + + try { + // create the Kafka consumer that we actually use for fetching + consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId); + + // make sure that all partitions have some offsets to start with + // those partitions that do not have an offset from a checkpoint need to get + // their start offset from ZooKeeper + getMissingOffsetsFromKafka(partitions); + + // Now, the actual work starts :-) + int offsetOutOfRangeCount = 0; + int reconnects = 0; + while (running) { + + // ----------------------------------- partitions list maintenance ---------------------------- + + // check queue for new partitions to read from: + List<KafkaTopicPartitionState<TopicAndPartition>> newPartitions = newPartitionsQueue.pollBatch(); + if (newPartitions != null) { + // found some new partitions for this thread's broker + + // check if the new partitions need an offset lookup + getMissingOffsetsFromKafka(newPartitions); + + // add the new partitions (and check they are not already in there) + for (KafkaTopicPartitionState<TopicAndPartition> newPartition: newPartitions) { + if (partitions.contains(newPartition)) { + throw new IllegalStateException("Adding partition " + newPartition + + " to subscribed partitions even though it is already subscribed"); + } + partitions.add(newPartition); + } + + LOG.info("Adding {} new partitions to consumer thread {}", newPartitions.size(), getName()); + LOG.debug("Partitions list: {}", newPartitions); + } + + if (partitions.size() == 0) { + if (newPartitionsQueue.close()) { + // close succeeded. Closing thread + running = false; + + LOG.info("Consumer thread {} does not have any partitions assigned anymore. Stopping thread.", + getName()); + + // add the wake-up marker into the queue to make the main thread + // immediately wake up and termination faster + unassignedPartitions.add(MARKER); + + break; + } else { + // close failed: fetcher main thread concurrently added new partitions into the queue. + // go to top of loop again and get the new partitions + continue; + } + } + + // ----------------------------------- request / response with kafka ---------------------------- + + FetchRequestBuilder frb = new FetchRequestBuilder(); + frb.clientId(clientId); + frb.maxWait(maxWait); + frb.minBytes(minBytes); + + for (KafkaTopicPartitionState<?> partition : partitions) { + frb.addFetch( + partition.getKafkaTopicPartition().getTopic(), + partition.getKafkaTopicPartition().getPartition(), + partition.getOffset() + 1, // request the next record + fetchSize); + } + + kafka.api.FetchRequest fetchRequest = frb.build(); + LOG.debug("Issuing fetch request {}", fetchRequest); + + FetchResponse fetchResponse; + try { + fetchResponse = consumer.fetch(fetchRequest); + } + catch (Throwable cce) { + //noinspection ConstantConditions + if (cce instanceof ClosedChannelException) { + LOG.warn("Fetch failed because of ClosedChannelException."); + LOG.debug("Full exception", cce); + + // we don't know if the broker is overloaded or unavailable. + // retry a few times, then return ALL partitions for new leader lookup + if (++reconnects >= reconnectLimit) { + LOG.warn("Unable to reach broker after {} retries. Returning all current partitions", reconnectLimit); + for (KafkaTopicPartitionState<TopicAndPartition> fp: this.partitions) { + unassignedPartitions.add(fp); + } + this.partitions.clear(); + continue; // jump to top of loop: will close thread or subscribe to new partitions + } + try { + consumer.close(); + } catch (Throwable t) { + LOG.warn("Error while closing consumer connection", t); + } + // delay & retry + Thread.sleep(100); + consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId); + continue; // retry + } else { + throw cce; + } + } + reconnects = 0; + + // ---------------------------------------- error handling ---------------------------- + + if (fetchResponse == null) { + throw new IOException("Fetch from Kafka failed (request returned null)"); + } + + if (fetchResponse.hasError()) { + String exception = ""; + List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>(); + + // iterate over partitions to get individual error codes + Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = partitions.iterator(); + boolean partitionsRemoved = false; + + while (partitionsIterator.hasNext()) { + final KafkaTopicPartitionState<TopicAndPartition> fp = partitionsIterator.next(); + short code = fetchResponse.errorCode(fp.getTopic(), fp.getPartition()); + + if (code == ErrorMapping.OffsetOutOfRangeCode()) { + // we were asked to read from an out-of-range-offset (maybe set wrong in Zookeeper) + // Kafka's high level consumer is resetting the offset according to 'auto.offset.reset' + partitionsToGetOffsetsFor.add(fp); + } + else if (code == ErrorMapping.NotLeaderForPartitionCode() || + code == ErrorMapping.LeaderNotAvailableCode() || + code == ErrorMapping.BrokerNotAvailableCode() || + code == ErrorMapping.UnknownCode()) + { + // the broker we are connected to is not the leader for the partition. + LOG.warn("{} is not the leader of {}. Reassigning leader for partition", broker, fp); + LOG.debug("Error code = {}", code); + + unassignedPartitions.add(fp); + + partitionsIterator.remove(); // unsubscribe the partition ourselves + partitionsRemoved = true; + } + else if (code != ErrorMapping.NoError()) { + exception += "\nException for " + fp.getTopic() +":"+ fp.getPartition() + ": " + + StringUtils.stringifyException(ErrorMapping.exceptionFor(code)); + } + } + if (partitionsToGetOffsetsFor.size() > 0) { + // safeguard against an infinite loop. + if (offsetOutOfRangeCount++ > 3) { + throw new RuntimeException("Found invalid offsets more than three times in partitions " + + partitionsToGetOffsetsFor + " Exceptions: " + exception); + } + // get valid offsets for these partitions and try again. + LOG.warn("The following partitions had an invalid offset: {}", partitionsToGetOffsetsFor); + getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior); + + LOG.warn("The new partition offsets are {}", partitionsToGetOffsetsFor); + continue; // jump back to create a new fetch request. The offset has not been touched. + } + else if (partitionsRemoved) { + continue; // create new fetch request + } + else { + // partitions failed on an error + throw new IOException("Error while fetching from broker '" + broker +"': " + exception); + } + } else { + // successful fetch, reset offsetOutOfRangeCount. + offsetOutOfRangeCount = 0; + } + + // ----------------------------------- process fetch response ---------------------------- + + int messagesInFetch = 0; + int deletedMessages = 0; + Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = partitions.iterator(); + + partitionsLoop: + while (partitionsIterator.hasNext()) { + final KafkaTopicPartitionState<TopicAndPartition> currentPartition = partitionsIterator.next(); + + final ByteBufferMessageSet messageSet = fetchResponse.messageSet( + currentPartition.getTopic(), currentPartition.getPartition()); + + for (MessageAndOffset msg : messageSet) { + if (running) { + messagesInFetch++; + final ByteBuffer payload = msg.message().payload(); + final long offset = msg.offset(); + + if (offset <= currentPartition.getOffset()) { + // we have seen this message already + LOG.info("Skipping message with offset " + msg.offset() + + " because we have seen messages until (including) " + + currentPartition.getOffset() + + " from topic/partition " + currentPartition.getTopic() + '/' + + currentPartition.getPartition() + " already"); + continue; + } + + // If the message value is null, this represents a delete command for the message key. + // Log this and pass it on to the client who might want to also receive delete messages. + byte[] valueBytes; + if (payload == null) { + deletedMessages++; + valueBytes = null; + } else { + valueBytes = new byte[payload.remaining()]; + payload.get(valueBytes); + } + + // put key into byte array + byte[] keyBytes = null; + int keySize = msg.message().keySize(); + + if (keySize >= 0) { // message().hasKey() is doing the same. We save one int deserialization + ByteBuffer keyPayload = msg.message().key(); + keyBytes = new byte[keySize]; + keyPayload.get(keyBytes); + } + + final T value = deserializer.deserialize(keyBytes, valueBytes, + currentPartition.getTopic(), currentPartition.getPartition(), offset); + + if (deserializer.isEndOfStream(value)) { + // remove partition from subscribed partitions. + partitionsIterator.remove(); + continue partitionsLoop; + } + + owner.emitRecord(value, currentPartition, offset); + } + else { + // no longer running + return; + } + } + } + LOG.debug("This fetch contained {} messages ({} deleted messages)", messagesInFetch, deletedMessages); + } // end of fetch loop + + if (!newPartitionsQueue.close()) { + throw new Exception("Bug: Cleanly leaving fetcher thread without having a closed queue."); + } + } + catch (Throwable t) { + // report to the fetcher's error handler + errorHandler.reportError(t); + } + finally { + if (consumer != null) { + // closing the consumer should not fail the program + try { + consumer.close(); + } + catch (Throwable t) { + LOG.error("Error while closing the Kafka simple consumer", t); + } + } + } + } + + private void getMissingOffsetsFromKafka( + List<KafkaTopicPartitionState<TopicAndPartition>> partitions) throws IOException + { + // collect which partitions we should fetch offsets for + List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>(); + for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) { + if (!part.isOffsetDefined()) { + // retrieve the offset from the consumer + partitionsToGetOffsetsFor.add(part); + } + } + + if (partitionsToGetOffsetsFor.size() > 0) { + getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior); + + LOG.info("No checkpoint/savepoint offsets found for some partitions. " + + "Fetched the following start offsets {}", partitionsToGetOffsetsFor); + } + } + + /** + * Cancels this fetch thread. The thread will release all resources and terminate. + */ + public void cancel() { + this.running = false; + + // interrupt whatever the consumer is doing + if (consumer != null) { + consumer.close(); + } + + this.interrupt(); + } + + // ------------------------------------------------------------------------ + // Kafka Request Utils + // ------------------------------------------------------------------------ + + /** + * Request latest offsets for a set of partitions, via a Kafka consumer. + * + * <p>This method retries three times if the response has an error. + * + * @param consumer The consumer connected to lead broker + * @param partitions The list of partitions we need offsets for + * @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest) + */ + private static void getLastOffsetFromKafka( + SimpleConsumer consumer, + List<KafkaTopicPartitionState<TopicAndPartition>> partitions, + long whichTime) throws IOException + { + Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>(); + for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) { + requestInfo.put(part.getKafkaPartitionHandle(), new PartitionOffsetRequestInfo(whichTime, 1)); + } + + int retries = 0; + OffsetResponse response; + while (true) { + kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( + requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId()); + response = consumer.getOffsetsBefore(request); + + if (response.hasError()) { + StringBuilder exception = new StringBuilder(); + for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) { + short code; + if ((code = response.errorCode(part.getTopic(), part.getPartition())) != ErrorMapping.NoError()) { + exception.append("\nException for topic=").append(part.getTopic()) + .append(" partition=").append(part.getPartition()).append(": ") + .append(StringUtils.stringifyException(ErrorMapping.exceptionFor(code))); + } + } + if (++retries >= 3) { + throw new IOException("Unable to get last offset for partitions " + partitions + ": " + + exception.toString()); + } else { + LOG.warn("Unable to get last offset for partitions: Exception(s): {}", exception); + } + } else { + break; // leave retry loop + } + } + + for (KafkaTopicPartitionState<TopicAndPartition> part: partitions) { + final long offset = response.offsets(part.getTopic(), part.getPartition())[0]; + + // the offset returned is that of the next record to fetch. because our state reflects the latest + // successfully emitted record, we subtract one + part.setOffset(offset - 1); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java new file mode 100644 index 0000000..8f2ef09 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import kafka.utils.ZKGroupTopicDirs; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.kafka.clients.consumer.ConsumerConfig; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Handler for committing Kafka offsets to Zookeeper and to retrieve them again. + */ +public class ZookeeperOffsetHandler { + + private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class); + + private final String groupId; + + private final CuratorFramework curatorClient; + + + public ZookeeperOffsetHandler(Properties props) { + this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG); + if (this.groupId == null) { + throw new IllegalArgumentException("Required property '" + + ConsumerConfig.GROUP_ID_CONFIG + "' has not been set"); + } + + String zkConnect = props.getProperty("zookeeper.connect"); + if (zkConnect == null) { + throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set"); + } + + // we use Curator's default timeouts + int sessionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "60000")); + int connectionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "15000")); + + // undocumented config options allowing users to configure the retry policy. (they are "flink." prefixed as they are no official kafka configs) + int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100")); + int backoffMaxRetries = Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10")); + + RetryPolicy retryPolicy = new ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries); + curatorClient = CuratorFrameworkFactory.newClient(zkConnect, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); + curatorClient.start(); + } + + // ------------------------------------------------------------------------ + // Offset access and manipulation + // ------------------------------------------------------------------------ + + /** + * Commits offsets for Kafka partitions to ZooKeeper. The given offsets to this method should be the offsets of + * the last processed records; this method will take care of incrementing the offsets by 1 before committing them so + * that the committed offsets to Zookeeper represent the next record to process. + * + * @param internalOffsets The internal offsets (representing last processed records) for the partitions to commit. + * @throws Exception The method forwards exceptions. + */ + public void prepareAndCommitOffsets(Map<KafkaTopicPartition, Long> internalOffsets) throws Exception { + for (Map.Entry<KafkaTopicPartition, Long> entry : internalOffsets.entrySet()) { + KafkaTopicPartition tp = entry.getKey(); + + Long lastProcessedOffset = entry.getValue(); + if (lastProcessedOffset != null && lastProcessedOffset >= 0) { + setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), lastProcessedOffset + 1); + } + } + } + + /** + * @param partitions The partitions to read offsets for. + * @return The mapping from partition to offset. + * @throws Exception This method forwards exceptions. + */ + public Map<KafkaTopicPartition, Long> getCommittedOffsets(List<KafkaTopicPartition> partitions) throws Exception { + Map<KafkaTopicPartition, Long> ret = new HashMap<>(partitions.size()); + for (KafkaTopicPartition tp : partitions) { + Long offset = getOffsetFromZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition()); + + if (offset != null) { + LOG.info("Offset for TopicPartition {}:{} was set to {} in ZooKeeper. Seeking fetcher to that position.", + tp.getTopic(), tp.getPartition(), offset); + ret.put(tp, offset); + } + } + return ret; + } + + /** + * Closes the offset handler. + * + * @throws IOException Thrown, if the handler cannot be closed properly. + */ + public void close() throws IOException { + curatorClient.close(); + } + + // ------------------------------------------------------------------------ + // Communication with Zookeeper + // ------------------------------------------------------------------------ + + public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception { + ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic); + String path = topicDirs.consumerOffsetDir() + "/" + partition; + curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient()); + byte[] data = Long.toString(offset).getBytes(); + curatorClient.setData().forPath(path, data); + } + + public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception { + ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic); + String path = topicDirs.consumerOffsetDir() + "/" + partition; + curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient()); + + byte[] data = curatorClient.getData().forPath(path); + + if (data == null) { + return null; + } else { + String asString = new String(data); + if (asString.length() == 0) { + return null; + } else { + try { + return Long.valueOf(asString); + } + catch (NumberFormatException e) { + LOG.error( + "The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}", + groupId, topic, partition, asString); + return null; + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java new file mode 100644 index 0000000..fabb0fe --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.runtime.client.JobCancellationException; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler; +import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class Kafka08ITCase extends KafkaConsumerTestBase { + + // ------------------------------------------------------------------------ + // Suite of Tests + // ------------------------------------------------------------------------ + + @Test(timeout = 60000) + public void testFailOnNoBroker() throws Exception { + runFailOnNoBrokerTest(); + } + + + @Test(timeout = 60000) + public void testConcurrentProducerConsumerTopology() throws Exception { + runSimpleConcurrentProducerConsumerTopology(); + } + +// @Test(timeout = 60000) +// public void testPunctuatedExplicitWMConsumer() throws Exception { +// runExplicitPunctuatedWMgeneratingConsumerTest(false); +// } + +// @Test(timeout = 60000) +// public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception { +// runExplicitPunctuatedWMgeneratingConsumerTest(true); +// } + + @Test(timeout = 60000) + public void testKeyValueSupport() throws Exception { + runKeyValueTest(); + } + + // --- canceling / failures --- + + @Test(timeout = 60000) + public void testCancelingEmptyTopic() throws Exception { + runCancelingOnEmptyInputTest(); + } + + @Test(timeout = 60000) + public void testCancelingFullTopic() throws Exception { + runCancelingOnFullInputTest(); + } + + @Test(timeout = 60000) + public void testFailOnDeploy() throws Exception { + runFailOnDeployTest(); + } + + @Test(timeout = 60000) + public void testInvalidOffset() throws Exception { + + final int parallelism = 1; + + // write 20 messages into topic: + final String topic = writeSequence("invalidOffsetTopic", 20, parallelism, 1); + + // set invalid offset: + CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); + ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"), topic, 0, 1234); + curatorClient.close(); + + // read from topic + final int valuesCount = 20; + final int startFrom = 0; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + + readSequence(env, standardProps, parallelism, topic, valuesCount, startFrom); + + deleteTestTopic(topic); + } + + // --- source to partition mappings and exactly once --- + + @Test(timeout = 60000) + public void testOneToOneSources() throws Exception { + runOneToOneExactlyOnceTest(); + } + + @Test(timeout = 60000) + public void testOneSourceMultiplePartitions() throws Exception { + runOneSourceMultiplePartitionsExactlyOnceTest(); + } + + @Test(timeout = 60000) + public void testMultipleSourcesOnePartition() throws Exception { + runMultipleSourcesOnePartitionExactlyOnceTest(); + } + + // --- broker failure --- + + @Test(timeout = 60000) + public void testBrokerFailure() throws Exception { + runBrokerFailureTest(); + } + + // --- offset committing --- + + @Test(timeout = 60000) + public void testCommitOffsetsToZookeeper() throws Exception { + runCommitOffsetsToKafka(); + } + + @Test(timeout = 60000) + public void testStartFromZookeeperCommitOffsets() throws Exception { + runStartFromKafkaCommitOffsets(); + } + + @Test(timeout = 60000) + public void testAutoOffsetRetrievalAndCommitToZookeeper() throws Exception { + runAutoOffsetRetrievalAndCommitToKafka(); + } + + @Test + public void runOffsetManipulationInZooKeeperTest() { + try { + final String topicName = "ZookeeperOffsetHandlerTest-Topic"; + final String groupId = "ZookeeperOffsetHandlerTest-Group"; + + final Long offset = (long) (Math.random() * Long.MAX_VALUE); + + CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer ).createCuratorClient(); + kafkaServer.createTestTopic(topicName, 3, 2); + + ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorFramework, groupId, topicName, 0, offset); + + Long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, groupId, topicName, 0); + + curatorFramework.close(); + + assertEquals(offset, fetchedOffset); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test(timeout = 60000) + public void testOffsetAutocommitTest() throws Exception { + final int parallelism = 3; + + // write a sequence from 0 to 99 to each of the 3 partitions. + final String topicName = writeSequence("testOffsetAutocommit", 100, parallelism, 1); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + // NOTE: We are not enabling the checkpointing! + env.getConfig().disableSysoutLogging(); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env.setParallelism(parallelism); + + // the readSequence operation sleeps for 20 ms between each record. + // setting a delay of 25*20 = 500 for the commit interval makes + // sure that we commit roughly 3-4 times while reading, however + // at least once. + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.commit.interval.ms", "500"); + + // read so that the offset can be committed to ZK + readSequence(env, readProps, parallelism, topicName, 100, 0); + + // get the offset + CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); + + Long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 0); + Long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 1); + Long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 2); + curatorFramework.close(); + LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3); + + // ensure that the offset has been committed + boolean atLeastOneOffsetSet = (o1 != null && o1 > 0 && o1 <= 100) || + (o2 != null && o2 > 0 && o2 <= 100) || + (o3 != null && o3 > 0 && o3 <= 100); + assertTrue("Expecting at least one offset to be set o1="+o1+" o2="+o2+" o3="+o3, atLeastOneOffsetSet); + + deleteTestTopic(topicName); + } + + // --- special executions --- + + @Test(timeout = 60000) + public void testBigRecordJob() throws Exception { + runBigRecordTestTopology(); + } + + @Test(timeout = 60000) + public void testMultipleTopics() throws Exception { + runProduceConsumeMultipleTopics(); + } + + @Test(timeout = 60000) + public void testAllDeletes() throws Exception { + runAllDeletesTest(); + } + + @Test(timeout=60000) + public void testEndOfStream() throws Exception { + runEndOfStreamTest(); + } + + @Test(timeout = 60000) + public void testMetrics() throws Throwable { + runMetricsTest(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java new file mode 100644 index 0000000..6d0b140 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import java.util.Properties; + +public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase { + + @Override + protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner, + final FlinkKafkaProducerBase<Row> kafkaProducer) { + + return new Kafka08JsonTableSink(topic, properties, partitioner) { + @Override + protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, + SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) { + return kafkaProducer; + } + }; + } + + @Override + @SuppressWarnings("unchecked") + protected SerializationSchema<Row> getSerializationSchema() { + return new JsonRowSerializationSchema(FIELD_NAMES); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java new file mode 100644 index 0000000..a2d66ac --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import java.util.Properties; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; + +public class Kafka08JsonTableSourceTest extends KafkaTableSourceTestBase { + + @Override + protected KafkaTableSource createTableSource(String topic, Properties properties, String[] fieldNames, TypeInformation<?>[] typeInfo) { + return new Kafka08JsonTableSource(topic, properties, fieldNames, typeInfo); + } + + @Override + @SuppressWarnings("unchecked") + protected Class<DeserializationSchema<Row>> getDeserializationSchema() { + return (Class) JsonRowDeserializationSchema.class; + } + + @Override + @SuppressWarnings("unchecked") + protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() { + return (Class) FlinkKafkaConsumer08.class; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java new file mode 100644 index 0000000..5c951db --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + + +import org.junit.Test; + +@SuppressWarnings("serial") +public class Kafka08ProducerITCase extends KafkaProducerTestBase { + + @Test + public void testCustomPartitioning() { + runCustomPartitioningTest(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java new file mode 100644 index 0000000..9520f55 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.Collections; +import java.util.Properties; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.junit.Test; + +public class KafkaConsumer08Test { + + @Test + public void testValidateZooKeeperConfig() { + try { + // empty + Properties emptyProperties = new Properties(); + try { + FlinkKafkaConsumer08.validateZooKeeperConfig(emptyProperties); + fail("should fail with an exception"); + } + catch (IllegalArgumentException e) { + // expected + } + + // no connect string (only group string) + Properties noConnect = new Properties(); + noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-test-group"); + try { + FlinkKafkaConsumer08.validateZooKeeperConfig(noConnect); + fail("should fail with an exception"); + } + catch (IllegalArgumentException e) { + // expected + } + + // no group string (only connect string) + Properties noGroup = new Properties(); + noGroup.put("zookeeper.connect", "localhost:47574"); + try { + FlinkKafkaConsumer08.validateZooKeeperConfig(noGroup); + fail("should fail with an exception"); + } + catch (IllegalArgumentException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCreateSourceWithoutCluster() { + try { + Properties props = new Properties(); + props.setProperty("zookeeper.connect", "localhost:56794"); + props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222"); + props.setProperty("group.id", "non-existent-group"); + + FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), new SimpleStringSchema(), props); + consumer.open(new Configuration()); + fail(); + } + catch (Exception e) { + assertTrue(e.getMessage().contains("Unable to retrieve any partitions")); + } + } + + @Test + public void testAllBoostrapServerHostsAreInvalid() { + try { + String zookeeperConnect = "localhost:56794"; + String bootstrapServers = "indexistentHost:11111"; + String groupId = "non-existent-group"; + Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId); + FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), + new SimpleStringSchema(), props); + consumer.open(new Configuration()); + fail(); + } catch (Exception e) { + assertTrue("Exception should be thrown containing 'all bootstrap servers invalid' message!", + e.getMessage().contains("All the servers provided in: '" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + + "' config are invalid")); + } + } + + @Test + public void testAtLeastOneBootstrapServerHostIsValid() { + try { + String zookeeperConnect = "localhost:56794"; + // we declare one valid boostrap server, namely the one with + // 'localhost' + String bootstrapServers = "indexistentHost:11111, localhost:22222"; + String groupId = "non-existent-group"; + Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId); + FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), + new SimpleStringSchema(), props); + consumer.open(new Configuration()); + fail(); + } catch (Exception e) { + // test is not failing because we have one valid boostrap server + assertTrue("The cause of the exception should not be 'all boostrap server are invalid'!", + !e.getMessage().contains("All the hosts provided in: " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + + " config are invalid")); + } + } + + private Properties createKafkaProps(String zookeeperConnect, String bootstrapServers, String groupId) { + Properties props = new Properties(); + props.setProperty("zookeeper.connect", zookeeperConnect); + props.setProperty("bootstrap.servers", bootstrapServers); + props.setProperty("group.id", groupId); + return props; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java new file mode 100644 index 0000000..72d2772 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import kafka.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaLocalSystemTime implements Time { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaLocalSystemTime.class); + + @Override + public long milliseconds() { + return System.currentTimeMillis(); + } + + @Override + public long nanoseconds() { + return System.nanoTime(); + } + + @Override + public void sleep(long ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + LOG.warn("Interruption", e); + } + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java new file mode 100644 index 0000000..91fc286 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.apache.flink.util.TestLogger; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.PartitionInfo; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Collections; +import java.util.concurrent.Future; + + +import static org.mockito.Mockito.*; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +import static org.junit.Assert.*; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(FlinkKafkaProducerBase.class) +public class KafkaProducerTest extends TestLogger { + + @Test + @SuppressWarnings("unchecked") + public void testPropagateExceptions() { + try { + // mock kafka producer + KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class); + + // partition setup + when(kafkaProducerMock.partitionsFor(anyString())).thenReturn( + // returning a unmodifiable list to mimic KafkaProducer#partitionsFor() behaviour + Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null))); + + // failure when trying to send an element + when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class))) + .thenAnswer(new Answer<Future<RecordMetadata>>() { + @Override + public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable { + Callback callback = (Callback) invocation.getArguments()[1]; + callback.onCompletion(null, new Exception("Test error")); + return null; + } + }); + + // make sure the FlinkKafkaProducer instantiates our mock producer + whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock); + + // (1) producer that propagates errors + + FlinkKafkaProducer08<String> producerPropagating = new FlinkKafkaProducer08<>( + "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producerPropagating)); + + testHarness.open(); + + try { + testHarness.processElement(new StreamRecord<>("value")); + testHarness.processElement(new StreamRecord<>("value")); + fail("This should fail with an exception"); + } + catch (Exception e) { + assertNotNull(e.getCause()); + assertNotNull(e.getCause().getMessage()); + assertTrue(e.getCause().getMessage().contains("Test error")); + } + + testHarness.close(); + + // (2) producer that only logs errors + + FlinkKafkaProducer08<String> producerLogging = new FlinkKafkaProducer08<>( + "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null); + producerLogging.setLogFailuresOnly(true); + + testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging)); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>("value")); + testHarness.processElement(new StreamRecord<>("value")); + + testHarness.close(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java new file mode 100644 index 0000000..c28799c --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.junit.Test; + +@SuppressWarnings("serial") +public class KafkaShortRetention08ITCase extends KafkaShortRetentionTestBase { + + @Test(timeout=60000) + public void testAutoOffsetReset() throws Exception { + runAutoOffsetResetTest(); + } + + @Test(timeout=60000) + public void testAutoOffsetResetNone() throws Exception { + runFailOnAutoOffsetResetNoneEager(); + } +}
