http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java deleted file mode 100644 index d015157..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java +++ /dev/null @@ -1,481 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -import kafka.common.TopicAndPartition; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.metrics.MetricGroup; -import org.apache.kafka.common.Node; - -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; -import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.SerializedValue; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * A fetcher that fetches data from Kafka brokers via the Kafka 0.8 low-level consumer API. - * The fetcher also handles the explicit communication with ZooKeeper to fetch initial offsets - * and to write offsets to ZooKeeper. - * - * @param <T> The type of elements produced by the fetcher. - */ -public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> { - - static final KafkaTopicPartitionState<TopicAndPartition> MARKER = - new KafkaTopicPartitionState<>(new KafkaTopicPartition("n/a", -1), new TopicAndPartition("n/a", -1)); - - private static final Logger LOG = LoggerFactory.getLogger(Kafka08Fetcher.class); - - // ------------------------------------------------------------------------ - - /** The schema to convert between Kafka's byte messages, and Flink's objects */ - private final KeyedDeserializationSchema<T> deserializer; - - /** The properties that configure the Kafka connection */ - private final Properties kafkaConfig; - - /** The subtask's runtime context */ - private final RuntimeContext runtimeContext; - - /** The queue of partitions that are currently not assigned to a broker connection */ - private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitionsQueue; - - /** The behavior to use in case that an offset is not valid (any more) for a partition */ - private final long invalidOffsetBehavior; - - /** The interval in which to automatically commit (-1 if deactivated) */ - private final long autoCommitInterval; - - /** The handler that reads/writes offsets from/to ZooKeeper */ - private volatile ZookeeperOffsetHandler zookeeperOffsetHandler; - - /** Flag to track the main work loop as alive */ - private volatile boolean running = true; - - - public Kafka08Fetcher( - SourceContext<T> sourceContext, - List<KafkaTopicPartition> assignedPartitions, - SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, - SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, - StreamingRuntimeContext runtimeContext, - KeyedDeserializationSchema<T> deserializer, - Properties kafkaProperties, - long invalidOffsetBehavior, - long autoCommitInterval, - boolean useMetrics) throws Exception - { - super( - sourceContext, - assignedPartitions, - watermarksPeriodic, - watermarksPunctuated, - runtimeContext.getProcessingTimeService(), - runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), - runtimeContext.getUserCodeClassLoader(), - useMetrics); - - this.deserializer = checkNotNull(deserializer); - this.kafkaConfig = checkNotNull(kafkaProperties); - this.runtimeContext = runtimeContext; - this.invalidOffsetBehavior = invalidOffsetBehavior; - this.autoCommitInterval = autoCommitInterval; - this.unassignedPartitionsQueue = new ClosableBlockingQueue<>(); - - // initially, all these partitions are not assigned to a specific broker connection - for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { - unassignedPartitionsQueue.add(partition); - } - } - - // ------------------------------------------------------------------------ - // Main Work Loop - // ------------------------------------------------------------------------ - - @Override - public void runFetchLoop() throws Exception { - // the map from broker to the thread that is connected to that broker - final Map<Node, SimpleConsumerThread<T>> brokerToThread = new HashMap<>(); - - // this holds possible the exceptions from the concurrent broker connection threads - final ExceptionProxy errorHandler = new ExceptionProxy(Thread.currentThread()); - - // the offset handler handles the communication with ZooKeeper, to commit externally visible offsets - final ZookeeperOffsetHandler zookeeperOffsetHandler = new ZookeeperOffsetHandler(kafkaConfig); - this.zookeeperOffsetHandler = zookeeperOffsetHandler; - - PeriodicOffsetCommitter periodicCommitter = null; - try { - // read offsets from ZooKeeper for partitions that did not restore offsets - { - List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>(); - for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { - if (!partition.isOffsetDefined()) { - partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); - } - } - - Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset); - for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { - Long zkOffset = zkOffsets.get(partition.getKafkaTopicPartition()); - if (zkOffset != null) { - // the offset in ZK represents the "next record to process", so we need to subtract it by 1 - // to correctly represent our internally checkpointed offsets - partition.setOffset(zkOffset - 1); - } - } - } - - // start the periodic offset committer thread, if necessary - if (autoCommitInterval > 0) { - LOG.info("Starting periodic offset committer, with commit interval of {}ms", autoCommitInterval); - - periodicCommitter = new PeriodicOffsetCommitter(zookeeperOffsetHandler, - subscribedPartitions(), errorHandler, autoCommitInterval); - periodicCommitter.setName("Periodic Kafka partition offset committer"); - periodicCommitter.setDaemon(true); - periodicCommitter.start(); - } - - // register offset metrics - if (useMetrics) { - final MetricGroup kafkaMetricGroup = runtimeContext.getMetricGroup().addGroup("KafkaConsumer"); - addOffsetStateGauge(kafkaMetricGroup); - } - - // Main loop polling elements from the unassignedPartitions queue to the threads - while (running) { - // re-throw any exception from the concurrent fetcher threads - errorHandler.checkAndThrowException(); - - // wait for max 5 seconds trying to get partitions to assign - // if threads shut down, this poll returns earlier, because the threads inject the - // special marker into the queue - List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToAssign = - unassignedPartitionsQueue.getBatchBlocking(5000); - partitionsToAssign.remove(MARKER); - - if (!partitionsToAssign.isEmpty()) { - LOG.info("Assigning {} partitions to broker threads", partitionsToAssign.size()); - Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeaders = - findLeaderForPartitions(partitionsToAssign, kafkaConfig); - - // assign the partitions to the leaders (maybe start the threads) - for (Map.Entry<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeader : - partitionsWithLeaders.entrySet()) - { - final Node leader = partitionsWithLeader.getKey(); - final List<KafkaTopicPartitionState<TopicAndPartition>> partitions = partitionsWithLeader.getValue(); - SimpleConsumerThread<T> brokerThread = brokerToThread.get(leader); - - if (!running) { - break; - } - - if (brokerThread == null || !brokerThread.getNewPartitionsQueue().isOpen()) { - // start new thread - brokerThread = createAndStartSimpleConsumerThread(partitions, leader, errorHandler); - brokerToThread.put(leader, brokerThread); - } - else { - // put elements into queue of thread - ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> newPartitionsQueue = - brokerThread.getNewPartitionsQueue(); - - for (KafkaTopicPartitionState<TopicAndPartition> fp : partitions) { - if (!newPartitionsQueue.addIfOpen(fp)) { - // we were unable to add the partition to the broker's queue - // the broker has closed in the meantime (the thread will shut down) - // create a new thread for connecting to this broker - List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions = new ArrayList<>(); - seedPartitions.add(fp); - brokerThread = createAndStartSimpleConsumerThread(seedPartitions, leader, errorHandler); - brokerToThread.put(leader, brokerThread); - newPartitionsQueue = brokerThread.getNewPartitionsQueue(); // update queue for the subsequent partitions - } - } - } - } - } - else { - // there were no partitions to assign. Check if any broker threads shut down. - // we get into this section of the code, if either the poll timed out, or the - // blocking poll was woken up by the marker element - Iterator<SimpleConsumerThread<T>> bttIterator = brokerToThread.values().iterator(); - while (bttIterator.hasNext()) { - SimpleConsumerThread<T> thread = bttIterator.next(); - if (!thread.getNewPartitionsQueue().isOpen()) { - LOG.info("Removing stopped consumer thread {}", thread.getName()); - bttIterator.remove(); - } - } - } - - if (brokerToThread.size() == 0 && unassignedPartitionsQueue.isEmpty()) { - if (unassignedPartitionsQueue.close()) { - LOG.info("All consumer threads are finished, there are no more unassigned partitions. Stopping fetcher"); - break; - } - // we end up here if somebody added something to the queue in the meantime --> continue to poll queue again - } - } - } - catch (InterruptedException e) { - // this may be thrown because an exception on one of the concurrent fetcher threads - // woke this thread up. make sure we throw the root exception instead in that case - errorHandler.checkAndThrowException(); - - // no other root exception, throw the interrupted exception - throw e; - } - finally { - this.running = false; - this.zookeeperOffsetHandler = null; - - // if we run a periodic committer thread, shut that down - if (periodicCommitter != null) { - periodicCommitter.shutdown(); - } - - // clear the interruption flag - // this allows the joining on consumer threads (on best effort) to happen in - // case the initial interrupt already - Thread.interrupted(); - - // make sure that in any case (completion, abort, error), all spawned threads are stopped - try { - int runningThreads; - do { - // check whether threads are alive and cancel them - runningThreads = 0; - Iterator<SimpleConsumerThread<T>> threads = brokerToThread.values().iterator(); - while (threads.hasNext()) { - SimpleConsumerThread<?> t = threads.next(); - if (t.isAlive()) { - t.cancel(); - runningThreads++; - } else { - threads.remove(); - } - } - - // wait for the threads to finish, before issuing a cancel call again - if (runningThreads > 0) { - for (SimpleConsumerThread<?> t : brokerToThread.values()) { - t.join(500 / runningThreads + 1); - } - } - } - while (runningThreads > 0); - } - catch (InterruptedException ignored) { - // waiting for the thread shutdown apparently got interrupted - // restore interrupted state and continue - Thread.currentThread().interrupt(); - } - catch (Throwable t) { - // we catch all here to preserve the original exception - LOG.error("Exception while shutting down consumer threads", t); - } - - try { - zookeeperOffsetHandler.close(); - } - catch (Throwable t) { - // we catch all here to preserve the original exception - LOG.error("Exception while shutting down ZookeeperOffsetHandler", t); - } - } - } - - @Override - public void cancel() { - // signal the main thread to exit - this.running = false; - - // make sure the main thread wakes up soon - this.unassignedPartitionsQueue.addIfOpen(MARKER); - } - - // ------------------------------------------------------------------------ - // Kafka 0.8 specific class instantiation - // ------------------------------------------------------------------------ - - @Override - public TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition partition) { - return new TopicAndPartition(partition.getTopic(), partition.getPartition()); - } - - // ------------------------------------------------------------------------ - // Offset handling - // ------------------------------------------------------------------------ - - @Override - public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception { - ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler; - if (zkHandler != null) { - // the ZK handler takes care of incrementing the offsets by 1 before committing - zkHandler.prepareAndCommitOffsets(offsets); - } - - // Set committed offsets in topic partition state - KafkaTopicPartitionState<TopicAndPartition>[] partitions = subscribedPartitions(); - for (KafkaTopicPartitionState<TopicAndPartition> partition : partitions) { - Long offset = offsets.get(partition.getKafkaTopicPartition()); - if (offset != null) { - partition.setCommittedOffset(offset); - } - } - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - private SimpleConsumerThread<T> createAndStartSimpleConsumerThread( - List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions, - Node leader, - ExceptionProxy errorHandler) throws IOException, ClassNotFoundException - { - // each thread needs its own copy of the deserializer, because the deserializer is - // not necessarily thread safe - final KeyedDeserializationSchema<T> clonedDeserializer = - InstantiationUtil.clone(deserializer, runtimeContext.getUserCodeClassLoader()); - - // seed thread with list of fetch partitions (otherwise it would shut down immediately again - SimpleConsumerThread<T> brokerThread = new SimpleConsumerThread<>( - this, errorHandler, kafkaConfig, leader, seedPartitions, unassignedPartitionsQueue, - clonedDeserializer, invalidOffsetBehavior); - - brokerThread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)", - runtimeContext.getTaskName(), leader.id(), leader.host(), leader.port())); - brokerThread.setDaemon(true); - brokerThread.start(); - - LOG.info("Starting thread {}", brokerThread.getName()); - return brokerThread; - } - - /** - * Returns a list of unique topics from for the given partitions - * - * @param partitions A the partitions - * @return A list of unique topics - */ - private static List<String> getTopics(List<KafkaTopicPartitionState<TopicAndPartition>> partitions) { - HashSet<String> uniqueTopics = new HashSet<>(); - for (KafkaTopicPartitionState<TopicAndPartition> fp: partitions) { - uniqueTopics.add(fp.getTopic()); - } - return new ArrayList<>(uniqueTopics); - } - - /** - * Find leaders for the partitions - * - * From a high level, the method does the following: - * - Get a list of FetchPartitions (usually only a few partitions) - * - Get the list of topics from the FetchPartitions list and request the partitions for the topics. (Kafka doesn't support getting leaders for a set of partitions) - * - Build a Map<Leader, List<FetchPartition>> where only the requested partitions are contained. - * - * @param partitionsToAssign fetch partitions list - * @return leader to partitions map - */ - private static Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> findLeaderForPartitions( - List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToAssign, - Properties kafkaProperties) throws Exception - { - if (partitionsToAssign.isEmpty()) { - throw new IllegalArgumentException("Leader request for empty partitions list"); - } - - LOG.info("Refreshing leader information for partitions {}", partitionsToAssign); - - // this request is based on the topic names - PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(getTopics(partitionsToAssign), kafkaProperties); - infoFetcher.start(); - - // NOTE: The kafka client apparently locks itself up sometimes - // when it is interrupted, so we run it only in a separate thread. - // since it sometimes refuses to shut down, we resort to the admittedly harsh - // means of killing the thread after a timeout. - KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000); - watchDog.start(); - - // this list contains ALL partitions of the requested topics - List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = infoFetcher.getPartitions(); - - // copy list to track unassigned partitions - List<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions = new ArrayList<>(partitionsToAssign); - - // final mapping from leader -> list(fetchPartition) - Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> leaderToPartitions = new HashMap<>(); - - for(KafkaTopicPartitionLeader partitionLeader: topicPartitionWithLeaderList) { - if (unassignedPartitions.size() == 0) { - // we are done: all partitions are assigned - break; - } - - Iterator<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitionsIterator = unassignedPartitions.iterator(); - while (unassignedPartitionsIterator.hasNext()) { - KafkaTopicPartitionState<TopicAndPartition> unassignedPartition = unassignedPartitionsIterator.next(); - - if (unassignedPartition.getKafkaTopicPartition().equals(partitionLeader.getTopicPartition())) { - // we found the leader for one of the fetch partitions - Node leader = partitionLeader.getLeader(); - - List<KafkaTopicPartitionState<TopicAndPartition>> partitionsOfLeader = leaderToPartitions.get(leader); - if (partitionsOfLeader == null) { - partitionsOfLeader = new ArrayList<>(); - leaderToPartitions.put(leader, partitionsOfLeader); - } - partitionsOfLeader.add(unassignedPartition); - unassignedPartitionsIterator.remove(); // partition has been assigned - break; - } - } - } - - if (unassignedPartitions.size() > 0) { - throw new RuntimeException("Unable to find a leader for partitions: " + unassignedPartitions); - } - - LOG.debug("Partitions with assigned leaders {}", leaderToPartitions); - - return leaderToPartitions; - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java deleted file mode 100644 index 4d61e53..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -/** - * A watch dog thread that forcibly kills another thread, if that thread does not - * finish in time. - * - * <p>This uses the discouraged {@link Thread#stop()} method. While this is not - * advisable, this watch dog is only for extreme cases of thread that simply - * to not terminate otherwise. - */ -class KillerWatchDog extends Thread { - - private final Thread toKill; - private final long timeout; - - KillerWatchDog(Thread toKill, long timeout) { - super("KillerWatchDog"); - setDaemon(true); - - this.toKill = toKill; - this.timeout = timeout; - } - - @SuppressWarnings("deprecation") - @Override - public void run() { - final long deadline = System.currentTimeMillis() + timeout; - long now; - - while (toKill.isAlive() && (now = System.currentTimeMillis()) < deadline) { - try { - toKill.join(deadline - now); - } - catch (InterruptedException e) { - // ignore here, our job is important! - } - } - - // this is harsh, but this watchdog is a last resort - if (toKill.isAlive()) { - toKill.stop(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java deleted file mode 100644 index d8d927d..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionInfoFetcher.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; - -import java.util.List; -import java.util.Properties; - -class PartitionInfoFetcher extends Thread { - - private final List<String> topics; - private final Properties properties; - - private volatile List<KafkaTopicPartitionLeader> result; - private volatile Throwable error; - - - PartitionInfoFetcher(List<String> topics, Properties properties) { - this.topics = topics; - this.properties = properties; - } - - @Override - public void run() { - try { - result = FlinkKafkaConsumer08.getPartitionsForTopic(topics, properties); - } - catch (Throwable t) { - this.error = t; - } - } - - public List<KafkaTopicPartitionLeader> getPartitions() throws Exception { - try { - this.join(); - } - catch (InterruptedException e) { - throw new Exception("Partition fetching was cancelled before completion"); - } - - if (error != null) { - throw new Exception("Failed to fetch partitions for topics " + topics.toString(), error); - } - if (result != null) { - return result; - } - throw new Exception("Partition fetching failed"); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java deleted file mode 100644 index 27d90f2..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -import java.util.HashMap; - -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * A thread that periodically writes the current Kafka partition offsets to Zookeeper. - */ -public class PeriodicOffsetCommitter extends Thread { - - /** The ZooKeeper handler */ - private final ZookeeperOffsetHandler offsetHandler; - - private final KafkaTopicPartitionState<?>[] partitionStates; - - /** The proxy to forward exceptions to the main thread */ - private final ExceptionProxy errorHandler; - - /** Interval in which to commit, in milliseconds */ - private final long commitInterval; - - /** Flag to mark the periodic committer as running */ - private volatile boolean running = true; - - PeriodicOffsetCommitter(ZookeeperOffsetHandler offsetHandler, - KafkaTopicPartitionState<?>[] partitionStates, - ExceptionProxy errorHandler, - long commitInterval) - { - this.offsetHandler = checkNotNull(offsetHandler); - this.partitionStates = checkNotNull(partitionStates); - this.errorHandler = checkNotNull(errorHandler); - this.commitInterval = commitInterval; - - checkArgument(commitInterval > 0); - } - - @Override - public void run() { - try { - while (running) { - Thread.sleep(commitInterval); - - // create copy a deep copy of the current offsets - HashMap<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>(partitionStates.length); - for (KafkaTopicPartitionState<?> partitionState : partitionStates) { - offsetsToCommit.put(partitionState.getKafkaTopicPartition(), partitionState.getOffset()); - } - - offsetHandler.prepareAndCommitOffsets(offsetsToCommit); - } - } - catch (Throwable t) { - if (running) { - errorHandler.reportError( - new Exception("The periodic offset committer encountered an error: " + t.getMessage(), t)); - } - } - } - - public void shutdown() { - this.running = false; - this.interrupt(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java deleted file mode 100644 index 35e491a..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java +++ /dev/null @@ -1,504 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -import kafka.api.FetchRequestBuilder; -import kafka.api.PartitionOffsetRequestInfo; -import kafka.common.ErrorMapping; -import kafka.common.TopicAndPartition; -import kafka.javaapi.FetchResponse; -import kafka.javaapi.OffsetResponse; -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.message.MessageAndOffset; - -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.util.StringUtils; - -import org.apache.kafka.common.Node; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ClosedChannelException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import static java.util.Objects.requireNonNull; -import static org.apache.flink.util.PropertiesUtil.getInt; - -/** - * This class implements a thread with a connection to a single Kafka broker. The thread - * pulls records for a set of topic partitions for which the connected broker is currently - * the leader. The thread deserializes these records and emits them. - * - * @param <T> The type of elements that this consumer thread creates from Kafka's byte messages - * and emits into the Flink DataStream. - */ -class SimpleConsumerThread<T> extends Thread { - - private static final Logger LOG = LoggerFactory.getLogger(SimpleConsumerThread.class); - - private static final KafkaTopicPartitionState<TopicAndPartition> MARKER = Kafka08Fetcher.MARKER; - - // ------------------------------------------------------------------------ - - private final Kafka08Fetcher<T> owner; - - private final KeyedDeserializationSchema<T> deserializer; - - private final List<KafkaTopicPartitionState<TopicAndPartition>> partitions; - - private final Node broker; - - /** Queue containing new fetch partitions for the consumer thread */ - private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> newPartitionsQueue; - - private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions; - - private final ExceptionProxy errorHandler; - - private final long invalidOffsetBehavior; - - private volatile boolean running = true; - - - // ----------------- Simple Consumer ---------------------- - private volatile SimpleConsumer consumer; - - private final int soTimeout; - private final int minBytes; - private final int maxWait; - private final int fetchSize; - private final int bufferSize; - private final int reconnectLimit; - - - // exceptions are thrown locally - public SimpleConsumerThread( - Kafka08Fetcher<T> owner, - ExceptionProxy errorHandler, - Properties config, - Node broker, - List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions, - ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions, - KeyedDeserializationSchema<T> deserializer, - long invalidOffsetBehavior) - { - this.owner = owner; - this.errorHandler = errorHandler; - this.broker = broker; - this.partitions = seedPartitions; - this.deserializer = requireNonNull(deserializer); - this.unassignedPartitions = requireNonNull(unassignedPartitions); - this.newPartitionsQueue = new ClosableBlockingQueue<>(); - this.invalidOffsetBehavior = invalidOffsetBehavior; - - // these are the actual configuration values of Kafka + their original default values. - this.soTimeout = getInt(config, "socket.timeout.ms", 30000); - this.minBytes = getInt(config, "fetch.min.bytes", 1); - this.maxWait = getInt(config, "fetch.wait.max.ms", 100); - this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576); - this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536); - this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3); - } - - public ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> getNewPartitionsQueue() { - return newPartitionsQueue; - } - - // ------------------------------------------------------------------------ - // main work loop - // ------------------------------------------------------------------------ - - @Override - public void run() { - LOG.info("Starting to fetch from {}", this.partitions); - - // set up the config values - final String clientId = "flink-kafka-consumer-legacy-" + broker.id(); - - try { - // create the Kafka consumer that we actually use for fetching - consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId); - - // make sure that all partitions have some offsets to start with - // those partitions that do not have an offset from a checkpoint need to get - // their start offset from ZooKeeper - getMissingOffsetsFromKafka(partitions); - - // Now, the actual work starts :-) - int offsetOutOfRangeCount = 0; - int reconnects = 0; - while (running) { - - // ----------------------------------- partitions list maintenance ---------------------------- - - // check queue for new partitions to read from: - List<KafkaTopicPartitionState<TopicAndPartition>> newPartitions = newPartitionsQueue.pollBatch(); - if (newPartitions != null) { - // found some new partitions for this thread's broker - - // check if the new partitions need an offset lookup - getMissingOffsetsFromKafka(newPartitions); - - // add the new partitions (and check they are not already in there) - for (KafkaTopicPartitionState<TopicAndPartition> newPartition: newPartitions) { - if (partitions.contains(newPartition)) { - throw new IllegalStateException("Adding partition " + newPartition + - " to subscribed partitions even though it is already subscribed"); - } - partitions.add(newPartition); - } - - LOG.info("Adding {} new partitions to consumer thread {}", newPartitions.size(), getName()); - LOG.debug("Partitions list: {}", newPartitions); - } - - if (partitions.size() == 0) { - if (newPartitionsQueue.close()) { - // close succeeded. Closing thread - running = false; - - LOG.info("Consumer thread {} does not have any partitions assigned anymore. Stopping thread.", - getName()); - - // add the wake-up marker into the queue to make the main thread - // immediately wake up and termination faster - unassignedPartitions.add(MARKER); - - break; - } else { - // close failed: fetcher main thread concurrently added new partitions into the queue. - // go to top of loop again and get the new partitions - continue; - } - } - - // ----------------------------------- request / response with kafka ---------------------------- - - FetchRequestBuilder frb = new FetchRequestBuilder(); - frb.clientId(clientId); - frb.maxWait(maxWait); - frb.minBytes(minBytes); - - for (KafkaTopicPartitionState<?> partition : partitions) { - frb.addFetch( - partition.getKafkaTopicPartition().getTopic(), - partition.getKafkaTopicPartition().getPartition(), - partition.getOffset() + 1, // request the next record - fetchSize); - } - - kafka.api.FetchRequest fetchRequest = frb.build(); - LOG.debug("Issuing fetch request {}", fetchRequest); - - FetchResponse fetchResponse; - try { - fetchResponse = consumer.fetch(fetchRequest); - } - catch (Throwable cce) { - //noinspection ConstantConditions - if (cce instanceof ClosedChannelException) { - LOG.warn("Fetch failed because of ClosedChannelException."); - LOG.debug("Full exception", cce); - - // we don't know if the broker is overloaded or unavailable. - // retry a few times, then return ALL partitions for new leader lookup - if (++reconnects >= reconnectLimit) { - LOG.warn("Unable to reach broker after {} retries. Returning all current partitions", reconnectLimit); - for (KafkaTopicPartitionState<TopicAndPartition> fp: this.partitions) { - unassignedPartitions.add(fp); - } - this.partitions.clear(); - continue; // jump to top of loop: will close thread or subscribe to new partitions - } - try { - consumer.close(); - } catch (Throwable t) { - LOG.warn("Error while closing consumer connection", t); - } - // delay & retry - Thread.sleep(100); - consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId); - continue; // retry - } else { - throw cce; - } - } - reconnects = 0; - - // ---------------------------------------- error handling ---------------------------- - - if (fetchResponse == null) { - throw new IOException("Fetch from Kafka failed (request returned null)"); - } - - if (fetchResponse.hasError()) { - String exception = ""; - List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>(); - - // iterate over partitions to get individual error codes - Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = partitions.iterator(); - boolean partitionsRemoved = false; - - while (partitionsIterator.hasNext()) { - final KafkaTopicPartitionState<TopicAndPartition> fp = partitionsIterator.next(); - short code = fetchResponse.errorCode(fp.getTopic(), fp.getPartition()); - - if (code == ErrorMapping.OffsetOutOfRangeCode()) { - // we were asked to read from an out-of-range-offset (maybe set wrong in Zookeeper) - // Kafka's high level consumer is resetting the offset according to 'auto.offset.reset' - partitionsToGetOffsetsFor.add(fp); - } - else if (code == ErrorMapping.NotLeaderForPartitionCode() || - code == ErrorMapping.LeaderNotAvailableCode() || - code == ErrorMapping.BrokerNotAvailableCode() || - code == ErrorMapping.UnknownCode()) - { - // the broker we are connected to is not the leader for the partition. - LOG.warn("{} is not the leader of {}. Reassigning leader for partition", broker, fp); - LOG.debug("Error code = {}", code); - - unassignedPartitions.add(fp); - - partitionsIterator.remove(); // unsubscribe the partition ourselves - partitionsRemoved = true; - } - else if (code != ErrorMapping.NoError()) { - exception += "\nException for " + fp.getTopic() +":"+ fp.getPartition() + ": " + - StringUtils.stringifyException(ErrorMapping.exceptionFor(code)); - } - } - if (partitionsToGetOffsetsFor.size() > 0) { - // safeguard against an infinite loop. - if (offsetOutOfRangeCount++ > 3) { - throw new RuntimeException("Found invalid offsets more than three times in partitions " - + partitionsToGetOffsetsFor + " Exceptions: " + exception); - } - // get valid offsets for these partitions and try again. - LOG.warn("The following partitions had an invalid offset: {}", partitionsToGetOffsetsFor); - getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior); - - LOG.warn("The new partition offsets are {}", partitionsToGetOffsetsFor); - continue; // jump back to create a new fetch request. The offset has not been touched. - } - else if (partitionsRemoved) { - continue; // create new fetch request - } - else { - // partitions failed on an error - throw new IOException("Error while fetching from broker '" + broker +"': " + exception); - } - } else { - // successful fetch, reset offsetOutOfRangeCount. - offsetOutOfRangeCount = 0; - } - - // ----------------------------------- process fetch response ---------------------------- - - int messagesInFetch = 0; - int deletedMessages = 0; - Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = partitions.iterator(); - - partitionsLoop: - while (partitionsIterator.hasNext()) { - final KafkaTopicPartitionState<TopicAndPartition> currentPartition = partitionsIterator.next(); - - final ByteBufferMessageSet messageSet = fetchResponse.messageSet( - currentPartition.getTopic(), currentPartition.getPartition()); - - for (MessageAndOffset msg : messageSet) { - if (running) { - messagesInFetch++; - final ByteBuffer payload = msg.message().payload(); - final long offset = msg.offset(); - - if (offset <= currentPartition.getOffset()) { - // we have seen this message already - LOG.info("Skipping message with offset " + msg.offset() - + " because we have seen messages until (including) " - + currentPartition.getOffset() - + " from topic/partition " + currentPartition.getTopic() + '/' - + currentPartition.getPartition() + " already"); - continue; - } - - // If the message value is null, this represents a delete command for the message key. - // Log this and pass it on to the client who might want to also receive delete messages. - byte[] valueBytes; - if (payload == null) { - deletedMessages++; - valueBytes = null; - } else { - valueBytes = new byte[payload.remaining()]; - payload.get(valueBytes); - } - - // put key into byte array - byte[] keyBytes = null; - int keySize = msg.message().keySize(); - - if (keySize >= 0) { // message().hasKey() is doing the same. We save one int deserialization - ByteBuffer keyPayload = msg.message().key(); - keyBytes = new byte[keySize]; - keyPayload.get(keyBytes); - } - - final T value = deserializer.deserialize(keyBytes, valueBytes, - currentPartition.getTopic(), currentPartition.getPartition(), offset); - - if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } - - owner.emitRecord(value, currentPartition, offset); - } - else { - // no longer running - return; - } - } - } - LOG.debug("This fetch contained {} messages ({} deleted messages)", messagesInFetch, deletedMessages); - } // end of fetch loop - - if (!newPartitionsQueue.close()) { - throw new Exception("Bug: Cleanly leaving fetcher thread without having a closed queue."); - } - } - catch (Throwable t) { - // report to the fetcher's error handler - errorHandler.reportError(t); - } - finally { - if (consumer != null) { - // closing the consumer should not fail the program - try { - consumer.close(); - } - catch (Throwable t) { - LOG.error("Error while closing the Kafka simple consumer", t); - } - } - } - } - - private void getMissingOffsetsFromKafka( - List<KafkaTopicPartitionState<TopicAndPartition>> partitions) throws IOException - { - // collect which partitions we should fetch offsets for - List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>(); - for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) { - if (!part.isOffsetDefined()) { - // retrieve the offset from the consumer - partitionsToGetOffsetsFor.add(part); - } - } - - if (partitionsToGetOffsetsFor.size() > 0) { - getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, invalidOffsetBehavior); - - LOG.info("No checkpoint/savepoint offsets found for some partitions. " + - "Fetched the following start offsets {}", partitionsToGetOffsetsFor); - } - } - - /** - * Cancels this fetch thread. The thread will release all resources and terminate. - */ - public void cancel() { - this.running = false; - - // interrupt whatever the consumer is doing - if (consumer != null) { - consumer.close(); - } - - this.interrupt(); - } - - // ------------------------------------------------------------------------ - // Kafka Request Utils - // ------------------------------------------------------------------------ - - /** - * Request latest offsets for a set of partitions, via a Kafka consumer. - * - * <p>This method retries three times if the response has an error. - * - * @param consumer The consumer connected to lead broker - * @param partitions The list of partitions we need offsets for - * @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest) - */ - private static void getLastOffsetFromKafka( - SimpleConsumer consumer, - List<KafkaTopicPartitionState<TopicAndPartition>> partitions, - long whichTime) throws IOException - { - Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>(); - for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) { - requestInfo.put(part.getKafkaPartitionHandle(), new PartitionOffsetRequestInfo(whichTime, 1)); - } - - int retries = 0; - OffsetResponse response; - while (true) { - kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( - requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId()); - response = consumer.getOffsetsBefore(request); - - if (response.hasError()) { - StringBuilder exception = new StringBuilder(); - for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) { - short code; - if ((code = response.errorCode(part.getTopic(), part.getPartition())) != ErrorMapping.NoError()) { - exception.append("\nException for topic=").append(part.getTopic()) - .append(" partition=").append(part.getPartition()).append(": ") - .append(StringUtils.stringifyException(ErrorMapping.exceptionFor(code))); - } - } - if (++retries >= 3) { - throw new IOException("Unable to get last offset for partitions " + partitions + ": " - + exception.toString()); - } else { - LOG.warn("Unable to get last offset for partitions: Exception(s): {}", exception); - } - } else { - break; // leave retry loop - } - } - - for (KafkaTopicPartitionState<TopicAndPartition> part: partitions) { - final long offset = response.offsets(part.getTopic(), part.getPartition())[0]; - - // the offset returned is that of the next record to fetch. because our state reflects the latest - // successfully emitted record, we subtract one - part.setOffset(offset - 1); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java deleted file mode 100644 index 8f2ef09..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.internals; - -import kafka.utils.ZKGroupTopicDirs; - -import org.apache.curator.RetryPolicy; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.kafka.clients.consumer.ConsumerConfig; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -/** - * Handler for committing Kafka offsets to Zookeeper and to retrieve them again. - */ -public class ZookeeperOffsetHandler { - - private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class); - - private final String groupId; - - private final CuratorFramework curatorClient; - - - public ZookeeperOffsetHandler(Properties props) { - this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG); - if (this.groupId == null) { - throw new IllegalArgumentException("Required property '" - + ConsumerConfig.GROUP_ID_CONFIG + "' has not been set"); - } - - String zkConnect = props.getProperty("zookeeper.connect"); - if (zkConnect == null) { - throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set"); - } - - // we use Curator's default timeouts - int sessionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "60000")); - int connectionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "15000")); - - // undocumented config options allowing users to configure the retry policy. (they are "flink." prefixed as they are no official kafka configs) - int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100")); - int backoffMaxRetries = Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10")); - - RetryPolicy retryPolicy = new ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries); - curatorClient = CuratorFrameworkFactory.newClient(zkConnect, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); - curatorClient.start(); - } - - // ------------------------------------------------------------------------ - // Offset access and manipulation - // ------------------------------------------------------------------------ - - /** - * Commits offsets for Kafka partitions to ZooKeeper. The given offsets to this method should be the offsets of - * the last processed records; this method will take care of incrementing the offsets by 1 before committing them so - * that the committed offsets to Zookeeper represent the next record to process. - * - * @param internalOffsets The internal offsets (representing last processed records) for the partitions to commit. - * @throws Exception The method forwards exceptions. - */ - public void prepareAndCommitOffsets(Map<KafkaTopicPartition, Long> internalOffsets) throws Exception { - for (Map.Entry<KafkaTopicPartition, Long> entry : internalOffsets.entrySet()) { - KafkaTopicPartition tp = entry.getKey(); - - Long lastProcessedOffset = entry.getValue(); - if (lastProcessedOffset != null && lastProcessedOffset >= 0) { - setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), lastProcessedOffset + 1); - } - } - } - - /** - * @param partitions The partitions to read offsets for. - * @return The mapping from partition to offset. - * @throws Exception This method forwards exceptions. - */ - public Map<KafkaTopicPartition, Long> getCommittedOffsets(List<KafkaTopicPartition> partitions) throws Exception { - Map<KafkaTopicPartition, Long> ret = new HashMap<>(partitions.size()); - for (KafkaTopicPartition tp : partitions) { - Long offset = getOffsetFromZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition()); - - if (offset != null) { - LOG.info("Offset for TopicPartition {}:{} was set to {} in ZooKeeper. Seeking fetcher to that position.", - tp.getTopic(), tp.getPartition(), offset); - ret.put(tp, offset); - } - } - return ret; - } - - /** - * Closes the offset handler. - * - * @throws IOException Thrown, if the handler cannot be closed properly. - */ - public void close() throws IOException { - curatorClient.close(); - } - - // ------------------------------------------------------------------------ - // Communication with Zookeeper - // ------------------------------------------------------------------------ - - public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception { - ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic); - String path = topicDirs.consumerOffsetDir() + "/" + partition; - curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient()); - byte[] data = Long.toString(offset).getBytes(); - curatorClient.setData().forPath(path, data); - } - - public static Long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception { - ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic); - String path = topicDirs.consumerOffsetDir() + "/" + partition; - curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient()); - - byte[] data = curatorClient.getData().forPath(path); - - if (data == null) { - return null; - } else { - String asString = new String(data); - if (asString.length() == 0) { - return null; - } else { - try { - return Long.valueOf(asString); - } - catch (NumberFormatException e) { - LOG.error( - "The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}", - groupId, topic, partition, asString); - return null; - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java deleted file mode 100644 index fabb0fe..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.runtime.client.JobCancellationException; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.DiscardingSink; -import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler; -import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; - -import org.junit.Assert; -import org.junit.Test; - -import java.util.Properties; -import java.util.concurrent.atomic.AtomicReference; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -public class Kafka08ITCase extends KafkaConsumerTestBase { - - // ------------------------------------------------------------------------ - // Suite of Tests - // ------------------------------------------------------------------------ - - @Test(timeout = 60000) - public void testFailOnNoBroker() throws Exception { - runFailOnNoBrokerTest(); - } - - - @Test(timeout = 60000) - public void testConcurrentProducerConsumerTopology() throws Exception { - runSimpleConcurrentProducerConsumerTopology(); - } - -// @Test(timeout = 60000) -// public void testPunctuatedExplicitWMConsumer() throws Exception { -// runExplicitPunctuatedWMgeneratingConsumerTest(false); -// } - -// @Test(timeout = 60000) -// public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception { -// runExplicitPunctuatedWMgeneratingConsumerTest(true); -// } - - @Test(timeout = 60000) - public void testKeyValueSupport() throws Exception { - runKeyValueTest(); - } - - // --- canceling / failures --- - - @Test(timeout = 60000) - public void testCancelingEmptyTopic() throws Exception { - runCancelingOnEmptyInputTest(); - } - - @Test(timeout = 60000) - public void testCancelingFullTopic() throws Exception { - runCancelingOnFullInputTest(); - } - - @Test(timeout = 60000) - public void testFailOnDeploy() throws Exception { - runFailOnDeployTest(); - } - - @Test(timeout = 60000) - public void testInvalidOffset() throws Exception { - - final int parallelism = 1; - - // write 20 messages into topic: - final String topic = writeSequence("invalidOffsetTopic", 20, parallelism, 1); - - // set invalid offset: - CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); - ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"), topic, 0, 1234); - curatorClient.close(); - - // read from topic - final int valuesCount = 20; - final int startFrom = 0; - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.getConfig().disableSysoutLogging(); - - readSequence(env, standardProps, parallelism, topic, valuesCount, startFrom); - - deleteTestTopic(topic); - } - - // --- source to partition mappings and exactly once --- - - @Test(timeout = 60000) - public void testOneToOneSources() throws Exception { - runOneToOneExactlyOnceTest(); - } - - @Test(timeout = 60000) - public void testOneSourceMultiplePartitions() throws Exception { - runOneSourceMultiplePartitionsExactlyOnceTest(); - } - - @Test(timeout = 60000) - public void testMultipleSourcesOnePartition() throws Exception { - runMultipleSourcesOnePartitionExactlyOnceTest(); - } - - // --- broker failure --- - - @Test(timeout = 60000) - public void testBrokerFailure() throws Exception { - runBrokerFailureTest(); - } - - // --- offset committing --- - - @Test(timeout = 60000) - public void testCommitOffsetsToZookeeper() throws Exception { - runCommitOffsetsToKafka(); - } - - @Test(timeout = 60000) - public void testStartFromZookeeperCommitOffsets() throws Exception { - runStartFromKafkaCommitOffsets(); - } - - @Test(timeout = 60000) - public void testAutoOffsetRetrievalAndCommitToZookeeper() throws Exception { - runAutoOffsetRetrievalAndCommitToKafka(); - } - - @Test - public void runOffsetManipulationInZooKeeperTest() { - try { - final String topicName = "ZookeeperOffsetHandlerTest-Topic"; - final String groupId = "ZookeeperOffsetHandlerTest-Group"; - - final Long offset = (long) (Math.random() * Long.MAX_VALUE); - - CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer ).createCuratorClient(); - kafkaServer.createTestTopic(topicName, 3, 2); - - ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorFramework, groupId, topicName, 0, offset); - - Long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, groupId, topicName, 0); - - curatorFramework.close(); - - assertEquals(offset, fetchedOffset); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test(timeout = 60000) - public void testOffsetAutocommitTest() throws Exception { - final int parallelism = 3; - - // write a sequence from 0 to 99 to each of the 3 partitions. - final String topicName = writeSequence("testOffsetAutocommit", 100, parallelism, 1); - - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - // NOTE: We are not enabling the checkpointing! - env.getConfig().disableSysoutLogging(); - env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env.setParallelism(parallelism); - - // the readSequence operation sleeps for 20 ms between each record. - // setting a delay of 25*20 = 500 for the commit interval makes - // sure that we commit roughly 3-4 times while reading, however - // at least once. - Properties readProps = new Properties(); - readProps.putAll(standardProps); - readProps.setProperty("auto.commit.interval.ms", "500"); - - // read so that the offset can be committed to ZK - readSequence(env, readProps, parallelism, topicName, 100, 0); - - // get the offset - CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); - - Long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 0); - Long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 1); - Long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 2); - curatorFramework.close(); - LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3); - - // ensure that the offset has been committed - boolean atLeastOneOffsetSet = (o1 != null && o1 > 0 && o1 <= 100) || - (o2 != null && o2 > 0 && o2 <= 100) || - (o3 != null && o3 > 0 && o3 <= 100); - assertTrue("Expecting at least one offset to be set o1="+o1+" o2="+o2+" o3="+o3, atLeastOneOffsetSet); - - deleteTestTopic(topicName); - } - - // --- special executions --- - - @Test(timeout = 60000) - public void testBigRecordJob() throws Exception { - runBigRecordTestTopology(); - } - - @Test(timeout = 60000) - public void testMultipleTopics() throws Exception { - runProduceConsumeMultipleTopics(); - } - - @Test(timeout = 60000) - public void testAllDeletes() throws Exception { - runAllDeletesTest(); - } - - @Test(timeout=60000) - public void testEndOfStream() throws Exception { - runEndOfStreamTest(); - } - - @Test(timeout = 60000) - public void testMetrics() throws Throwable { - runMetricsTest(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java deleted file mode 100644 index 6d0b140..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.table.Row; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; -import org.apache.flink.streaming.util.serialization.SerializationSchema; - -import java.util.Properties; - -public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase { - - @Override - protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner, - final FlinkKafkaProducerBase<Row> kafkaProducer) { - - return new Kafka08JsonTableSink(topic, properties, partitioner) { - @Override - protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, - SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) { - return kafkaProducer; - } - }; - } - - @Override - @SuppressWarnings("unchecked") - protected SerializationSchema<Row> getSerializationSchema() { - return new JsonRowSerializationSchema(FIELD_NAMES); - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java deleted file mode 100644 index a2d66ac..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import java.util.Properties; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.table.Row; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; - -public class Kafka08JsonTableSourceTest extends KafkaTableSourceTestBase { - - @Override - protected KafkaTableSource createTableSource(String topic, Properties properties, String[] fieldNames, TypeInformation<?>[] typeInfo) { - return new Kafka08JsonTableSource(topic, properties, fieldNames, typeInfo); - } - - @Override - @SuppressWarnings("unchecked") - protected Class<DeserializationSchema<Row>> getDeserializationSchema() { - return (Class) JsonRowDeserializationSchema.class; - } - - @Override - @SuppressWarnings("unchecked") - protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() { - return (Class) FlinkKafkaConsumer08.class; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java deleted file mode 100644 index 5c951db..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - - -import org.junit.Test; - -@SuppressWarnings("serial") -public class Kafka08ProducerITCase extends KafkaProducerTestBase { - - @Test - public void testCustomPartitioning() { - runCustomPartitioningTest(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java deleted file mode 100644 index 9520f55..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.util.Collections; -import java.util.Properties; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.junit.Test; - -public class KafkaConsumer08Test { - - @Test - public void testValidateZooKeeperConfig() { - try { - // empty - Properties emptyProperties = new Properties(); - try { - FlinkKafkaConsumer08.validateZooKeeperConfig(emptyProperties); - fail("should fail with an exception"); - } - catch (IllegalArgumentException e) { - // expected - } - - // no connect string (only group string) - Properties noConnect = new Properties(); - noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-test-group"); - try { - FlinkKafkaConsumer08.validateZooKeeperConfig(noConnect); - fail("should fail with an exception"); - } - catch (IllegalArgumentException e) { - // expected - } - - // no group string (only connect string) - Properties noGroup = new Properties(); - noGroup.put("zookeeper.connect", "localhost:47574"); - try { - FlinkKafkaConsumer08.validateZooKeeperConfig(noGroup); - fail("should fail with an exception"); - } - catch (IllegalArgumentException e) { - // expected - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCreateSourceWithoutCluster() { - try { - Properties props = new Properties(); - props.setProperty("zookeeper.connect", "localhost:56794"); - props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222"); - props.setProperty("group.id", "non-existent-group"); - - FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), new SimpleStringSchema(), props); - consumer.open(new Configuration()); - fail(); - } - catch (Exception e) { - assertTrue(e.getMessage().contains("Unable to retrieve any partitions")); - } - } - - @Test - public void testAllBoostrapServerHostsAreInvalid() { - try { - String zookeeperConnect = "localhost:56794"; - String bootstrapServers = "indexistentHost:11111"; - String groupId = "non-existent-group"; - Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId); - FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), - new SimpleStringSchema(), props); - consumer.open(new Configuration()); - fail(); - } catch (Exception e) { - assertTrue("Exception should be thrown containing 'all bootstrap servers invalid' message!", - e.getMessage().contains("All the servers provided in: '" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG - + "' config are invalid")); - } - } - - @Test - public void testAtLeastOneBootstrapServerHostIsValid() { - try { - String zookeeperConnect = "localhost:56794"; - // we declare one valid boostrap server, namely the one with - // 'localhost' - String bootstrapServers = "indexistentHost:11111, localhost:22222"; - String groupId = "non-existent-group"; - Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId); - FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), - new SimpleStringSchema(), props); - consumer.open(new Configuration()); - fail(); - } catch (Exception e) { - // test is not failing because we have one valid boostrap server - assertTrue("The cause of the exception should not be 'all boostrap server are invalid'!", - !e.getMessage().contains("All the hosts provided in: " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG - + " config are invalid")); - } - } - - private Properties createKafkaProps(String zookeeperConnect, String bootstrapServers, String groupId) { - Properties props = new Properties(); - props.setProperty("zookeeper.connect", zookeeperConnect); - props.setProperty("bootstrap.servers", bootstrapServers); - props.setProperty("group.id", groupId); - return props; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java deleted file mode 100644 index 72d2772..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import kafka.utils.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class KafkaLocalSystemTime implements Time { - - private static final Logger LOG = LoggerFactory.getLogger(KafkaLocalSystemTime.class); - - @Override - public long milliseconds() { - return System.currentTimeMillis(); - } - - @Override - public long nanoseconds() { - return System.nanoTime(); - } - - @Override - public void sleep(long ms) { - try { - Thread.sleep(ms); - } catch (InterruptedException e) { - LOG.warn("Interruption", e); - } - } - -} - http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java deleted file mode 100644 index 91fc286..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; -import org.apache.flink.util.TestLogger; - -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.PartitionInfo; -import org.junit.Test; -import org.junit.runner.RunWith; - -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.util.Collections; -import java.util.concurrent.Future; - - -import static org.mockito.Mockito.*; -import static org.powermock.api.mockito.PowerMockito.whenNew; - -import static org.junit.Assert.*; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(FlinkKafkaProducerBase.class) -public class KafkaProducerTest extends TestLogger { - - @Test - @SuppressWarnings("unchecked") - public void testPropagateExceptions() { - try { - // mock kafka producer - KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class); - - // partition setup - when(kafkaProducerMock.partitionsFor(anyString())).thenReturn( - // returning a unmodifiable list to mimic KafkaProducer#partitionsFor() behaviour - Collections.singletonList(new PartitionInfo("mock_topic", 42, null, null, null))); - - // failure when trying to send an element - when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class))) - .thenAnswer(new Answer<Future<RecordMetadata>>() { - @Override - public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable { - Callback callback = (Callback) invocation.getArguments()[1]; - callback.onCompletion(null, new Exception("Test error")); - return null; - } - }); - - // make sure the FlinkKafkaProducer instantiates our mock producer - whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock); - - // (1) producer that propagates errors - - FlinkKafkaProducer08<String> producerPropagating = new FlinkKafkaProducer08<>( - "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null); - - OneInputStreamOperatorTestHarness<String, Object> testHarness = - new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producerPropagating)); - - testHarness.open(); - - try { - testHarness.processElement(new StreamRecord<>("value")); - testHarness.processElement(new StreamRecord<>("value")); - fail("This should fail with an exception"); - } - catch (Exception e) { - assertNotNull(e.getCause()); - assertNotNull(e.getCause().getMessage()); - assertTrue(e.getCause().getMessage().contains("Test error")); - } - - testHarness.close(); - - // (2) producer that only logs errors - - FlinkKafkaProducer08<String> producerLogging = new FlinkKafkaProducer08<>( - "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null); - producerLogging.setLogFailuresOnly(true); - - testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging)); - - testHarness.open(); - - testHarness.processElement(new StreamRecord<>("value")); - testHarness.processElement(new StreamRecord<>("value")); - - testHarness.close(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } -}
