FLUME-2823: Flume-Kafka-Channel with new APIs (Jeff Holoman via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e8c4a7bf Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e8c4a7bf Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e8c4a7bf Branch: refs/heads/trunk Commit: e8c4a7bffc74f6ea10ae6cc45adbaf4919f45186 Parents: 7f588e6 Author: Jarek Jarcec Cecho <[email protected]> Authored: Tue Mar 29 09:45:40 2016 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Tue Mar 29 09:45:40 2016 -0700 ---------------------------------------------------------------------- flume-ng-channels/flume-kafka-channel/pom.xml | 6 + .../flume/channel/kafka/KafkaChannel.java | 641 ++++++++++++------- .../kafka/KafkaChannelConfiguration.java | 32 +- .../flume/channel/kafka/TestKafkaChannel.java | 219 ++++--- .../src/test/resources/kafka-server.properties | 2 +- flume-ng-doc/sphinx/FlumeUserGuide.rst | 88 ++- 6 files changed, 634 insertions(+), 354 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/e8c4a7bf/flume-ng-channels/flume-kafka-channel/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/pom.xml b/flume-ng-channels/flume-kafka-channel/pom.xml index fa1bd42..587b4b4 100644 --- a/flume-ng-channels/flume-kafka-channel/pom.xml +++ b/flume-ng-channels/flume-kafka-channel/pom.xml @@ -40,6 +40,12 @@ limitations under the License. <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.flume.flume-ng-sinks</groupId> http://git-wip-us.apache.org/repos/asf/flume/blob/e8c4a7bf/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java index c0c1c66..2d9b0c6 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java @@ -20,108 +20,120 @@ package org.apache.flume.channel.kafka; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; -import kafka.consumer.*; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; -import org.apache.avro.io.*; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.flume.*; +import org.apache.flume.ChannelException; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.FlumeException; import org.apache.flume.channel.BasicChannelSemantics; import org.apache.flume.channel.BasicTransactionSemantics; import org.apache.flume.conf.ConfigurationException; - -import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*; - import org.apache.flume.event.EventBuilder; import org.apache.flume.instrumentation.kafka.KafkaChannelCounter; import org.apache.flume.source.avro.AvroFlumeEvent; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*; + public class KafkaChannel extends BasicChannelSemantics { - private final static Logger LOGGER = - LoggerFactory.getLogger(KafkaChannel.class); + private final static Logger logger = + LoggerFactory.getLogger(KafkaChannel.class); + private final Properties consumerProps = new Properties(); + private final Properties producerProps = new Properties(); - private final Properties kafkaConf = new Properties(); - private Producer<String, byte[]> producer; + private KafkaProducer<String, byte[]> producer; private final String channelUUID = UUID.randomUUID().toString(); private AtomicReference<String> topic = new AtomicReference<String>(); private boolean parseAsFlumeEvent = DEFAULT_PARSE_AS_FLUME_EVENT; - private final Map<String, Integer> topicCountMap = - Collections.synchronizedMap(new HashMap<String, Integer>()); + + //used to indicate if a rebalance has occurred during the current transaction + AtomicBoolean rebalanceFlag = new AtomicBoolean(); + //This isn't a Kafka property per se, but we allow it to be configurable + private long pollTimeout = DEFAULT_POLL_TIMEOUT; + // Track all consumers to close them eventually. - private final List<ConsumerAndIterator> consumers = - Collections.synchronizedList(new LinkedList<ConsumerAndIterator>()); + private final List<ConsumerAndRecords> consumers = + Collections.synchronizedList(new LinkedList<ConsumerAndRecords>()); private KafkaChannelCounter counter; - /* Each ConsumerConnector commit will commit all partitions owned by it. To + /* Each Consumer commit will commit all partitions owned by it. To * ensure that each partition is only committed when all events are - * actually done, we will need to keep a ConsumerConnector per thread. - * See Neha's answer here: - * http://grokbase.com/t/kafka/users/13b4gmk2jk/commit-offset-per-topic - * Since only one consumer connector will a partition at any point in time, - * when we commit the partition we would have committed all events to the - * final destination from that partition. - * - * If a new partition gets assigned to this connector, - * my understanding is that all message from the last partition commit will - * get replayed which may cause duplicates -- which is fine as this - * happens only on partition rebalancing which is on failure or new nodes - * coming up, which is rare. + * actually done, we will need to keep a Consumer per thread. */ - private final ThreadLocal<ConsumerAndIterator> consumerAndIter = new - ThreadLocal<ConsumerAndIterator>() { - @Override - public ConsumerAndIterator initialValue() { - return createConsumerAndIter(); - } - }; + + private final ThreadLocal<ConsumerAndRecords> consumerAndRecords = new + ThreadLocal<ConsumerAndRecords>() { + @Override + public ConsumerAndRecords initialValue() { + return createConsumerAndRecords(); + } + }; @Override public void start() { - try { - LOGGER.info("Starting Kafka Channel: " + getName()); - producer = new Producer<String, byte[]>(new ProducerConfig(kafkaConf)); + logger.info("Starting Kafka Channel: {}", getName()); + producer = new KafkaProducer<String, byte[]>(producerProps); // We always have just one topic being read by one thread - LOGGER.info("Topic = " + topic.get()); - topicCountMap.put(topic.get(), 1); + logger.info("Topic = {}", topic.get()); counter.start(); super.start(); - } catch (Exception e) { - LOGGER.error("Could not start producer"); - throw new FlumeException("Unable to create Kafka Connections. " + - "Check whether Kafka Brokers are up and that the " + - "Flume agent can connect to it.", e); - } } @Override public void stop() { - for (ConsumerAndIterator c : consumers) { + for (ConsumerAndRecords c : consumers) { try { - decommissionConsumerAndIterator(c); + decommissionConsumerAndRecords(c); } catch (Exception ex) { - LOGGER.warn("Error while shutting down consumer.", ex); + logger.warn("Error while shutting down consumer.", ex); } } producer.close(); counter.stop(); super.stop(); - LOGGER.info("Kafka channel {} stopped. Metrics: {}", getName(), - counter); + logger.info("Kafka channel {} stopped. Metrics: {}", getName(), + counter); } @Override @@ -129,98 +141,147 @@ public class KafkaChannel extends BasicChannelSemantics { return new KafkaTransaction(); } - private synchronized ConsumerAndIterator createConsumerAndIter() { - try { - ConsumerConfig consumerConfig = new ConsumerConfig(kafkaConf); - ConsumerConnector consumer = - Consumer.createJavaConsumerConnector(consumerConfig); - Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = - consumer.createMessageStreams(topicCountMap); - final List<KafkaStream<byte[], byte[]>> streamList = consumerMap - .get(topic.get()); - KafkaStream<byte[], byte[]> stream = streamList.remove(0); - ConsumerAndIterator ret = - new ConsumerAndIterator(consumer, stream.iterator(), channelUUID); - consumers.add(ret); - LOGGER.info("Created new consumer to connect to Kafka"); - return ret; - } catch (Exception e) { - throw new FlumeException("Unable to connect to Kafka", e); - } - } - - Properties getKafkaConf() { - return kafkaConf; - } - @Override public void configure(Context ctx) { - String topicStr = ctx.getString(TOPIC); + + //Can remove in the next release + translateOldProps(ctx); + + String topicStr = ctx.getString(TOPIC_CONFIG); if (topicStr == null || topicStr.isEmpty()) { topicStr = DEFAULT_TOPIC; - LOGGER - .info("Topic was not specified. Using " + topicStr + " as the topic."); + logger.info("Topic was not specified. Using {} as the topic.", topicStr); } topic.set(topicStr); - String groupId = ctx.getString(GROUP_ID_FLUME); - if (groupId == null || groupId.isEmpty()) { - groupId = DEFAULT_GROUP_ID; - LOGGER.info( - "Group ID was not specified. Using " + groupId + " as the group id."); + String bootStrapServers = ctx.getString(BOOTSTRAP_SERVERS_CONFIG); + if (bootStrapServers == null || bootStrapServers.isEmpty()) { + throw new ConfigurationException("Bootstrap Servers must be specified"); } - String brokerList = ctx.getString(BROKER_LIST_FLUME_KEY); - if (brokerList == null || brokerList.isEmpty()) { - throw new ConfigurationException("Broker List must be specified"); + + setProducerProps(ctx, bootStrapServers); + setConsumerProps(ctx, bootStrapServers); + + parseAsFlumeEvent = ctx.getBoolean(PARSE_AS_FLUME_EVENT, DEFAULT_PARSE_AS_FLUME_EVENT); + pollTimeout = ctx.getLong(POLL_TIMEOUT, DEFAULT_POLL_TIMEOUT); + + if (counter == null) { + counter = new KafkaChannelCounter(getName()); } - String zkConnect = ctx.getString(ZOOKEEPER_CONNECT_FLUME_KEY); - if (zkConnect == null || zkConnect.isEmpty()) { - throw new ConfigurationException( - "Zookeeper Connection must be specified"); + } + + // We can remove this once the properties are officially deprecated + private void translateOldProps(Context ctx) { + + if (!(ctx.containsKey(TOPIC_CONFIG))) { + ctx.put(TOPIC_CONFIG, ctx.getString("topic")); + logger.warn("{} is deprecated. Please use the parameter {}", "topic", TOPIC_CONFIG); } - kafkaConf.putAll(ctx.getSubProperties(KAFKA_PREFIX)); - kafkaConf.put(GROUP_ID, groupId); - kafkaConf.put(BROKER_LIST_KEY, brokerList); - kafkaConf.put(ZOOKEEPER_CONNECT, zkConnect); - kafkaConf.put(AUTO_COMMIT_ENABLED, String.valueOf(false)); - if(kafkaConf.get(CONSUMER_TIMEOUT) == null) { - kafkaConf.put(CONSUMER_TIMEOUT, DEFAULT_TIMEOUT); + + //Broker List + // If there is no value we need to check and set the old param and log a warning message + if (!(ctx.containsKey(BOOTSTRAP_SERVERS_CONFIG))) { + String brokerList = ctx.getString(BROKER_LIST_FLUME_KEY); + if (brokerList == null || brokerList.isEmpty()) { + throw new ConfigurationException("Bootstrap Servers must be specified"); + } else { + ctx.put(BOOTSTRAP_SERVERS_CONFIG, brokerList); + logger.warn("{} is deprecated. Please use the parameter {}", BROKER_LIST_FLUME_KEY, BOOTSTRAP_SERVERS_CONFIG); + } } - kafkaConf.put(REQUIRED_ACKS_KEY, "-1"); - LOGGER.info(kafkaConf.toString()); - parseAsFlumeEvent = - ctx.getBoolean(PARSE_AS_FLUME_EVENT, DEFAULT_PARSE_AS_FLUME_EVENT); - - boolean readSmallest = ctx.getBoolean(READ_SMALLEST_OFFSET, - DEFAULT_READ_SMALLEST_OFFSET); - // If the data is to be parsed as Flume events, we always read the smallest. - // Else, we read the configuration, which by default reads the largest. - if (parseAsFlumeEvent || readSmallest) { - // readSmallest is eval-ed only if parseAsFlumeEvent is false. - // The default is largest, so we don't need to set it explicitly. - kafkaConf.put("auto.offset.reset", "smallest"); + + //GroupId + // If there is an old Group Id set, then use that if no groupId is set. + if (!(ctx.containsKey(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG))) { + String oldGroupId = ctx.getString(GROUP_ID_FLUME); + if ( oldGroupId != null && !oldGroupId.isEmpty()) { + ctx.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG, oldGroupId); + logger.warn("{} is deprecated. Please use the parameter {}", GROUP_ID_FLUME, KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG); + } } - if (counter == null) { - counter = new KafkaChannelCounter(getName()); + if (!(ctx.containsKey((KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)))) { + Boolean oldReadSmallest = ctx.getBoolean(READ_SMALLEST_OFFSET); + String auto; + if (oldReadSmallest != null) { + if (oldReadSmallest) { + auto = "earliest"; + } else { + auto = "latest"; + } + ctx.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,auto); + logger.warn("{} is deprecated. Please use the parameter {}", READ_SMALLEST_OFFSET,KAFKA_CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); + } + + } + } + + + private void setProducerProps(Context ctx, String bootStrapServers) { + producerProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KEY_SERIALIZER); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIAIZER); + //Defaults overridden based on config + producerProps.putAll(ctx.getSubProperties(KAFKA_PRODUCER_PREFIX)); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); + logger.info("Producer properties: " + producerProps.toString()); + } + + protected Properties getProducerProps() { + return producerProps; + } + + private void setConsumerProps(Context ctx, String bootStrapServers) { + String groupId = ctx.getString(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG); + if (groupId == null || groupId.isEmpty()) { + groupId = DEFAULT_GROUP_ID; + logger.info("Group ID was not specified. Using {} as the group id.", groupId); } + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DEFAULT_KEY_DESERIALIZER); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_DESERIAIZER); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, DEFAULT_AUTO_OFFSET_RESET); + //Defaults overridden based on config + consumerProps.putAll(ctx.getSubProperties(KAFKA_CONSUMER_PREFIX)); + //These always take precedence over config + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + + logger.info(consumerProps.toString()); + } + protected Properties getConsumerProps() { return consumerProps; } + + + private synchronized ConsumerAndRecords createConsumerAndRecords() { + try { + KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(consumerProps); + ConsumerAndRecords car = new ConsumerAndRecords(consumer, channelUUID); + logger.info("Created new consumer to connect to Kafka"); + car.consumer.subscribe(Arrays.asList(topic.get()), new ChannelRebalanceListener(rebalanceFlag)); + car.offsets = new HashMap<TopicPartition, OffsetAndMetadata>(); + consumers.add(car); + return car; + } catch (Exception e) { + throw new FlumeException("Unable to connect to Kafka", e); + } } - private void decommissionConsumerAndIterator(ConsumerAndIterator c) { + private void decommissionConsumerAndRecords(ConsumerAndRecords c) { if (c.failedEvents.isEmpty()) { - c.consumer.commitOffsets(); + c.commitOffsets(); } c.failedEvents.clear(); - c.consumer.shutdown(); + c.consumer.close(); } - // Force a consumer to be initialized. There are many duplicates in - // tests due to rebalancing - making testing tricky. In production, - // this is less of an issue as - // rebalancing would happen only on startup. @VisibleForTesting void registerThread() { - consumerAndIter.get(); + try { + consumerAndRecords.get(); + } catch (Exception e) { + logger.error(e.getMessage()); + e.printStackTrace(); + } } private enum TransactionType { @@ -233,54 +294,41 @@ public class KafkaChannel extends BasicChannelSemantics { private class KafkaTransaction extends BasicTransactionSemantics { private TransactionType type = TransactionType.NONE; - // For Puts private Optional<ByteArrayOutputStream> tempOutStream = Optional - .absent(); - - // For put transactions, serialize the events and batch them and send it. - private Optional<LinkedList<byte[]>> serializedEvents = Optional.absent(); + .absent(); + // For put transactions, serialize the events and hold them until the commit goes is requested. + private Optional<LinkedList<ProducerRecord<String, byte[]>>> producerRecords = Optional.absent(); // For take transactions, deserialize and hold them till commit goes through private Optional<LinkedList<Event>> events = Optional.absent(); private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer = - Optional.absent(); + Optional.absent(); private Optional<SpecificDatumReader<AvroFlumeEvent>> reader = - Optional.absent(); + Optional.absent(); + private Optional<LinkedList<Future<RecordMetadata>>> kafkaFutures = + Optional.absent(); + private final String batchUUID = UUID.randomUUID().toString(); // Fine to use null for initial value, Avro will create new ones if this // is null private BinaryEncoder encoder = null; private BinaryDecoder decoder = null; - private final String batchUUID = UUID.randomUUID().toString(); private boolean eventTaken = false; @Override + protected void doBegin() throws InterruptedException { + rebalanceFlag.set(false); + } + + @Override protected void doPut(Event event) throws InterruptedException { type = TransactionType.PUT; - if (!serializedEvents.isPresent()) { - serializedEvents = Optional.of(new LinkedList<byte[]>()); + if (!producerRecords.isPresent()) { + producerRecords = Optional.of(new LinkedList<ProducerRecord<String, byte[]>>()); } - + String key = event.getHeaders().get(KEY_HEADER); try { - if (parseAsFlumeEvent) { - if (!tempOutStream.isPresent()) { - tempOutStream = Optional.of(new ByteArrayOutputStream()); - } - if (!writer.isPresent()) { - writer = Optional.of(new - SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class)); - } - tempOutStream.get().reset(); - AvroFlumeEvent e = new AvroFlumeEvent( - toCharSeqMap(event.getHeaders()), - ByteBuffer.wrap(event.getBody())); - encoder = EncoderFactory.get() - .directBinaryEncoder(tempOutStream.get(), encoder); - writer.get().write(e, encoder); - // Not really possible to avoid this copy :( - serializedEvents.get().add(tempOutStream.get().toByteArray()); - } else { - serializedEvents.get().add(event.getBody()); - } + producerRecords.get().add(new ProducerRecord<String, byte[]> + (topic.get(), key, serializeValue(event, parseAsFlumeEvent))); } catch (Exception e) { throw new ChannelException("Error while serializing event", e); } @@ -291,53 +339,64 @@ public class KafkaChannel extends BasicChannelSemantics { protected Event doTake() throws InterruptedException { type = TransactionType.TAKE; try { - if (!(consumerAndIter.get().uuid.equals(channelUUID))) { - LOGGER.info("UUID mismatch, creating new consumer"); - decommissionConsumerAndIterator(consumerAndIter.get()); - consumerAndIter.remove(); + if (!(consumerAndRecords.get().uuid.equals(channelUUID))) { + logger.info("UUID mismatch, creating new consumer"); + decommissionConsumerAndRecords(consumerAndRecords.get()); + consumerAndRecords.remove(); } } catch (Exception ex) { - LOGGER.warn("Error while shutting down consumer", ex); + logger.warn("Error while shutting down consumer", ex); } if (!events.isPresent()) { events = Optional.of(new LinkedList<Event>()); } Event e; - if (!consumerAndIter.get().failedEvents.isEmpty()) { - e = consumerAndIter.get().failedEvents.removeFirst(); + // Give the channel a chance to commit if there has been a rebalance + if (rebalanceFlag.get()) { + logger.debug("Returning null event after Consumer rebalance."); + return null; + } + if (!consumerAndRecords.get().failedEvents.isEmpty()) { + e = consumerAndRecords.get().failedEvents.removeFirst(); } else { + + if (logger.isDebugEnabled()) { + logger.debug("Assigment: {}", consumerAndRecords.get().consumer.assignment().toString()); + } + try { - ConsumerIterator<byte[], byte[]> it = consumerAndIter.get().iterator; long startTime = System.nanoTime(); - it.hasNext(); - long endTime = System.nanoTime(); - counter.addToKafkaEventGetTimer((endTime-startTime)/(1000*1000)); - if (parseAsFlumeEvent) { - ByteArrayInputStream in = - new ByteArrayInputStream(it.next().message()); - decoder = DecoderFactory.get().directBinaryDecoder(in, decoder); - if (!reader.isPresent()) { - reader = Optional.of( - new SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class)); - } - AvroFlumeEvent event = reader.get().read(null, decoder); - e = EventBuilder.withBody(event.getBody().array(), - toStringMap(event.getHeaders())); - } else { - e = EventBuilder.withBody(it.next().message(), - Collections.EMPTY_MAP); + if (!consumerAndRecords.get().recordIterator.hasNext()) { + consumerAndRecords.get().poll(); } + if (consumerAndRecords.get().recordIterator.hasNext()) { + ConsumerRecord<String, byte[]> record = consumerAndRecords.get().recordIterator.next(); + e = deserializeValue(record.value(), parseAsFlumeEvent); + TopicPartition tp = new TopicPartition(record.topic(), record.partition()); + OffsetAndMetadata oam = new OffsetAndMetadata(record.offset() + 1, batchUUID); + consumerAndRecords.get().offsets.put(tp, oam); + + if (logger.isTraceEnabled()) { + logger.trace("Took offset: {}", consumerAndRecords.get().offsets.toString()); + } - } catch (ConsumerTimeoutException ex) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Timed out while waiting for data to come from Kafka", - ex); + //Add the key to the header + e.getHeaders().put(KEY_HEADER, record.key()); + + if (logger.isDebugEnabled()) { + logger.debug("Processed output from partition {} offset {}", record.partition(), record.offset()); + } + + long endTime = System.nanoTime(); + counter.addToKafkaEventGetTimer((endTime - startTime) / (1000 * 1000)); + } else { + return null; } - return null; } catch (Exception ex) { - LOGGER.warn("Error while getting events from Kafka", ex); + logger.warn("Error while getting events from Kafka. This is usually caused by trying to read " + + "a non-flume event. Ensure the setting for parseAsFlumeEvent is correct", ex); throw new ChannelException("Error while getting events from Kafka", - ex); + ex); } } eventTaken = true; @@ -351,32 +410,41 @@ public class KafkaChannel extends BasicChannelSemantics { return; } if (type.equals(TransactionType.PUT)) { + if (!kafkaFutures.isPresent()) { + kafkaFutures = Optional.of(new LinkedList<Future<RecordMetadata>>()); + } try { - List<KeyedMessage<String, byte[]>> messages = new - ArrayList<KeyedMessage<String, byte[]>>(serializedEvents.get() - .size()); - for (byte[] event : serializedEvents.get()) { - messages.add(new KeyedMessage<String, byte[]>(topic.get(), null, - batchUUID, event)); - } + long batchSize = producerRecords.get().size(); long startTime = System.nanoTime(); - producer.send(messages); + int index = 0; + for (ProducerRecord<String, byte[]> record : producerRecords.get()) { + index++; + kafkaFutures.get().add(producer.send(record, new ChannelCallback(index, startTime))); + } + //prevents linger.ms from being a problem + producer.flush(); + + for (Future<RecordMetadata> future : kafkaFutures.get()) { + future.get(); + } long endTime = System.nanoTime(); - counter.addToKafkaEventSendTimer((endTime-startTime)/(1000*1000)); - counter.addToEventPutSuccessCount(Long.valueOf(messages.size())); - serializedEvents.get().clear(); + counter.addToKafkaEventSendTimer((endTime - startTime) / (1000 * 1000)); + counter.addToEventPutSuccessCount(batchSize); + producerRecords.get().clear(); + kafkaFutures.get().clear(); } catch (Exception ex) { - LOGGER.warn("Sending events to Kafka failed", ex); + logger.warn("Sending events to Kafka failed", ex); throw new ChannelException("Commit failed as send to Kafka failed", - ex); + ex); } } else { - if (consumerAndIter.get().failedEvents.isEmpty() && eventTaken) { + if (consumerAndRecords.get().failedEvents.isEmpty() && eventTaken) { long startTime = System.nanoTime(); - consumerAndIter.get().consumer.commitOffsets(); + consumerAndRecords.get().commitOffsets(); long endTime = System.nanoTime(); - counter.addToKafkaCommitTimer((endTime-startTime)/(1000*1000)); - } + counter.addToKafkaCommitTimer((endTime - startTime) / (1000 * 1000)); + consumerAndRecords.get().printCurrentAssignment(); + } counter.addToEventTakeSuccessCount(Long.valueOf(events.get().size())); events.get().clear(); } @@ -388,37 +456,66 @@ public class KafkaChannel extends BasicChannelSemantics { return; } if (type.equals(TransactionType.PUT)) { - serializedEvents.get().clear(); + producerRecords.get().clear(); + kafkaFutures.get().clear(); } else { counter.addToRollbackCounter(Long.valueOf(events.get().size())); - consumerAndIter.get().failedEvents.addAll(events.get()); + consumerAndRecords.get().failedEvents.addAll(events.get()); events.get().clear(); } } - } - - private class ConsumerAndIterator { - final ConsumerConnector consumer; - final ConsumerIterator<byte[], byte[]> iterator; - final String uuid; - final LinkedList<Event> failedEvents = new LinkedList<Event>(); + private byte[] serializeValue(Event event, boolean parseAsFlumeEvent) throws IOException { + byte[] bytes; + if (parseAsFlumeEvent) { + if (!tempOutStream.isPresent()) { + tempOutStream = Optional.of(new ByteArrayOutputStream()); + } + if (!writer.isPresent()) { + writer = Optional.of(new + SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class)); + } + tempOutStream.get().reset(); + AvroFlumeEvent e = new AvroFlumeEvent( + toCharSeqMap(event.getHeaders()), + ByteBuffer.wrap(event.getBody())); + encoder = EncoderFactory.get() + .directBinaryEncoder(tempOutStream.get(), encoder); + writer.get().write(e, encoder); + encoder.flush(); + bytes = tempOutStream.get().toByteArray(); + } else { + bytes = event.getBody(); + } + return bytes; + } - ConsumerAndIterator(ConsumerConnector consumerConnector, - ConsumerIterator<byte[], byte[]> iterator, String uuid) { - this.consumer = consumerConnector; - this.iterator = iterator; - this.uuid = uuid; + private Event deserializeValue(byte[] value, boolean parseAsFlumeEvent) throws IOException { + Event e; + if (parseAsFlumeEvent) { + ByteArrayInputStream in = + new ByteArrayInputStream(value); + decoder = DecoderFactory.get().directBinaryDecoder(in, decoder); + if (!reader.isPresent()) { + reader = Optional.of( + new SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class)); + } + AvroFlumeEvent event = reader.get().read(null, decoder); + e = EventBuilder.withBody(event.getBody().array(), + toStringMap(event.getHeaders())); + } else { + e = EventBuilder.withBody(value, Collections.EMPTY_MAP); + } + return e; } } /** * Helper function to convert a map of String to a map of CharSequence. */ - private static Map<CharSequence, CharSequence> toCharSeqMap( - Map<String, String> stringMap) { + private static Map<CharSequence, CharSequence> toCharSeqMap(Map<String, String> stringMap) { Map<CharSequence, CharSequence> charSeqMap = - new HashMap<CharSequence, CharSequence>(); + new HashMap<CharSequence, CharSequence>(); for (Map.Entry<String, String> entry : stringMap.entrySet()) { charSeqMap.put(entry.getKey(), entry.getValue()); } @@ -428,13 +525,105 @@ public class KafkaChannel extends BasicChannelSemantics { /** * Helper function to convert a map of CharSequence to a map of String. */ - private static Map<String, String> toStringMap( - Map<CharSequence, CharSequence> charSeqMap) { - Map<String, String> stringMap = - new HashMap<String, String>(); + private static Map<String, String> toStringMap(Map<CharSequence, CharSequence> charSeqMap) { + Map<String, String> stringMap = new HashMap<String, String>(); for (Map.Entry<CharSequence, CharSequence> entry : charSeqMap.entrySet()) { stringMap.put(entry.getKey().toString(), entry.getValue().toString()); } return stringMap; } + + /* Object to store our consumer */ + private class ConsumerAndRecords { + final KafkaConsumer<String, byte[]> consumer; + final String uuid; + final LinkedList<Event> failedEvents = new LinkedList<Event>(); + + ConsumerRecords<String, byte[]> records; + Iterator<ConsumerRecord<String, byte[]>> recordIterator; + Map<TopicPartition, OffsetAndMetadata> offsets; + + ConsumerAndRecords(KafkaConsumer<String, byte[]> consumer, String uuid) { + this.consumer = consumer; + this.uuid = uuid; + this.records = ConsumerRecords.empty(); + this.recordIterator = records.iterator(); + } + + void poll() { + this.records = consumer.poll(pollTimeout); + this.recordIterator = records.iterator(); + logger.trace("polling"); + } + + void commitOffsets() { + this.consumer.commitSync(offsets); + } + + // This will reset the latest assigned partitions to the last committed offsets; + + public void printCurrentAssignment() { + StringBuilder sb = new StringBuilder(); + for (TopicPartition tp : this.consumer.assignment()) { + try { + sb.append("Committed: [").append(tp).append(",").append(this.consumer.committed(tp).offset()) + .append(",").append(this.consumer.committed(tp).metadata()).append("]"); + if (logger.isDebugEnabled()) { + logger.debug(sb.toString()); + } + } catch (NullPointerException npe) { + if (logger.isDebugEnabled()) { + logger.debug("Committed {}", tp); + } + } + } + } + } +} + +// Throw exception if there is an error +class ChannelCallback implements Callback { + private static final Logger log = LoggerFactory.getLogger(ChannelCallback.class); + private int index; + private long startTime; + + public ChannelCallback(int index, long startTime) { + this.index = index; + this.startTime = startTime; + } + + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null) { + log.trace("Error sending message to Kafka due to " + exception.getMessage()); + } + if (log.isDebugEnabled()) { + long batchElapsedTime = System.currentTimeMillis() - startTime; + log.debug("Acked message_no " + index + ": " + metadata.topic() + "-" + metadata.partition() + "-" + + metadata.offset() + "-" + batchElapsedTime); + } + } +} + +class ChannelRebalanceListener implements ConsumerRebalanceListener { + private static final Logger log = LoggerFactory.getLogger(ChannelRebalanceListener.class); + private AtomicBoolean rebalanceFlag; + + public ChannelRebalanceListener(AtomicBoolean rebalanceFlag) { + this.rebalanceFlag = rebalanceFlag; + } + + // Set a flag that a rebalance has occurred. Then we can commit the currently written transactions + // on the next doTake() pass. + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + for (TopicPartition partition : partitions) { + log.info("topic {} - partition {} revoked.", partition.topic(), partition.partition()); + rebalanceFlag.set(true); + } + } + + public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + for (TopicPartition partition : partitions) { + log.info("topic {} - partition {} assigned.", partition.topic(), partition.partition()); + } + } } http://git-wip-us.apache.org/repos/asf/flume/blob/e8c4a7bf/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java index 9a342ef..faf46b6 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java @@ -18,27 +18,45 @@ */ package org.apache.flume.channel.kafka; +import org.apache.kafka.clients.CommonClientConfigs; + public class KafkaChannelConfiguration { public static final String KAFKA_PREFIX = "kafka."; + public static final String KAFKA_CONSUMER_PREFIX = KAFKA_PREFIX + "consumer."; + public static final String KAFKA_PRODUCER_PREFIX = KAFKA_PREFIX + "producer."; + public static final String DEFAULT_ACKS = "all"; + public static final String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; + public static final String DEFAULT_VALUE_SERIAIZER = "org.apache.kafka.common.serialization.ByteArraySerializer"; + public static final String DEFAULT_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; + public static final String DEFAULT_VALUE_DESERIAIZER = "org.apache.kafka.common.serialization.ByteArrayDeserializer"; + public static final String TOPIC_CONFIG = KAFKA_PREFIX + "topic"; + public static final String BOOTSTRAP_SERVERS_CONFIG = KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; + public static final String DEFAULT_TOPIC = "flume-channel"; + public static final String DEFAULT_GROUP_ID = "flume"; + public static final String POLL_TIMEOUT = KAFKA_PREFIX + "pollTimeout"; + public static final long DEFAULT_POLL_TIMEOUT = 500; + + public static final String KEY_HEADER = "key"; + + public static final String DEFAULT_AUTO_OFFSET_RESET = "earliest"; + + public static final String PARSE_AS_FLUME_EVENT = "parseAsFlumeEvent"; + public static final boolean DEFAULT_PARSE_AS_FLUME_EVENT = true; + + /*** Old Configuration Parameters ****/ public static final String BROKER_LIST_KEY = "metadata.broker.list"; public static final String REQUIRED_ACKS_KEY = "request.required.acks"; public static final String BROKER_LIST_FLUME_KEY = "brokerList"; - public static final String TOPIC = "topic"; - public static final String GROUP_ID = "group.id"; + //public static final String TOPIC = "topic"; public static final String GROUP_ID_FLUME = "groupId"; public static final String AUTO_COMMIT_ENABLED = "auto.commit.enable"; public static final String ZOOKEEPER_CONNECT = "zookeeper.connect"; public static final String ZOOKEEPER_CONNECT_FLUME_KEY = "zookeeperConnect"; - public static final String DEFAULT_GROUP_ID = "flume"; - public static final String DEFAULT_TOPIC = "flume-channel"; public static final String TIMEOUT = "timeout"; public static final String DEFAULT_TIMEOUT = "100"; public static final String CONSUMER_TIMEOUT = "consumer.timeout.ms"; - public static final String PARSE_AS_FLUME_EVENT = "parseAsFlumeEvent"; - public static final boolean DEFAULT_PARSE_AS_FLUME_EVENT = true; - public static final String READ_SMALLEST_OFFSET = "readSmallestOffset"; public static final boolean DEFAULT_READ_SMALLEST_OFFSET = false; } http://git-wip-us.apache.org/repos/asf/flume/blob/e8c4a7bf/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java index 319e779..637428d 100644 --- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java @@ -19,13 +19,8 @@ package org.apache.flume.channel.kafka; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import kafka.admin.AdminUtils; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; -import kafka.utils.ZKStringSerializer$; -import org.I0Itec.zkclient.ZkClient; +import kafka.utils.ZkUtils; import org.apache.commons.lang.RandomStringUtils; import org.apache.flume.Context; import org.apache.flume.Event; @@ -33,17 +28,26 @@ import org.apache.flume.Transaction; import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; import org.apache.flume.sink.kafka.util.TestUtil; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.*; - -import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*; + public class TestKafkaChannel { + private final static Logger LOGGER = + LoggerFactory.getLogger(TestKafkaChannel.class); + private static TestUtil testUtil = TestUtil.getInstance(); private String topic = null; private final Set<String> usedTopics = new HashSet<String>(); @@ -78,11 +82,52 @@ public class TestKafkaChannel { testUtil.tearDown(); } + //Make sure the props are picked up correctly. + @Test + public void testProps() throws Exception { + Context context = new Context(); + context.put("kafka.producer.some-parameter", "1"); + context.put("kafka.consumer.another-parameter", "1"); + context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerUrl()); + context.put(TOPIC_CONFIG, topic); + + final KafkaChannel channel = new KafkaChannel(); + Configurables.configure(channel, context); + + Properties consumerProps = channel.getConsumerProps(); + Properties producerProps = channel.getProducerProps(); + + Assert.assertEquals(producerProps.getProperty("some-parameter"), "1"); + Assert.assertEquals(consumerProps.getProperty("another-parameter"), "1"); + } + + @Test + public void testOldConfig() throws Exception { + Context context = new Context(); + context.put(BROKER_LIST_FLUME_KEY,testUtil.getKafkaServerUrl()); + context.put(GROUP_ID_FLUME,"flume-something"); + context.put(READ_SMALLEST_OFFSET,"true"); + context.put("topic",topic); + + final KafkaChannel channel = new KafkaChannel(); + Configurables.configure(channel, context); + + Properties consumerProps = channel.getConsumerProps(); + Properties producerProps = channel.getProducerProps(); + + Assert.assertEquals(producerProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),testUtil.getKafkaServerUrl()); + Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG), "flume-something"); + Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); + + } + + @Test public void testSuccess() throws Exception { doTestSuccessRollback(false, false); } + @Test public void testSuccessInterleave() throws Exception { doTestSuccessRollback(false, true); @@ -99,7 +144,7 @@ public class TestKafkaChannel { } private void doTestSuccessRollback(final boolean rollback, - final boolean interleave) throws Exception { + final boolean interleave) throws Exception { final KafkaChannel channel = startChannel(true); writeAndVerify(rollback, channel, interleave); channel.stop(); @@ -122,82 +167,89 @@ public class TestKafkaChannel { } @Test - public void testNoParsingAsFlumeAgent() throws Exception { + public void testParseAsFlumeEventFalse() throws Exception { + doParseAsFlumeEventFalse(false); + } + + @Test + public void testParseAsFlumeEventFalseCheckHeader() throws Exception { + doParseAsFlumeEventFalse(true); + } + + @Test + public void testParseAsFlumeEventFalseAsSource() throws Exception { + doParseAsFlumeEventFalseAsSource(false); + } + + @Test + public void testParseAsFlumeEventFalseAsSourceCheckHeader() throws Exception { + doParseAsFlumeEventFalseAsSource(true); + } + + private void doParseAsFlumeEventFalse(Boolean checkHeaders) throws Exception { final KafkaChannel channel = startChannel(false); - Producer<String, byte[]> producer = new Producer<String, byte[]>( - new ProducerConfig(channel.getKafkaConf())); - List<KeyedMessage<String, byte[]>> original = Lists.newArrayList(); + Properties props = channel.getProducerProps(); + KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props); + for (int i = 0; i < 50; i++) { - KeyedMessage<String, byte[]> data = new KeyedMessage<String, - byte[]>(topic, null, RandomStringUtils.randomAlphabetic(6), - String.valueOf(i).getBytes()); - original.add(data); + ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, String.valueOf(i) + "-header", String.valueOf(i).getBytes()); + producer.send(data).get(); } - producer.send(original); ExecutorCompletionService<Void> submitterSvc = new - ExecutorCompletionService<Void>(Executors.newCachedThreadPool()); + ExecutorCompletionService<Void>(Executors.newCachedThreadPool()); List<Event> events = pullEvents(channel, submitterSvc, - 50, false, false); + 50, false, false); wait(submitterSvc, 5); - Set<Integer> finals = Sets.newHashSet(); + Map<Integer, String> finals = new HashMap<Integer, String>(); for (int i = 0; i < 50; i++) { - finals.add(Integer.parseInt(new String(events.get(i).getBody()))); + finals.put(Integer.parseInt(new String(events.get(i).getBody())), events.get(i).getHeaders().get(KEY_HEADER)); } for (int i = 0; i < 50; i++) { - Assert.assertTrue(finals.contains(i)); + Assert.assertTrue(finals.keySet().contains(i)); + if (checkHeaders) { + Assert.assertTrue(finals.containsValue(String.valueOf(i) + "-header")); + } finals.remove(i); } Assert.assertTrue(finals.isEmpty()); channel.stop(); } - @Test - public void testTimeoutConfig() throws Exception { - Context context = prepareDefaultContext(true); - KafkaChannel channel = new KafkaChannel(); - Configurables.configure(channel, context); - Assert.assertTrue(channel.getKafkaConf().get(CONSUMER_TIMEOUT) - .equals(DEFAULT_TIMEOUT)); - - String timeout = "1000"; - context.put("kafka."+CONSUMER_TIMEOUT, timeout); - channel = new KafkaChannel(); - Configurables.configure(channel, context); - Assert.assertTrue(channel.getKafkaConf().get(CONSUMER_TIMEOUT) - .equals(timeout)); - } - /** * Like the previous test but here we write to the channel like a Flume source would do * to verify that the events are written as text and not as an Avro object * * @throws Exception */ - @Test - public void testWritingToNoParsingAsFlumeAgent() throws Exception { + public void doParseAsFlumeEventFalseAsSource(Boolean checkHeaders) throws Exception { final KafkaChannel channel = startChannel(false); List<String> msgs = new ArrayList<String>(); - for (int i = 0; i < 50; i++){ + Map<String, String> headers = new HashMap<String, String>(); + for (int i = 0; i < 50; i++) { msgs.add(String.valueOf(i)); } Transaction tx = channel.getTransaction(); tx.begin(); - for (int i = 0; i < msgs.size(); i++){ - channel.put(EventBuilder.withBody(msgs.get(i).getBytes())); + for (int i = 0; i < msgs.size(); i++) { + headers.put(KEY_HEADER, String.valueOf(i) + "-header"); + channel.put(EventBuilder.withBody(msgs.get(i).getBytes(), headers)); } tx.commit(); ExecutorCompletionService<Void> submitterSvc = new ExecutorCompletionService<Void>(Executors.newCachedThreadPool()); List<Event> events = pullEvents(channel, submitterSvc, - 50, false, false); + 50, false, false); wait(submitterSvc, 5); - Set<Integer> finals = Sets.newHashSet(); + Map<Integer, String> finals = new HashMap<Integer, String>(); for (int i = 0; i < 50; i++) { - finals.add(Integer.parseInt(new String(events.get(i).getBody()))); + finals.put(Integer.parseInt(new String(events.get(i).getBody())), events.get(i).getHeaders().get(KEY_HEADER)); } for (int i = 0; i < 50; i++) { - Assert.assertTrue(finals.contains(i)); + Assert.assertTrue(finals.keySet().contains(i)); + if (checkHeaders) { + Assert.assertTrue(finals.containsValue(String.valueOf(i) + "-header")); + } finals.remove(i); } Assert.assertTrue(finals.isEmpty()); @@ -216,12 +268,12 @@ public class TestKafkaChannel { * @throws Exception */ private void doTestStopAndStart(boolean rollback, - boolean retryAfterRollback) throws Exception { + boolean retryAfterRollback) throws Exception { final KafkaChannel channel = startChannel(true); ExecutorService underlying = Executors - .newCachedThreadPool(); + .newCachedThreadPool(); ExecutorCompletionService<Void> submitterSvc = - new ExecutorCompletionService<Void>(underlying); + new ExecutorCompletionService<Void>(underlying); final List<List<Event>> events = createBaseList(); putEvents(channel, events, submitterSvc); int completed = 0; @@ -233,14 +285,14 @@ public class TestKafkaChannel { total = 40; } final List<Event> eventsPulled = - pullEvents(channel2, submitterSvc, total, rollback, retryAfterRollback); + pullEvents(channel2, submitterSvc, total, rollback, retryAfterRollback); wait(submitterSvc, 5); channel2.stop(); if (!retryAfterRollback && rollback) { final KafkaChannel channel3 = startChannel(true); int expectedRemaining = 50 - eventsPulled.size(); final List<Event> eventsPulled2 = - pullEvents(channel3, submitterSvc, expectedRemaining, false, false); + pullEvents(channel3, submitterSvc, expectedRemaining, false, false); wait(submitterSvc, 5); Assert.assertEquals(expectedRemaining, eventsPulled2.size()); eventsPulled.addAll(eventsPulled2); @@ -259,18 +311,18 @@ public class TestKafkaChannel { } private void writeAndVerify(final boolean testRollbacks, - final KafkaChannel channel) throws Exception { + final KafkaChannel channel) throws Exception { writeAndVerify(testRollbacks, channel, false); } private void writeAndVerify(final boolean testRollbacks, - final KafkaChannel channel, final boolean interleave) throws Exception { + final KafkaChannel channel, final boolean interleave) throws Exception { final List<List<Event>> events = createBaseList(); ExecutorCompletionService<Void> submitterSvc = - new ExecutorCompletionService<Void>(Executors - .newCachedThreadPool()); + new ExecutorCompletionService<Void>(Executors + .newCachedThreadPool()); putEvents(channel, events, submitterSvc); @@ -279,11 +331,11 @@ public class TestKafkaChannel { } ExecutorCompletionService<Void> submitterSvc2 = - new ExecutorCompletionService<Void>(Executors - .newCachedThreadPool()); + new ExecutorCompletionService<Void>(Executors + .newCachedThreadPool()); final List<Event> eventsPulled = - pullEvents(channel, submitterSvc2, 50, testRollbacks, true); + pullEvents(channel, submitterSvc2, 50, testRollbacks, true); if (!interleave) { wait(submitterSvc, 5); @@ -301,7 +353,7 @@ public class TestKafkaChannel { for (int j = 0; j < 10; j++) { Map<String, String> hdrs = new HashMap<String, String>(); String v = (String.valueOf(i) + " - " + String - .valueOf(j)); + .valueOf(j)); hdrs.put("header", v); eventList.add(EventBuilder.withBody(v.getBytes(), hdrs)); } @@ -310,7 +362,7 @@ public class TestKafkaChannel { } private void putEvents(final KafkaChannel channel, final List<List<Event>> - events, ExecutorCompletionService<Void> submitterSvc) { + events, ExecutorCompletionService<Void> submitterSvc) { for (int i = 0; i < 5; i++) { final int index = i; submitterSvc.submit(new Callable<Void>() { @@ -334,10 +386,10 @@ public class TestKafkaChannel { } private List<Event> pullEvents(final KafkaChannel channel, - ExecutorCompletionService<Void> submitterSvc, final int total, - final boolean testRollbacks, final boolean retryAfterRollback) { + ExecutorCompletionService<Void> submitterSvc, final int total, + final boolean testRollbacks, final boolean retryAfterRollback) { final List<Event> eventsPulled = Collections.synchronizedList(new - ArrayList<Event>(50)); + ArrayList<Event>(50)); final CyclicBarrier barrier = new CyclicBarrier(5); final AtomicInteger counter = new AtomicInteger(0); final AtomicInteger rolledBackCount = new AtomicInteger(0); @@ -366,9 +418,9 @@ public class TestKafkaChannel { eventsLocal.add(e); } else { if (testRollbacks && - index == 4 && - (!rolledBack.get()) && - startedGettingEvents.get()) { + index == 4 && + (!rolledBack.get()) && + startedGettingEvents.get()) { tx.rollback(); tx.close(); tx = null; @@ -407,7 +459,7 @@ public class TestKafkaChannel { } private void wait(ExecutorCompletionService<Void> submitterSvc, int max) - throws Exception { + throws Exception { int completed = 0; while (completed < max) { submitterSvc.take(); @@ -420,8 +472,7 @@ public class TestKafkaChannel { Assert.assertEquals(50, eventsPulled.size()); Set<String> eventStrings = new HashSet<String>(); for (Event e : eventsPulled) { - Assert - .assertEquals(e.getHeaders().get("header"), new String(e.getBody())); + Assert.assertEquals(e.getHeaders().get("header"), new String(e.getBody())); eventStrings.add(e.getHeaders().get("header")); } for (int i = 0; i < 5; i++) { @@ -437,14 +488,10 @@ public class TestKafkaChannel { private Context prepareDefaultContext(boolean parseAsFlume) { // Prepares a default context with Kafka Server Properties Context context = new Context(); - context.put(KafkaChannelConfiguration.BROKER_LIST_FLUME_KEY, - testUtil.getKafkaServerUrl()); - context.put(KafkaChannelConfiguration.ZOOKEEPER_CONNECT_FLUME_KEY, - testUtil.getZkUrl()); - context.put(KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT, - String.valueOf(parseAsFlume)); - context.put(KafkaChannelConfiguration.READ_SMALLEST_OFFSET, "true"); - context.put(KafkaChannelConfiguration.TOPIC, topic); + context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerUrl()); + context.put(PARSE_AS_FLUME_EVENT, String.valueOf(parseAsFlume)); + context.put(TOPIC_CONFIG, topic); + return context; } @@ -452,22 +499,18 @@ public class TestKafkaChannel { int numPartitions = 5; int sessionTimeoutMs = 10000; int connectionTimeoutMs = 10000; - ZkClient zkClient = new ZkClient(testUtil.getZkUrl(), - sessionTimeoutMs, connectionTimeoutMs, - ZKStringSerializer$.MODULE$); + ZkUtils zkUtils = ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false); int replicationFactor = 1; Properties topicConfig = new Properties(); - AdminUtils.createTopic(zkClient, topicName, numPartitions, - replicationFactor, topicConfig); + AdminUtils.createTopic(zkUtils, topicName, numPartitions, + replicationFactor, topicConfig); } public static void deleteTopic(String topicName) { int sessionTimeoutMs = 10000; int connectionTimeoutMs = 10000; - ZkClient zkClient = new ZkClient(testUtil.getZkUrl(), - sessionTimeoutMs, connectionTimeoutMs, - ZKStringSerializer$.MODULE$); - AdminUtils.deleteTopic(zkClient, topicName); + ZkUtils zkUtils = ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false); + AdminUtils.deleteTopic(zkUtils, topicName); } } http://git-wip-us.apache.org/repos/asf/flume/blob/e8c4a7bf/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties b/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties index c10c89d..216bfd8 100644 --- a/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties +++ b/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties @@ -38,7 +38,7 @@ port=9092 #advertised.port=<port accessible by clients> # The number of threads handling network requests -num.network.threads=2 +num.network.threads=4 # The number of threads doing disk I/O num.io.threads=8 http://git-wip-us.apache.org/repos/asf/flume/blob/e8c4a7bf/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 15f27c3..5149ab5 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -2679,36 +2679,60 @@ The Kafka channel can be used for multiple scenarios: * With Flume source and interceptor but no sink - it allows writing Flume events into a Kafka topic, for use by other apps * With Flume sink, but no source - it is a low-latency, fault tolerant way to send events from Kafka to Flume sources such as HDFS, HBase or Solr + +This version of Flume requires Kafka version 0.9 or greater due to the reliance on the Kafka clients shipped with that version. The configuration of +the channel has changed compared to previous flume versions. + +The configuration parameters are organized as such: +1) Configuration values related to the channel generically are applied at the channel config level, eg: a1.channel.k1.type = +2) Configuration values related to Kafka or how the Channel operates are prefixed with "kafka.", (this are analgous to CommonClient Configs)eg: a1.channels.k1.kafka.topica1.channels.k1.kafka.bootstrap.serversThis is not dissimilar to how the hdfs sink operates +3) Properties specific to the producer/consumer are prefixed by kafka.producer or kafka.consumer +4) Where possible, the Kafka paramter names are used, eg: bootstrap.servers and acks + +This version of flume is backwards-compatible with previous versions, however deprecated properties are indicated in the table below and a warning message +is logged on startup when they are present in the configuration file. + Required properties are in **bold**. -====================== ========================== =============================================================================================================== -Property Name Default Description -====================== ========================== =============================================================================================================== -**type** -- The component type name, needs to be ``org.apache.flume.channel.kafka.KafkaChannel`` -**brokerList** -- List of brokers in the Kafka cluster used by the channel - This can be a partial list of brokers, but we recommend at least two for HA. - The format is comma separated list of hostname:port -**zookeeperConnect** -- URI of ZooKeeper used by Kafka cluster - The format is comma separated list of hostname:port. If chroot is used, it is added once at the end. - For example: zookeeper-1:2181,zookeeper-2:2182,zookeeper-3:2181/kafka -topic flume-channel Kafka topic which the channel will use -groupId flume Consumer group ID the channel uses to register with Kafka. - Multiple channels must use the same topic and group to ensure that when one agent fails another can get the data - Note that having non-channel consumers with the same ID can lead to data loss. -parseAsFlumeEvent true Expecting Avro datums with FlumeEvent schema in the channel. - This should be true if Flume source is writing to the channel - And false if other producers are writing into the topic that the channel is using - Flume source messages to Kafka can be parsed outside of Flume by using - org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact -readSmallestOffset false When set to true, the channel will read all data in the topic, starting from the oldest event - when false, it will read only events written after the channel started - When "parseAsFlumeEvent" is true, this will be false. Flume source will start prior to the sinks and this - guarantees that events sent by source before sinks start will not be lost. -Other Kafka Properties -- These properties are used to configure the Kafka Producer and Consumer used by the channel. - Any property supported by Kafka can be used. - The only requirement is to prepend the property name with the prefix ``kafka.``. - For example: kafka.producer.type -====================== ========================== =============================================================================================================== +================================ ========================== =============================================================================================================== +Property Name Default Description +================================ ========================== =============================================================================================================== +**type** -- The component type name, needs to be ``org.apache.flume.channel.kafka.KafkaChannel`` +**kafka.bootstrap.servers** -- List of brokers in the Kafka cluster used by the channel + This can be a partial list of brokers, but we recommend at least two for HA. + The format is comma separated list of hostname:port +kafka.topic flume-channel Kafka topic which the channel will use +kafka.consumer.group.id flume Consumer group ID the channel uses to register with Kafka. + Multiple channels must use the same topic and group to ensure that when one agent fails another can get the data + Note that having non-channel consumers with the same ID can lead to data loss. + +parseAsFlumeEvent true Expecting Avro datums with FlumeEvent schema in the channel. + This should be true if Flume source is writing to the channel and false if other producers are + writing into the topic that the channel is using. Flume source messages to Kafka can be parsed outside of Flume by using + org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact +pollTimeout 500 The amount of time(in milliseconds) to wait in the "poll()" call of the conumer. + https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long) +kafka.consumer.auto.offset.reset latest What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server + (e.g. because that data has been deleted): + earliest: automatically reset the offset to the earliest offset + latest: automatically reset the offset to the latest offset + none: throw exception to the consumer if no previous offset is found for the consumer\'s group + anything else: throw exception to the consumer. +================================ ========================== =============================================================================================================== + +Deprecated Properties + +================================ ========================== =============================================================================================================== +Property Name Default Description +================================ ========================== =============================================================================================================== +brokerList -- List of brokers in the Kafka cluster used by the channel + This can be a partial list of brokers, but we recommend at least two for HA. + The format is comma separated list of hostname:port +topic flume-channel Use kafka.topic +groupId flume Use kafka.consumer.group.id +readSmallestOffset false Use kafka.consumer.auto.offset.reset + +================================ ========================== =============================================================================================================== .. note:: Due to the way the channel is load balanced, there may be duplicate events when the agent first starts up @@ -2716,12 +2740,12 @@ Example for agent named a1: .. code-block:: properties - a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel + a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.channel1.capacity = 10000 a1.channels.channel1.transactionCapacity = 1000 - a1.channels.channel1.brokerList=kafka-2:9092,kafka-3:9092 - a1.channels.channel1.topic=channel1 - a1.channels.channel1.zookeeperConnect=kafka-1:2181 + a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092 + a1.channels.channel1.kafka.topic = channel1 + a1.channels.channel1.kafka.consumer.group.id = flume-consumer File Channel ~~~~~~~~~~~~
