Repository: flume Updated Branches: refs/heads/trunk 186a3b808 -> bde2c2821
FLUME-2470. Kafka Sink and Source must use camel case for all configs. (Gwen Shapira via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/bde2c282 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/bde2c282 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/bde2c282 Branch: refs/heads/trunk Commit: bde2c28211a2d05a9930f1599cb15864ad3cdba0 Parents: 186a3b8 Author: Hari Shreedharan <[email protected]> Authored: Tue Sep 23 23:10:25 2014 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Tue Sep 23 23:10:25 2014 -0700 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 67 ++++++++------- .../org/apache/flume/sink/kafka/KafkaSink.java | 65 ++++++--------- .../flume/sink/kafka/KafkaSinkConstants.java | 20 ++++- .../apache/flume/sink/kafka/TestKafkaSink.java | 23 ++---- .../src/test/resources/kafka-server.properties | 1 + .../src/test/resources/log4j.properties | 2 +- .../apache/flume/source/kafka/KafkaSource.java | 40 ++++++--- .../source/kafka/KafkaSourceConstants.java | 15 ++-- .../flume/source/kafka/KafkaSourceUtil.java | 87 +++++++++++++++----- .../flume/source/kafka/KafkaSourceTest.java | 12 ++- .../flume/source/kafka/KafkaSourceUtilTest.java | 43 +++++++--- 11 files changed, 232 insertions(+), 143 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 11c1ad7..ce52946 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1101,29 +1101,33 @@ Kafka Source is an Apache Kafka consumer that reads messages from a Kafka topic. If you have multiple Kafka sources running, you can configure them with the same Consumer Group so each will read a unique set of partitions for the topic. -The properties below are required properties, but you can specify any Kafka parameter you want -and it will be passed to the consumer. Check `Kafka documentation <https://kafka.apache.org/08/configuration.html#consumerconfigs>`_ -for details - -=========================== =========== =================================================== -Property Name Default Description -=========================== =========== =================================================== -**channels** -- -**type** -- The component type name, needs to be ``org.apache.flume.source.kafka,KafkaSource`` -**kafka.zookeeper.connect** -- URI of ZooKeeper used by Kafka cluster -**kadka.group.id** -- Unique identified of consumer group. Setting the same id in multiple sources or agents - indicates that they are part of the same consumer group -**topic** -- Kafka topic we'll read messages from. At the time, this is a single topic only. -batchSize 1000 Maximum number of messages written to Channel in one batch -batchDurationMillis 1000 Maximum time (in ms) before a batch will be written to Channel - The batch will be written whenever the first of size and time will be reached. -kafka.auto.commit.enable false If true, Kafka will commit events automatically - faster but less durable option. - when false, the Kafka Source will commit events before writing batch to channel -consumer.timeout.ms 10 Polling interval for new data for batch. - Low value means more CPU usage. - High value means the maxBatchDurationMillis may be missed while waiting for - additional data. -=========================== =========== =================================================== + + +=============================== =========== =================================================== +Property Name Default Description +=============================== =========== =================================================== +**channels** -- +**type** -- The component type name, needs to be ``org.apache.flume.source.kafka,KafkaSource`` +**zookeeperConnect** -- URI of ZooKeeper used by Kafka cluster +**groupId** flume Unique identified of consumer group. Setting the same id in multiple sources or agents + indicates that they are part of the same consumer group +**topic** -- Kafka topic we'll read messages from. At the time, this is a single topic only. +batchSize 1000 Maximum number of messages written to Channel in one batch +batchDurationMillis 1000 Maximum time (in ms) before a batch will be written to Channel + The batch will be written whenever the first of size and time will be reached. +Other Kafka Consumer Properties -- These properties are used to configure the Kafka Consumer. Any producer property supported + by Kafka can be used. The only requirement is to prepend the property name with the prefix ``kafka.``. + For example: kafka.consumer.timeout.ms + Check `Kafka documentation <https://kafka.apache.org/08/configuration.html#consumerconfigs>` for details +=============================== =========== =================================================== + +.. note:: The Kafka Source overrides two Kafka consumer parameters: + auto.commit.enable is set to "false" by the source and we commit every batch. For improved performance + this can be set to "true", however, this can lead to loss of data + consumer.timeout.ms is set to 10ms, so when we check Kafka for new data we wait at most 10ms for the data to arrive + setting this to a higher value can reduce CPU utilization (we'll poll Kafka in less of a tight loop), but also means + higher latency in writing batches to channel (since we'll wait longer for data to arrive). + Example for agent named tier1: @@ -1131,9 +1135,9 @@ Example for agent named tier1: tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier1.sources.source1.channels = channel1 - tier1.sources.source1.kafka.zookeeper.connect = localhost:2181 + tier1.sources.source1.zookeeperConnect = localhost:2181 tier1.sources.source1.topic = test1 - tier1.sources.source1.kafka.group.id = flume + tier1.sources.source1.groupId = flume tier1.sources.source1.kafka.consumer.timeout.ms = 100 @@ -2152,7 +2156,7 @@ Required properties are marked in bold font. Property Name Default Description =============================== =================== ============================================================================================= **type** -- Must be set to ``org.apache.flume.sink.kafka.KafkaSink`` -**kafka.metadata.broker.list** -- List of brokers Kafka-Sink will connect to, to get the list of topic partitions +**brokerList** -- List of brokers Kafka-Sink will connect to, to get the list of topic partitions This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port topic default-flume-topic The topic in Kafka to which the messages will be published. If this parameter is configured, @@ -2160,13 +2164,12 @@ topic default-flume-topic The topic in Kafka to whic If the event header contains a "topic" field, the event will be published to that topic overriding the topic configured here. batchSize 100 How many messages to process in one batch. Larger batches improve throughput while adding latency. -kafka.request.required.acks 0 How many replicas must acknowledge a message before its considered successfully written. +requiredAcks 1 How many replicas must acknowledge a message before its considered successfully written. Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) - The default is the fastest option, but we *highly recommend* setting this to -1 to avoid data loss -kafka.producer.type sync Whether messages should be sent to broker synchronously or using an asynchronous background thread. - Accepted values are sync (safest) and async (faster but potentially unsafe) + Set this to -1 to avoid data loss in some cases of leader failure. Other Kafka Producer Properties -- These properties are used to configure the Kafka Producer. Any producer property supported by Kafka can be used. The only requirement is to prepend the property name with the prefix ``kafka.``. + For example: kafka.producer.type =============================== =================== ============================================================================================= .. note:: Kafka Sink uses the ``topic`` and ``key`` properties from the FlumeEvent headers to send events to Kafka. @@ -2186,8 +2189,8 @@ argument. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = mytopic - a1.sinks.k1.kafka.metadata.broker.list = localhost:9092 - a1.sinks.k1.kafka.request.required.acks = 1 + a1.sinks.k1.brokerList = localhost:9092 + a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 20 a1.sinks.k1.channel = c1 http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java index a6121ac..a90b950 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java @@ -28,10 +28,10 @@ import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; +import java.util.Properties; import java.util.List; import java.util.Map; -import java.util.Properties; +import java.util.ArrayList; /** * A Flume Sink that can publish messages to Kafka. @@ -43,11 +43,7 @@ import java.util.Properties; * partition key * <p/> * Mandatory properties are: - * kafka.metadata.broker.list -- can be a partial list, - * but at least 2 are recommended for HA - * kafka.request.required.acks -- 0 (unsafe), 1 (accepted by at least one - * broker), -1 (accepted by all brokers) - * kafka.producer.type -- for safety, this should be sync + * brokerList -- can be a partial list, but at least 2 are recommended for HA * <p/> * <p/> * however, any property starting with "kafka." will be passed along to the @@ -60,6 +56,8 @@ import java.util.Properties; * different topics * batchSize - how many messages to process in one batch. Larger batches * improve throughput while adding latency. + * requiredAcks -- 0 (unsafe), 1 (accepted by at least one broker, default), + * -1 (accepted by all brokers) * <p/> * header properties (per event): * topic @@ -70,7 +68,7 @@ public class KafkaSink extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory.getLogger(KafkaSink.class); public static final String KEY_HDR = "key"; public static final String TOPIC_HDR = "topic"; - private Properties producerProps; + private Properties kafkaProps; private Producer<String, byte[]> producer; private String topic; private int batchSize; @@ -154,7 +152,7 @@ public class KafkaSink extends AbstractSink implements Configurable { @Override public synchronized void start() { // instantiate the producer - ProducerConfig config = new ProducerConfig(producerProps); + ProducerConfig config = new ProducerConfig(kafkaProps); producer = new Producer<String, byte[]>(config); super.start(); } @@ -166,54 +164,43 @@ public class KafkaSink extends AbstractSink implements Configurable { } + /** + * We configure the sink and generate properties for the Kafka Producer + * + * Kafka producer properties is generated as follows: + * 1. We generate a properties object with some static defaults that + * can be overridden by Sink configuration + * 2. We add the configuration users added for Kafka (parameters starting + * with .kafka. and must be valid Kafka Producer properties + * 3. We add the sink's documented parameters which can override other + * properties + * + * @param context + */ @Override public void configure(Context context) { batchSize = context.getInteger(KafkaSinkConstants.BATCH_SIZE, KafkaSinkConstants.DEFAULT_BATCH_SIZE); - logger.debug("Using batch size: {}", batchSize); messageList = new ArrayList<KeyedMessage<String, byte[]>>(batchSize); - Map<String, String> params = context.getParameters(); - logger.debug("all params: " + params.entrySet().toString()); - setProducerProps(params); - if (!producerProps.contains("serializer.class")) { - producerProps.put("serializer.class", "kafka.serializer.DefaultEncoder"); - } - if (!producerProps.contains("key.serializer.class")) { - producerProps.put("key.serializer.class", - "kafka.serializer.StringEncoder"); - } + logger.debug("Using batch size: {}", batchSize); topic = context.getString(KafkaSinkConstants.TOPIC, KafkaSinkConstants.DEFAULT_TOPIC); if (topic.equals(KafkaSinkConstants.DEFAULT_TOPIC)) { - logger.warn("The Properties 'preprocessor' or 'topic' is not set. " + - "Using the default topic name" + + logger.warn("The Property 'topic' is not set. " + + "Using the default topic name: " + KafkaSinkConstants.DEFAULT_TOPIC); } else { logger.info("Using the static topic: " + topic + " this may be over-ridden by event headers"); } - } + kafkaProps = KafkaSinkUtil.getKafkaProperties(context); - private void setProducerProps(Map<String, String> params) { - producerProps = new Properties(); - for (String key : params.keySet()) { - String value = params.get(key).trim(); - key = key.trim(); - if (key.startsWith(KafkaSinkConstants.PROPERTY_PREFIX)) { - // remove the prefix - key = key.substring(KafkaSinkConstants.PROPERTY_PREFIX.length() + 1, - key.length()); - producerProps.put(key.trim(), value); - if (logger.isDebugEnabled()) { - logger.debug("Reading a Kafka Producer Property: key: " + key + - ", value: " + value); - } - } + if (logger.isDebugEnabled()) { + logger.debug("Kafka producer properties: " + kafkaProps); } } - } http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java index 48d875e..3ee12de 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java @@ -18,14 +18,30 @@ package org.apache.flume.sink.kafka; +import kafka.serializer.StringDecoder; + public class KafkaSinkConstants { - public static final String PROPERTY_PREFIX = "kafka"; + public static final String PROPERTY_PREFIX = "kafka."; /* Properties */ - public static final String DEFAULT_TOPIC = "default-flume-topic"; + public static final String TOPIC = "topic"; public static final String BATCH_SIZE = "batchSize"; + public static final String MESSAGE_SERIALIZER_KEY = "serializer.class"; + public static final String KEY_SERIALIZER_KEY = "key.serializer.class"; + public static final String BROKER_LIST_KEY = "metadata.broker.list"; + public static final String REQUIRED_ACKS_KEY = "request.required.acks"; + public static final String BROKER_LIST_FLUME_KEY = "brokerList"; + public static final String REQUIRED_ACKS_FLUME_KEY = "requiredAcks"; + + public static final int DEFAULT_BATCH_SIZE = 100; + public static final String DEFAULT_TOPIC = "default-flume-topic"; + public static final String DEFAULT_MESSAGE_SERIALIZER = + "kafka.serializer.DefaultEncoder"; + public static final String DEFAULT_KEY_SERIALIZER = + "kafka.serializer.StringEncoder"; + public static final String DEFAULT_REQUIRED_ACKS = "1"; } http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java index aed6dac..80f764f 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; @@ -126,7 +127,7 @@ public class TestKafkaSink { kafkaSink.setChannel(memoryChannel); kafkaSink.start(); - String msg = "my message"; + String msg = "test-topic-and-key-from-header"; Map<String, String> headers = new HashMap<String, String>(); headers.put("topic", TestConstants.CUSTOM_TOPIC); headers.put("key", TestConstants.CUSTOM_KEY); @@ -156,9 +157,8 @@ public class TestKafkaSink { } @Test - public void testEmptyChannel() throws UnsupportedEncodingException { - - + public void testEmptyChannel() throws UnsupportedEncodingException, + EventDeliveryException { Sink kafkaSink = new KafkaSink(); Context context = prepareDefaultContext(); Configurables.configure(kafkaSink, context); @@ -167,25 +167,20 @@ public class TestKafkaSink { kafkaSink.setChannel(memoryChannel); kafkaSink.start(); - try { - Sink.Status status = kafkaSink.process(); - if (status == Sink.Status.BACKOFF) { - fail("Error Occurred"); - } - } catch (EventDeliveryException ex) { - // ignore + Sink.Status status = kafkaSink.process(); + if (status == Sink.Status.BACKOFF) { + fail("Error Occurred"); } assertNull( testUtil.getNextMessageFromConsumer(KafkaSinkConstants.DEFAULT_TOPIC)); - } - private Context prepareDefaultContext() { // Prepares a default context with Kafka Server Properties Context context = new Context(); - context.put("kafka.metadata.broker.list", testUtil.getKafkaServerUrl()); + context.put("brokerList", testUtil.getKafkaServerUrl()); context.put("kafka.request.required.acks", "1"); + context.put("kafka.producer.type","sync"); context.put("batchSize", "1"); return context; } http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties index c07cdea..02a81e2 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties @@ -1,4 +1,5 @@ # Licensed to the Apache Software Foundation (ASF) under one or more +# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties index bdcb643..b86600b 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties @@ -15,7 +15,7 @@ kafka.logs.dir=target/logs -log4j.rootLogger=INFO, stdout +log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index da78f80..231ae42 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import kafka.consumer.ConsumerIterator; import kafka.consumer.ConsumerTimeoutException; @@ -66,6 +67,7 @@ public class KafkaSource extends AbstractSource private int consumerTimeout; private boolean kafkaAutoCommitEnabled; private Context context; + private Properties kafkaProps; private final List<Event> eventList = new ArrayList<Event>(); public Status process() throws EventDeliveryException { @@ -122,6 +124,19 @@ public class KafkaSource extends AbstractSource } } + /** + * We configure the source and generate properties for the Kafka Consumer + * + * Kafka Consumer properties are generated as follows: + * 1. Generate a properties object with some static defaults that + * can be overridden by Source configuration + * 2. We add the configuration users added for Kafka (parameters starting + * with kafka. and must be valid Kafka Consumer properties + * 3. We add the source documented parameters which can override other + * properties + * + * @param context + */ public void configure(Context context) { this.context = context; batchUpperLimit = context.getInteger(KafkaSourceConstants.BATCH_SIZE, @@ -130,21 +145,16 @@ public class KafkaSource extends AbstractSource KafkaSourceConstants.DEFAULT_BATCH_DURATION); topic = context.getString(KafkaSourceConstants.TOPIC); - //if consumer timeout and autocommit were not set by user, - // set them to 10ms and false - consumerTimeout = context.getInteger(KafkaSourceConstants.CONSUMER_TIMEOUT, - KafkaSourceConstants.DEFAULT_CONSUMER_TIMEOUT); - context.put(KafkaSourceConstants.CONSUMER_TIMEOUT, - Integer.toString(consumerTimeout)); - String autoCommit = context.getString( - KafkaSourceConstants.AUTO_COMMIT_ENABLED, - String.valueOf(KafkaSourceConstants.DEFAULT_AUTO_COMMIT)); - kafkaAutoCommitEnabled = Boolean.valueOf(autoCommit); - context.put(KafkaSourceConstants.AUTO_COMMIT_ENABLED,autoCommit); - if(topic == null) { throw new ConfigurationException("Kafka topic must be specified."); } + + kafkaProps = KafkaSourceUtil.getKafkaProperties(context); + consumerTimeout = Integer.parseInt(kafkaProps.getProperty( + KafkaSourceConstants.CONSUMER_TIMEOUT)); + kafkaAutoCommitEnabled = Boolean.parseBoolean(kafkaProps.getProperty( + KafkaSourceConstants.AUTO_COMMIT_ENABLED)); + } @Override @@ -153,7 +163,7 @@ public class KafkaSource extends AbstractSource try { //initialize a consumer. This creates the connection to ZooKeeper - consumer = KafkaSourceUtil.getConsumer(context); + consumer = KafkaSourceUtil.getConsumer(kafkaProps); } catch (Exception e) { throw new FlumeException("Unable to create consumer. " + "Check whether the ZooKeeper server is up and that the " + @@ -192,6 +202,10 @@ public class KafkaSource extends AbstractSource } + + + + /** * Check if there are messages waiting in Kafka, * waiting until timeout (10ms by default) for messages to arrive. http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java index ac86f65..169cc10 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java @@ -21,16 +21,19 @@ public class KafkaSourceConstants { public static final String TIMESTAMP = "timestamp"; public static final String BATCH_SIZE = "batchSize"; public static final String BATCH_DURATION_MS = "batchDurationMillis"; - public static final String CONSUMER_TIMEOUT = "kafka.consumer.timeout.ms"; - public static final String AUTO_COMMIT_ENABLED = "kafka.auto.commit.enabled"; - public static final String ZOOKEEPER_CONNECT = "kafka.zookeeper.connect"; - public static final String GROUP_ID = "kafka.group.id"; - public static final String PROPERTY_PREFIX = "kafka"; + public static final String CONSUMER_TIMEOUT = "consumer.timeout.ms"; + public static final String AUTO_COMMIT_ENABLED = "auto.commit.enabled"; + public static final String ZOOKEEPER_CONNECT = "zookeeper.connect"; + public static final String ZOOKEEPER_CONNECT_FLUME = "zookeeperConnect"; + public static final String GROUP_ID = "group.id"; + public static final String GROUP_ID_FLUME = "groupId"; + public static final String PROPERTY_PREFIX = "kafka."; public static final int DEFAULT_BATCH_SIZE = 1000; public static final int DEFAULT_BATCH_DURATION = 1000; - public static final int DEFAULT_CONSUMER_TIMEOUT = 10; + public static final String DEFAULT_CONSUMER_TIMEOUT = "10"; public static final boolean DEFAULT_AUTO_COMMIT = false; + public static final String DEFAULT_GROUP_ID = "flume"; } http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java index 8397272..4a4034b 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java @@ -25,6 +25,7 @@ import kafka.consumer.ConsumerConfig; import kafka.javaapi.consumer.ConsumerConnector; import org.apache.flume.Context; +import org.apache.flume.conf.ConfigurationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,32 +33,80 @@ public class KafkaSourceUtil { private static final Logger log = LoggerFactory.getLogger(KafkaSourceUtil.class); - public static Properties getKafkaConfigProperties(Context context) { + public static Properties getKafkaProperties(Context context) { log.info("context={}",context.toString()); - Properties props = new Properties(); - Map<String, String> contextMap = context.getParameters(); - for(String key : contextMap.keySet()) { - String value = contextMap.get(key).trim(); - key = key.trim(); - if (key.startsWith(KafkaSourceConstants.PROPERTY_PREFIX)) { - // remove the prefix - key = key.substring(KafkaSourceConstants.PROPERTY_PREFIX.length() + 1, - key.length()); - props.put(key, value); - if (log.isDebugEnabled()) { - log.debug("Reading a Kafka Producer Property: key: " + key + - ", value: " + value); - } - } - } + Properties props = generateDefaultKafkaProps(); + setKafkaProps(context,props); + addDocumentedKafkaProps(context,props); return props; } - public static ConsumerConnector getConsumer(Context context) { + public static ConsumerConnector getConsumer(Properties kafkaProps) { ConsumerConfig consumerConfig = - new ConsumerConfig(getKafkaConfigProperties(context)); + new ConsumerConfig(kafkaProps); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); return consumer; } + + /** + * Generate consumer properties object with some defaults + * @return + */ + private static Properties generateDefaultKafkaProps() { + Properties props = new Properties(); + props.put(KafkaSourceConstants.AUTO_COMMIT_ENABLED, + KafkaSourceConstants.DEFAULT_AUTO_COMMIT); + props.put(KafkaSourceConstants.CONSUMER_TIMEOUT, + KafkaSourceConstants.DEFAULT_CONSUMER_TIMEOUT); + props.put(KafkaSourceConstants.GROUP_ID, + KafkaSourceConstants.DEFAULT_GROUP_ID); + return props; + } + + /** + * Add all configuration parameters starting with "kafka" + * to consumer properties + */ + private static void setKafkaProps(Context context,Properties kafkaProps) { + + Map<String,String> kafkaProperties = + context.getSubProperties(KafkaSourceConstants.PROPERTY_PREFIX); + + for (Map.Entry<String,String> prop : kafkaProperties.entrySet()) { + + kafkaProps.put(prop.getKey(), prop.getValue()); + if (log.isDebugEnabled()) { + log.debug("Reading a Kafka Producer Property: key: " + + prop.getKey() + ", value: " + prop.getValue()); + } + } + } + + /** + * Some of the producer properties are especially important + * We documented them and gave them a camel-case name to match Flume config + * If user set these, we will override any existing parameters with these + * settings. + * Knowledge of which properties are documented is maintained here for now. + * If this will become a maintenance issue we'll set a proper data structure. + */ + private static void addDocumentedKafkaProps(Context context, + Properties kafkaProps) + throws ConfigurationException { + String zookeeperConnect = context.getString( + KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME); + if (zookeeperConnect == null) { + throw new ConfigurationException("ZookeeperConnect must contain " + + "at least one ZooKeeper server"); + } + kafkaProps.put(KafkaSourceConstants.ZOOKEEPER_CONNECT, zookeeperConnect); + + String groupID = context.getString(KafkaSourceConstants.GROUP_ID_FLUME); + + if (groupID != null ) { + kafkaProps.put(KafkaSourceConstants.GROUP_ID, groupID); + } + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java index 1009f1c..d067e24 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.*; import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.util.List; +import java.util.Properties; import com.google.common.base.Charsets; import com.google.common.collect.Lists; @@ -38,6 +39,7 @@ import kafka.message.MessageAndMetadata; import org.apache.flume.*; import org.apache.flume.PollableSource.Status; import org.apache.flume.channel.ChannelProcessor; +import org.apache.flume.conf.Configurables; import org.apache.flume.source.AbstractSource; import org.junit.After; import org.junit.Before; @@ -74,11 +76,11 @@ public class KafkaSourceTest { } context = new Context(); - context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT, + context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME, kafkaServer.getZkConnectString()); - context.put(KafkaSourceConstants.GROUP_ID,"flume"); + context.put(KafkaSourceConstants.GROUP_ID_FLUME,"flume"); context.put(KafkaSourceConstants.TOPIC,topicName); - context.put(KafkaSourceConstants.CONSUMER_TIMEOUT,"100"); + context.put("kafka.consumer.timeout.ms","100"); ChannelProcessor channelProcessor = mock(ChannelProcessor.class); @@ -183,7 +185,7 @@ public class KafkaSourceTest { public void testNonExistingZk() throws EventDeliveryException, SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException, InterruptedException { - context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT,"blabla:666"); + context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME,"blabla:666"); kafkaSource.configure(context); kafkaSource.start(); Thread.sleep(500L); @@ -192,4 +194,6 @@ public class KafkaSourceTest { assertEquals(Status.BACKOFF, status); } + + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/bde2c282/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java index b9a1b25..f87e5ae 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java @@ -39,10 +39,12 @@ public class KafkaSourceUtilTest { @Before public void setUp() throws Exception { - context.put("consumer.timeout", "10"); + context.put("kafka.consumer.timeout", "10"); context.put("type", "KafkaSource"); context.put("topic", "test"); - props = KafkaSourceUtil.getKafkaConfigProperties(context); + context.put("zookeeperConnect", "127.0.0.1:"+zkPort); + context.put("groupId","test"); + props = KafkaSourceUtil.getKafkaProperties(context); zookeeper = new KafkaSourceEmbeddedZookeeper(zkPort); @@ -53,23 +55,38 @@ public class KafkaSourceUtilTest { zookeeper.stopZookeeper(); } - @Test - public void testGetKafkaConfigParameter() { - assertEquals("10",props.getProperty("consumer.timeout")); - assertEquals("test",props.getProperty("topic")); - assertNull(props.getProperty("type")); - } - @Test public void testGetConsumer() { - context.put("zookeeper.connect", "127.0.0.1:"+zkPort); - context.put("group.id","test"); - - ConsumerConnector cc = KafkaSourceUtil.getConsumer(context); + ConsumerConnector cc = KafkaSourceUtil.getConsumer(props); assertNotNull(cc); } + @Test + public void testKafkaConsumerProperties() { + Context context = new Context(); + context.put("kafka.auto.commit.enabled", "override.default.autocommit"); + context.put("kafka.fake.property", "kafka.property.value"); + context.put("kafka.zookeeper.connect","bad-zookeeper-list"); + context.put("zookeeperConnect","real-zookeeper-list"); + Properties kafkaProps = KafkaSourceUtil.getKafkaProperties(context); + + //check that we have defaults set + assertEquals( + kafkaProps.getProperty(KafkaSourceConstants.GROUP_ID), + KafkaSourceConstants.DEFAULT_GROUP_ID); + //check that kafka properties override the default and get correct name + assertEquals( + kafkaProps.getProperty(KafkaSourceConstants.AUTO_COMMIT_ENABLED), + "override.default.autocommit"); + //check that any kafka property gets in + assertEquals(kafkaProps.getProperty("fake.property"), + "kafka.property.value"); + //check that documented property overrides defaults + assertEquals(kafkaProps.getProperty("zookeeper.connect") + ,"real-zookeeper-list"); + } + }
