http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java new file mode 100644 index 0000000..9fec52d --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java @@ -0,0 +1,652 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import kafka.api.FetchRequestBuilder; +import kafka.api.OffsetRequest; +import kafka.api.PartitionOffsetRequestInfo; +import kafka.common.ErrorMapping; +import kafka.common.TopicAndPartition; +import kafka.javaapi.FetchResponse; +import kafka.javaapi.OffsetResponse; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.javaapi.message.ByteBufferMessageSet; +import kafka.message.MessageAndOffset; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; + +import org.apache.flink.util.StringUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * This fetcher uses Kafka's low-level API to pull data from a specific + * set of topics and partitions. + * + * <p>This code is in parts based on the tutorial code for the low-level Kafka consumer.</p> + */ +public class LegacyFetcher implements Fetcher { + + private static final Logger LOG = LoggerFactory.getLogger(LegacyFetcher.class); + + + /** The properties that configure the Kafka connection */ + private final Properties config; + + /** The task name, to give more readable names to the spawned threads */ + private final String taskName; + + /** The first error that occurred in a connection thread */ + private final AtomicReference<Throwable> error; + + /** The partitions that the fetcher should read, with their starting offsets */ + private Map<KafkaTopicPartitionLeader, Long> partitionsToRead; + + /** The seek() method might receive KafkaTopicPartition's without leader information + * (for example when restoring). + * If there are elements in this list, we'll fetch the leader from Kafka. + **/ + private Map<KafkaTopicPartition, Long> partitionsToReadWithoutLeader; + + /** Reference the the thread that executed the run() method. */ + private volatile Thread mainThread; + + /** Flag to shot the fetcher down */ + private volatile boolean running = true; + + public LegacyFetcher(List<KafkaTopicPartitionLeader> partitions, Properties props, String taskName) { + this.config = checkNotNull(props, "The config properties cannot be null"); + //this.topic = checkNotNull(topic, "The topic cannot be null"); + this.partitionsToRead = new HashMap<>(); + for (KafkaTopicPartitionLeader p: partitions) { + partitionsToRead.put(p, FlinkKafkaConsumer08.OFFSET_NOT_SET); + } + this.taskName = taskName; + this.error = new AtomicReference<>(); + } + + // ------------------------------------------------------------------------ + // Fetcher methods + // ------------------------------------------------------------------------ + + @Override + public void seek(KafkaTopicPartition topicPartition, long offsetToRead) { + if (partitionsToRead == null) { + throw new IllegalArgumentException("No partitions to read set"); + } + if (!topicPartition.isContained(partitionsToRead)) { + throw new IllegalArgumentException("Can not set offset on a partition (" + topicPartition + + ") we are not going to read. Partitions to read " + partitionsToRead); + } + if (partitionsToReadWithoutLeader == null) { + partitionsToReadWithoutLeader = new HashMap<>(); + } + partitionsToReadWithoutLeader.put(topicPartition, offsetToRead); + } + + @Override + public void close() { + // flag needs to be check by the run() method that creates the spawned threads + this.running = false; + + // all other cleanup is made by the run method itself + } + + @Override + public <T> void run(SourceFunction.SourceContext<T> sourceContext, + KeyedDeserializationSchema<T> deserializer, + HashMap<KafkaTopicPartition, Long> lastOffsets) throws Exception { + + if (partitionsToRead == null || partitionsToRead.size() == 0) { + throw new IllegalArgumentException("No partitions set"); + } + + // NOTE: This method is needs to always release all resources it acquires + + this.mainThread = Thread.currentThread(); + + LOG.info("Reading from partitions " + partitionsToRead + " using the legacy fetcher"); + + // get lead broker if necessary + if (partitionsToReadWithoutLeader != null && partitionsToReadWithoutLeader.size() > 0) { + LOG.info("Refreshing leader information for partitions {}", KafkaTopicPartition.toString(partitionsToReadWithoutLeader)); + // NOTE: The kafka client apparently locks itself in an infinite loop sometimes + // when it is interrupted, so we run it only in a separate thread. + // since it sometimes refuses to shut down, we resort to the admittedly harsh + // means of killing the thread after a timeout. + PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(KafkaTopicPartition.getTopics(partitionsToReadWithoutLeader), config); + infoFetcher.start(); + + KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000); + watchDog.start(); + + List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = infoFetcher.getPartitions(); + + // replace potentially outdated leader information in partitionsToRead with fresh data from topicPartitionWithLeader + for (Map.Entry<KafkaTopicPartition, Long> pt: partitionsToReadWithoutLeader.entrySet()) { + KafkaTopicPartitionLeader topicPartitionWithLeader = null; + // go through list + for (KafkaTopicPartitionLeader withLeader: topicPartitionWithLeaderList) { + if (withLeader.getTopicPartition().equals(pt.getKey())) { + topicPartitionWithLeader = withLeader; + break; + } + } + if (topicPartitionWithLeader == null) { + throw new IllegalStateException("Unable to find topic/partition leader information"); + } + Long removed = KafkaTopicPartitionLeader.replaceIgnoringLeader(topicPartitionWithLeader, pt.getValue(), partitionsToRead); + if (removed == null) { + throw new IllegalStateException("Seek request on unknown topic partition"); + } + } + } + + + // build a map for each broker with its partitions + Map<Node, List<FetchPartition>> fetchBrokers = new HashMap<>(); + + for (Map.Entry<KafkaTopicPartitionLeader, Long> entry : partitionsToRead.entrySet()) { + final KafkaTopicPartitionLeader topicPartition = entry.getKey(); + final long offset = entry.getValue(); + + List<FetchPartition> partitions = fetchBrokers.get(topicPartition.getLeader()); + if (partitions == null) { + partitions = new ArrayList<>(); + fetchBrokers.put(topicPartition.getLeader(), partitions); + } + + partitions.add(new FetchPartition(topicPartition.getTopicPartition().getTopic(), topicPartition.getTopicPartition().getPartition(), offset)); + } + + // create SimpleConsumers for each broker + ArrayList<SimpleConsumerThread<?>> consumers = new ArrayList<>(fetchBrokers.size()); + + for (Map.Entry<Node, List<FetchPartition>> brokerInfo : fetchBrokers.entrySet()) { + final Node broker = brokerInfo.getKey(); + final List<FetchPartition> partitionsList = brokerInfo.getValue(); + + FetchPartition[] partitions = partitionsList.toArray(new FetchPartition[partitionsList.size()]); + + SimpleConsumerThread<T> thread = new SimpleConsumerThread<>(this, config, + broker, partitions, sourceContext, deserializer, lastOffsets); + + thread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)", + taskName, broker.id(), broker.host(), broker.port())); + thread.setDaemon(true); + consumers.add(thread); + } + + // last check whether we should abort. + if (!running) { + return; + } + + // start all consumer threads + for (SimpleConsumerThread<?> t : consumers) { + LOG.info("Starting thread {}", t.getName()); + t.start(); + } + + // wait until all consumer threads are done, or until we are aborted, or until + // an error occurred in one of the fetcher threads + try { + boolean someConsumersRunning = true; + while (running && error.get() == null && someConsumersRunning) { + try { + // wait for the consumer threads. if an error occurs, we are interrupted + for (SimpleConsumerThread<?> t : consumers) { + t.join(); + } + + // safety net + someConsumersRunning = false; + for (SimpleConsumerThread<?> t : consumers) { + someConsumersRunning |= t.isAlive(); + } + } + catch (InterruptedException e) { + // ignore. we should notice what happened in the next loop check + } + } + + // make sure any asynchronous error is noticed + Throwable error = this.error.get(); + if (error != null) { + throw new Exception(error.getMessage(), error); + } + } + finally { + // make sure that in any case (completion, abort, error), all spawned threads are stopped + for (SimpleConsumerThread<?> t : consumers) { + if (t.isAlive()) { + t.cancel(); + } + } + } + } + + /** + * Reports an error from a fetch thread. This will cause the main thread to see this error, + * abort, and cancel all other fetch threads. + * + * @param error The error to report. + */ + @Override + public void stopWithError(Throwable error) { + if (this.error.compareAndSet(null, error)) { + // we are the first to report an error + if (mainThread != null) { + mainThread.interrupt(); + } + } + } + + // ------------------------------------------------------------------------ + + /** + * Representation of a partition to fetch. + */ + private static class FetchPartition { + + final String topic; + + /** ID of the partition within the topic (0 indexed, as given by Kafka) */ + final int partition; + + /** Offset pointing at the next element to read from that partition. */ + long nextOffsetToRead; + + FetchPartition(String topic, int partition, long nextOffsetToRead) { + this.topic = topic; + this.partition = partition; + this.nextOffsetToRead = nextOffsetToRead; + } + + @Override + public String toString() { + return "FetchPartition {topic=" + topic +", partition=" + partition + ", offset=" + nextOffsetToRead + '}'; + } + } + + // ------------------------------------------------------------------------ + // Per broker fetcher + // ------------------------------------------------------------------------ + + /** + * Each broker needs its separate connection. This thread implements the connection to + * one broker. The connection can fetch multiple partitions from the broker. + * + * @param <T> The data type fetched. + */ + private static class SimpleConsumerThread<T> extends Thread { + + private final SourceFunction.SourceContext<T> sourceContext; + private final KeyedDeserializationSchema<T> deserializer; + private final HashMap<KafkaTopicPartition, Long> offsetsState; + + private final FetchPartition[] partitions; + + private final Node broker; + + private final Properties config; + + private final LegacyFetcher owner; + + private SimpleConsumer consumer; + + private volatile boolean running = true; + + + // exceptions are thrown locally + public SimpleConsumerThread(LegacyFetcher owner, + Properties config, + Node broker, + FetchPartition[] partitions, + SourceFunction.SourceContext<T> sourceContext, + KeyedDeserializationSchema<T> deserializer, + HashMap<KafkaTopicPartition, Long> offsetsState) { + this.owner = owner; + this.config = config; + this.broker = broker; + this.partitions = partitions; + this.sourceContext = checkNotNull(sourceContext); + this.deserializer = checkNotNull(deserializer); + this.offsetsState = checkNotNull(offsetsState); + } + + @Override + public void run() { + LOG.info("Starting to fetch from {}", Arrays.toString(this.partitions)); + try { + // set up the config values + final String clientId = "flink-kafka-consumer-legacy-" + broker.id(); + + // these are the actual configuration values of Kafka + their original default values. + final int soTimeout = Integer.valueOf(config.getProperty("socket.timeout.ms", "30000")); + final int bufferSize = Integer.valueOf(config.getProperty("socket.receive.buffer.bytes", "65536")); + final int fetchSize = Integer.valueOf(config.getProperty("fetch.message.max.bytes", "1048576")); + final int maxWait = Integer.valueOf(config.getProperty("fetch.wait.max.ms", "100")); + final int minBytes = Integer.valueOf(config.getProperty("fetch.min.bytes", "1")); + + // create the Kafka consumer that we actually use for fetching + consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId); + + // make sure that all partitions have some offsets to start with + // those partitions that do not have an offset from a checkpoint need to get + // their start offset from ZooKeeper + { + List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<>(); + + for (FetchPartition fp : partitions) { + if (fp.nextOffsetToRead == FlinkKafkaConsumer08.OFFSET_NOT_SET) { + // retrieve the offset from the consumer + partitionsToGetOffsetsFor.add(fp); + } + } + if (partitionsToGetOffsetsFor.size() > 0) { + getLastOffset(consumer, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config)); + LOG.info("No prior offsets found for some partitions. Fetched the following start offsets {}", partitionsToGetOffsetsFor); + } + } + + // Now, the actual work starts :-) + int offsetOutOfRangeCount = 0; + fetchLoop: while (running) { + FetchRequestBuilder frb = new FetchRequestBuilder(); + frb.clientId(clientId); + frb.maxWait(maxWait); + frb.minBytes(minBytes); + + for (FetchPartition fp : partitions) { + frb.addFetch(fp.topic, fp.partition, fp.nextOffsetToRead, fetchSize); + } + kafka.api.FetchRequest fetchRequest = frb.build(); + LOG.debug("Issuing fetch request {}", fetchRequest); + + FetchResponse fetchResponse = consumer.fetch(fetchRequest); + + if (fetchResponse.hasError()) { + String exception = ""; + List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<>(); + for (FetchPartition fp : partitions) { + short code = fetchResponse.errorCode(fp.topic, fp.partition); + + if (code == ErrorMapping.OffsetOutOfRangeCode()) { + // we were asked to read from an out-of-range-offset (maybe set wrong in Zookeeper) + // Kafka's high level consumer is resetting the offset according to 'auto.offset.reset' + partitionsToGetOffsetsFor.add(fp); + } else if (code != ErrorMapping.NoError()) { + exception += "\nException for partition " + fp.partition + ": " + + StringUtils.stringifyException(ErrorMapping.exceptionFor(code)); + } + } + if (partitionsToGetOffsetsFor.size() > 0) { + // safeguard against an infinite loop. + if (offsetOutOfRangeCount++ > 0) { + throw new RuntimeException("Found invalid offsets more than once in partitions "+partitionsToGetOffsetsFor.toString()+" " + + "Exceptions: "+exception); + } + // get valid offsets for these partitions and try again. + LOG.warn("The following partitions had an invalid offset: {}", partitionsToGetOffsetsFor); + getLastOffset(consumer, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config)); + LOG.warn("The new partition offsets are {}", partitionsToGetOffsetsFor); + continue; // jump back to create a new fetch request. The offset has not been touched. + } else { + // all partitions failed on an error + throw new IOException("Error while fetching from broker: " + exception); + } + } + + int messagesInFetch = 0; + int deletedMessages = 0; + for (FetchPartition fp : partitions) { + final ByteBufferMessageSet messageSet = fetchResponse.messageSet(fp.topic, fp.partition); + final KafkaTopicPartition topicPartition = new KafkaTopicPartition(fp.topic, fp.partition); + + for (MessageAndOffset msg : messageSet) { + if (running) { + messagesInFetch++; + if (msg.offset() < fp.nextOffsetToRead) { + // we have seen this message already + LOG.info("Skipping message with offset " + msg.offset() + + " because we have seen messages until " + fp.nextOffsetToRead + + " from partition " + fp.partition + " already"); + continue; + } + + final long offset = msg.offset(); + + ByteBuffer payload = msg.message().payload(); + + // If the message value is null, this represents a delete command for the message key. + // Log this and pass it on to the client who might want to also receive delete messages. + byte[] valueBytes; + if (payload == null) { + deletedMessages++; + valueBytes = null; + } else { + valueBytes = new byte[payload.remaining()]; + payload.get(valueBytes); + } + + // put key into byte array + byte[] keyBytes = null; + int keySize = msg.message().keySize(); + + if (keySize >= 0) { // message().hasKey() is doing the same. We save one int deserialization + ByteBuffer keyPayload = msg.message().key(); + keyBytes = new byte[keySize]; + keyPayload.get(keyBytes); + } + + final T value = deserializer.deserialize(keyBytes, valueBytes, fp.topic, fp.partition, offset); + if(deserializer.isEndOfStream(value)) { + running = false; + break fetchLoop; // leave running loop + } + synchronized (sourceContext.getCheckpointLock()) { + sourceContext.collect(value); + offsetsState.put(topicPartition, offset); + } + + // advance offset for the next request + fp.nextOffsetToRead = offset + 1; + } + else { + // no longer running + return; + } + } + } + LOG.debug("This fetch contained {} messages ({} deleted messages)", messagesInFetch, deletedMessages); + } + } + catch (Throwable t) { + // report to the main thread + owner.stopWithError(t); + } + finally { + // end of run loop. close connection to consumer + if (consumer != null) { + // closing the consumer should not fail the program + try { + consumer.close(); + } + catch (Throwable t) { + LOG.error("Error while closing the Kafka simple consumer", t); + } + } + } + } + + /** + * Cancels this fetch thread. The thread will release all resources and terminate. + */ + public void cancel() { + this.running = false; + + // interrupt whatever the consumer is doing + if (consumer != null) { + consumer.close(); + } + + this.interrupt(); + } + + /** + * Request latest offsets for a set of partitions, via a Kafka consumer. + * + * @param consumer The consumer connected to lead broker + * @param partitions The list of partitions we need offsets for + * @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest) + */ + private static void getLastOffset(SimpleConsumer consumer, List<FetchPartition> partitions, long whichTime) { + + Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>(); + for (FetchPartition fp: partitions) { + TopicAndPartition topicAndPartition = new TopicAndPartition(fp.topic, fp.partition); + requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); + } + + kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId()); + OffsetResponse response = consumer.getOffsetsBefore(request); + + if (response.hasError()) { + String exception = ""; + for (FetchPartition fp: partitions) { + short code; + if ( (code=response.errorCode(fp.topic, fp.partition)) != ErrorMapping.NoError()) { + exception += "\nException for partition "+fp.partition+": "+ StringUtils.stringifyException(ErrorMapping.exceptionFor(code)); + } + } + throw new RuntimeException("Unable to get last offset for partitions " + partitions + ". " + exception); + } + + for (FetchPartition fp: partitions) { + // the resulting offset is the next offset we are going to read + // for not-yet-consumed partitions, it is 0. + fp.nextOffsetToRead = response.offsets(fp.topic, fp.partition)[0]; + } + } + + private static long getInvalidOffsetBehavior(Properties config) { + long timeType; + if (config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest").equals("latest")) { + timeType = OffsetRequest.LatestTime(); + } else { + timeType = OffsetRequest.EarliestTime(); + } + return timeType; + } + } + + + private static class PartitionInfoFetcher extends Thread { + + private final List<String> topics; + private final Properties properties; + + private volatile List<KafkaTopicPartitionLeader> result; + private volatile Throwable error; + + + PartitionInfoFetcher(List<String> topics, Properties properties) { + this.topics = topics; + this.properties = properties; + } + + @Override + public void run() { + try { + result = FlinkKafkaConsumer08.getPartitionsForTopic(topics, properties); + } + catch (Throwable t) { + this.error = t; + } + } + + public List<KafkaTopicPartitionLeader> getPartitions() throws Exception { + try { + this.join(); + } + catch (InterruptedException e) { + throw new Exception("Partition fetching was cancelled before completion"); + } + + if (error != null) { + throw new Exception("Failed to fetch partitions for topics " + topics.toString(), error); + } + if (result != null) { + return result; + } + throw new Exception("Partition fetching failed"); + } + } + + private static class KillerWatchDog extends Thread { + + private final Thread toKill; + private final long timeout; + + private KillerWatchDog(Thread toKill, long timeout) { + super("KillerWatchDog"); + setDaemon(true); + + this.toKill = toKill; + this.timeout = timeout; + } + + @SuppressWarnings("deprecation") + @Override + public void run() { + final long deadline = System.currentTimeMillis() + timeout; + long now; + + while (toKill.isAlive() && (now = System.currentTimeMillis()) < deadline) { + try { + toKill.join(deadline - now); + } + catch (InterruptedException e) { + // ignore here, our job is important! + } + } + + // this is harsh, but this watchdog is a last resort + if (toKill.isAlive()) { + toKill.stop(); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java new file mode 100644 index 0000000..fdd89c6 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * The offset handler is responsible for locating the initial partition offsets + * where the source should start reading, as well as committing offsets from completed + * checkpoints. + */ +public interface OffsetHandler { + + /** + * Commits the given offset for the partitions. May commit the offsets to the Kafka broker, + * or to ZooKeeper, based on its configured behavior. + * + * @param offsetsToCommit The offset to commit, per partition. + */ + void commit(Map<KafkaTopicPartition, Long> offsetsToCommit) throws Exception; + + /** + * Positions the given fetcher to the initial read offsets where the stream consumption + * will start from. + * + * @param partitions The partitions for which to seeks the fetcher to the beginning. + * @param fetcher The fetcher that will pull data from Kafka and must be positioned. + */ + void seekFetcherToInitialOffsets(List<KafkaTopicPartitionLeader> partitions, Fetcher fetcher) throws Exception; + + /** + * Closes the offset handler, releasing all resources. + * + * @throws IOException Thrown, if the closing fails. + */ + void close() throws IOException; +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java new file mode 100644 index 0000000..a38c3bd --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka.internals; + +import kafka.producer.Partitioner; +import kafka.utils.VerifiableProperties; + +/** + * Hacky wrapper to send an object instance through a Properties - map. + * + * This works as follows: + * The recommended way of creating a KafkaSink is specifying a classname for the partitioner. + * + * Otherwise (if the user gave a (serializable) class instance), we give Kafka the PartitionerWrapper class of Flink. + * This is set in the key-value (java.util.Properties) map. + * In addition to that, we use the Properties.put(Object, Object) to store the instance of the (serializable). + * This is a hack because the put() method is called on the underlying Hashmap. + * + * This PartitionerWrapper is called with the Properties. From there, we extract the wrapped Partitioner instance. + * + * The serializable PartitionerWrapper is serialized into the Properties Hashmap and also deserialized from there. + */ +public class PartitionerWrapper implements Partitioner { + public final static String SERIALIZED_WRAPPER_NAME = "flink.kafka.wrapper.serialized"; + + private Partitioner wrapped; + public PartitionerWrapper(VerifiableProperties properties) { + wrapped = (Partitioner) properties.props().get(SERIALIZED_WRAPPER_NAME); + } + + @Override + public int partition(Object value, int numberOfPartitions) { + return wrapped.partition(value, numberOfPartitions); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java new file mode 100644 index 0000000..1eca4dd --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import kafka.utils.ZKGroupTopicDirs; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; +import org.apache.kafka.clients.consumer.ConsumerConfig; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Handler for committing Kafka offsets to Zookeeper and to retrieve them again. + */ +public class ZookeeperOffsetHandler implements OffsetHandler { + + private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class); + + private static final long OFFSET_NOT_SET = FlinkKafkaConsumer08.OFFSET_NOT_SET; + + private final String groupId; + + private final CuratorFramework curatorClient; + + + public ZookeeperOffsetHandler(Properties props) { + this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG); + if (this.groupId == null) { + throw new IllegalArgumentException("Required property '" + + ConsumerConfig.GROUP_ID_CONFIG + "' has not been set"); + } + + String zkConnect = props.getProperty("zookeeper.connect"); + if (zkConnect == null) { + throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set"); + } + + // we use Curator's default timeouts + int sessionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "60000")); + int connectionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "15000")); + + // undocumented config options allowing users to configure the retry policy. (they are "flink." prefixed as they are no official kafka configs) + int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100")); + int backoffMaxRetries = Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10")); + + RetryPolicy retryPolicy = new ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries); + curatorClient = CuratorFrameworkFactory.newClient(zkConnect, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); + curatorClient.start(); + } + + + @Override + public void commit(Map<KafkaTopicPartition, Long> offsetsToCommit) throws Exception { + for (Map.Entry<KafkaTopicPartition, Long> entry : offsetsToCommit.entrySet()) { + KafkaTopicPartition tp = entry.getKey(); + long offset = entry.getValue(); + + if (offset >= 0) { + setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), offset); + } + } + } + + @Override + public void seekFetcherToInitialOffsets(List<KafkaTopicPartitionLeader> partitions, Fetcher fetcher) throws Exception { + for (KafkaTopicPartitionLeader tp : partitions) { + long offset = getOffsetFromZooKeeper(curatorClient, groupId, tp.getTopicPartition().getTopic(), tp.getTopicPartition().getPartition()); + + if (offset != OFFSET_NOT_SET) { + LOG.info("Offset for partition {} was set to {} in ZooKeeper. Seeking fetcher to that position.", + tp.getTopicPartition().getPartition(), offset); + + // the offset in Zookeeper was the last read offset, seek is accepting the next-to-read-offset. + fetcher.seek(tp.getTopicPartition(), offset + 1); + } + } + } + + @Override + public void close() throws IOException { + curatorClient.close(); + } + + // ------------------------------------------------------------------------ + // Communication with Zookeeper + // ------------------------------------------------------------------------ + + public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception { + ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic); + String path = topicDirs.consumerOffsetDir() + "/" + partition; + curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient()); + byte[] data = Long.toString(offset).getBytes(); + curatorClient.setData().forPath(path, data); + } + + public static long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception { + ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic); + String path = topicDirs.consumerOffsetDir() + "/" + partition; + curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient()); + + byte[] data = curatorClient.getData().forPath(path); + + if (data == null) { + return OFFSET_NOT_SET; + } else { + String asString = new String(data); + if (asString.length() == 0) { + return OFFSET_NOT_SET; + } else { + try { + return Long.parseLong(asString); + } catch (NumberFormatException e) { + throw new Exception(String.format( + "The offset in ZooKeeper for group '%s', topic '%s', partition %d is a malformed string: %s", + groupId, topic, partition, asString)); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java new file mode 100644 index 0000000..26e31f5 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.api.java.functions.FlatMapIterator; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler; +import org.junit.Test; + +import java.util.Iterator; +import java.util.Properties; + +import static org.junit.Assert.assertTrue; + + +public class Kafka08ITCase extends KafkaConsumerTestBase { + + // ------------------------------------------------------------------------ + // Suite of Tests + // ------------------------------------------------------------------------ + + @Test(timeout = 60000) + public void testCheckpointing() throws Exception { + runCheckpointingTest(); + } + + @Test(timeout = 60000) + public void testFailOnNoBroker() throws Exception { + runFailOnNoBrokerTest(); + } + + + @Test(timeout = 60000) + public void testConcurrentProducerConsumerTopology() throws Exception { + runSimpleConcurrentProducerConsumerTopology(); + } + + @Test(timeout = 60000) + public void testKeyValueSupport() throws Exception { + runKeyValueTest(); + } + + // --- canceling / failures --- + + @Test(timeout = 60000) + public void testCancelingEmptyTopic() throws Exception { + runCancelingOnEmptyInputTest(); + } + + @Test(timeout = 60000) + public void testCancelingFullTopic() throws Exception { + runCancelingOnFullInputTest(); + } + + @Test(timeout = 60000) + public void testFailOnDeploy() throws Exception { + runFailOnDeployTest(); + } + + @Test(timeout = 60000) + public void testInvalidOffset() throws Exception { + final String topic = "invalidOffsetTopic"; + final int parallelism = 1; + + // create topic + createTestTopic(topic, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + + // write 20 messages into topic: + writeSequence(env, topic, 20, parallelism); + + // set invalid offset: + CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); + ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardCC.groupId(), topic, 0, 1234); + curatorClient.close(); + + // read from topic + final int valuesCount = 20; + final int startFrom = 0; + readSequence(env, standardCC.props().props(), parallelism, topic, valuesCount, startFrom); + + deleteTestTopic(topic); + } + + // --- source to partition mappings and exactly once --- + + @Test(timeout = 60000) + public void testOneToOneSources() throws Exception { + runOneToOneExactlyOnceTest(); + } + + @Test(timeout = 60000) + public void testOneSourceMultiplePartitions() throws Exception { + runOneSourceMultiplePartitionsExactlyOnceTest(); + } + + @Test(timeout = 60000) + public void testMultipleSourcesOnePartition() throws Exception { + runMultipleSourcesOnePartitionExactlyOnceTest(); + } + + // --- broker failure --- + + @Test(timeout = 60000) + public void testBrokerFailure() throws Exception { + runBrokerFailureTest(); + } + + // --- special executions --- + + @Test(timeout = 60000) + public void testBigRecordJob() throws Exception { + runBigRecordTestTopology(); + } + + @Test(timeout = 60000) + public void testMultipleTopics() throws Exception { + runConsumeMultipleTopics(); + } + + @Test(timeout = 60000) + public void testAllDeletes() throws Exception { + runAllDeletesTest(); + } + + @Test(timeout=60000) + public void testMetricsAndEndOfStream() throws Exception { + runMetricsAndEndOfStreamTest(); + } + + + /** + * Tests that offsets are properly committed to ZooKeeper and initial offsets are read from ZooKeeper. + * + * This test is only applicable if the Flink Kafka Consumer uses the ZooKeeperOffsetHandler. + */ + @Test(timeout = 60000) + public void testOffsetInZookeeper() throws Exception { + final String topicName = "testOffsetInZK"; + final int parallelism = 3; + + createTestTopic(topicName, parallelism, 1); + + StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env1.getConfig().disableSysoutLogging(); + env1.enableCheckpointing(50); + env1.setNumberOfExecutionRetries(0); + env1.setParallelism(parallelism); + + StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env2.getConfig().disableSysoutLogging(); + env2.enableCheckpointing(50); + env2.setNumberOfExecutionRetries(0); + env2.setParallelism(parallelism); + + StreamExecutionEnvironment env3 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env3.getConfig().disableSysoutLogging(); + env3.enableCheckpointing(50); + env3.setNumberOfExecutionRetries(0); + env3.setParallelism(parallelism); + + // write a sequence from 0 to 99 to each of the 3 partitions. + writeSequence(env1, topicName, 100, parallelism); + + readSequence(env2, standardProps, parallelism, topicName, 100, 0); + + CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); + + long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardCC.groupId(), topicName, 0); + long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardCC.groupId(), topicName, 1); + long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, standardCC.groupId(), topicName, 2); + + LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3); + + assertTrue(o1 == FlinkKafkaConsumer08.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100)); + assertTrue(o2 == FlinkKafkaConsumer08.OFFSET_NOT_SET || (o2 >= 0 && o2 <= 100)); + assertTrue(o3 == FlinkKafkaConsumer08.OFFSET_NOT_SET || (o3 >= 0 && o3 <= 100)); + + LOG.info("Manipulating offsets"); + + // set the offset to 50 for the three partitions + ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardCC.groupId(), topicName, 0, 49); + ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardCC.groupId(), topicName, 1, 49); + ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardCC.groupId(), topicName, 2, 49); + + curatorClient.close(); + + // create new env + readSequence(env3, standardProps, parallelism, topicName, 50, 50); + + deleteTestTopic(topicName); + } + + @Test(timeout = 60000) + public void testOffsetAutocommitTest() throws Exception { + final String topicName = "testOffsetAutocommit"; + final int parallelism = 3; + + createTestTopic(topicName, parallelism, 1); + + StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env1.getConfig().disableSysoutLogging(); + env1.setNumberOfExecutionRetries(0); + env1.setParallelism(parallelism); + + StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + // NOTE: We are not enabling the checkpointing! + env2.getConfig().disableSysoutLogging(); + env2.setNumberOfExecutionRetries(0); + env2.setParallelism(parallelism); + + + // write a sequence from 0 to 99 to each of the 3 partitions. + writeSequence(env1, topicName, 100, parallelism); + + + // the readSequence operation sleeps for 20 ms between each record. + // setting a delay of 25*20 = 500 for the commit interval makes + // sure that we commit roughly 3-4 times while reading, however + // at least once. + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.commit.interval.ms", "500"); + + // read so that the offset can be committed to ZK + readSequence(env2, readProps, parallelism, topicName, 100, 0); + + // get the offset + CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); + + long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardCC.groupId(), topicName, 0); + long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardCC.groupId(), topicName, 1); + long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardCC.groupId(), topicName, 2); + + LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3); + + // ensure that the offset has been committed + assertTrue("Offset of o1=" + o1 + " was not in range", o1 > 0 && o1 <= 100); + assertTrue("Offset of o2=" + o2 + " was not in range", o2 > 0 && o2 <= 100); + assertTrue("Offset of o3=" + o3 + " was not in range", o3 > 0 && o3 <= 100); + + deleteTestTopic(topicName); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java new file mode 100644 index 0000000..fc13719 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + + +import org.junit.Test; + + +@SuppressWarnings("serial") +public class Kafka08ProducerITCase extends KafkaProducerTestBase { + + @Test + public void testCustomPartitioning() { + runCustomPartitioningTest(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java new file mode 100644 index 0000000..113ad71 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.commons.collections.map.LinkedMap; + +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.junit.Ignore; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +public class KafkaConsumerTest { + + @Test + public void testValidateZooKeeperConfig() { + try { + // empty + Properties emptyProperties = new Properties(); + try { + FlinkKafkaConsumer08.validateZooKeeperConfig(emptyProperties); + fail("should fail with an exception"); + } + catch (IllegalArgumentException e) { + // expected + } + + // no connect string (only group string) + Properties noConnect = new Properties(); + noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-test-group"); + try { + FlinkKafkaConsumer08.validateZooKeeperConfig(noConnect); + fail("should fail with an exception"); + } + catch (IllegalArgumentException e) { + // expected + } + + // no group string (only connect string) + Properties noGroup = new Properties(); + noGroup.put("zookeeper.connect", "localhost:47574"); + try { + FlinkKafkaConsumer08.validateZooKeeperConfig(noGroup); + fail("should fail with an exception"); + } + catch (IllegalArgumentException e) { + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSnapshot() { + try { + Field offsetsField = FlinkKafkaConsumerBase.class.getDeclaredField("offsetsState"); + Field runningField = FlinkKafkaConsumerBase.class.getDeclaredField("running"); + Field mapField = FlinkKafkaConsumerBase.class.getDeclaredField("pendingCheckpoints"); + + offsetsField.setAccessible(true); + runningField.setAccessible(true); + mapField.setAccessible(true); + + FlinkKafkaConsumer08<?> consumer = mock(FlinkKafkaConsumer08.class); + when(consumer.snapshotState(anyLong(), anyLong())).thenCallRealMethod(); + + + HashMap<KafkaTopicPartition, Long> testOffsets = new HashMap<>(); + long[] offsets = new long[] { 43, 6146, 133, 16, 162, 616 }; + int j = 0; + for (long i: offsets) { + KafkaTopicPartition ktp = new KafkaTopicPartition("topic", j++); + testOffsets.put(ktp, i); + } + + LinkedMap map = new LinkedMap(); + + offsetsField.set(consumer, testOffsets); + runningField.set(consumer, true); + mapField.set(consumer, map); + + assertTrue(map.isEmpty()); + + // make multiple checkpoints + for (long checkpointId = 10L; checkpointId <= 2000L; checkpointId += 9L) { + HashMap<KafkaTopicPartition, Long> checkpoint = consumer.snapshotState(checkpointId, 47 * checkpointId); + assertEquals(testOffsets, checkpoint); + + // change the offsets, make sure the snapshot did not change + HashMap<KafkaTopicPartition, Long> checkpointCopy = (HashMap<KafkaTopicPartition, Long>) checkpoint.clone(); + + for (Map.Entry<KafkaTopicPartition, Long> e: testOffsets.entrySet()) { + testOffsets.put(e.getKey(), e.getValue() + 1); + } + + assertEquals(checkpointCopy, checkpoint); + + assertTrue(map.size() > 0); + assertTrue(map.size() <= FlinkKafkaConsumer08.MAX_NUM_PENDING_CHECKPOINTS); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + @Ignore("Kafka consumer internally makes an infinite loop") + public void testCreateSourceWithoutCluster() { + try { + Properties props = new Properties(); + props.setProperty("zookeeper.connect", "localhost:56794"); + props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222"); + props.setProperty("group.id", "non-existent-group"); + + new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), new SimpleStringSchema(), props); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java new file mode 100644 index 0000000..72d2772 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import kafka.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaLocalSystemTime implements Time { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaLocalSystemTime.class); + + @Override + public long milliseconds() { + return System.currentTimeMillis(); + } + + @Override + public long nanoseconds() { + return System.nanoTime(); + } + + @Override + public void sleep(long ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + LOG.warn("Interruption", e); + } + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java new file mode 100644 index 0000000..8602ffe --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.apache.flink.util.TestLogger; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.PartitionInfo; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.Future; + + +import static org.mockito.Mockito.*; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +import static org.junit.Assert.*; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(FlinkKafkaProducerBase.class) +public class KafkaProducerTest extends TestLogger { + + @Test + @SuppressWarnings("unchecked") + public void testPropagateExceptions() { + try { + // mock kafka producer + KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class); + + // partition setup + when(kafkaProducerMock.partitionsFor(anyString())).thenReturn( + Arrays.asList(new PartitionInfo("mock_topic", 42, null, null, null))); + + // failure when trying to send an element + when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class))) + .thenAnswer(new Answer<Future<RecordMetadata>>() { + @Override + public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable { + Callback callback = (Callback) invocation.getArguments()[1]; + callback.onCompletion(null, new Exception("Test error")); + return null; + } + }); + + // make sure the FlinkKafkaProducer instantiates our mock producer + whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock); + + // (1) producer that propagates errors + + FlinkKafkaProducer08<String> producerPropagating = new FlinkKafkaProducer08<>( + "mock_topic", new SimpleStringSchema(), new Properties(), null); + + producerPropagating.setRuntimeContext(new MockRuntimeContext(17, 3)); + producerPropagating.open(new Configuration()); + + try { + producerPropagating.invoke("value"); + producerPropagating.invoke("value"); + fail("This should fail with an exception"); + } + catch (Exception e) { + assertNotNull(e.getCause()); + assertNotNull(e.getCause().getMessage()); + assertTrue(e.getCause().getMessage().contains("Test error")); + } + + // (2) producer that only logs errors + + FlinkKafkaProducer08<String> producerLogging = new FlinkKafkaProducer08<>( + "mock_topic", new SimpleStringSchema(), new Properties(), null); + producerLogging.setLogFailuresOnly(true); + + producerLogging.setRuntimeContext(new MockRuntimeContext(17, 3)); + producerLogging.open(new Configuration()); + + producerLogging.invoke("value"); + producerLogging.invoke("value"); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java new file mode 100644 index 0000000..348b75d --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.streaming.connectors.kafka; + +import kafka.admin.AdminUtils; +import kafka.api.PartitionMetadata; +import kafka.common.KafkaException; +import kafka.consumer.ConsumerConfig; +import kafka.network.SocketServer; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import org.I0Itec.zkclient.ZkClient; +import org.apache.commons.io.FileUtils; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.TestingServer; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader; +import org.apache.flink.streaming.connectors.kafka.internals.ZooKeeperStringSerializer; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.util.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.Seq; + +import java.io.File; +import java.net.BindException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +import static org.apache.flink.util.NetUtils.hostAndPortToUrlString; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * An implementation of the KafkaServerProvider for Kafka 0.8 + */ +public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { + + protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class); + private File tmpZkDir; + private File tmpKafkaParent; + private List<File> tmpKafkaDirs; + private List<KafkaServer> brokers; + private TestingServer zookeeper; + private String zookeeperConnectionString; + private String brokerConnectionString = ""; + private Properties standardProps; + private ConsumerConfig standardCC; + + + public String getBrokerConnectionString() { + return brokerConnectionString; + } + + + @Override + public ConsumerConfig getStandardConsumerConfig() { + return standardCC; + } + + @Override + public Properties getStandardProperties() { + return standardProps; + } + + @Override + public String getVersion() { + return "0.8"; + } + + @Override + public List<KafkaServer> getBrokers() { + return brokers; + } + + @Override + public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) { + return new FlinkKafkaConsumer08<>(topics, readSchema, props); + } + + @Override + public <T> FlinkKafkaProducerBase<T> getProducer(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) { + return new FlinkKafkaProducer08<T>(topic, serSchema, props, partitioner); + } + + @Override + public void restartBroker(int leaderId) throws Exception { + brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId), KAFKA_HOST, zookeeperConnectionString)); + } + + @Override + public int getLeaderToShutDown(String topic) throws Exception { + ZkClient zkClient = createZkClient(); + PartitionMetadata firstPart = null; + do { + if (firstPart != null) { + LOG.info("Unable to find leader. error code {}", firstPart.errorCode()); + // not the first try. Sleep a bit + Thread.sleep(150); + } + + Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata(); + firstPart = partitionMetadata.head(); + } + while (firstPart.errorCode() != 0); + zkClient.close(); + + return firstPart.leader().get().id(); + } + + @Override + public int getBrokerId(KafkaServer server) { + return server.socketServer().brokerId(); + } + + + @Override + public void prepare(int numKafkaServers) { + File tempDir = new File(System.getProperty("java.io.tmpdir")); + + tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); + assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs()); + + tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString())); + assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs()); + + tmpKafkaDirs = new ArrayList<>(numKafkaServers); + for (int i = 0; i < numKafkaServers; i++) { + File tmpDir = new File(tmpKafkaParent, "server-" + i); + assertTrue("cannot create kafka temp dir", tmpDir.mkdir()); + tmpKafkaDirs.add(tmpDir); + } + + zookeeper = null; + brokers = null; + + try { + LOG.info("Starting Zookeeper"); + zookeeper = new TestingServer(-1, tmpZkDir); + zookeeperConnectionString = zookeeper.getConnectString(); + + LOG.info("Starting KafkaServer"); + brokers = new ArrayList<>(numKafkaServers); + + for (int i = 0; i < numKafkaServers; i++) { + brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i), KafkaTestEnvironment.KAFKA_HOST, zookeeperConnectionString)); + SocketServer socketServer = brokers.get(i).socketServer(); + + String host = socketServer.host() == null ? "localhost" : socketServer.host(); + brokerConnectionString += hostAndPortToUrlString(host, socketServer.port()) + ","; + } + + LOG.info("ZK and KafkaServer started."); + } + catch (Throwable t) { + t.printStackTrace(); + fail("Test setup failed: " + t.getMessage()); + } + + standardProps = new Properties(); + standardProps.setProperty("zookeeper.connect", zookeeperConnectionString); + standardProps.setProperty("bootstrap.servers", brokerConnectionString); + standardProps.setProperty("group.id", "flink-tests"); + standardProps.setProperty("auto.commit.enable", "false"); + standardProps.setProperty("zookeeper.session.timeout.ms", "12000"); // 6 seconds is default. Seems to be too small for travis. + standardProps.setProperty("zookeeper.connection.timeout.ms", "20000"); + standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. + standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!) + + Properties consumerConfigProps = new Properties(); + consumerConfigProps.putAll(standardProps); + consumerConfigProps.setProperty("auto.offset.reset", "smallest"); + standardCC = new ConsumerConfig(consumerConfigProps); + } + + @Override + public void shutdown() { + for (KafkaServer broker : brokers) { + if (broker != null) { + broker.shutdown(); + } + } + brokers.clear(); + + if (zookeeper != null) { + try { + zookeeper.stop(); + } + catch (Exception e) { + LOG.warn("ZK.stop() failed", e); + } + zookeeper = null; + } + + // clean up the temp spaces + + if (tmpKafkaParent != null && tmpKafkaParent.exists()) { + try { + FileUtils.deleteDirectory(tmpKafkaParent); + } + catch (Exception e) { + // ignore + } + } + if (tmpZkDir != null && tmpZkDir.exists()) { + try { + FileUtils.deleteDirectory(tmpZkDir); + } + catch (Exception e) { + // ignore + } + } + } + + @Override + public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) { + // create topic with one client + Properties topicConfig = new Properties(); + LOG.info("Creating topic {}", topic); + + ZkClient creator = createZkClient(); + + AdminUtils.createTopic(creator, topic, numberOfPartitions, replicationFactor, topicConfig); + creator.close(); + + // validate that the topic has been created + final long deadline = System.currentTimeMillis() + 30000; + do { + try { + Thread.sleep(100); + } + catch (InterruptedException e) { + // restore interrupted state + } + List<KafkaTopicPartitionLeader> partitions = FlinkKafkaConsumer08.getPartitionsForTopic(Collections.singletonList(topic), standardProps); + if (partitions != null && partitions.size() > 0) { + return; + } + } + while (System.currentTimeMillis() < deadline); + fail ("Test topic could not be created"); + } + + @Override + public void deleteTestTopic(String topic) { + LOG.info("Deleting topic {}", topic); + + ZkClient zk = createZkClient(); + AdminUtils.deleteTopic(zk, topic); + zk.close(); + } + + private ZkClient createZkClient() { + return new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), + standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer()); + } + + /** + * Only for the 0.8 server we need access to the zk client. + */ + public CuratorFramework createCuratorClient() { + RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 10); + CuratorFramework curatorClient = CuratorFrameworkFactory.newClient(standardProps.getProperty("zookeeper.connect"), retryPolicy); + curatorClient.start(); + return curatorClient; + } + + /** + * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed) + */ + protected static KafkaServer getKafkaServer(int brokerId, File tmpFolder, + String kafkaHost, + String zookeeperConnectionString) throws Exception { + LOG.info("Starting broker with id {}", brokerId); + Properties kafkaProperties = new Properties(); + + // properties have to be Strings + kafkaProperties.put("advertised.host.name", kafkaHost); + kafkaProperties.put("broker.id", Integer.toString(brokerId)); + kafkaProperties.put("log.dir", tmpFolder.toString()); + kafkaProperties.put("zookeeper.connect", zookeeperConnectionString); + kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024)); + kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024)); + + // for CI stability, increase zookeeper session timeout + kafkaProperties.put("zookeeper.session.timeout.ms", "20000"); + + final int numTries = 5; + + for (int i = 1; i <= numTries; i++) { + int kafkaPort = NetUtils.getAvailablePort(); + kafkaProperties.put("port", Integer.toString(kafkaPort)); + KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties); + + try { + KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime()); + server.startup(); + return server; + } + catch (KafkaException e) { + if (e.getCause() instanceof BindException) { + // port conflict, retry... + LOG.info("Port conflict when starting Kafka Broker. Retrying..."); + } + else { + throw e; + } + } + } + + throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts."); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java new file mode 100644 index 0000000..c99e133 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl; +import org.apache.flink.streaming.connectors.kafka.KafkaTestBase; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class ZookeeperOffsetHandlerTest extends KafkaTestBase { + + @Test + public void runOffsetManipulationinZooKeeperTest() { + try { + final String topicName = "ZookeeperOffsetHandlerTest-Topic"; + final String groupId = "ZookeeperOffsetHandlerTest-Group"; + + final long offset = (long) (Math.random() * Long.MAX_VALUE); + + CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer ).createCuratorClient(); + kafkaServer.createTestTopic(topicName, 3, 2); + + ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorFramework, groupId, topicName, 0, offset); + + long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, groupId, topicName, 0); + + curatorFramework.close(); + + assertEquals(offset, fetchedOffset); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..6bdfb48 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties @@ -0,0 +1,29 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger + + http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml new file mode 100644 index 0000000..45b3b92 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/logback-test.xml @@ -0,0 +1,30 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> + <logger name="org.apache.flink.streaming" level="WARN"/> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml new file mode 100644 index 0000000..b3c9749 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml @@ -0,0 +1,131 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-connectors-parent</artifactId> + <version>1.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-kafka-0.9</artifactId> + <name>flink-connector-kafka-0.9</name> + + <packaging>jar</packaging> + + <!-- Allow users to pass custom connector versions --> + <properties> + <kafka.version>0.9.0.0</kafka.version> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-base</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-base</artifactId> + <version>${project.version}</version> + <exclusions> + <!-- exclude 0.8 dependencies --> + <exclusion> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + </exclusion> + </exclusions> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.version}</version> + </dependency> + + <dependency> + <!-- include 0.9 server for tests --> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.binary.version}</artifactId> + <version>${kafka.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-tests</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + </dependencies> + + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <configuration> + <!-- Enforce single fork execution due to heavy mini cluster use in the tests --> + <forkCount>1</forkCount> + </configuration> + </plugin> + </plugins> + </build> + +</project>
