Repository: flume Updated Branches: refs/heads/trunk acc965134 -> 75f748cbd
FLUME-2251. Kafka Sink. (Thilina Buddhika, Gwen Shapira via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/75f748cb Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/75f748cb Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/75f748cb Branch: refs/heads/trunk Commit: 75f748cbd101d6efe8463a1c747fb87d2f668091 Parents: acc9651 Author: Hari Shreedharan <[email protected]> Authored: Mon Sep 15 14:26:19 2014 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Mon Sep 15 14:26:19 2014 -0700 ---------------------------------------------------------------------- flume-ng-dist/pom.xml | 4 + flume-ng-sinks/flume-ng-kafka-sink/pom.xml | 68 ++++++ .../org/apache/flume/sink/kafka/KafkaSink.java | 219 +++++++++++++++++++ .../flume/sink/kafka/KafkaSinkConstants.java | 31 +++ .../apache/flume/sink/kafka/TestConstants.java | 25 +++ .../apache/flume/sink/kafka/TestKafkaSink.java | 212 ++++++++++++++++++ .../flume/sink/kafka/util/KafkaConsumer.java | 98 +++++++++ .../flume/sink/kafka/util/KafkaLocal.java | 52 +++++ .../apache/flume/sink/kafka/util/TestUtil.java | 174 +++++++++++++++ .../flume/sink/kafka/util/ZooKeeperLocal.java | 62 ++++++ .../src/test/resources/kafka-server.properties | 117 ++++++++++ .../src/test/resources/log4j.properties | 78 +++++++ .../src/test/resources/zookeeper.properties | 20 ++ flume-ng-sinks/pom.xml | 1 + pom.xml | 6 + 15 files changed, 1167 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-dist/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml index 8c18af6..ca3cd8b 100644 --- a/flume-ng-dist/pom.xml +++ b/flume-ng-dist/pom.xml @@ -150,6 +150,10 @@ <artifactId>flume-ng-morphline-solr-sink</artifactId> </dependency> <dependency> + <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-ng-kafka-sink</artifactId> + </dependency> + <dependency> <groupId>org.apache.flume.flume-ng-sources</groupId> <artifactId>flume-scribe-source</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml new file mode 100644 index 0000000..307fa59 --- /dev/null +++ b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml @@ -0,0 +1,68 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>flume-ng-sinks</artifactId> + <groupId>org.apache.flume</groupId> + <version>1.6.0-SNAPSHOT</version> + </parent> + <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-ng-kafka-sink</artifactId> + <name>Flume Kafka Sink</name> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-sdk</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-configuration</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>0.8.1.1</version> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java new file mode 100644 index 0000000..a6121ac --- /dev/null +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java @@ -0,0 +1,219 @@ +/** + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + limitations under the License. + */ + +package org.apache.flume.sink.kafka; + +import com.google.common.base.Throwables; +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; +import org.apache.flume.*; +import org.apache.flume.conf.Configurable; +import org.apache.flume.sink.AbstractSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * A Flume Sink that can publish messages to Kafka. + * This is a general implementation that can be used with any Flume agent and + * a channel. + * The message can be any event and the key is a string that we read from the + * header + * For use of partitioning, use an interceptor to generate a header with the + * partition key + * <p/> + * Mandatory properties are: + * kafka.metadata.broker.list -- can be a partial list, + * but at least 2 are recommended for HA + * kafka.request.required.acks -- 0 (unsafe), 1 (accepted by at least one + * broker), -1 (accepted by all brokers) + * kafka.producer.type -- for safety, this should be sync + * <p/> + * <p/> + * however, any property starting with "kafka." will be passed along to the + * Kafka producer + * Read the Kafka producer documentation to see which configurations can be used + * <p/> + * Optional properties + * topic - there's a default, and also - this can be in the event header if + * you need to support events with + * different topics + * batchSize - how many messages to process in one batch. Larger batches + * improve throughput while adding latency. + * <p/> + * header properties (per event): + * topic + * key + */ +public class KafkaSink extends AbstractSink implements Configurable { + + private static final Logger logger = LoggerFactory.getLogger(KafkaSink.class); + public static final String KEY_HDR = "key"; + public static final String TOPIC_HDR = "topic"; + private Properties producerProps; + private Producer<String, byte[]> producer; + private String topic; + private int batchSize; + private List<KeyedMessage<String, byte[]>> messageList; + + @Override + public Status process() throws EventDeliveryException { + Status result = Status.READY; + Channel channel = getChannel(); + Transaction transaction = null; + Event event = null; + String eventTopic = null; + String eventKey = null; + + try { + long processedEvents = 0; + + transaction = channel.getTransaction(); + transaction.begin(); + + messageList.clear(); + for (; processedEvents < batchSize; processedEvents += 1) { + event = channel.take(); + + if (event == null) { + // no events available in channel + break; + } + + byte[] eventBody = event.getBody(); + Map<String, String> headers = event.getHeaders(); + + if ((eventTopic = headers.get(TOPIC_HDR)) == null) { + eventTopic = topic; + } + + eventKey = headers.get(KEY_HDR); + + if (logger.isDebugEnabled()) { + logger.debug("{Event} " + eventTopic + " : " + eventKey + " : " + + new String(eventBody, "UTF-8")); + logger.debug("event #{}", processedEvents); + } + + // create a message and add to buffer + KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]> + (eventTopic, eventKey, eventBody); + messageList.add(data); + + } + + // publish batch and commit. + if (processedEvents > 0) { + producer.send(messageList); + } + + transaction.commit(); + + } catch (Exception ex) { + String errorMsg = "Failed to publish events"; + logger.error("Failed to publish events", ex); + result = Status.BACKOFF; + if (transaction != null) { + try { + transaction.rollback(); + } catch (Exception e) { + logger.error("Transaction rollback failed", e); + throw Throwables.propagate(e); + } + } + throw new EventDeliveryException(errorMsg, ex); + } finally { + if (transaction != null) { + transaction.close(); + } + } + + return result; + } + + @Override + public synchronized void start() { + // instantiate the producer + ProducerConfig config = new ProducerConfig(producerProps); + producer = new Producer<String, byte[]>(config); + super.start(); + } + + @Override + public synchronized void stop() { + producer.close(); + super.stop(); + } + + + @Override + public void configure(Context context) { + + batchSize = context.getInteger(KafkaSinkConstants.BATCH_SIZE, + KafkaSinkConstants.DEFAULT_BATCH_SIZE); + logger.debug("Using batch size: {}", batchSize); + messageList = + new ArrayList<KeyedMessage<String, byte[]>>(batchSize); + Map<String, String> params = context.getParameters(); + logger.debug("all params: " + params.entrySet().toString()); + setProducerProps(params); + if (!producerProps.contains("serializer.class")) { + producerProps.put("serializer.class", "kafka.serializer.DefaultEncoder"); + } + if (!producerProps.contains("key.serializer.class")) { + producerProps.put("key.serializer.class", + "kafka.serializer.StringEncoder"); + } + + topic = context.getString(KafkaSinkConstants.TOPIC, + KafkaSinkConstants.DEFAULT_TOPIC); + if (topic.equals(KafkaSinkConstants.DEFAULT_TOPIC)) { + logger.warn("The Properties 'preprocessor' or 'topic' is not set. " + + "Using the default topic name" + + KafkaSinkConstants.DEFAULT_TOPIC); + } else { + logger.info("Using the static topic: " + topic + + " this may be over-ridden by event headers"); + } + } + + + private void setProducerProps(Map<String, String> params) { + producerProps = new Properties(); + for (String key : params.keySet()) { + String value = params.get(key).trim(); + key = key.trim(); + if (key.startsWith(KafkaSinkConstants.PROPERTY_PREFIX)) { + // remove the prefix + key = key.substring(KafkaSinkConstants.PROPERTY_PREFIX.length() + 1, + key.length()); + producerProps.put(key.trim(), value); + if (logger.isDebugEnabled()) { + logger.debug("Reading a Kafka Producer Property: key: " + key + + ", value: " + value); + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java new file mode 100644 index 0000000..48d875e --- /dev/null +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkConstants.java @@ -0,0 +1,31 @@ +/** + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + limitations under the License. + */ + +package org.apache.flume.sink.kafka; + +public class KafkaSinkConstants { + + public static final String PROPERTY_PREFIX = "kafka"; + + /* Properties */ + public static final String DEFAULT_TOPIC = "default-flume-topic"; + public static final String TOPIC = "topic"; + public static final String BATCH_SIZE = "batchSize"; + + public static final int DEFAULT_BATCH_SIZE = 100; +} http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java new file mode 100644 index 0000000..f99be53 --- /dev/null +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestConstants.java @@ -0,0 +1,25 @@ +/** + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + limitations under the License. + */ + +package org.apache.flume.sink.kafka; + +public class TestConstants { + public static final String STATIC_TOPIC = "static-topic"; + public static final String CUSTOM_KEY = "custom-key"; + public static final String CUSTOM_TOPIC = "custom-topic"; +} http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java new file mode 100644 index 0000000..aed6dac --- /dev/null +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java @@ -0,0 +1,212 @@ +/** + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + limitations under the License. + */ + +package org.apache.flume.sink.kafka; + +import kafka.message.MessageAndMetadata; +import org.apache.flume.*; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.conf.Configurables; +import org.apache.flume.event.EventBuilder; +import org.apache.flume.sink.kafka.util.TestUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +/** + * Unit tests for Kafka Sink + */ +public class TestKafkaSink { + + private static TestUtil testUtil = TestUtil.getInstance(); + + @BeforeClass + public static void setup() { + testUtil.prepare(); + List<String> topics = new ArrayList<String>(3); + topics.add(KafkaSinkConstants.DEFAULT_TOPIC); + topics.add(TestConstants.STATIC_TOPIC); + topics.add(TestConstants.CUSTOM_TOPIC); + testUtil.initTopicList(topics); + } + + @AfterClass + public static void tearDown() { + testUtil.tearDown(); + } + + @Test + public void testDefaultTopic() { + Sink kafkaSink = new KafkaSink(); + Context context = prepareDefaultContext(); + Configurables.configure(kafkaSink, context); + Channel memoryChannel = new MemoryChannel(); + Configurables.configure(memoryChannel, context); + kafkaSink.setChannel(memoryChannel); + kafkaSink.start(); + + String msg = "default-topic-test"; + Transaction tx = memoryChannel.getTransaction(); + tx.begin(); + Event event = EventBuilder.withBody(msg.getBytes()); + memoryChannel.put(event); + tx.commit(); + tx.close(); + + try { + Sink.Status status = kafkaSink.process(); + if (status == Sink.Status.BACKOFF) { + fail("Error Occurred"); + } + } catch (EventDeliveryException ex) { + // ignore + } + + String fetchedMsg = new String((byte[]) + testUtil.getNextMessageFromConsumer(KafkaSinkConstants.DEFAULT_TOPIC) + .message()); + assertEquals(msg, fetchedMsg); + } + + @Test + public void testStaticTopic() { + Context context = prepareDefaultContext(); + // add the static topic + context.put(KafkaSinkConstants.TOPIC, TestConstants.STATIC_TOPIC); + String msg = "static-topic-test"; + + try { + Sink.Status status = prepareAndSend(context, msg); + if (status == Sink.Status.BACKOFF) { + fail("Error Occurred"); + } + } catch (EventDeliveryException ex) { + // ignore + } + + String fetchedMsg = new String((byte[]) testUtil.getNextMessageFromConsumer( + TestConstants.STATIC_TOPIC).message()); + assertEquals(msg, fetchedMsg); + } + + @Test + public void testTopicAndKeyFromHeader() throws UnsupportedEncodingException { + + + Sink kafkaSink = new KafkaSink(); + Context context = prepareDefaultContext(); + Configurables.configure(kafkaSink, context); + Channel memoryChannel = new MemoryChannel(); + Configurables.configure(memoryChannel, context); + kafkaSink.setChannel(memoryChannel); + kafkaSink.start(); + + String msg = "my message"; + Map<String, String> headers = new HashMap<String, String>(); + headers.put("topic", TestConstants.CUSTOM_TOPIC); + headers.put("key", TestConstants.CUSTOM_KEY); + Transaction tx = memoryChannel.getTransaction(); + tx.begin(); + Event event = EventBuilder.withBody(msg.getBytes(), headers); + memoryChannel.put(event); + tx.commit(); + tx.close(); + + try { + Sink.Status status = kafkaSink.process(); + if (status == Sink.Status.BACKOFF) { + fail("Error Occurred"); + } + } catch (EventDeliveryException ex) { + // ignore + } + + MessageAndMetadata fetchedMsg = + testUtil.getNextMessageFromConsumer(TestConstants.CUSTOM_TOPIC); + + assertEquals(msg, new String((byte[]) fetchedMsg.message(), "UTF-8")); + assertEquals(TestConstants.CUSTOM_KEY, + new String((byte[]) fetchedMsg.key(), "UTF-8")); + + } + + @Test + public void testEmptyChannel() throws UnsupportedEncodingException { + + + Sink kafkaSink = new KafkaSink(); + Context context = prepareDefaultContext(); + Configurables.configure(kafkaSink, context); + Channel memoryChannel = new MemoryChannel(); + Configurables.configure(memoryChannel, context); + kafkaSink.setChannel(memoryChannel); + kafkaSink.start(); + + try { + Sink.Status status = kafkaSink.process(); + if (status == Sink.Status.BACKOFF) { + fail("Error Occurred"); + } + } catch (EventDeliveryException ex) { + // ignore + } + assertNull( + testUtil.getNextMessageFromConsumer(KafkaSinkConstants.DEFAULT_TOPIC)); + + } + + + private Context prepareDefaultContext() { + // Prepares a default context with Kafka Server Properties + Context context = new Context(); + context.put("kafka.metadata.broker.list", testUtil.getKafkaServerUrl()); + context.put("kafka.request.required.acks", "1"); + context.put("batchSize", "1"); + return context; + } + + private Sink.Status prepareAndSend(Context context, String msg) + throws EventDeliveryException { + Sink kafkaSink = new KafkaSink(); + Configurables.configure(kafkaSink, context); + Channel memoryChannel = new MemoryChannel(); + Configurables.configure(memoryChannel, context); + kafkaSink.setChannel(memoryChannel); + kafkaSink.start(); + + Transaction tx = memoryChannel.getTransaction(); + tx.begin(); + Event event = EventBuilder.withBody(msg.getBytes()); + memoryChannel.put(event); + tx.commit(); + tx.close(); + + return kafkaSink.process(); + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java new file mode 100644 index 0000000..1c98922 --- /dev/null +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java @@ -0,0 +1,98 @@ +/** + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + limitations under the License. + */ + +package org.apache.flume.sink.kafka.util; + +import kafka.consumer.ConsumerConfig; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.ConsumerTimeoutException; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.message.MessageAndMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * A Kafka Consumer implementation. This uses the current thread to fetch the + * next message from the queue and doesn't use a multi threaded implementation. + * So this implements a synchronous blocking call. + * To avoid infinite waiting, a timeout is implemented to wait only for + * 10 seconds before concluding that the message will not be available. + */ +public class KafkaConsumer { + + private static final Logger logger = LoggerFactory.getLogger( + KafkaConsumer.class); + + private final ConsumerConnector consumer; + Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap; + + public KafkaConsumer() { + consumer = kafka.consumer.Consumer.createJavaConsumerConnector( + createConsumerConfig(TestUtil.getInstance().getZkUrl(), "group_1")); + } + + private static ConsumerConfig createConsumerConfig(String zkUrl, + String groupId) { + Properties props = new Properties(); + props.put("zookeeper.connect", zkUrl); + props.put("group.id", groupId); + props.put("zookeeper.session.timeout.ms", "400"); + props.put("zookeeper.sync.time.ms", "200"); + props.put("auto.commit.interval.ms", "1000"); + props.put("auto.offset.reset", "smallest"); + props.put("consumer.timeout.ms","1000"); + return new ConsumerConfig(props); + } + + public void initTopicList(List<String> topics) { + Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); + for (String topic : topics) { + // we need only single threaded consumers + topicCountMap.put(topic, new Integer(1)); + } + consumerMap = consumer.createMessageStreams(topicCountMap); + } + + public MessageAndMetadata getNextMessage(String topic) { + List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); + // it has only a single stream, because there is only one consumer + KafkaStream stream = streams.get(0); + final ConsumerIterator<byte[], byte[]> it = stream.iterator(); + int counter = 0; + try { + if (it.hasNext()) { + return it.next(); + } else { + return null; + } + } catch (ConsumerTimeoutException e) { + logger.error("0 messages available to fetch for the topic " + topic); + return null; + } + } + + public void shutdown() { + consumer.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java new file mode 100644 index 0000000..3c6e064 --- /dev/null +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java @@ -0,0 +1,52 @@ +/** + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + limitations under the License. + */ + +package org.apache.flume.sink.kafka.util; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaServerStartable; + +import java.io.IOException; +import java.util.Properties; + +/** + * A local Kafka server for running unit tests. + * Reference: https://gist.github.com/fjavieralba/7930018/ + */ +public class KafkaLocal { + + public KafkaServerStartable kafka; + public ZooKeeperLocal zookeeper; + + public KafkaLocal(Properties kafkaProperties) throws IOException, + InterruptedException{ + KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties); + + //start local kafka broker + kafka = new KafkaServerStartable(kafkaConfig); + } + + public void start() throws Exception{ + kafka.startup(); + } + + public void stop(){ + kafka.shutdown(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java new file mode 100644 index 0000000..8855c53 --- /dev/null +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java @@ -0,0 +1,174 @@ +/** + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + limitations under the License. + */ + +package org.apache.flume.sink.kafka.util; + +import kafka.message.MessageAndMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.BindException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.List; +import java.util.Properties; +import java.util.Random; + +/** + * A utility class for starting/stopping Kafka Server. + */ +public class TestUtil { + + private static final Logger logger = LoggerFactory.getLogger(TestUtil.class); + private static TestUtil instance = new TestUtil(); + + private Random randPortGen = new Random(System.currentTimeMillis()); + private KafkaLocal kafkaServer; + private KafkaConsumer kafkaConsumer; + private String hostname = "localhost"; + private int kafkaLocalPort; + private int zkLocalPort; + + private TestUtil() { + init(); + } + + public static TestUtil getInstance() { + return instance; + } + + private void init() { + // get the localhost. + try { + hostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + logger.warn("Error getting the value of localhost. " + + "Proceeding with 'localhost'.", e); + } + } + + private boolean startKafkaServer() { + Properties kafkaProperties = new Properties(); + Properties zkProperties = new Properties(); + + try { + //load properties + zkProperties.load(Class.class.getResourceAsStream( + "/zookeeper.properties")); + + ZooKeeperLocal zookeeper; + while (true) { + //start local Zookeeper + try { + zkLocalPort = getNextPort(); + // override the Zookeeper client port with the generated one. + zkProperties.setProperty("clientPort", Integer.toString(zkLocalPort)); + zookeeper = new ZooKeeperLocal(zkProperties); + break; + } catch (BindException bindEx) { + // bind exception. port is already in use. Try a different port. + } + } + logger.info("ZooKeeper instance is successfully started on port " + + zkLocalPort); + + kafkaProperties.load(Class.class.getResourceAsStream( + "/kafka-server.properties")); + // override the Zookeeper url. + kafkaProperties.setProperty("zookeeper.connect", getZkUrl()); + while (true) { + kafkaLocalPort = getNextPort(); + // override the Kafka server port + kafkaProperties.setProperty("port", Integer.toString(kafkaLocalPort)); + kafkaServer = new KafkaLocal(kafkaProperties); + try { + kafkaServer.start(); + break; + } catch (BindException bindEx) { + // let's try another port. + } + } + logger.info("Kafka Server is successfully started on port " + + kafkaLocalPort); + return true; + + } catch (Exception e) { + logger.error("Error starting the Kafka Server.", e); + return false; + } + } + + private KafkaConsumer getKafkaConsumer() { + synchronized (this) { + if (kafkaConsumer == null) { + kafkaConsumer = new KafkaConsumer(); + } + } + return kafkaConsumer; + } + + public void initTopicList(List<String> topics) { + getKafkaConsumer().initTopicList(topics); + } + + public MessageAndMetadata getNextMessageFromConsumer(String topic) { + return getKafkaConsumer().getNextMessage(topic); + } + + public void prepare() { + boolean startStatus = startKafkaServer(); + if (!startStatus) { + throw new RuntimeException("Error starting the server!"); + } + try { + Thread.sleep(3 * 1000); // add this sleep time to + // ensure that the server is fully started before proceeding with tests. + } catch (InterruptedException e) { + // ignore + } + getKafkaConsumer(); + logger.info("Completed the prepare phase."); + } + + public void tearDown() { + logger.info("Shutting down the Kafka Consumer."); + getKafkaConsumer().shutdown(); + try { + Thread.sleep(3 * 1000); // add this sleep time to + // ensure that the server is fully started before proceeding with tests. + } catch (InterruptedException e) { + // ignore + } + logger.info("Shutting down the kafka Server."); + kafkaServer.stop(); + logger.info("Completed the tearDown phase."); + } + + private synchronized int getNextPort() { + // generate a random port number between 49152 and 65535 + return randPortGen.nextInt(65535 - 49152) + 49152; + } + + public String getZkUrl() { + return hostname + ":" + zkLocalPort; + } + + public String getKafkaServerUrl() { + return hostname + ":" + kafkaLocalPort; + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java new file mode 100644 index 0000000..1a5728f --- /dev/null +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java @@ -0,0 +1,62 @@ +/** + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + limitations under the License. + */ + +package org.apache.flume.sink.kafka.util; + +import org.apache.zookeeper.server.ServerConfig; +import org.apache.zookeeper.server.ZooKeeperServerMain; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Properties; + +/** + * A local Zookeeper server for running unit tests. + * Reference: https://gist.github.com/fjavieralba/7930018/ + */ +public class ZooKeeperLocal { + + private static final Logger logger = + LoggerFactory.getLogger(ZooKeeperLocal.class); + private ZooKeeperServerMain zooKeeperServer; + + public ZooKeeperLocal(Properties zkProperties) throws IOException{ + QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig(); + try { + quorumConfiguration.parseProperties(zkProperties); + } catch(Exception e) { + throw new RuntimeException(e); + } + + zooKeeperServer = new ZooKeeperServerMain(); + final ServerConfig configuration = new ServerConfig(); + configuration.readFrom(quorumConfiguration); + + new Thread() { + public void run() { + try { + zooKeeperServer.runFromConfig(configuration); + } catch (IOException e) { + logger.error("Zookeeper startup failed.", e); + } + } + }.start(); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties new file mode 100644 index 0000000..c07cdea --- /dev/null +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties @@ -0,0 +1,117 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id=0 + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port=9092 + +# Hostname the broker will bind to. If not set, the server will bind to all interfaces +#host.name=localhost + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#advertised.host.name=<hostname routable by clients> + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port=<port accessible by clients> + +# The number of threads handling network requests +num.network.threads=2 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma seperated list of directories under which to store log files +log.dirs=target/kafka-logs + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=2 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=536870912 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=60000 + +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect=localhost:2181 + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=1000000 http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties new file mode 100644 index 0000000..bdcb643 --- /dev/null +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/log4j.properties @@ -0,0 +1,78 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +kafka.logs.dir=target/logs + +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log +log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log +log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log +log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log +log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log +log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n + +# Turn on all our debugging info +#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender +#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender +#log4j.logger.kafka.perf=DEBUG, kafkaAppender +#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender +#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG +log4j.logger.kafka=INFO, kafkaAppender + +log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender +log4j.additivity.kafka.network.RequestChannel$=false + +#log4j.logger.kafka.network.Processor=TRACE, requestAppender +#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender +#log4j.additivity.kafka.server.KafkaApis=false +log4j.logger.kafka.request.logger=WARN, requestAppender +log4j.additivity.kafka.request.logger=false + +log4j.logger.kafka.controller=TRACE, controllerAppender +log4j.additivity.kafka.controller=false + +log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender +log4j.additivity.kafka.log.LogCleaner=false + +log4j.logger.state.change.logger=TRACE, stateChangeAppender +log4j.additivity.state.change.logger=false http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/zookeeper.properties ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/zookeeper.properties b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/zookeeper.properties new file mode 100644 index 0000000..89e1b5e --- /dev/null +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/zookeeper.properties @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir=target +# the port at which the clients will connect +clientPort=2181 +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/flume-ng-sinks/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/pom.xml b/flume-ng-sinks/pom.xml index 3381bde..4bac019 100644 --- a/flume-ng-sinks/pom.xml +++ b/flume-ng-sinks/pom.xml @@ -46,6 +46,7 @@ limitations under the License. <module>flume-ng-hbase-sink</module> <module>flume-ng-elasticsearch-sink</module> <module>flume-ng-morphline-solr-sink</module> + <module>flume-ng-kafka-sink</module> </modules> <profiles> http://git-wip-us.apache.org/repos/asf/flume/blob/75f748cb/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 150db2e..740edc2 100644 --- a/pom.xml +++ b/pom.xml @@ -1139,6 +1139,12 @@ limitations under the License. </dependency> <dependency> + <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-ng-kafka-sink</artifactId> + <version>1.6.0-SNAPSHOT</version> + </dependency> + + <dependency> <groupId>org.apache.flume.flume-ng-sources</groupId> <artifactId>flume-scribe-source</artifactId> <version>1.6.0-SNAPSHOT</version>
