Repository: flume
Updated Branches:
  refs/heads/flume-1.6 57d651105 -> 9cc850825


FLUME-2250. Kafka Source.

(Frank Yao, Ashish Paliwal, Gwen Shapira via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/9cc85082
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/9cc85082
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/9cc85082

Branch: refs/heads/flume-1.6
Commit: 9cc8508255078b5a03f5b2899cc2795f376ad2b5
Parents: 57d6511
Author: Hari Shreedharan <[email protected]>
Authored: Tue Sep 16 21:24:37 2014 -0700
Committer: Hari Shreedharan <[email protected]>
Committed: Tue Sep 16 21:26:48 2014 -0700

----------------------------------------------------------------------
 flume-ng-dist/pom.xml                           |   4 +
 flume-ng-doc/sphinx/FlumeUserGuide.rst          | 126 ++++++----
 flume-ng-sinks/flume-ng-kafka-sink/pom.xml      |   1 -
 flume-ng-sources/flume-kafka-source/pom.xml     |  70 ++++++
 .../apache/flume/source/kafka/KafkaSource.java  | 231 +++++++++++++++++++
 .../source/kafka/KafkaSourceConstants.java      |  36 +++
 .../flume/source/kafka/KafkaSourceUtil.java     |  63 +++++
 .../source/kafka/KafkaSourceEmbeddedKafka.java  |  92 ++++++++
 .../kafka/KafkaSourceEmbeddedZookeeper.java     |  64 +++++
 .../flume/source/kafka/KafkaSourceTest.java     | 195 ++++++++++++++++
 .../flume/source/kafka/KafkaSourceUtilTest.java |  75 ++++++
 .../src/test/resources/log4j.properties         |  25 ++
 flume-ng-sources/pom.xml                        |   1 +
 pom.xml                                         |  20 ++
 14 files changed, 961 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml
index ca3cd8b..a5db0c7 100644
--- a/flume-ng-dist/pom.xml
+++ b/flume-ng-dist/pom.xml
@@ -166,6 +166,10 @@
       <artifactId>flume-twitter-source</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.flume.flume-ng-sources</groupId>
+      <artifactId>flume-kafka-source</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.flume.flume-ng-legacy-sources</groupId>
       <artifactId>flume-avro-source</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index a718fbf..3a47560 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -994,47 +994,6 @@ Example for an agent named agent-1:
   agent-1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
   agent-1.sources.src-1.fileHeader = true
 
-Twitter 1% firehose Source (experimental)
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-.. warning::
-  This source is hightly experimental and may change between minor versions of 
Flume.
-  Use at your own risk.
-
-Experimental source that connects via Streaming API to the 1% sample twitter
-firehose, continously downloads tweets, converts them to Avro format and
-sends Avro events to a downstream Flume sink. Requires the consumer and 
-access tokens and secrets of a Twitter developer account.
-Required properties are in **bold**.
-
-====================== ===========  
===================================================
-Property Name          Default      Description
-====================== ===========  
===================================================
-**channels**           --
-**type**               --           The component type name, needs to be 
``org.apache.flume.source.twitter.TwitterSource``
-**consumerKey**        --           OAuth consumer key
-**consumerSecret**     --           OAuth consumer secret
-**accessToken**        --           OAuth access token
-**accessTokenSecret**  --           OAuth toekn secret 
-maxBatchSize           1000         Maximum number of twitter messages to put 
in a single batch
-maxBatchDurationMillis 1000         Maximum number of milliseconds to wait 
before closing a batch
-====================== ===========  
===================================================
-
-Example for agent named a1:
-
-.. code-block:: properties
-
-  a1.sources = r1
-  a1.channels = c1
-  a1.sources.r1.type = org.apache.flume.source.twitter.TwitterSource
-  a1.sources.r1.channels = c1
-  a1.sources.r1.consumerKey = YOUR_TWITTER_CONSUMER_KEY
-  a1.sources.r1.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET
-  a1.sources.r1.accessToken = YOUR_TWITTER_ACCESS_TOKEN
-  a1.sources.r1.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET
-  a1.sources.r1.maxBatchSize = 10
-  a1.sources.r1.maxBatchDurationMillis = 200
-
 Event Deserializers
 '''''''''''''''''''
 
@@ -1094,6 +1053,91 @@ Property Name               Default             
Description
 deserializer.maxBlobLength  100000000           The maximum number of bytes to 
read and buffer for a given request
 ==========================  ==================  
=======================================================================
 
+Twitter 1% firehose Source (experimental)
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+.. warning::
+  This source is hightly experimental and may change between minor versions of 
Flume.
+  Use at your own risk.
+
+Experimental source that connects via Streaming API to the 1% sample twitter
+firehose, continously downloads tweets, converts them to Avro format and
+sends Avro events to a downstream Flume sink. Requires the consumer and 
+access tokens and secrets of a Twitter developer account.
+Required properties are in **bold**.
+
+====================== ===========  
===================================================
+Property Name          Default      Description
+====================== ===========  
===================================================
+**channels**           --
+**type**               --           The component type name, needs to be 
``org.apache.flume.source.twitter.TwitterSource``
+**consumerKey**        --           OAuth consumer key
+**consumerSecret**     --           OAuth consumer secret
+**accessToken**        --           OAuth access token
+**accessTokenSecret**  --           OAuth toekn secret 
+maxBatchSize           1000         Maximum number of twitter messages to put 
in a single batch
+maxBatchDurationMillis 1000         Maximum number of milliseconds to wait 
before closing a batch
+====================== ===========  
===================================================
+
+Example for agent named a1:
+
+.. code-block:: properties
+
+  a1.sources = r1
+  a1.channels = c1
+  a1.sources.r1.type = org.apache.flume.source.twitter.TwitterSource
+  a1.sources.r1.channels = c1
+  a1.sources.r1.consumerKey = YOUR_TWITTER_CONSUMER_KEY
+  a1.sources.r1.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET
+  a1.sources.r1.accessToken = YOUR_TWITTER_ACCESS_TOKEN
+  a1.sources.r1.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET
+  a1.sources.r1.maxBatchSize = 10
+  a1.sources.r1.maxBatchDurationMillis = 200
+
+Kafka Source
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Kafka Source is an Apache Kafka consumer that reads messages from a Kafka 
topic.
+If you have multiple Kafka sources running, you can configure them with the 
same Consumer Group
+so each will read a unique set of partitions for the topic.
+
+The properties below are required properties, but you can specify any Kafka 
parameter you want
+and it will be passed to the consumer. Check `Kafka documentation 
<https://kafka.apache.org/08/configuration.html#consumerconfigs>`_
+for details
+
+===========================     ===========  
===================================================
+Property Name                   Default      Description
+===========================     ===========  
===================================================
+**channels**                    --
+**type**                        --           The component type name, needs to 
be ``org.apache.flume.source.kafka,KafkaSource``
+**kafka.zookeeper.connect**     --           URI of ZooKeeper used by Kafka 
cluster
+**kadka.group.id**              --           Unique identified of consumer 
group. Setting the same id in multiple sources or agents
+                                             indicates that they are part of 
the same consumer group
+**topic**                       --           Kafka topic we'll read messages 
from. At the time, this is a single topic only.
+batchSize                       1000         Maximum number of messages 
written to Channel in one batch
+batchDurationMillis             1000         Maximum time (in ms) before a 
batch will be written to Channel
+                                             The batch will be written 
whenever the first of size and time will be reached.
+kafka.auto.commit.enable        false        If true, Kafka will commit events 
automatically - faster but less durable option.
+                                             when false, the Kafka Source will 
commit events before writing batch to channel
+consumer.timeout.ms             10           Polling interval for new data for 
batch.
+                                             Low value means more CPU usage.
+                                             High value means the 
maxBatchDurationMillis may be missed while waiting for
+                                             additional data.
+===========================     ===========  
===================================================
+
+Example for agent named tier1:
+
+.. code-block:: properties
+
+    tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
+    tier1.sources.source1.channels = channel1
+    tier1.sources.source1.kafka.zookeeper.connect = localhost:2181
+    tier1.sources.source1.topic = test1
+    tier1.sources.source1.kafka.group.id = flume
+    tier1.sources.source1.kafka.consumer.timeout.ms = 100
+
+
+
 
 NetCat Source
 ~~~~~~~~~~~~~

http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml 
b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
index 307fa59..746a395 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
+++ b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
@@ -61,7 +61,6 @@
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.10</artifactId>
-      <version>0.8.1.1</version>
     </dependency>
   </dependencies>
 

http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-sources/flume-kafka-source/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/pom.xml 
b/flume-ng-sources/flume-kafka-source/pom.xml
new file mode 100644
index 0000000..8ad29d7
--- /dev/null
+++ b/flume-ng-sources/flume-kafka-source/pom.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <parent>
+    <artifactId>flume-ng-sources</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.6.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>org.apache.flume.flume-ng-sources</groupId>
+  <artifactId>flume-kafka-source</artifactId>
+  <name>Flume Kafka Source</name>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.10</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.10</artifactId>
+      <classifier>test</classifier>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
new file mode 100644
index 0000000..da78f80
--- /dev/null
+++ 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.source.kafka;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.ConsumerTimeoutException;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+
+import org.apache.flume.*;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.AbstractSource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A Source for Kafka which reads messages from kafka.
+ * I use this in company production environment and its performance is good.
+ * Over 100k messages per second can be read from kafka in one source.<p>
+ * <tt>kafka.zookeeper.connect: </tt> the zookeeper ip kafka use.<p>
+ * <tt>kafka.group.id: </tt> the groupid of consumer group.<p>
+ * <tt>topic: </tt> the topic to read from kafka.<p>
+ * maxBatchSize - maximum number of messages written to Channel in one batch
+ * maxBatchDurationMillis - maximum time before a batch (of any size)
+ *                          will be written to Channel
+ * kafka.auto.commit.enable - if true, commit automatically every time period.
+ *                      if false, commit on each batch.
+ * kafka.consumer.timeout.ms -  polling interval for new data for batch.
+ *                        Low value means more CPU usage.
+ *                        High value means the time.upper.limit may be missed.
+ *
+ * Any property starting with "kafka" will be passed to the kafka consumer
+ * So you can use any configuration supported by Kafka 0.8.1.1
+ */
+public class KafkaSource extends AbstractSource
+        implements Configurable, PollableSource {
+  private static final Logger log = LoggerFactory.getLogger(KafkaSource.class);
+  private ConsumerConnector consumer;
+  private ConsumerIterator<byte[],byte[]> it;
+  private String topic;
+  private int batchUpperLimit;
+  private int timeUpperLimit;
+  private int consumerTimeout;
+  private boolean kafkaAutoCommitEnabled;
+  private Context context;
+  private final List<Event> eventList = new ArrayList<Event>();
+
+  public Status process() throws EventDeliveryException {
+    eventList.clear();
+    byte[] bytes;
+    Event event;
+    Map<String, String> headers;
+    try {
+      int eventCounter = 0;
+      int timeWaited = 0;
+      IterStatus iterStatus = new IterStatus(false, -1);
+      while (eventCounter < batchUpperLimit && timeWaited < timeUpperLimit) {
+        iterStatus = timedHasNext();
+        if (iterStatus.hasData()) {
+          // get next message
+          bytes = it.next().message();
+
+          headers = new HashMap<String, String>();
+          headers.put(KafkaSourceConstants.TIMESTAMP,
+                  String.valueOf(System.currentTimeMillis()));
+          headers.put(KafkaSourceConstants.TOPIC,topic);
+          if (log.isDebugEnabled()) {
+            log.debug("Message: {}", new String(bytes));
+          }
+          event = EventBuilder.withBody(bytes, headers);
+          eventList.add(event);
+          eventCounter++;
+        }
+        timeWaited += iterStatus.getWaitTime();
+        if (log.isDebugEnabled()) {
+          log.debug("Waited: {} ", timeWaited);
+          log.debug("Event #: {}", eventCounter);
+        }
+      }
+      // If we have events, send events to channel
+      // and commit if Kafka doesn't auto-commit
+      if (eventCounter > 0) {
+        getChannelProcessor().processEventBatch(eventList);
+        if (!kafkaAutoCommitEnabled) {
+          // commit the read transactions to Kafka to avoid duplicates
+          consumer.commitOffsets();
+        }
+      }
+      if (!iterStatus.hasData()) {
+        if (log.isDebugEnabled()) {
+          log.debug("Returning with backoff. No more data to read");
+        }
+        return Status.BACKOFF;
+      }
+      return Status.READY;
+    } catch (Exception e) {
+      log.error("KafkaSource EXCEPTION, {}", e);
+      return Status.BACKOFF;
+    }
+  }
+
+  public void configure(Context context) {
+    this.context = context;
+    batchUpperLimit = context.getInteger(KafkaSourceConstants.BATCH_SIZE,
+            KafkaSourceConstants.DEFAULT_BATCH_SIZE);
+    timeUpperLimit = context.getInteger(KafkaSourceConstants.BATCH_DURATION_MS,
+            KafkaSourceConstants.DEFAULT_BATCH_DURATION);
+    topic = context.getString(KafkaSourceConstants.TOPIC);
+
+    //if consumer timeout and autocommit were not set by user,
+    // set them to 10ms and false
+    consumerTimeout = context.getInteger(KafkaSourceConstants.CONSUMER_TIMEOUT,
+            KafkaSourceConstants.DEFAULT_CONSUMER_TIMEOUT);
+    context.put(KafkaSourceConstants.CONSUMER_TIMEOUT,
+            Integer.toString(consumerTimeout));
+    String autoCommit = context.getString(
+            KafkaSourceConstants.AUTO_COMMIT_ENABLED,
+            String.valueOf(KafkaSourceConstants.DEFAULT_AUTO_COMMIT));
+    kafkaAutoCommitEnabled = Boolean.valueOf(autoCommit);
+    context.put(KafkaSourceConstants.AUTO_COMMIT_ENABLED,autoCommit);
+
+    if(topic == null) {
+      throw new ConfigurationException("Kafka topic must be specified.");
+    }
+  }
+
+  @Override
+  public synchronized void start() {
+    log.info("Starting {}...", this);
+
+    try {
+      //initialize a consumer. This creates the connection to ZooKeeper
+      consumer = KafkaSourceUtil.getConsumer(context);
+    } catch (Exception e) {
+      throw new FlumeException("Unable to create consumer. " +
+              "Check whether the ZooKeeper server is up and that the " +
+              "Flume agent can connect to it.", e);
+    }
+
+    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
+    // We always have just one topic being read by one thread
+    topicCountMap.put(topic, 1);
+
+    // Get the message iterator for our topic
+    // Note that this succeeds even if the topic doesn't exist
+    // in that case we simply get no messages for the topic
+    // Also note that currently we only support a single topic
+    try {
+      Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
+              consumer.createMessageStreams(topicCountMap);
+      List<KafkaStream<byte[], byte[]>> topicList = consumerMap.get(topic);
+      KafkaStream<byte[], byte[]> stream = topicList.get(0);
+      it = stream.iterator();
+    } catch (Exception e) {
+      throw new FlumeException("Unable to get message iterator from Kafka", e);
+    }
+    log.info("Kafka source {} started.", getName());
+    super.start();
+  }
+
+  @Override
+  public synchronized void stop() {
+    if (consumer != null) {
+      // exit cleanly. This syncs offsets of messages read to ZooKeeper
+      // to avoid reading the same messages again
+      consumer.shutdown();
+    }
+    super.stop();
+  }
+
+
+  /**
+   * Check if there are messages waiting in Kafka,
+   * waiting until timeout (10ms by default) for messages to arrive.
+   * And timing our wait.
+   * @return IterStatus object.
+   * Indicating whether a message was found and how long we waited for it
+   */
+  IterStatus timedHasNext() {
+    try {
+      long startTime = System.currentTimeMillis();
+      it.hasNext();
+      long endTime = System.currentTimeMillis();
+      return new IterStatus(true, endTime - startTime);
+    } catch (ConsumerTimeoutException e) {
+      return new IterStatus(false, consumerTimeout);
+    }
+  }
+
+  private class IterStatus {
+    private long waitTime;
+    private boolean hasData;
+
+
+    private IterStatus(boolean hasData,long waitTime) {
+      this.waitTime = waitTime;
+      this.hasData = hasData;
+    }
+
+    public long getWaitTime() {
+      return waitTime;
+    }
+
+    public boolean hasData() {
+      return hasData;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
new file mode 100644
index 0000000..ac86f65
--- /dev/null
+++ 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.source.kafka;
+
+public class KafkaSourceConstants {
+  public static final String TOPIC = "topic";
+  public static final String TIMESTAMP = "timestamp";
+  public static final String BATCH_SIZE = "batchSize";
+  public static final String BATCH_DURATION_MS = "batchDurationMillis";
+  public static final String CONSUMER_TIMEOUT = "kafka.consumer.timeout.ms";
+  public static final String AUTO_COMMIT_ENABLED = "kafka.auto.commit.enabled";
+  public static final String ZOOKEEPER_CONNECT = "kafka.zookeeper.connect";
+  public static final String GROUP_ID = "kafka.group.id";
+  public static final String PROPERTY_PREFIX = "kafka";
+
+
+  public static final int DEFAULT_BATCH_SIZE = 1000;
+  public static final int DEFAULT_BATCH_DURATION = 1000;
+  public static final int DEFAULT_CONSUMER_TIMEOUT = 10;
+  public static final boolean DEFAULT_AUTO_COMMIT =  false;
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java
 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java
new file mode 100644
index 0000000..8397272
--- /dev/null
+++ 
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.source.kafka;
+
+import java.util.Map;
+import java.util.Properties;
+
+import kafka.common.KafkaException;
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.javaapi.consumer.ConsumerConnector;
+
+import org.apache.flume.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaSourceUtil {
+  private static final Logger log =
+          LoggerFactory.getLogger(KafkaSourceUtil.class);
+
+  public static Properties getKafkaConfigProperties(Context context) {
+    log.info("context={}",context.toString());
+    Properties props = new Properties();
+    Map<String, String> contextMap = context.getParameters();
+    for(String key : contextMap.keySet()) {
+      String value = contextMap.get(key).trim();
+      key = key.trim();
+      if (key.startsWith(KafkaSourceConstants.PROPERTY_PREFIX)) {
+      // remove the prefix
+      key = key.substring(KafkaSourceConstants.PROPERTY_PREFIX.length() + 1,
+              key.length());
+        props.put(key, value);
+        if (log.isDebugEnabled()) {
+          log.debug("Reading a Kafka Producer Property: key: " + key +
+                  ", value: " + value);
+        }
+      }
+    }
+    return props;
+  }
+
+  public static ConsumerConnector getConsumer(Context context) {
+    ConsumerConfig consumerConfig =
+            new ConsumerConfig(getKafkaConfigProperties(context));
+    ConsumerConnector consumer =
+            Consumer.createJavaConsumerConnector(consumerConfig);
+    return consumer;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
new file mode 100644
index 0000000..26c5c9d
--- /dev/null
+++ 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.source.kafka;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import kafka.admin.AdminUtils;
+import org.I0Itec.zkclient.ZkClient;
+import kafka.utils.ZKStringSerializer$;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public class KafkaSourceEmbeddedKafka {
+  KafkaServerStartable kafkaServer;
+  KafkaSourceEmbeddedZookeeper zookeeper;
+  int zkPort = 21818; // none-standard
+  Producer<String,String> producer;
+
+  public KafkaSourceEmbeddedKafka() {
+    zookeeper = new KafkaSourceEmbeddedZookeeper(zkPort);
+    Properties props = new Properties();
+    props.put("zookeeper.connect",zookeeper.getConnectString());
+    props.put("broker.id","1");
+    KafkaConfig config = new KafkaConfig(props);
+    kafkaServer = new KafkaServerStartable(config);
+    kafkaServer.startup();
+    initProducer();
+  }
+
+  public void stop() throws IOException {
+    producer.close();
+    kafkaServer.shutdown();
+    zookeeper.stopZookeeper();
+  }
+
+  public String getZkConnectString() {
+    return zookeeper.getConnectString();
+  }
+
+  private void initProducer()
+  {
+    Properties props = new Properties();
+    props.put("metadata.broker.list","127.0.0.1:" +
+            kafkaServer.serverConfig().port());
+    props.put("serializer.class","kafka.serializer.StringEncoder");
+    props.put("request.required.acks", "1");
+
+    ProducerConfig config = new ProducerConfig(props);
+
+    producer = new Producer<String,String>(config);
+
+  }
+
+  public void produce(String topic, String k, String v) {
+    KeyedMessage<String,String> message = new 
KeyedMessage<String,String>(topic,k,v);
+    producer.send(message);
+  }
+
+  public void createTopic(String topicName) {
+    // Create a ZooKeeper client
+    int sessionTimeoutMs = 10000;
+    int connectionTimeoutMs = 10000;
+    ZkClient zkClient = new ZkClient(zookeeper.getConnectString(),
+            sessionTimeoutMs, connectionTimeoutMs,
+            ZKStringSerializer$.MODULE$);
+
+    int numPartitions = 1;
+    int replicationFactor = 1;
+    Properties topicConfig = new Properties();
+    AdminUtils.createTopic(zkClient, topicName, numPartitions,
+            replicationFactor, topicConfig);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java
 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java
new file mode 100644
index 0000000..1b8a271
--- /dev/null
+++ 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.source.kafka;
+
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+public class KafkaSourceEmbeddedZookeeper {
+  private int zkPort;
+  private ZooKeeperServer zookeeper;
+  private NIOServerCnxnFactory factory;
+  File dir;
+
+
+  public KafkaSourceEmbeddedZookeeper(int zkPort){
+    int numConnections = 5000;
+    int tickTime = 2000;
+
+    this.zkPort = zkPort;
+
+    String dataDirectory = System.getProperty("java.io.tmpdir");
+    dir = new File(dataDirectory, "zookeeper").getAbsoluteFile();
+
+    try {
+      this.zookeeper = new ZooKeeperServer(dir,dir,tickTime);
+      this.factory = new NIOServerCnxnFactory();
+      factory.configure(new InetSocketAddress("127.0.0.1",zkPort),0);
+      factory.startup(zookeeper);
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  public void stopZookeeper() throws IOException {
+    zookeeper.shutdown();
+    factory.shutdown();
+    FileUtils.deleteDirectory(dir);
+  }
+
+  public String getConnectString() {
+    return "127.0.0.1:"+zkPort;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java
 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java
new file mode 100644
index 0000000..1009f1c
--- /dev/null
+++ 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.source.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Mockito.*;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import junit.framework.Assert;
+import kafka.common.TopicExistsException;
+import kafka.consumer.ConsumerIterator;
+import kafka.message.Message;
+
+import kafka.message.MessageAndMetadata;
+
+import org.apache.flume.*;
+import org.apache.flume.PollableSource.Status;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.source.AbstractSource;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaSourceTest {
+  private static final Logger log =
+          LoggerFactory.getLogger(KafkaSourceTest.class);
+
+  private KafkaSource kafkaSource;
+  private KafkaSourceEmbeddedKafka kafkaServer;
+  private ConsumerIterator<byte[], byte[]> mockIt;
+  private Message message;
+  private Context context;
+  private List<Event> events;
+  private String topicName = "test1";
+
+
+  @SuppressWarnings("unchecked")
+  @Before
+  public void setup() throws Exception {
+
+    kafkaSource = new KafkaSource();
+    kafkaServer = new KafkaSourceEmbeddedKafka();
+    try {
+      kafkaServer.createTopic(topicName);
+    } catch (TopicExistsException e) {
+      //do nothing
+    }
+
+    context = new Context();
+    context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT,
+            kafkaServer.getZkConnectString());
+    context.put(KafkaSourceConstants.GROUP_ID,"flume");
+    context.put(KafkaSourceConstants.TOPIC,topicName);
+    context.put(KafkaSourceConstants.CONSUMER_TIMEOUT,"100");
+
+    ChannelProcessor channelProcessor = mock(ChannelProcessor.class);
+
+    events = Lists.newArrayList();
+
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        events.addAll((List<Event>)invocation.getArguments()[0]);
+        return null;
+      }
+    }).when(channelProcessor).processEventBatch(any(List.class));
+    kafkaSource.setChannelProcessor(channelProcessor);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    kafkaSource.stop();
+    kafkaServer.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testProcessItNotEmpty() throws EventDeliveryException,
+          SecurityException, NoSuchFieldException, IllegalArgumentException,
+          IllegalAccessException, InterruptedException {
+    context.put(KafkaSourceConstants.BATCH_SIZE,"1");
+    kafkaSource.configure(context);
+    kafkaSource.start();
+
+    Thread.sleep(500L);
+
+    kafkaServer.produce(topicName, "", "hello, world");
+
+    Thread.sleep(500L);
+
+    Assert.assertEquals(Status.READY, kafkaSource.process());
+    Assert.assertEquals(Status.BACKOFF, kafkaSource.process());
+    Assert.assertEquals(1, events.size());
+
+    Assert.assertEquals("hello, world", new String(events.get(0).getBody(),
+            Charsets.UTF_8));
+
+
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testProcessItNotEmptyBatch() throws EventDeliveryException,
+          SecurityException, NoSuchFieldException, IllegalArgumentException,
+          IllegalAccessException, InterruptedException {
+    context.put(KafkaSourceConstants.BATCH_SIZE,"2");
+    kafkaSource.configure(context);
+    kafkaSource.start();
+
+    Thread.sleep(500L);
+
+    kafkaServer.produce(topicName, "", "hello, world");
+    kafkaServer.produce(topicName, "", "foo, bar");
+
+    Thread.sleep(500L);
+
+    Status status = kafkaSource.process();
+    assertEquals(Status.READY, status);
+    Assert.assertEquals("hello, world", new String(events.get(0).getBody(),
+            Charsets.UTF_8));
+    Assert.assertEquals("foo, bar", new String(events.get(1).getBody(),
+            Charsets.UTF_8));
+
+  }
+
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testProcessItEmpty() throws EventDeliveryException,
+          SecurityException, NoSuchFieldException, IllegalArgumentException,
+          IllegalAccessException, InterruptedException {
+    kafkaSource.configure(context);
+    kafkaSource.start();
+    Thread.sleep(500L);
+
+    Status status = kafkaSource.process();
+    assertEquals(Status.BACKOFF, status);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testNonExistingTopic() throws EventDeliveryException,
+          SecurityException, NoSuchFieldException, IllegalArgumentException,
+          IllegalAccessException, InterruptedException {
+    context.put(KafkaSourceConstants.TOPIC,"faketopic");
+    kafkaSource.configure(context);
+    kafkaSource.start();
+    Thread.sleep(500L);
+
+    Status status = kafkaSource.process();
+    assertEquals(Status.BACKOFF, status);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(expected= FlumeException.class)
+  public void testNonExistingZk() throws EventDeliveryException,
+          SecurityException, NoSuchFieldException, IllegalArgumentException,
+          IllegalAccessException, InterruptedException {
+    context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT,"blabla:666");
+    kafkaSource.configure(context);
+    kafkaSource.start();
+    Thread.sleep(500L);
+
+    Status status = kafkaSource.process();
+    assertEquals(Status.BACKOFF, status);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java
 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java
new file mode 100644
index 0000000..b9a1b25
--- /dev/null
+++ 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceUtilTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.source.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import kafka.javaapi.consumer.ConsumerConnector;
+import org.apache.flume.Context;
+import org.apache.zookeeper.server.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KafkaSourceUtilTest {
+  private Properties props = new Properties();
+  private Context context = new Context();
+  private int zkPort = 21818; // none-standard
+  private KafkaSourceEmbeddedZookeeper zookeeper;
+
+  @Before
+  public void setUp() throws Exception {
+    context.put("consumer.timeout", "10");
+    context.put("type", "KafkaSource");
+    context.put("topic", "test");
+    props = KafkaSourceUtil.getKafkaConfigProperties(context);
+    zookeeper = new KafkaSourceEmbeddedZookeeper(zkPort);
+
+
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    zookeeper.stopZookeeper();
+  }
+
+  @Test
+  public void testGetKafkaConfigParameter() {
+    assertEquals("10",props.getProperty("consumer.timeout"));
+    assertEquals("test",props.getProperty("topic"));
+    assertNull(props.getProperty("type"));
+  }
+
+
+  @Test
+  public void testGetConsumer() {
+    context.put("zookeeper.connect", "127.0.0.1:"+zkPort);
+    context.put("group.id","test");
+
+    ConsumerConnector cc = KafkaSourceUtil.getConsumer(context);
+    assertNotNull(cc);
+
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-sources/flume-kafka-source/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/test/resources/log4j.properties 
b/flume-ng-sources/flume-kafka-source/src/test/resources/log4j.properties
new file mode 100644
index 0000000..78b1067
--- /dev/null
+++ b/flume-ng-sources/flume-kafka-source/src/test/resources/log4j.properties
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+log4j.rootLogger = INFO, out
+
+log4j.appender.out = org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout = org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
+
+log4j.logger.org.apache.flume = INFO
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/flume-ng-sources/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sources/pom.xml b/flume-ng-sources/pom.xml
index c03307a..ab8eca4 100644
--- a/flume-ng-sources/pom.xml
+++ b/flume-ng-sources/pom.xml
@@ -44,6 +44,7 @@ limitations under the License.
     <module>flume-scribe-source</module>
     <module>flume-jms-source</module>
     <module>flume-twitter-source</module>
+    <module>flume-kafka-source</module>
   </modules>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/9cc85082/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 740edc2..8ee82f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1169,6 +1169,12 @@ limitations under the License.
       </dependency>
 
       <dependency>
+        <groupId>org.apache.flume.flume-ng-sources</groupId>
+        <artifactId>flume-kafka-source</artifactId>
+        <version>1.6.0-SNAPSHOT</version>
+      </dependency>
+
+      <dependency>
         <groupId>org.apache.flume.flume-ng-legacy-sources</groupId>
         <artifactId>flume-avro-source</artifactId>
         <version>1.6.0-SNAPSHOT</version>
@@ -1270,6 +1276,20 @@ limitations under the License.
         <version>3.0.3</version>
       </dependency>
 
+      <!-- Dependencies of Kafka source -->
+      <dependency>
+        <groupId>org.apache.kafka</groupId>
+        <artifactId>kafka_2.10</artifactId>
+        <version>0.8.1.1</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.kafka</groupId>
+        <artifactId>kafka_2.10</artifactId>
+        <version>0.8.1.1</version>
+        <classifier>test</classifier>
+        <scope>test</scope>
+      </dependency>
+
       <dependency>
         <groupId>org.kitesdk</groupId>
         <artifactId>kite-data-core</artifactId>

Reply via email to