Repository: flume
Updated Branches:
  refs/heads/trunk ea2fcdaa5 -> aef02df10


FLUME-2500: Add a channel that uses Kafka

(Hari Shreedharan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/aef02df1
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/aef02df1
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/aef02df1

Branch: refs/heads/trunk
Commit: aef02df10a26a6b6911b771a506994f2069857cf
Parents: ea2fcda
Author: Jarek Jarcec Cecho <[email protected]>
Authored: Tue Oct 28 14:59:23 2014 -0700
Committer: Jarek Jarcec Cecho <[email protected]>
Committed: Tue Oct 28 14:59:23 2014 -0700

----------------------------------------------------------------------
 flume-ng-channels/flume-kafka-channel/pom.xml   |  54 +++
 .../flume/channel/kafka/KafkaChannel.java       | 411 ++++++++++++++++++
 .../kafka/KafkaChannelConfiguration.java        |  44 ++
 .../flume/channel/kafka/TestKafkaChannel.java   | 418 +++++++++++++++++++
 .../src/test/resources/kafka-server.properties  | 118 ++++++
 .../src/test/resources/log4j.properties         |  78 ++++
 .../src/test/resources/zookeeper.properties     |  20 +
 flume-ng-channels/pom.xml                       |   1 +
 flume-ng-sinks/flume-ng-kafka-sink/pom.xml      |  11 +
 .../flume/sink/kafka/util/KafkaConsumer.java    |   2 +-
 .../apache/flume/sink/kafka/util/TestUtil.java  |   1 +
 11 files changed, 1157 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/aef02df1/flume-ng-channels/flume-kafka-channel/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/pom.xml 
b/flume-ng-channels/flume-kafka-channel/pom.xml
new file mode 100644
index 0000000..2da98b9
--- /dev/null
+++ b/flume-ng-channels/flume-kafka-channel/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <parent>
+    <artifactId>flume-ng-channels</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.6.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>org.apache.flume.flume-ng-channels</groupId>
+  <artifactId>flume-kafka-channel</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.10</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flume.flume-ng-sinks</groupId>
+      <artifactId>flume-ng-kafka-sink</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/aef02df1/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
 
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
new file mode 100644
index 0000000..d767aac
--- /dev/null
+++ 
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.kafka;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import kafka.consumer.*;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.apache.avro.io.*;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.flume.*;
+import org.apache.flume.channel.BasicChannelSemantics;
+import org.apache.flume.channel.BasicTransactionSemantics;
+import org.apache.flume.conf.ConfigurationException;
+
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
+
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class KafkaChannel extends BasicChannelSemantics {
+
+  private final static Logger LOGGER =
+    LoggerFactory.getLogger(KafkaChannel.class);
+
+
+  private final Properties kafkaConf = new Properties();
+  private Producer<String, byte[]> producer;
+  private final String channelUUID = UUID.randomUUID().toString();
+
+  private AtomicReference<String> topic = new AtomicReference<String>();
+  private boolean parseAsFlumeEvent = DEFAULT_PARSE_AS_FLUME_EVENT;
+  private final Map<String, Integer> topicCountMap =
+    Collections.synchronizedMap(new HashMap<String, Integer>());
+
+  // Track all consumers to close them eventually.
+  private final List<ConsumerAndIterator> consumers =
+    Collections.synchronizedList(new LinkedList<ConsumerAndIterator>());
+
+  /* Each ConsumerConnector commit will commit all partitions owned by it. To
+   * ensure that each partition is only committed when all events are
+   * actually done, we will need to keep a ConsumerConnector per thread.
+   * See Neha's answer here:
+   * http://grokbase.com/t/kafka/users/13b4gmk2jk/commit-offset-per-topic
+   * Since only one consumer connector will a partition at any point in time,
+   * when we commit the partition we would have committed all events to the
+   * final destination from that partition.
+   *
+   * If a new partition gets assigned to this connector,
+   * my understanding is that all message from the last partition commit will
+   * get replayed which may cause duplicates -- which is fine as this
+   * happens only on partition rebalancing which is on failure or new nodes
+   * coming up, which is rare.
+   */
+  private final ThreadLocal<ConsumerAndIterator> consumerAndIter = new
+    ThreadLocal<ConsumerAndIterator>() {
+      @Override
+      public ConsumerAndIterator initialValue() {
+        return createConsumerAndIter();
+      }
+    };
+
+  @Override
+  public void start() {
+    try {
+      LOGGER.info("Starting Kafka Channel: " + getName());
+      producer = new Producer<String, byte[]>(new ProducerConfig(kafkaConf));
+      // We always have just one topic being read by one thread
+      LOGGER.info("Topic = " + topic.get());
+      topicCountMap.put(topic.get(), 1);
+      super.start();
+    } catch (Exception e) {
+      LOGGER.error("Could not start producer");
+      throw new FlumeException("Unable to create Kafka Connections. " +
+        "Check whether Kafka Brokers are up and that the " +
+        "Flume agent can connect to it.", e);
+    }
+  }
+
+  @Override
+  public void stop() {
+    for (ConsumerAndIterator c : consumers) {
+      try {
+        decommissionConsumerAndIterator(c);
+      } catch (Exception ex) {
+        LOGGER.warn("Error while shutting down consumer.", ex);
+      }
+    }
+    producer.close();
+    super.stop();
+  }
+
+  @Override
+  protected BasicTransactionSemantics createTransaction() {
+    return new KafkaTransaction();
+  }
+
+  private synchronized ConsumerAndIterator createConsumerAndIter() {
+    try {
+      ConsumerConfig consumerConfig = new ConsumerConfig(kafkaConf);
+      ConsumerConnector consumer =
+        Consumer.createJavaConsumerConnector(consumerConfig);
+      Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
+        consumer.createMessageStreams(topicCountMap);
+      final List<KafkaStream<byte[], byte[]>> streamList = consumerMap
+        .get(topic.get());
+      KafkaStream<byte[], byte[]> stream = streamList.remove(0);
+      ConsumerAndIterator ret =
+        new ConsumerAndIterator(consumer, stream.iterator(), channelUUID);
+      consumers.add(ret);
+      LOGGER.info("Created new consumer to connect to Kafka");
+      return ret;
+    } catch (Exception e) {
+      throw new FlumeException("Unable to connect to Kafka", e);
+    }
+  }
+
+  Properties getKafkaConf() {
+    return kafkaConf;
+  }
+
+  @Override
+  public void configure(Context ctx) {
+    String topicStr = ctx.getString(TOPIC);
+    if (topicStr == null || topicStr.isEmpty()) {
+      topicStr = DEFAULT_TOPIC;
+      LOGGER
+        .info("Topic was not specified. Using " + topicStr + " as the topic.");
+    }
+    topic.set(topicStr);
+    String groupId = ctx.getString(GROUP_ID_FLUME);
+    if (groupId == null || groupId.isEmpty()) {
+      groupId = DEFAULT_GROUP_ID;
+      LOGGER.info(
+        "Group ID was not specified. Using " + groupId + " as the group id.");
+    }
+    String brokerList = ctx.getString(BROKER_LIST_FLUME_KEY);
+    if (brokerList == null || brokerList.isEmpty()) {
+      throw new ConfigurationException("Broker List must be specified");
+    }
+    String zkConnect = ctx.getString(ZOOKEEPER_CONNECT_FLUME_KEY);
+    if (zkConnect == null || zkConnect.isEmpty()) {
+      throw new ConfigurationException(
+        "Zookeeper Connection must be specified");
+    }
+    Long timeout = ctx.getLong(TIMEOUT, Long.valueOf(DEFAULT_TIMEOUT));
+    kafkaConf.putAll(ctx.getSubProperties(KAFKA_PREFIX));
+    kafkaConf.put(GROUP_ID, groupId);
+    kafkaConf.put(BROKER_LIST_KEY, brokerList);
+    kafkaConf.put(ZOOKEEPER_CONNECT, zkConnect);
+    kafkaConf.put(AUTO_COMMIT_ENABLED, String.valueOf(false));
+    kafkaConf.put(CONSUMER_TIMEOUT, String.valueOf(timeout));
+    kafkaConf.put(REQUIRED_ACKS_KEY, "-1");
+    LOGGER.info(kafkaConf.toString());
+    parseAsFlumeEvent =
+      ctx.getBoolean(PARSE_AS_FLUME_EVENT, DEFAULT_PARSE_AS_FLUME_EVENT);
+
+    boolean readSmallest = ctx.getBoolean(READ_SMALLEST_OFFSET,
+      DEFAULT_READ_SMALLEST_OFFSET);
+    // If the data is to be parsed as Flume events, we always read the 
smallest.
+    // Else, we read the configuration, which by default reads the largest.
+    if (parseAsFlumeEvent || readSmallest) {
+      // readSmallest is eval-ed only if parseAsFlumeEvent is false.
+      // The default is largest, so we don't need to set it explicitly.
+      kafkaConf.put("auto.offset.reset", "smallest");
+    }
+
+  }
+
+  private void decommissionConsumerAndIterator(ConsumerAndIterator c) {
+    if (c.failedEvents.isEmpty()) {
+      c.consumer.commitOffsets();
+    }
+    c.failedEvents.clear();
+    c.consumer.shutdown();
+  }
+
+  // Force a consumer to be initialized. There are  many duplicates in
+  // tests due to rebalancing - making testing tricky. In production,
+  // this is less of an issue as
+  // rebalancing would happen only on startup.
+  @VisibleForTesting
+  void registerThread() {
+    consumerAndIter.get();
+  }
+
+  private enum TransactionType {
+    PUT,
+    TAKE,
+    NONE
+  }
+
+
+  private class KafkaTransaction extends BasicTransactionSemantics {
+
+    private TransactionType type = TransactionType.NONE;
+    // For Puts
+    private Optional<ByteArrayOutputStream> tempOutStream = Optional
+      .absent();
+
+    // For put transactions, serialize the events and batch them and send it.
+    private Optional<LinkedList<byte[]>> serializedEvents = Optional.absent();
+    // For take transactions, deserialize and hold them till commit goes 
through
+    private Optional<LinkedList<Event>> events = Optional.absent();
+    private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer =
+      Optional.absent();
+    private Optional<SpecificDatumReader<AvroFlumeEvent>> reader =
+      Optional.absent();
+
+    // Fine to use null for initial value, Avro will create new ones if this
+    // is null
+    private BinaryEncoder encoder = null;
+    private BinaryDecoder decoder = null;
+    private final String batchUUID = UUID.randomUUID().toString();
+    private boolean eventTaken = false;
+
+    @Override
+    protected void doPut(Event event) throws InterruptedException {
+      type = TransactionType.PUT;
+      if (!serializedEvents.isPresent()) {
+        serializedEvents = Optional.of(new LinkedList<byte[]>());
+      }
+
+      try {
+        if (!tempOutStream.isPresent()) {
+          tempOutStream = Optional.of(new ByteArrayOutputStream());
+        }
+        if (!writer.isPresent()) {
+          writer = Optional.of(new
+            SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class));
+        }
+        tempOutStream.get().reset();
+        AvroFlumeEvent e = new AvroFlumeEvent(
+          toCharSeqMap(event.getHeaders()), ByteBuffer.wrap(event.getBody()));
+        encoder = EncoderFactory.get()
+          .directBinaryEncoder(tempOutStream.get(), encoder);
+        writer.get().write(e, encoder);
+        // Not really possible to avoid this copy :(
+        serializedEvents.get().add(tempOutStream.get().toByteArray());
+      } catch (Exception e) {
+        throw new ChannelException("Error while serializing event", e);
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Event doTake() throws InterruptedException {
+      type = TransactionType.TAKE;
+      try {
+        if (!(consumerAndIter.get().uuid.equals(channelUUID))) {
+          LOGGER.info("UUID mismatch, creating new consumer");
+          decommissionConsumerAndIterator(consumerAndIter.get());
+          consumerAndIter.remove();
+        }
+      } catch (Exception ex) {
+        LOGGER.warn("Error while shutting down consumer", ex);
+      }
+      if (!events.isPresent()) {
+        events = Optional.of(new LinkedList<Event>());
+      }
+      Event e;
+      if (!consumerAndIter.get().failedEvents.isEmpty()) {
+        e = consumerAndIter.get().failedEvents.removeFirst();
+      } else {
+        try {
+          ConsumerIterator<byte[], byte[]> it = consumerAndIter.get().iterator;
+          it.hasNext();
+          if (parseAsFlumeEvent) {
+            ByteArrayInputStream in =
+              new ByteArrayInputStream(it.next().message());
+            decoder = DecoderFactory.get().directBinaryDecoder(in, decoder);
+            if (!reader.isPresent()) {
+              reader = Optional.of(
+                new SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class));
+            }
+            AvroFlumeEvent event = reader.get().read(null, decoder);
+            e = EventBuilder.withBody(event.getBody().array(),
+              toStringMap(event.getHeaders()));
+          } else {
+            e = EventBuilder.withBody(it.next().message(),
+              Collections.EMPTY_MAP);
+          }
+
+        } catch (ConsumerTimeoutException ex) {
+          if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Timed out while waiting for data to come from Kafka",
+              ex);
+          }
+          return null;
+        } catch (Exception ex) {
+          LOGGER.warn("Error while getting events from Kafka", ex);
+          throw new ChannelException("Error while getting events from Kafka",
+            ex);
+        }
+      }
+      eventTaken = true;
+      events.get().add(e);
+      return e;
+    }
+
+    @Override
+    protected void doCommit() throws InterruptedException {
+      if (type.equals(TransactionType.NONE)) {
+        return;
+      }
+      if (type.equals(TransactionType.PUT)) {
+        try {
+          List<KeyedMessage<String, byte[]>> messages = new
+            ArrayList<KeyedMessage<String, byte[]>>(serializedEvents.get()
+            .size());
+          for (byte[] event : serializedEvents.get()) {
+            messages.add(new KeyedMessage<String, byte[]>(topic.get(), null,
+              batchUUID, event));
+          }
+          producer.send(messages);
+          serializedEvents.get().clear();
+        } catch (Exception ex) {
+          LOGGER.warn("Sending events to Kafka failed", ex);
+          throw new ChannelException("Commit failed as send to Kafka failed",
+            ex);
+        }
+      } else {
+        if (consumerAndIter.get().failedEvents.isEmpty() && eventTaken) {
+          consumerAndIter.get().consumer.commitOffsets();
+        }
+        events.get().clear();
+      }
+    }
+
+    @Override
+    protected void doRollback() throws InterruptedException {
+      if (type.equals(TransactionType.NONE)) {
+        return;
+      }
+      if (type.equals(TransactionType.PUT)) {
+        serializedEvents.get().clear();
+      } else {
+        consumerAndIter.get().failedEvents.addAll(events.get());
+        events.get().clear();
+      }
+    }
+  }
+
+
+  private class ConsumerAndIterator {
+    final ConsumerConnector consumer;
+    final ConsumerIterator<byte[], byte[]> iterator;
+    final String uuid;
+    final LinkedList<Event> failedEvents = new LinkedList<Event>();
+
+    ConsumerAndIterator(ConsumerConnector consumerConnector,
+      ConsumerIterator<byte[], byte[]> iterator, String uuid) {
+      this.consumer = consumerConnector;
+      this.iterator = iterator;
+      this.uuid = uuid;
+    }
+  }
+
+  /**
+   * Helper function to convert a map of String to a map of CharSequence.
+   */
+  private static Map<CharSequence, CharSequence> toCharSeqMap(
+    Map<String, String> stringMap) {
+    Map<CharSequence, CharSequence> charSeqMap =
+      new HashMap<CharSequence, CharSequence>();
+    for (Map.Entry<String, String> entry : stringMap.entrySet()) {
+      charSeqMap.put(entry.getKey(), entry.getValue());
+    }
+    return charSeqMap;
+  }
+
+  /**
+   * Helper function to convert a map of CharSequence to a map of String.
+   */
+  private static Map<String, String> toStringMap(
+    Map<CharSequence, CharSequence> charSeqMap) {
+    Map<String, String> stringMap =
+      new HashMap<String, String>();
+    for (Map.Entry<CharSequence, CharSequence> entry : charSeqMap.entrySet()) {
+      stringMap.put(entry.getKey().toString(), entry.getValue().toString());
+    }
+    return stringMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/aef02df1/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
 
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
new file mode 100644
index 0000000..9a342ef
--- /dev/null
+++ 
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannelConfiguration.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.kafka;
+
+public class KafkaChannelConfiguration {
+
+  public static final String KAFKA_PREFIX = "kafka.";
+  public static final String BROKER_LIST_KEY = "metadata.broker.list";
+  public static final String REQUIRED_ACKS_KEY = "request.required.acks";
+  public static final String BROKER_LIST_FLUME_KEY = "brokerList";
+  public static final String TOPIC = "topic";
+  public static final String GROUP_ID = "group.id";
+  public static final String GROUP_ID_FLUME = "groupId";
+  public static final String AUTO_COMMIT_ENABLED = "auto.commit.enable";
+  public static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
+  public static final String ZOOKEEPER_CONNECT_FLUME_KEY = "zookeeperConnect";
+  public static final String DEFAULT_GROUP_ID = "flume";
+  public static final String DEFAULT_TOPIC = "flume-channel";
+  public static final String TIMEOUT = "timeout";
+  public static final String DEFAULT_TIMEOUT = "100";
+  public static final String CONSUMER_TIMEOUT = "consumer.timeout.ms";
+
+  public static final String PARSE_AS_FLUME_EVENT = "parseAsFlumeEvent";
+  public static final boolean DEFAULT_PARSE_AS_FLUME_EVENT = true;
+
+  public static final String READ_SMALLEST_OFFSET = "readSmallestOffset";
+  public static final boolean DEFAULT_READ_SMALLEST_OFFSET = false;
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/aef02df1/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
 
b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
new file mode 100644
index 0000000..e665431
--- /dev/null
+++ 
b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.kafka;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import kafka.admin.AdminUtils;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import kafka.utils.ZKStringSerializer$;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.sink.kafka.util.TestUtil;
+import org.junit.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TestKafkaChannel {
+
+  private static TestUtil testUtil = TestUtil.getInstance();
+  private String topic = null;
+  private final Set<String> usedTopics = new HashSet<String>();
+  private CountDownLatch latch = null;
+
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    testUtil.prepare();
+    Thread.sleep(2500);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    boolean topicFound = false;
+    while (!topicFound) {
+      topic = RandomStringUtils.randomAlphabetic(8);
+      if (!usedTopics.contains(topic)) {
+        usedTopics.add(topic);
+        topicFound = true;
+      }
+    }
+    try {
+      createTopic(topic);
+    } catch (Exception e) {
+    }
+    Thread.sleep(2500);
+    latch = new CountDownLatch(5);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    testUtil.tearDown();
+  }
+
+  @Test
+  public void testSuccess() throws Exception {
+    doTestSuccessRollback(false, false);
+  }
+
+  @Test
+  public void testSuccessInterleave() throws Exception {
+    doTestSuccessRollback(false, true);
+  }
+
+  @Test
+  public void testRollbacks() throws Exception {
+    doTestSuccessRollback(true, false);
+  }
+
+  @Test
+  public void testRollbacksInterleave() throws Exception {
+    doTestSuccessRollback(true, true);
+  }
+
+  private void doTestSuccessRollback(final boolean rollback,
+    final boolean interleave) throws Exception {
+    final KafkaChannel channel = startChannel(true);
+    writeAndVerify(rollback, channel, interleave);
+    channel.stop();
+  }
+
+
+  @Test
+  public void testStopAndStart() throws Exception {
+    doTestStopAndStart(false, false);
+  }
+
+  @Test
+  public void testStopAndStartWithRollback() throws Exception {
+    doTestStopAndStart(true, true);
+  }
+
+  @Test
+  public void testStopAndStartWithRollbackAndNoRetry() throws Exception {
+    doTestStopAndStart(true, false);
+  }
+
+  @Test
+  public void testNoParsingAsFlumeAgent() throws Exception {
+    final KafkaChannel channel = startChannel(false);
+    Producer<String, byte[]> producer = new Producer<String, byte[]>(
+      new ProducerConfig(channel.getKafkaConf()));
+    List<KeyedMessage<String, byte[]>> original = Lists.newArrayList();
+    for (int i = 0; i < 50; i++) {
+      KeyedMessage<String, byte[]> data = new KeyedMessage<String,
+        byte[]>(topic, null, RandomStringUtils.randomAlphabetic(6),
+        String.valueOf(i).getBytes());
+      original.add(data);
+    }
+    producer.send(original);
+    ExecutorCompletionService<Void> submitterSvc = new
+      ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
+    List<Event> events = pullEvents(channel, submitterSvc,
+      50, false, false);
+    wait(submitterSvc, 5);
+    Set<Integer> finals = Sets.newHashSet();
+    for (int i = 0; i < 50; i++) {
+      finals.add(Integer.parseInt(new String(events.get(i).getBody())));
+    }
+    for (int i = 0; i < 50; i++) {
+      Assert.assertTrue(finals.contains(i));
+      finals.remove(i);
+    }
+    Assert.assertTrue(finals.isEmpty());
+    channel.stop();
+  }
+
+  /**
+   * This method starts a channel, puts events into it. The channel is then
+   * stopped and restarted. Then we check to make sure if all events we put
+   * come out. Optionally, 10 events are rolled back,
+   * and optionally we restart the agent immediately after and we try to pull 
it
+   * out.
+   *
+   * @param rollback
+   * @param retryAfterRollback
+   * @throws Exception
+   */
+  private void doTestStopAndStart(boolean rollback,
+    boolean retryAfterRollback) throws Exception {
+    final KafkaChannel channel = startChannel(true);
+    ExecutorService underlying = Executors
+      .newCachedThreadPool();
+    ExecutorCompletionService<Void> submitterSvc =
+      new ExecutorCompletionService<Void>(underlying);
+    final List<List<Event>> events = createBaseList();
+    putEvents(channel, events, submitterSvc);
+    int completed = 0;
+    wait(submitterSvc, 5);
+    channel.stop();
+    final KafkaChannel channel2 = startChannel(true);
+    int total = 50;
+    if (rollback && !retryAfterRollback) {
+      total = 40;
+    }
+    final List<Event> eventsPulled =
+      pullEvents(channel2, submitterSvc, total, rollback, retryAfterRollback);
+    wait(submitterSvc, 5);
+    channel2.stop();
+    if (!retryAfterRollback && rollback) {
+      final KafkaChannel channel3 = startChannel(true);
+      int expectedRemaining = 50 - eventsPulled.size();
+      final List<Event> eventsPulled2 =
+        pullEvents(channel3, submitterSvc, expectedRemaining, false, false);
+      wait(submitterSvc, 5);
+      Assert.assertEquals(expectedRemaining, eventsPulled2.size());
+      eventsPulled.addAll(eventsPulled2);
+      channel3.stop();
+    }
+    underlying.shutdownNow();
+    verify(eventsPulled);
+  }
+
+  private KafkaChannel startChannel(boolean parseAsFlume) throws Exception {
+    Context context = prepareDefaultContext(parseAsFlume);
+    final KafkaChannel channel = new KafkaChannel();
+    Configurables.configure(channel, context);
+    channel.start();
+    return channel;
+  }
+
+  private void writeAndVerify(final boolean testRollbacks,
+    final KafkaChannel channel) throws Exception {
+    writeAndVerify(testRollbacks, channel, false);
+  }
+
+  private void writeAndVerify(final boolean testRollbacks,
+    final KafkaChannel channel, final boolean interleave) throws Exception {
+
+    final List<List<Event>> events = createBaseList();
+
+    ExecutorCompletionService<Void> submitterSvc =
+      new ExecutorCompletionService<Void>(Executors
+        .newCachedThreadPool());
+
+    putEvents(channel, events, submitterSvc);
+
+    if (interleave) {
+      wait(submitterSvc, 5);
+    }
+
+    ExecutorCompletionService<Void> submitterSvc2 =
+      new ExecutorCompletionService<Void>(Executors
+        .newCachedThreadPool());
+
+    final List<Event> eventsPulled =
+      pullEvents(channel, submitterSvc2, 50, testRollbacks, true);
+
+    if (!interleave) {
+      wait(submitterSvc, 5);
+    }
+    wait(submitterSvc2, 5);
+
+    verify(eventsPulled);
+  }
+
+  private List<List<Event>> createBaseList() {
+    final List<List<Event>> events = new ArrayList<List<Event>>();
+    for (int i = 0; i < 5; i++) {
+      List<Event> eventList = new ArrayList<Event>(10);
+      events.add(eventList);
+      for (int j = 0; j < 10; j++) {
+        Map<String, String> hdrs = new HashMap<String, String>();
+        String v = (String.valueOf(i) + " - " + String
+          .valueOf(j));
+        hdrs.put("header", v);
+        eventList.add(EventBuilder.withBody(v.getBytes(), hdrs));
+      }
+    }
+    return events;
+  }
+
+  private void putEvents(final KafkaChannel channel, final List<List<Event>>
+    events, ExecutorCompletionService<Void> submitterSvc) {
+    for (int i = 0; i < 5; i++) {
+      final int index = i;
+      submitterSvc.submit(new Callable<Void>() {
+        @Override
+        public Void call() {
+          Transaction tx = channel.getTransaction();
+          tx.begin();
+          List<Event> eventsToPut = events.get(index);
+          for (int j = 0; j < 10; j++) {
+            channel.put(eventsToPut.get(j));
+          }
+          try {
+            tx.commit();
+          } finally {
+            tx.close();
+          }
+          return null;
+        }
+      });
+    }
+  }
+
+  private List<Event> pullEvents(final KafkaChannel channel,
+    ExecutorCompletionService<Void> submitterSvc, final int total,
+    final boolean testRollbacks, final boolean retryAfterRollback) {
+    final List<Event> eventsPulled = Collections.synchronizedList(new
+      ArrayList<Event>(50));
+    final CyclicBarrier barrier = new CyclicBarrier(5);
+    final AtomicInteger counter = new AtomicInteger(0);
+    final AtomicInteger rolledBackCount = new AtomicInteger(0);
+    final AtomicBoolean startedGettingEvents = new AtomicBoolean(false);
+    final AtomicBoolean rolledBack = new AtomicBoolean(false);
+    for (int k = 0; k < 5; k++) {
+      final int index = k;
+      submitterSvc.submit(new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          Transaction tx = null;
+          final List<Event> eventsLocal = Lists.newLinkedList();
+          int takenByThisThread = 0;
+          channel.registerThread();
+          Thread.sleep(1000);
+          barrier.await();
+          while (counter.get() < (total - rolledBackCount.get())) {
+            if (tx == null) {
+              tx = channel.getTransaction();
+              tx.begin();
+            }
+            try {
+              Event e = channel.take();
+              if (e != null) {
+                startedGettingEvents.set(true);
+                eventsLocal.add(e);
+              } else {
+                if (testRollbacks &&
+                  index == 4 &&
+                  (!rolledBack.get()) &&
+                  startedGettingEvents.get()) {
+                  tx.rollback();
+                  tx.close();
+                  tx = null;
+                  rolledBack.set(true);
+                  final int eventsLocalSize = eventsLocal.size();
+                  eventsLocal.clear();
+                  if (!retryAfterRollback) {
+                    rolledBackCount.set(eventsLocalSize);
+                    return null;
+                  }
+                } else {
+                  tx.commit();
+                  tx.close();
+                  tx = null;
+                  eventsPulled.addAll(eventsLocal);
+                  counter.getAndAdd(eventsLocal.size());
+                  eventsLocal.clear();
+                }
+              }
+            } catch (Exception ex) {
+              eventsLocal.clear();
+              if (tx != null) {
+                tx.rollback();
+                tx.close();
+              }
+              tx = null;
+              ex.printStackTrace();
+            }
+          }
+          // Close txn.
+          return null;
+        }
+      });
+    }
+    return eventsPulled;
+  }
+
+  private void wait(ExecutorCompletionService<Void> submitterSvc, int max)
+    throws Exception {
+    int completed = 0;
+    while (completed < max) {
+      submitterSvc.take();
+      completed++;
+    }
+  }
+
+  private void verify(List<Event> eventsPulled) {
+    Assert.assertFalse(eventsPulled.isEmpty());
+    Assert.assertEquals(50, eventsPulled.size());
+    Set<String> eventStrings = new HashSet<String>();
+    for (Event e : eventsPulled) {
+      Assert
+        .assertEquals(e.getHeaders().get("header"), new String(e.getBody()));
+      eventStrings.add(e.getHeaders().get("header"));
+    }
+    for (int i = 0; i < 5; i++) {
+      for (int j = 0; j < 10; j++) {
+        String v = String.valueOf(i) + " - " + String.valueOf(j);
+        Assert.assertTrue(eventStrings.contains(v));
+        eventStrings.remove(v);
+      }
+    }
+    Assert.assertTrue(eventStrings.isEmpty());
+  }
+
+  private Context prepareDefaultContext(boolean parseAsFlume) {
+    // Prepares a default context with Kafka Server Properties
+    Context context = new Context();
+    context.put(KafkaChannelConfiguration.BROKER_LIST_FLUME_KEY,
+      testUtil.getKafkaServerUrl());
+    context.put(KafkaChannelConfiguration.ZOOKEEPER_CONNECT_FLUME_KEY,
+      testUtil.getZkUrl());
+    context.put(KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT,
+      String.valueOf(parseAsFlume));
+    context.put(KafkaChannelConfiguration.READ_SMALLEST_OFFSET, "true");
+    context.put(KafkaChannelConfiguration.TOPIC, topic);
+    return context;
+  }
+
+  public static void createTopic(String topicName) {
+    int numPartitions = 5;
+    int sessionTimeoutMs = 10000;
+    int connectionTimeoutMs = 10000;
+    ZkClient zkClient = new ZkClient(testUtil.getZkUrl(),
+      sessionTimeoutMs, connectionTimeoutMs,
+      ZKStringSerializer$.MODULE$);
+
+    int replicationFactor = 1;
+    Properties topicConfig = new Properties();
+    AdminUtils.createTopic(zkClient, topicName, numPartitions,
+      replicationFactor, topicConfig);
+  }
+
+  public static void deleteTopic(String topicName) {
+    int sessionTimeoutMs = 10000;
+    int connectionTimeoutMs = 10000;
+    ZkClient zkClient = new ZkClient(testUtil.getZkUrl(),
+      sessionTimeoutMs, connectionTimeoutMs,
+      ZKStringSerializer$.MODULE$);
+    AdminUtils.deleteTopic(zkClient, topicName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/aef02df1/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
 
b/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
new file mode 100644
index 0000000..c10c89d
--- /dev/null
+++ 
b/flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=0
+
+############################# Socket Server Settings 
#############################
+
+# The port the socket server listens on
+port=9092
+
+# Hostname the broker will bind to. If not set, the server will bind to all 
interfaces
+#host.name=localhost
+
+# Hostname the broker will advertise to producers and consumers. If not set, 
it uses the
+# value for "host.name" if configured.  Otherwise, it will use the value 
returned from
+# java.net.InetAddress.getCanonicalHostName().
+#advertised.host.name=<hostname routable by clients>
+
+# The port to publish to ZooKeeper for clients to use. If this is not set,
+# it will publish the same port that the broker binds to.
+#advertised.port=<port accessible by clients>
+
+# The number of threads handling network requests
+num.network.threads=2
+
+# The number of threads doing disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=1048576
+
+# The maximum size of a request that the socket server will accept (protection 
against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma seperated list of directories under which to store log files
+log.dirs=target/kafka-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=5
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only 
fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data 
to disk.
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data may be lost if you are not using 
replication.
+#    2. Latency: Very large flush intervals may lead to latency spikes when 
the flush does occur as there will be a lot of data to flush.
+#    3. Throughput: The flush is generally the most expensive operation, and a 
small flush interval may lead to exceessive seeks.
+# The settings below allow one to configure the flush policy to flush data 
after a period of time or
+# every N messages (or both). This can be done globally and overridden on a 
per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy 
#############################
+
+# The following configurations control the disposal of log segments. The 
policy can
+# be set to delete segments after a period of time, or after a given size has 
accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. 
Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as 
long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log 
segment will be created.
+log.segment.bytes=536870912
+
+# The interval at which log segments are checked to see if they can be deleted 
according
+# to the retention policies
+log.retention.check.interval.ms=60000
+
+# By default the log cleaner is disabled and the log retention policy will 
default to just delete segments after their retention expires.
+# If log.cleaner.enable=true is set the cleaner will be enabled and individual 
logs can then be marked for log compaction.
+log.cleaner.enable=false
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=localhost:2181
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=1000000

http://git-wip-us.apache.org/repos/asf/flume/blob/aef02df1/flume-ng-channels/flume-kafka-channel/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-kafka-channel/src/test/resources/log4j.properties 
b/flume-ng-channels/flume-kafka-channel/src/test/resources/log4j.properties
new file mode 100644
index 0000000..b86600b
--- /dev/null
+++ b/flume-ng-channels/flume-kafka-channel/src/test/resources/log4j.properties
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+kafka.logs.dir=target/logs
+
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
+log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
+log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
+log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
+log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
+log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+# Turn on all our debugging info
+#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
+#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
+#log4j.logger.kafka.perf=DEBUG, kafkaAppender
+#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, 
kafkaAppender
+#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
+log4j.logger.kafka=INFO, kafkaAppender
+
+log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
+log4j.additivity.kafka.network.RequestChannel$=false
+
+#log4j.logger.kafka.network.Processor=TRACE, requestAppender
+#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
+#log4j.additivity.kafka.server.KafkaApis=false
+log4j.logger.kafka.request.logger=WARN, requestAppender
+log4j.additivity.kafka.request.logger=false
+
+log4j.logger.kafka.controller=TRACE, controllerAppender
+log4j.additivity.kafka.controller=false
+
+log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
+log4j.additivity.kafka.log.LogCleaner=false
+
+log4j.logger.state.change.logger=TRACE, stateChangeAppender
+log4j.additivity.state.change.logger=false

http://git-wip-us.apache.org/repos/asf/flume/blob/aef02df1/flume-ng-channels/flume-kafka-channel/src/test/resources/zookeeper.properties
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-kafka-channel/src/test/resources/zookeeper.properties 
b/flume-ng-channels/flume-kafka-channel/src/test/resources/zookeeper.properties
new file mode 100644
index 0000000..89e1b5e
--- /dev/null
+++ 
b/flume-ng-channels/flume-kafka-channel/src/test/resources/zookeeper.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=target
+# the port at which the clients will connect
+clientPort=2181
+# disable the per-ip limit on the number of connections since this is a 
non-production config
+maxClientCnxns=0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/aef02df1/flume-ng-channels/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-channels/pom.xml b/flume-ng-channels/pom.xml
index dc8dbc6..f44171d 100644
--- a/flume-ng-channels/pom.xml
+++ b/flume-ng-channels/pom.xml
@@ -44,5 +44,6 @@ limitations under the License.
     <module>flume-jdbc-channel</module>
     <module>flume-file-channel</module>
     <module>flume-spillable-memory-channel</module>
+    <module>flume-kafka-channel</module>
   </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/aef02df1/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml 
b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
index 746a395..e323658 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
+++ b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
@@ -28,6 +28,17 @@
         <groupId>org.apache.rat</groupId>
         <artifactId>apache-rat-plugin</artifactId>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 

http://git-wip-us.apache.org/repos/asf/flume/blob/aef02df1/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java
index 1c98922..d5dfbd6 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java
@@ -57,7 +57,7 @@ public class KafkaConsumer {
     Properties props = new Properties();
     props.put("zookeeper.connect", zkUrl);
     props.put("group.id", groupId);
-    props.put("zookeeper.session.timeout.ms", "400");
+    props.put("zookeeper.session.timeout.ms", "1000");
     props.put("zookeeper.sync.time.ms", "200");
     props.put("auto.commit.interval.ms", "1000");
     props.put("auto.offset.reset", "smallest");

http://git-wip-us.apache.org/repos/asf/flume/blob/aef02df1/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
index 8855c53..6405d6c 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
@@ -66,6 +66,7 @@ public class TestUtil {
     Properties kafkaProperties = new Properties();
     Properties zkProperties = new Properties();
 
+    logger.info("Starting kafka server.");
     try {
       //load properties
       zkProperties.load(Class.class.getResourceAsStream(

Reply via email to