Repository: flume Updated Branches: refs/heads/flume-1.6 57d651105 -> 9cc850825
FLUME-2250. Kafka Source. (Frank Yao, Ashish Paliwal, Gwen Shapira via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/9cc85082 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/9cc85082 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/9cc85082 Branch: refs/heads/flume-1.6 Commit: 9cc8508255078b5a03f5b2899cc2795f376ad2b5 Parents: 57d6511 Author: Hari Shreedharan <[email protected]> Authored: Tue Sep 16 21:24:37 2014 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Tue Sep 16 21:26:48 2014 -0700 ---------------------------------------------------------------------- flume-ng-dist/pom.xml | 4 + flume-ng-doc/sphinx/FlumeUserGuide.rst | 126 ++++++---- flume-ng-sinks/flume-ng-kafka-sink/pom.xml | 1 - flume-ng-sources/flume-kafka-source/pom.xml | 70 ++++++ .../apache/flume/source/kafka/KafkaSource.java | 231 +++++++++++++++++++ .../source/kafka/KafkaSourceConstants.java | 36 +++ .../flume/source/kafka/KafkaSourceUtil.java | 63 +++++ .../source/kafka/KafkaSourceEmbeddedKafka.java | 92 ++++++++ .../kafka/KafkaSourceEmbeddedZookeeper.java | 64 +++++ .../flume/source/kafka/KafkaSourceTest.java | 195 ++++++++++++++++ .../flume/source/kafka/KafkaSourceUtilTest.java | 75 ++++++ .../src/test/resources/log4j.properties | 25 ++ flume-ng-sources/pom.xml | 1 + pom.xml | 20 ++ 14 files changed, 961 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-dist/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml index ca3cd8b..a5db0c7 100644 --- a/flume-ng-dist/pom.xml +++ b/flume-ng-dist/pom.xml @@ -166,6 +166,10 @@ <artifactId>flume-twitter-source</artifactId> </dependency> <dependency> + <groupId>org.apache.flume.flume-ng-sources</groupId> + <artifactId>flume-kafka-source</artifactId> + </dependency> + <dependency> <groupId>org.apache.flume.flume-ng-legacy-sources</groupId> <artifactId>flume-avro-source</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index a718fbf..3a47560 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -994,47 +994,6 @@ Example for an agent named agent-1: agent-1.sources.src-1.spoolDir = /var/log/apache/flumeSpool agent-1.sources.src-1.fileHeader = true -Twitter 1% firehose Source (experimental) -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -.. warning:: - This source is hightly experimental and may change between minor versions of Flume. - Use at your own risk. - -Experimental source that connects via Streaming API to the 1% sample twitter -firehose, continously downloads tweets, converts them to Avro format and -sends Avro events to a downstream Flume sink. Requires the consumer and -access tokens and secrets of a Twitter developer account. -Required properties are in **bold**. - -====================== =========== =================================================== -Property Name Default Description -====================== =========== =================================================== -**channels** -- -**type** -- The component type name, needs to be ``org.apache.flume.source.twitter.TwitterSource`` -**consumerKey** -- OAuth consumer key -**consumerSecret** -- OAuth consumer secret -**accessToken** -- OAuth access token -**accessTokenSecret** -- OAuth toekn secret -maxBatchSize 1000 Maximum number of twitter messages to put in a single batch -maxBatchDurationMillis 1000 Maximum number of milliseconds to wait before closing a batch -====================== =========== =================================================== - -Example for agent named a1: - -.. code-block:: properties - - a1.sources = r1 - a1.channels = c1 - a1.sources.r1.type = org.apache.flume.source.twitter.TwitterSource - a1.sources.r1.channels = c1 - a1.sources.r1.consumerKey = YOUR_TWITTER_CONSUMER_KEY - a1.sources.r1.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET - a1.sources.r1.accessToken = YOUR_TWITTER_ACCESS_TOKEN - a1.sources.r1.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET - a1.sources.r1.maxBatchSize = 10 - a1.sources.r1.maxBatchDurationMillis = 200 - Event Deserializers ''''''''''''''''''' @@ -1094,6 +1053,91 @@ Property Name Default Description deserializer.maxBlobLength 100000000 The maximum number of bytes to read and buffer for a given request ========================== ================== ======================================================================= +Twitter 1% firehose Source (experimental) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. warning:: + This source is hightly experimental and may change between minor versions of Flume. + Use at your own risk. + +Experimental source that connects via Streaming API to the 1% sample twitter +firehose, continously downloads tweets, converts them to Avro format and +sends Avro events to a downstream Flume sink. Requires the consumer and +access tokens and secrets of a Twitter developer account. +Required properties are in **bold**. + +====================== =========== =================================================== +Property Name Default Description +====================== =========== =================================================== +**channels** -- +**type** -- The component type name, needs to be ``org.apache.flume.source.twitter.TwitterSource`` +**consumerKey** -- OAuth consumer key +**consumerSecret** -- OAuth consumer secret +**accessToken** -- OAuth access token +**accessTokenSecret** -- OAuth toekn secret +maxBatchSize 1000 Maximum number of twitter messages to put in a single batch +maxBatchDurationMillis 1000 Maximum number of milliseconds to wait before closing a batch +====================== =========== =================================================== + +Example for agent named a1: + +.. code-block:: properties + + a1.sources = r1 + a1.channels = c1 + a1.sources.r1.type = org.apache.flume.source.twitter.TwitterSource + a1.sources.r1.channels = c1 + a1.sources.r1.consumerKey = YOUR_TWITTER_CONSUMER_KEY + a1.sources.r1.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET + a1.sources.r1.accessToken = YOUR_TWITTER_ACCESS_TOKEN + a1.sources.r1.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET + a1.sources.r1.maxBatchSize = 10 + a1.sources.r1.maxBatchDurationMillis = 200 + +Kafka Source +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Kafka Source is an Apache Kafka consumer that reads messages from a Kafka topic. +If you have multiple Kafka sources running, you can configure them with the same Consumer Group +so each will read a unique set of partitions for the topic. + +The properties below are required properties, but you can specify any Kafka parameter you want +and it will be passed to the consumer. Check `Kafka documentation <https://kafka.apache.org/08/configuration.html#consumerconfigs>`_ +for details + +=========================== =========== =================================================== +Property Name Default Description +=========================== =========== =================================================== +**channels** -- +**type** -- The component type name, needs to be ``org.apache.flume.source.kafka,KafkaSource`` +**kafka.zookeeper.connect** -- URI of ZooKeeper used by Kafka cluster +**kadka.group.id** -- Unique identified of consumer group. Setting the same id in multiple sources or agents + indicates that they are part of the same consumer group +**topic** -- Kafka topic we'll read messages from. At the time, this is a single topic only. +batchSize 1000 Maximum number of messages written to Channel in one batch +batchDurationMillis 1000 Maximum time (in ms) before a batch will be written to Channel + The batch will be written whenever the first of size and time will be reached. +kafka.auto.commit.enable false If true, Kafka will commit events automatically - faster but less durable option. + when false, the Kafka Source will commit events before writing batch to channel +consumer.timeout.ms 10 Polling interval for new data for batch. + Low value means more CPU usage. + High value means the maxBatchDurationMillis may be missed while waiting for + additional data. +=========================== =========== =================================================== + +Example for agent named tier1: + +.. code-block:: properties + + tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource + tier1.sources.source1.channels = channel1 + tier1.sources.source1.kafka.zookeeper.connect = localhost:2181 + tier1.sources.source1.topic = test1 + tier1.sources.source1.kafka.group.id = flume + tier1.sources.source1.kafka.consumer.timeout.ms = 100 + + + NetCat Source ~~~~~~~~~~~~~ http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-sinks/flume-ng-kafka-sink/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml index 307fa59..746a395 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml +++ b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml @@ -61,7 +61,6 @@ <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> - <version>0.8.1.1</version> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-sources/flume-kafka-source/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/pom.xml b/flume-ng-sources/flume-kafka-source/pom.xml new file mode 100644 index 0000000..8ad29d7 --- /dev/null +++ b/flume-ng-sources/flume-kafka-source/pom.xml @@ -0,0 +1,70 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flume-ng-sources</artifactId> + <groupId>org.apache.flume</groupId> + <version>1.6.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.flume.flume-ng-sources</groupId> + <artifactId>flume-kafka-source</artifactId> + <name>Flume Kafka Source</name> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + </dependency> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <classifier>test</classifier> + <scope>test</scope> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java new file mode 100644 index 0000000..da78f80 --- /dev/null +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.source.kafka; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import kafka.consumer.ConsumerIterator; +import kafka.consumer.ConsumerTimeoutException; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; + +import org.apache.flume.*; +import org.apache.flume.conf.Configurable; +import org.apache.flume.conf.ConfigurationException; +import org.apache.flume.event.EventBuilder; +import org.apache.flume.source.AbstractSource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A Source for Kafka which reads messages from kafka. + * I use this in company production environment and its performance is good. + * Over 100k messages per second can be read from kafka in one source.<p> + * <tt>kafka.zookeeper.connect: </tt> the zookeeper ip kafka use.<p> + * <tt>kafka.group.id: </tt> the groupid of consumer group.<p> + * <tt>topic: </tt> the topic to read from kafka.<p> + * maxBatchSize - maximum number of messages written to Channel in one batch + * maxBatchDurationMillis - maximum time before a batch (of any size) + * will be written to Channel + * kafka.auto.commit.enable - if true, commit automatically every time period. + * if false, commit on each batch. + * kafka.consumer.timeout.ms - polling interval for new data for batch. + * Low value means more CPU usage. + * High value means the time.upper.limit may be missed. + * + * Any property starting with "kafka" will be passed to the kafka consumer + * So you can use any configuration supported by Kafka 0.8.1.1 + */ +public class KafkaSource extends AbstractSource + implements Configurable, PollableSource { + private static final Logger log = LoggerFactory.getLogger(KafkaSource.class); + private ConsumerConnector consumer; + private ConsumerIterator<byte[],byte[]> it; + private String topic; + private int batchUpperLimit; + private int timeUpperLimit; + private int consumerTimeout; + private boolean kafkaAutoCommitEnabled; + private Context context; + private final List<Event> eventList = new ArrayList<Event>(); + + public Status process() throws EventDeliveryException { + eventList.clear(); + byte[] bytes; + Event event; + Map<String, String> headers; + try { + int eventCounter = 0; + int timeWaited = 0; + IterStatus iterStatus = new IterStatus(false, -1); + while (eventCounter < batchUpperLimit && timeWaited < timeUpperLimit) { + iterStatus = timedHasNext(); + if (iterStatus.hasData()) { + // get next message + bytes = it.next().message(); + + headers = new HashMap<String, String>(); + headers.put(KafkaSourceConstants.TIMESTAMP, + String.valueOf(System.currentTimeMillis())); + headers.put(KafkaSourceConstants.TOPIC,topic); + if (log.isDebugEnabled()) { + log.debug("Message: {}", new String(bytes)); + } + event = EventBuilder.withBody(bytes, headers); + eventList.add(event); + eventCounter++; + } + timeWaited += iterStatus.getWaitTime(); + if (log.isDebugEnabled()) { + log.debug("Waited: {} ", timeWaited); + log.debug("Event #: {}", eventCounter); + } + } + // If we have events, send events to channel + // and commit if Kafka doesn't auto-commit + if (eventCounter > 0) { + getChannelProcessor().processEventBatch(eventList); + if (!kafkaAutoCommitEnabled) { + // commit the read transactions to Kafka to avoid duplicates + consumer.commitOffsets(); + } + } + if (!iterStatus.hasData()) { + if (log.isDebugEnabled()) { + log.debug("Returning with backoff. No more data to read"); + } + return Status.BACKOFF; + } + return Status.READY; + } catch (Exception e) { + log.error("KafkaSource EXCEPTION, {}", e); + return Status.BACKOFF; + } + } + + public void configure(Context context) { + this.context = context; + batchUpperLimit = context.getInteger(KafkaSourceConstants.BATCH_SIZE, + KafkaSourceConstants.DEFAULT_BATCH_SIZE); + timeUpperLimit = context.getInteger(KafkaSourceConstants.BATCH_DURATION_MS, + KafkaSourceConstants.DEFAULT_BATCH_DURATION); + topic = context.getString(KafkaSourceConstants.TOPIC); + + //if consumer timeout and autocommit were not set by user, + // set them to 10ms and false + consumerTimeout = context.getInteger(KafkaSourceConstants.CONSUMER_TIMEOUT, + KafkaSourceConstants.DEFAULT_CONSUMER_TIMEOUT); + context.put(KafkaSourceConstants.CONSUMER_TIMEOUT, + Integer.toString(consumerTimeout)); + String autoCommit = context.getString( + KafkaSourceConstants.AUTO_COMMIT_ENABLED, + String.valueOf(KafkaSourceConstants.DEFAULT_AUTO_COMMIT)); + kafkaAutoCommitEnabled = Boolean.valueOf(autoCommit); + context.put(KafkaSourceConstants.AUTO_COMMIT_ENABLED,autoCommit); + + if(topic == null) { + throw new ConfigurationException("Kafka topic must be specified."); + } + } + + @Override + public synchronized void start() { + log.info("Starting {}...", this); + + try { + //initialize a consumer. This creates the connection to ZooKeeper + consumer = KafkaSourceUtil.getConsumer(context); + } catch (Exception e) { + throw new FlumeException("Unable to create consumer. " + + "Check whether the ZooKeeper server is up and that the " + + "Flume agent can connect to it.", e); + } + + Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); + // We always have just one topic being read by one thread + topicCountMap.put(topic, 1); + + // Get the message iterator for our topic + // Note that this succeeds even if the topic doesn't exist + // in that case we simply get no messages for the topic + // Also note that currently we only support a single topic + try { + Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = + consumer.createMessageStreams(topicCountMap); + List<KafkaStream<byte[], byte[]>> topicList = consumerMap.get(topic); + KafkaStream<byte[], byte[]> stream = topicList.get(0); + it = stream.iterator(); + } catch (Exception e) { + throw new FlumeException("Unable to get message iterator from Kafka", e); + } + log.info("Kafka source {} started.", getName()); + super.start(); + } + + @Override + public synchronized void stop() { + if (consumer != null) { + // exit cleanly. This syncs offsets of messages read to ZooKeeper + // to avoid reading the same messages again + consumer.shutdown(); + } + super.stop(); + } + + + /** + * Check if there are messages waiting in Kafka, + * waiting until timeout (10ms by default) for messages to arrive. + * And timing our wait. + * @return IterStatus object. + * Indicating whether a message was found and how long we waited for it + */ + IterStatus timedHasNext() { + try { + long startTime = System.currentTimeMillis(); + it.hasNext(); + long endTime = System.currentTimeMillis(); + return new IterStatus(true, endTime - startTime); + } catch (ConsumerTimeoutException e) { + return new IterStatus(false, consumerTimeout); + } + } + + private class IterStatus { + private long waitTime; + private boolean hasData; + + + private IterStatus(boolean hasData,long waitTime) { + this.waitTime = waitTime; + this.hasData = hasData; + } + + public long getWaitTime() { + return waitTime; + } + + public boolean hasData() { + return hasData; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java new file mode 100644 index 0000000..ac86f65 --- /dev/null +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.source.kafka; + +public class KafkaSourceConstants { + public static final String TOPIC = "topic"; + public static final String TIMESTAMP = "timestamp"; + public static final String BATCH_SIZE = "batchSize"; + public static final String BATCH_DURATION_MS = "batchDurationMillis"; + public static final String CONSUMER_TIMEOUT = "kafka.consumer.timeout.ms"; + public static final String AUTO_COMMIT_ENABLED = "kafka.auto.commit.enabled"; + public static final String ZOOKEEPER_CONNECT = "kafka.zookeeper.connect"; + public static final String GROUP_ID = "kafka.group.id"; + public static final String PROPERTY_PREFIX = "kafka"; + + + public static final int DEFAULT_BATCH_SIZE = 1000; + public static final int DEFAULT_BATCH_DURATION = 1000; + public static final int DEFAULT_CONSUMER_TIMEOUT = 10; + public static final boolean DEFAULT_AUTO_COMMIT = false; + +} http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java new file mode 100644 index 0000000..8397272 --- /dev/null +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.source.kafka; + +import java.util.Map; +import java.util.Properties; + +import kafka.common.KafkaException; +import kafka.consumer.Consumer; +import kafka.consumer.ConsumerConfig; +import kafka.javaapi.consumer.ConsumerConnector; + +import org.apache.flume.Context; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaSourceUtil { + private static final Logger log = + LoggerFactory.getLogger(KafkaSourceUtil.class); + + public static Properties getKafkaConfigProperties(Context context) { + log.info("context={}",context.toString()); + Properties props = new Properties(); + Map<String, String> contextMap = context.getParameters(); + for(String key : contextMap.keySet()) { + String value = contextMap.get(key).trim(); + key = key.trim(); + if (key.startsWith(KafkaSourceConstants.PROPERTY_PREFIX)) { + // remove the prefix + key = key.substring(KafkaSourceConstants.PROPERTY_PREFIX.length() + 1, + key.length()); + props.put(key, value); + if (log.isDebugEnabled()) { + log.debug("Reading a Kafka Producer Property: key: " + key + + ", value: " + value); + } + } + } + return props; + } + + public static ConsumerConnector getConsumer(Context context) { + ConsumerConfig consumerConfig = + new ConsumerConfig(getKafkaConfigProperties(context)); + ConsumerConnector consumer = + Consumer.createJavaConsumerConnector(consumerConfig); + return consumer; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java new file mode 100644 index 0000000..26c5c9d --- /dev/null +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.source.kafka; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaServerStartable; +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; +import kafka.admin.AdminUtils; +import org.I0Itec.zkclient.ZkClient; +import kafka.utils.ZKStringSerializer$; + +import java.io.IOException; +import java.util.Properties; + +public class KafkaSourceEmbeddedKafka { + KafkaServerStartable kafkaServer; + KafkaSourceEmbeddedZookeeper zookeeper; + int zkPort = 21818; // none-standard + Producer<String,String> producer; + + public KafkaSourceEmbeddedKafka() { + zookeeper = new KafkaSourceEmbeddedZookeeper(zkPort); + Properties props = new Properties(); + props.put("zookeeper.connect",zookeeper.getConnectString()); + props.put("broker.id","1"); + KafkaConfig config = new KafkaConfig(props); + kafkaServer = new KafkaServerStartable(config); + kafkaServer.startup(); + initProducer(); + } + + public void stop() throws IOException { + producer.close(); + kafkaServer.shutdown(); + zookeeper.stopZookeeper(); + } + + public String getZkConnectString() { + return zookeeper.getConnectString(); + } + + private void initProducer() + { + Properties props = new Properties(); + props.put("metadata.broker.list","127.0.0.1:" + + kafkaServer.serverConfig().port()); + props.put("serializer.class","kafka.serializer.StringEncoder"); + props.put("request.required.acks", "1"); + + ProducerConfig config = new ProducerConfig(props); + + producer = new Producer<String,String>(config); + + } + + public void produce(String topic, String k, String v) { + KeyedMessage<String,String> message = new KeyedMessage<String,String>(topic,k,v); + producer.send(message); + } + + public void createTopic(String topicName) { + // Create a ZooKeeper client + int sessionTimeoutMs = 10000; + int connectionTimeoutMs = 10000; + ZkClient zkClient = new ZkClient(zookeeper.getConnectString(), + sessionTimeoutMs, connectionTimeoutMs, + ZKStringSerializer$.MODULE$); + + int numPartitions = 1; + int replicationFactor = 1; + Properties topicConfig = new Properties(); + AdminUtils.createTopic(zkClient, topicName, numPartitions, + replicationFactor, topicConfig); + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java new file mode 100644 index 0000000..1b8a271 --- /dev/null +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.source.kafka; + +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.commons.io.FileUtils; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; + +public class KafkaSourceEmbeddedZookeeper { + private int zkPort; + private ZooKeeperServer zookeeper; + private NIOServerCnxnFactory factory; + File dir; + + + public KafkaSourceEmbeddedZookeeper(int zkPort){ + int numConnections = 5000; + int tickTime = 2000; + + this.zkPort = zkPort; + + String dataDirectory = System.getProperty("java.io.tmpdir"); + dir = new File(dataDirectory, "zookeeper").getAbsoluteFile(); + + try { + this.zookeeper = new ZooKeeperServer(dir,dir,tickTime); + this.factory = new NIOServerCnxnFactory(); + factory.configure(new InetSocketAddress("127.0.0.1",zkPort),0); + factory.startup(zookeeper); + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public void stopZookeeper() throws IOException { + zookeeper.shutdown(); + factory.shutdown(); + FileUtils.deleteDirectory(dir); + } + + public String getConnectString() { + return "127.0.0.1:"+zkPort; + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java new file mode 100644 index 0000000..1009f1c --- /dev/null +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.source.kafka; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyList; +import static org.mockito.Mockito.*; + +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.util.List; + +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; +import junit.framework.Assert; +import kafka.common.TopicExistsException; +import kafka.consumer.ConsumerIterator; +import kafka.message.Message; + +import kafka.message.MessageAndMetadata; + +import org.apache.flume.*; +import org.apache.flume.PollableSource.Status; +import org.apache.flume.channel.ChannelProcessor; +import org.apache.flume.source.AbstractSource; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaSourceTest { + private static final Logger log = + LoggerFactory.getLogger(KafkaSourceTest.class); + + private KafkaSource kafkaSource; + private KafkaSourceEmbeddedKafka kafkaServer; + private ConsumerIterator<byte[], byte[]> mockIt; + private Message message; + private Context context; + private List<Event> events; + private String topicName = "test1"; + + + @SuppressWarnings("unchecked") + @Before + public void setup() throws Exception { + + kafkaSource = new KafkaSource(); + kafkaServer = new KafkaSourceEmbeddedKafka(); + try { + kafkaServer.createTopic(topicName); + } catch (TopicExistsException e) { + //do nothing + } + + context = new Context(); + context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT, + kafkaServer.getZkConnectString()); + context.put(KafkaSourceConstants.GROUP_ID,"flume"); + context.put(KafkaSourceConstants.TOPIC,topicName); + context.put(KafkaSourceConstants.CONSUMER_TIMEOUT,"100"); + + ChannelProcessor channelProcessor = mock(ChannelProcessor.class); + + events = Lists.newArrayList(); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + events.addAll((List<Event>)invocation.getArguments()[0]); + return null; + } + }).when(channelProcessor).processEventBatch(any(List.class)); + kafkaSource.setChannelProcessor(channelProcessor); + } + + @After + public void tearDown() throws Exception { + kafkaSource.stop(); + kafkaServer.stop(); + } + + @SuppressWarnings("unchecked") + @Test + public void testProcessItNotEmpty() throws EventDeliveryException, + SecurityException, NoSuchFieldException, IllegalArgumentException, + IllegalAccessException, InterruptedException { + context.put(KafkaSourceConstants.BATCH_SIZE,"1"); + kafkaSource.configure(context); + kafkaSource.start(); + + Thread.sleep(500L); + + kafkaServer.produce(topicName, "", "hello, world"); + + Thread.sleep(500L); + + Assert.assertEquals(Status.READY, kafkaSource.process()); + Assert.assertEquals(Status.BACKOFF, kafkaSource.process()); + Assert.assertEquals(1, events.size()); + + Assert.assertEquals("hello, world", new String(events.get(0).getBody(), + Charsets.UTF_8)); + + + } + + @SuppressWarnings("unchecked") + @Test + public void testProcessItNotEmptyBatch() throws EventDeliveryException, + SecurityException, NoSuchFieldException, IllegalArgumentException, + IllegalAccessException, InterruptedException { + context.put(KafkaSourceConstants.BATCH_SIZE,"2"); + kafkaSource.configure(context); + kafkaSource.start(); + + Thread.sleep(500L); + + kafkaServer.produce(topicName, "", "hello, world"); + kafkaServer.produce(topicName, "", "foo, bar"); + + Thread.sleep(500L); + + Status status = kafkaSource.process(); + assertEquals(Status.READY, status); + Assert.assertEquals("hello, world", new String(events.get(0).getBody(), + Charsets.UTF_8)); + Assert.assertEquals("foo, bar", new String(events.get(1).getBody(), + Charsets.UTF_8)); + + } + + + @SuppressWarnings("unchecked") + @Test + public void testProcessItEmpty() throws EventDeliveryException, + SecurityException, NoSuchFieldException, IllegalArgumentException, + IllegalAccessException, InterruptedException { + kafkaSource.configure(context); + kafkaSource.start(); + Thread.sleep(500L); + + Status status = kafkaSource.process(); + assertEquals(Status.BACKOFF, status); + } + + @SuppressWarnings("unchecked") + @Test + public void testNonExistingTopic() throws EventDeliveryException, + SecurityException, NoSuchFieldException, IllegalArgumentException, + IllegalAccessException, InterruptedException { + context.put(KafkaSourceConstants.TOPIC,"faketopic"); + kafkaSource.configure(context); + kafkaSource.start(); + Thread.sleep(500L); + + Status status = kafkaSource.process(); + assertEquals(Status.BACKOFF, status); + } + + @SuppressWarnings("unchecked") + @Test(expected= FlumeException.class) + public void testNonExistingZk() throws EventDeliveryException, + SecurityException, NoSuchFieldException, IllegalArgumentException, + IllegalAccessException, InterruptedException { + context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT,"blabla:666"); + kafkaSource.configure(context); + kafkaSource.start(); + Thread.sleep(500L); + + Status status = kafkaSource.process(); + assertEquals(Status.BACKOFF, status); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java new file mode 100644 index 0000000..b9a1b25 --- /dev/null +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.source.kafka; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import java.util.Properties; + +import kafka.javaapi.consumer.ConsumerConnector; +import org.apache.flume.Context; +import org.apache.zookeeper.server.*; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class KafkaSourceUtilTest { + private Properties props = new Properties(); + private Context context = new Context(); + private int zkPort = 21818; // none-standard + private KafkaSourceEmbeddedZookeeper zookeeper; + + @Before + public void setUp() throws Exception { + context.put("consumer.timeout", "10"); + context.put("type", "KafkaSource"); + context.put("topic", "test"); + props = KafkaSourceUtil.getKafkaConfigProperties(context); + zookeeper = new KafkaSourceEmbeddedZookeeper(zkPort); + + + } + + @After + public void tearDown() throws Exception { + zookeeper.stopZookeeper(); + } + + @Test + public void testGetKafkaConfigParameter() { + assertEquals("10",props.getProperty("consumer.timeout")); + assertEquals("test",props.getProperty("topic")); + assertNull(props.getProperty("type")); + } + + + @Test + public void testGetConsumer() { + context.put("zookeeper.connect", "127.0.0.1:"+zkPort); + context.put("group.id","test"); + + ConsumerConnector cc = KafkaSourceUtil.getConsumer(context); + assertNotNull(cc); + + } + + +} http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-sources/flume-kafka-source/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/resources/log4j.properties b/flume-ng-sources/flume-kafka-source/src/test/resources/log4j.properties new file mode 100644 index 0000000..78b1067 --- /dev/null +++ b/flume-ng-sources/flume-kafka-source/src/test/resources/log4j.properties @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +log4j.rootLogger = INFO, out + +log4j.appender.out = org.apache.log4j.ConsoleAppender +log4j.appender.out.layout = org.apache.log4j.PatternLayout +log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n + +log4j.logger.org.apache.flume = INFO \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-sources/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sources/pom.xml b/flume-ng-sources/pom.xml index c03307a..ab8eca4 100644 --- a/flume-ng-sources/pom.xml +++ b/flume-ng-sources/pom.xml @@ -44,6 +44,7 @@ limitations under the License. <module>flume-scribe-source</module> <module>flume-jms-source</module> <module>flume-twitter-source</module> + <module>flume-kafka-source</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 740edc2..8ee82f3 100644 --- a/pom.xml +++ b/pom.xml @@ -1169,6 +1169,12 @@ limitations under the License. </dependency> <dependency> + <groupId>org.apache.flume.flume-ng-sources</groupId> + <artifactId>flume-kafka-source</artifactId> + <version>1.6.0-SNAPSHOT</version> + </dependency> + + <dependency> <groupId>org.apache.flume.flume-ng-legacy-sources</groupId> <artifactId>flume-avro-source</artifactId> <version>1.6.0-SNAPSHOT</version> @@ -1270,6 +1276,20 @@ limitations under the License. <version>3.0.3</version> </dependency> + <!-- Dependencies of Kafka source --> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>0.8.1.1</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>0.8.1.1</version> + <classifier>test</classifier> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.kitesdk</groupId> <artifactId>kite-data-core</artifactId>
