Repository: flume
Updated Branches:
  refs/heads/trunk c8c0f9b84 -> cfbf11568


http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
 
b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
index 5423f8f..cdc09b5 100644
--- 
a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
+++ 
b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
@@ -61,6 +61,7 @@ public class TestJMSSource extends JMSMessageConsumerTestBase 
{
   private InitialContextFactory contextFactory;
   private File baseDir;
   private File passwordFile;
+
   @SuppressWarnings("unchecked")
   @Override
   void afterSetup() throws Exception {
@@ -79,10 +80,11 @@ public class TestJMSSource extends 
JMSMessageConsumerTestBase {
     }).when(channelProcessor).processEventBatch(any(List.class));
     consumerFactory = mock(JMSMessageConsumerFactory.class);
     consumer = spy(create());
-    when(consumerFactory.create(any(InitialContext.class), 
any(ConnectionFactory.class), anyString(),
-        any(JMSDestinationType.class), any(JMSDestinationLocator.class), 
anyString(), anyInt(), anyLong(),
-        any(JMSMessageConverter.class), any(Optional.class),
-        any(Optional.class))).thenReturn(consumer);
+    when(consumerFactory.create(any(InitialContext.class), 
any(ConnectionFactory.class),
+                                anyString(), any(JMSDestinationType.class),
+                                any(JMSDestinationLocator.class), anyString(), 
anyInt(), anyLong(),
+                                any(JMSMessageConverter.class), 
any(Optional.class),
+                                any(Optional.class))).thenReturn(consumer);
     when(initialContext.lookup(anyString())).thenReturn(connectionFactory);
     contextFactory = mock(InitialContextFactory.class);
     
when(contextFactory.create(any(Properties.class))).thenReturn(initialContext);
@@ -97,10 +99,12 @@ public class TestJMSSource extends 
JMSMessageConsumerTestBase {
     context.put(JMSSourceConfiguration.PROVIDER_URL, "dummy:1414");
     context.put(JMSSourceConfiguration.INITIAL_CONTEXT_FACTORY, 
"ldap://dummy:389";);
   }
+
   @Override
   void afterTearDown() throws Exception {
     FileUtils.deleteDirectory(baseDir);
   }
+
   @Test
   public void testStop() throws Exception {
     source.configure(context);
@@ -108,38 +112,45 @@ public class TestJMSSource extends 
JMSMessageConsumerTestBase {
     source.stop();
     verify(consumer).close();
   }
+
   @Test(expected = IllegalArgumentException.class)
   public void testConfigureWithoutInitialContextFactory() throws Exception {
     context.put(JMSSourceConfiguration.INITIAL_CONTEXT_FACTORY, "");
     source.configure(context);
   }
+
   @Test(expected = IllegalArgumentException.class)
   public void testConfigureWithoutProviderURL() throws Exception {
     context.put(JMSSourceConfiguration.PROVIDER_URL, "");
     source.configure(context);
   }
+
   @Test(expected = IllegalArgumentException.class)
   public void testConfigureWithoutDestinationName() throws Exception {
     context.put(JMSSourceConfiguration.DESTINATION_NAME, "");
     source.configure(context);
   }
+
   @Test(expected = FlumeException.class)
   public void testConfigureWithBadDestinationType() throws Exception {
     context.put(JMSSourceConfiguration.DESTINATION_TYPE, "DUMMY");
     source.configure(context);
   }
+
   @Test(expected = IllegalArgumentException.class)
   public void testConfigureWithEmptyDestinationType() throws Exception {
     context.put(JMSSourceConfiguration.DESTINATION_TYPE, "");
     source.configure(context);
   }
+
   @SuppressWarnings("unchecked")
   @Test
   public void testStartConsumerCreateThrowsException() throws Exception {
-    when(consumerFactory.create(any(InitialContext.class), 
any(ConnectionFactory.class), anyString(),
-        any(JMSDestinationType.class), any(JMSDestinationLocator.class), 
anyString(), anyInt(), anyLong(),
-        any(JMSMessageConverter.class), any(Optional.class),
-        any(Optional.class))).thenThrow(new RuntimeException());
+    when(consumerFactory.create(any(InitialContext.class), 
any(ConnectionFactory.class),
+                                anyString(), any(JMSDestinationType.class),
+                                any(JMSDestinationLocator.class), anyString(), 
anyInt(), anyLong(),
+                                any(JMSMessageConverter.class), 
any(Optional.class),
+                                any(Optional.class))).thenThrow(new 
RuntimeException());
     source.configure(context);
     source.start();
     try {
@@ -149,28 +160,33 @@ public class TestJMSSource extends 
JMSMessageConsumerTestBase {
 
     }
   }
+
   @Test(expected = FlumeException.class)
   public void testConfigureWithContextLookupThrowsException() throws Exception 
{
     when(initialContext.lookup(anyString())).thenThrow(new NamingException());
     source.configure(context);
   }
+
   @Test(expected = FlumeException.class)
   public void testConfigureWithContextCreateThrowsException() throws Exception 
{
     when(contextFactory.create(any(Properties.class)))
       .thenThrow(new NamingException());
     source.configure(context);
   }
+
   @Test(expected = IllegalArgumentException.class)
   public void testConfigureWithInvalidBatchSize() throws Exception {
     context.put(JMSSourceConfiguration.BATCH_SIZE, "0");
     source.configure(context);
   }
+
   @Test(expected = FlumeException.class)
   public void testConfigureWithInvalidPasswordFile() throws Exception {
     context.put(JMSSourceConfiguration.PASSWORD_FILE,
         "/dev/does/not/exist/nor/will/ever/exist");
     source.configure(context);
   }
+
   @Test
   public void testConfigureWithUserNameButNoPasswordFile() throws Exception {
     context.put(JMSSourceConfiguration.USERNAME, "dummy");
@@ -180,6 +196,7 @@ public class TestJMSSource extends 
JMSMessageConsumerTestBase {
     Assert.assertEquals(batchSize, events.size());
     assertBodyIsExpected(events);
   }
+
   @Test
   public void testConfigureWithUserNameAndPasswordFile() throws Exception {
     context.put(JMSSourceConfiguration.USERNAME, "dummy");
@@ -191,11 +208,13 @@ public class TestJMSSource extends 
JMSMessageConsumerTestBase {
     Assert.assertEquals(batchSize, events.size());
     assertBodyIsExpected(events);
   }
+
   @Test(expected = FlumeException.class)
   public void testConfigureWithInvalidConverterClass() throws Exception {
     context.put(JMSSourceConfiguration.CONVERTER_TYPE, "not a valid 
classname");
     source.configure(context);
   }
+
   @Test
   public void testProcessNoStart() throws Exception {
     try {
@@ -205,6 +224,7 @@ public class TestJMSSource extends 
JMSMessageConsumerTestBase {
 
     }
   }
+
   @Test
   public void testNonDefaultConverter() throws Exception {
     // tests that a classname can be specified
@@ -217,24 +237,26 @@ public class TestJMSSource extends 
JMSMessageConsumerTestBase {
     assertBodyIsExpected(events);
     verify(consumer).commit();
   }
-  public static class NonBuilderNonConfigurableConverter
-  implements JMSMessageConverter {
+
+  public static class NonBuilderNonConfigurableConverter implements 
JMSMessageConverter {
     @Override
     public List<Event> convert(Message message) throws JMSException {
       throw new UnsupportedOperationException();
     }
   }
-  public static class NonBuilderConfigurableConverter
-  implements JMSMessageConverter, Configurable {
+
+  public static class NonBuilderConfigurableConverter implements 
JMSMessageConverter, Configurable {
     @Override
     public List<Event> convert(Message message) throws JMSException {
       throw new UnsupportedOperationException();
     }
+
     @Override
     public void configure(Context context) {
 
     }
   }
+
   @Test
   public void testNonBuilderConfigurableConverter() throws Exception {
     // tests that a non builder by configurable converter works
@@ -247,6 +269,7 @@ public class TestJMSSource extends 
JMSMessageConsumerTestBase {
     assertBodyIsExpected(events);
     verify(consumer).commit();
   }
+
   @Test
   public void testNonBuilderNonConfigurableConverter() throws Exception {
     // tests that a non builder non configurable converter
@@ -259,6 +282,7 @@ public class TestJMSSource extends 
JMSMessageConsumerTestBase {
     assertBodyIsExpected(events);
     verify(consumer).commit();
   }
+
   @Test
   public void testProcessFullBatch() throws Exception {
     source.configure(context);
@@ -268,6 +292,7 @@ public class TestJMSSource extends 
JMSMessageConsumerTestBase {
     assertBodyIsExpected(events);
     verify(consumer).commit();
   }
+
   @Test
   public void testProcessNoEvents() throws Exception {
     when(messageConsumer.receive(anyLong())).thenReturn(null);
@@ -277,6 +302,7 @@ public class TestJMSSource extends 
JMSMessageConsumerTestBase {
     Assert.assertEquals(0, events.size());
     verify(consumer).commit();
   }
+
   @Test
   public void testProcessPartialBatch() throws Exception {
     when(messageConsumer.receiveNoWait()).thenReturn(message, (Message)null);
@@ -287,6 +313,7 @@ public class TestJMSSource extends 
JMSMessageConsumerTestBase {
     assertBodyIsExpected(events);
     verify(consumer).commit();
   }
+
   @SuppressWarnings("unchecked")
   @Test
   public void testProcessChannelProcessorThrowsChannelException() throws 
Exception {
@@ -297,6 +324,7 @@ public class TestJMSSource extends 
JMSMessageConsumerTestBase {
     Assert.assertEquals(Status.BACKOFF, source.process());
     verify(consumer).rollback();
   }
+
   @SuppressWarnings("unchecked")
   @Test
   public void testProcessChannelProcessorThrowsError() throws Exception {
@@ -312,6 +340,7 @@ public class TestJMSSource extends 
JMSMessageConsumerTestBase {
     }
     verify(consumer).rollback();
   }
+
   @Test
   public void testProcessReconnect() throws Exception {
     source.configure(context);

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
index affac03..b72c532 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
@@ -16,9 +16,9 @@
  */
 package org.apache.flume.source.kafka;
 
+import kafka.admin.AdminUtils;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServerStartable;
-import kafka.admin.AdminUtils;
 import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.io.FileUtils;
@@ -70,8 +70,9 @@ public class KafkaSourceEmbeddedKafka {
     props.put("host.name", "localhost");
     props.put("port", String.valueOf(serverPort));
     props.put("log.dir", dir.getAbsolutePath());
-    if (properties != null)
+    if (properties != null) {
       props.putAll(props);
+    }
     KafkaConfig config = new KafkaConfig(props);
     kafkaServer = new KafkaServerStartable(config);
     kafkaServer.startup();
@@ -134,12 +135,12 @@ public class KafkaSourceEmbeddedKafka {
     // Create a ZooKeeper client
     int sessionTimeoutMs = 10000;
     int connectionTimeoutMs = 10000;
-    ZkClient zkClient = ZkUtils.createZkClient(HOST + ":" + zkPort, 
sessionTimeoutMs, connectionTimeoutMs);
+    ZkClient zkClient =
+        ZkUtils.createZkClient(HOST + ":" + zkPort, sessionTimeoutMs, 
connectionTimeoutMs);
     ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
     int replicationFactor = 1;
     Properties topicConfig = new Properties();
-    AdminUtils.createTopic(zkUtils, topicName, numPartitions,
-            replicationFactor, topicConfig);
+    AdminUtils.createTopic(zkUtils, topicName, numPartitions, 
replicationFactor, topicConfig);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java
 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java
index db144c2..f04fc64 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java
@@ -31,7 +31,6 @@ public class KafkaSourceEmbeddedZookeeper {
   private NIOServerCnxnFactory factory;
   File dir;
 
-
   public KafkaSourceEmbeddedZookeeper(int zkPort) {
     int tickTime = 2000;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index b4250de..1598741 100644
--- 
a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ 
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -17,29 +17,18 @@
 
 package org.apache.flume.source.kafka;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.regex.Pattern;
-
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import junit.framework.Assert;
 import kafka.common.TopicExistsException;
-
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.flume.*;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
 import org.apache.flume.PollableSource.Status;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.source.avro.AvroFlumeEvent;
@@ -52,11 +41,36 @@ import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.flume.source.kafka.KafkaSourceConstants.*;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import static org.apache.flume.source.kafka.KafkaSourceConstants.AVRO_EVENT;
+import static 
org.apache.flume.source.kafka.KafkaSourceConstants.BATCH_DURATION_MS;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.BATCH_SIZE;
+import static 
org.apache.flume.source.kafka.KafkaSourceConstants.BOOTSTRAP_SERVERS;
+import static 
org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_AUTO_COMMIT;
+import static 
org.apache.flume.source.kafka.KafkaSourceConstants.KAFKA_CONSUMER_PREFIX;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.OLD_GROUP_ID;
+import static 
org.apache.flume.source.kafka.KafkaSourceConstants.PARTITION_HEADER;
+import static 
org.apache.flume.source.kafka.KafkaSourceConstants.TIMESTAMP_HEADER;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS_REGEX;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC_HEADER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
 
 public class TestKafkaSource {
-  private static final Logger log =
-          LoggerFactory.getLogger(TestKafkaSource.class);
+  private static final Logger log = 
LoggerFactory.getLogger(TestKafkaSource.class);
 
   private KafkaSource kafkaSource;
   private KafkaSourceEmbeddedKafka kafkaServer;
@@ -243,10 +257,10 @@ public class TestKafkaSource {
   }
 
   @SuppressWarnings("unchecked")
-  @Test(expected= FlumeException.class)
-  public void testNonExistingKafkaServer() throws EventDeliveryException,
-          SecurityException, NoSuchFieldException, IllegalArgumentException,
-          IllegalAccessException, InterruptedException {
+  @Test(expected = FlumeException.class)
+  public void testNonExistingKafkaServer() throws EventDeliveryException, 
SecurityException,
+                                                  NoSuchFieldException, 
IllegalArgumentException,
+                                                  IllegalAccessException, 
InterruptedException {
     context.put(TOPICS, topic0);
     context.put(BOOTSTRAP_SERVERS,"blabla:666");
     kafkaSource.configure(context);
@@ -258,8 +272,7 @@ public class TestKafkaSource {
   }
 
   @Test
-  public void testBatchTime() throws InterruptedException,
-          EventDeliveryException {
+  public void testBatchTime() throws InterruptedException, 
EventDeliveryException {
     context.put(TOPICS, topic0);
     context.put(BATCH_DURATION_MS, "250");
     kafkaSource.configure(context);
@@ -267,7 +280,7 @@ public class TestKafkaSource {
 
     Thread.sleep(500L);
 
-    for (int i=1; i<5000; i++) {
+    for (int i = 1; i < 5000; i++) {
       kafkaServer.produce(topic0, "", "hello, world " + i);
     }
     Thread.sleep(500L);
@@ -277,8 +290,7 @@ public class TestKafkaSource {
     Status status = kafkaSource.process();
     long endTime = System.currentTimeMillis();
     assertEquals(Status.READY, status);
-    assertTrue(endTime - startTime <
-            (context.getLong(BATCH_DURATION_MS) + error));
+    assertTrue(endTime - startTime < (context.getLong(BATCH_DURATION_MS) + 
error));
   }
 
   // Consume event, stop source, start again and make sure we are not
@@ -302,13 +314,11 @@ public class TestKafkaSource {
     kafkaSource.start();
     Thread.sleep(500L);
     Assert.assertEquals(Status.BACKOFF, kafkaSource.process());
-
   }
 
   // Remove channel processor and test if we can consume events again
   @Test
-  public void testNonCommit() throws EventDeliveryException,
-          InterruptedException {
+  public void testNonCommit() throws EventDeliveryException, 
InterruptedException {
     context.put(TOPICS, topic0);
     context.put(BATCH_SIZE,"1");
     context.put(BATCH_DURATION_MS,"30000");
@@ -328,15 +338,11 @@ public class TestKafkaSource {
 
     log.debug("re-process to good channel - this should work");
     kafkaSource.process();
-    Assert.assertEquals("hello, world", new String(events.get(0).getBody(),
-            Charsets.UTF_8));
-
-
+    Assert.assertEquals("hello, world", new String(events.get(0).getBody(), 
Charsets.UTF_8));
   }
 
   @Test
-  public void testTwoBatches() throws InterruptedException,
-          EventDeliveryException {
+  public void testTwoBatches() throws InterruptedException, 
EventDeliveryException {
     context.put(TOPICS, topic0);
     context.put(BATCH_SIZE,"1");
     context.put(BATCH_DURATION_MS, "30000");
@@ -348,20 +354,17 @@ public class TestKafkaSource {
     Thread.sleep(500L);
 
     kafkaSource.process();
-    Assert.assertEquals("event 1", new String(events.get(0).getBody(),
-            Charsets.UTF_8));
+    Assert.assertEquals("event 1", new String(events.get(0).getBody(), 
Charsets.UTF_8));
     events.clear();
 
     kafkaServer.produce(topic0, "", "event 2");
     Thread.sleep(500L);
     kafkaSource.process();
-    Assert.assertEquals("event 2", new String(events.get(0).getBody(),
-            Charsets.UTF_8));
+    Assert.assertEquals("event 2", new String(events.get(0).getBody(), 
Charsets.UTF_8));
   }
 
   @Test
-  public void testTwoBatchesWithAutocommit() throws InterruptedException,
-          EventDeliveryException {
+  public void testTwoBatchesWithAutocommit() throws InterruptedException, 
EventDeliveryException {
     context.put(TOPICS, topic0);
     context.put(BATCH_SIZE,"1");
     context.put(BATCH_DURATION_MS,"30000");
@@ -374,23 +377,20 @@ public class TestKafkaSource {
     Thread.sleep(500L);
 
     kafkaSource.process();
-    Assert.assertEquals("event 1", new String(events.get(0).getBody(),
-            Charsets.UTF_8));
+    Assert.assertEquals("event 1", new String(events.get(0).getBody(), 
Charsets.UTF_8));
     events.clear();
 
     kafkaServer.produce(topic0, "", "event 2");
     Thread.sleep(500L);
     kafkaSource.process();
-    Assert.assertEquals("event 2", new String(events.get(0).getBody(),
-            Charsets.UTF_8));
-
+    Assert.assertEquals("event 2", new String(events.get(0).getBody(), 
Charsets.UTF_8));
   }
 
   @SuppressWarnings("unchecked")
   @Test
-  public void testNullKey() throws EventDeliveryException,
-      SecurityException, NoSuchFieldException, IllegalArgumentException,
-      IllegalAccessException, InterruptedException {
+  public void testNullKey() throws EventDeliveryException, SecurityException, 
NoSuchFieldException,
+                                   IllegalArgumentException, 
IllegalAccessException,
+                                   InterruptedException {
     context.put(TOPICS, topic0);
     context.put(BATCH_SIZE, "1");
     kafkaSource.configure(context);
@@ -406,8 +406,7 @@ public class TestKafkaSource {
     Assert.assertEquals(Status.BACKOFF, kafkaSource.process());
     Assert.assertEquals(1, events.size());
 
-    Assert.assertEquals("hello, world", new String(events.get(0).getBody(),
-        Charsets.UTF_8));
+    Assert.assertEquals("hello, world", new String(events.get(0).getBody(), 
Charsets.UTF_8));
   }
 
   @Test
@@ -430,7 +429,8 @@ public class TestKafkaSource {
   public void testKafkaProperties() {
     Context context = new Context();
     context.put(TOPICS, "test1, test2");
-    context.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG, 
"override.default.group.id");
+    context.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG,
+                "override.default.group.id");
     context.put(KAFKA_CONSUMER_PREFIX + "fake.property", 
"kafka.property.value");
     context.put(BOOTSTRAP_SERVERS, "real-bootstrap-servers-list");
     context.put(KAFKA_CONSUMER_PREFIX + "bootstrap.servers", 
"bad-bootstrap-servers-list");
@@ -439,21 +439,17 @@ public class TestKafkaSource {
     Properties kafkaProps = source.getConsumerProps();
 
     //check that we have defaults set
-    assertEquals(
-            String.valueOf(DEFAULT_AUTO_COMMIT),
-            kafkaProps.getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+    assertEquals(String.valueOf(DEFAULT_AUTO_COMMIT),
+                 
kafkaProps.getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
     //check that kafka properties override the default and get correct name
-    assertEquals(
-            "override.default.group.id",
-            kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
+    assertEquals("override.default.group.id",
+                 kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
     //check that any kafka property gets in
-    assertEquals(
-            "kafka.property.value",
-            kafkaProps.getProperty("fake.property"));
+    assertEquals("kafka.property.value",
+                 kafkaProps.getProperty("fake.property"));
     //check that documented property overrides defaults
-    assertEquals(
-            "real-bootstrap-servers-list",
-            kafkaProps.getProperty("bootstrap.servers"));
+    assertEquals("real-bootstrap-servers-list",
+                 kafkaProps.getProperty("bootstrap.servers"));
   }
 
   @Test
@@ -469,22 +465,16 @@ public class TestKafkaSource {
 
     KafkaSource.Subscriber<List<String>> subscriber = source.getSubscriber();
     //check topic was set
-    assertEquals(
-            "old.topic",
-            subscriber.get().get(0));
+    assertEquals("old.topic", subscriber.get().get(0));
     //check that kafka old properties override the default and get correct name
-    assertEquals(
-            "old.groupId",
-            kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
+    assertEquals("old.groupId", 
kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
 
     source = new KafkaSource();
     context.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG, 
"override.old.group.id");
     source.doConfigure(context);
     kafkaProps = source.getConsumerProps();
     //check that kafka new properties override old
-    assertEquals(
-            "override.old.group.id",
-            kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
+    assertEquals("override.old.group.id", 
kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
 
     context.clear();
     context.put(BOOTSTRAP_SERVERS, "real-bootstrap-servers-list");
@@ -493,9 +483,8 @@ public class TestKafkaSource {
     source.doConfigure(context);
     kafkaProps = source.getConsumerProps();
     //check defaults set
-    assertEquals(
-            KafkaSourceConstants.DEFAULT_GROUP_ID,
-            kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
+    assertEquals(KafkaSourceConstants.DEFAULT_GROUP_ID,
+                 kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
   }
 
   @Test
@@ -568,8 +557,7 @@ public class TestKafkaSource {
 
     Event event = events.get(0);
 
-    Assert.assertEquals("hello, world", new String(event.getBody(),
-            Charsets.UTF_8));
+    Assert.assertEquals("hello, world", new String(event.getBody(), 
Charsets.UTF_8));
 
     Assert.assertEquals("value1", e.getHeaders().get("header1"));
     Assert.assertEquals("value2", e.getHeaders().get("header2"));
@@ -577,8 +565,7 @@ public class TestKafkaSource {
 
     event = events.get(1);
 
-    Assert.assertEquals("hello, world2", new String(event.getBody(),
-            Charsets.UTF_8));
+    Assert.assertEquals("hello, world2", new String(event.getBody(), 
Charsets.UTF_8));
 
     Assert.assertEquals("value1", e.getHeaders().get("header1"));
     Assert.assertEquals("value2", e.getHeaders().get("header2"));
@@ -603,7 +590,6 @@ public class TestKafkaSource {
     }).when(channelProcessor).processEventBatch(any(List.class));
 
     return channelProcessor;
-
   }
 
   ChannelProcessor createBadChannel() {

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java
 
b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java
index 1896883..bcfe4bb 100644
--- 
a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java
+++ 
b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java
@@ -19,20 +19,6 @@
 
 package org.apache.flume.source.taildir;
 
-import static 
org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.*;
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flume.Event;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
 import com.google.common.base.Charsets;
 import com.google.common.base.Throwables;
 import com.google.common.collect.HashBasedTable;
@@ -41,6 +27,22 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.collect.Table;
 import com.google.common.io.Files;
+import org.apache.flume.Event;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flume.source.taildir.TaildirSourceConfigurationConstants
+                  .BYTE_OFFSET_HEADER_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class TestTaildirEventReader {
   private File tmpDir;
@@ -85,7 +87,8 @@ public class TestTaildirEventReader {
   }
 
   private ReliableTaildirEventReader getReader(boolean addByteOffset) {
-    Map<String, String> filePaths = ImmutableMap.of("testFiles", 
tmpDir.getAbsolutePath() + "/file.*");
+    Map<String, String> filePaths = ImmutableMap.of("testFiles",
+                                                    tmpDir.getAbsolutePath() + 
"/file.*");
     Table<String, String, String> headerTable = HashBasedTable.create();
     return getReader(filePaths, headerTable, addByteOffset);
   }
@@ -472,7 +475,8 @@ public class TestTaildirEventReader {
   @Test
   public void testNewLineBoundaries() throws IOException {
     File f1 = new File(tmpDir, "file1");
-    
Files.write("file1line1\nfile1line2\rfile1line2\nfile1line3\r\nfile1line4\n", 
f1, Charsets.UTF_8);
+    
Files.write("file1line1\nfile1line2\rfile1line2\nfile1line3\r\nfile1line4\n",
+                f1, Charsets.UTF_8);
 
     ReliableTaildirEventReader reader = getReader();
     List<String> out = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirMatcher.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirMatcher.java
 
b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirMatcher.java
index 4bff841..c341054 100644
--- 
a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirMatcher.java
+++ 
b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirMatcher.java
@@ -33,7 +33,10 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class TestTaildirMatcher {
   private File tmpDir;
@@ -55,10 +58,10 @@ public class TestTaildirMatcher {
    */
   private void append(String fileName) throws IOException {
     File f;
-    if(!files.containsKey(fileName)){
+    if (!files.containsKey(fileName)) {
       f = new File(tmpDir, fileName);
       files.put(fileName, f);
-    }else{
+    } else {
       f = files.get(fileName);
     }
     Files.append(fileName + "line\n", f, Charsets.UTF_8);
@@ -67,7 +70,7 @@ public class TestTaildirMatcher {
   /**
    * Translate a list of files to list of filename strings.
    */
-  private static List<String> filesToNames(List<File> origList){
+  private static List<String> filesToNames(List<File> origList) {
     Function<File, String> file2nameFn = new Function<File, String>() {
       @Override
       public String apply(File input) {
@@ -102,7 +105,9 @@ public class TestTaildirMatcher {
     append("file0");
     append("file1");
 
-    TaildirMatcher tm = new TaildirMatcher("f1", tmpDir.getAbsolutePath() + 
File.separator + "file.*", isCachingNeeded);
+    TaildirMatcher tm = new TaildirMatcher("f1",
+                                           tmpDir.getAbsolutePath() + 
File.separator + "file.*",
+                                           isCachingNeeded);
     List<String> files = filesToNames(tm.getMatchingFiles());
     assertEquals(msgAlreadyExistingFile, 2, files.size());
     assertTrue(msgAlreadyExistingFile, files.contains("file1"));
@@ -136,7 +141,9 @@ public class TestTaildirMatcher {
     append("file0");
     append("file1");
 
-    TaildirMatcher tm = new TaildirMatcher("f1", tmpDir.getAbsolutePath() + 
File.separator + "file.*", false);
+    TaildirMatcher tm = new TaildirMatcher("f1",
+                                           tmpDir.getAbsolutePath() + 
File.separator + "file.*",
+                                           false);
     List<String> files = filesToNames(tm.getMatchingFiles());
     assertEquals(msgAlreadyExistingFile, 2, files.size());
     assertTrue(msgAlreadyExistingFile, files.contains("file1"));
@@ -167,7 +174,9 @@ public class TestTaildirMatcher {
 
   @Test
   public void testEmtpyDirMatching() throws Exception {
-    TaildirMatcher tm = new TaildirMatcher("empty", tmpDir.getAbsolutePath() + 
File.separator + ".*", isCachingNeeded);
+    TaildirMatcher tm = new TaildirMatcher("empty",
+                                           tmpDir.getAbsolutePath() + 
File.separator + ".*",
+                                           isCachingNeeded);
     List<File> files = tm.getMatchingFiles();
     assertNotNull(msgEmptyDir, files);
     assertTrue(msgEmptyDir, files.isEmpty());
@@ -175,7 +184,10 @@ public class TestTaildirMatcher {
 
   @Test
   public void testNoMatching() throws Exception {
-    TaildirMatcher tm = new TaildirMatcher("nomatch", tmpDir.getAbsolutePath() 
+ File.separator + "abracadabra_nonexisting", isCachingNeeded);
+    TaildirMatcher tm = new TaildirMatcher(
+        "nomatch",
+        tmpDir.getAbsolutePath() + File.separator + "abracadabra_nonexisting",
+        isCachingNeeded);
     List<File> files = tm.getMatchingFiles();
     assertNotNull(msgNoMatch, files);
     assertTrue(msgNoMatch, files.isEmpty());
@@ -183,7 +195,8 @@ public class TestTaildirMatcher {
 
   @Test(expected = IllegalStateException.class)
   public void testNonExistingDir() {
-    TaildirMatcher tm = new TaildirMatcher("exception", 
"/abracadabra/doesntexist/.*", isCachingNeeded);
+    TaildirMatcher tm = new TaildirMatcher("exception", 
"/abracadabra/doesntexist/.*",
+                                           isCachingNeeded);
   }
 
   @Test
@@ -191,7 +204,8 @@ public class TestTaildirMatcher {
     new File(tmpDir, "outerFile").createNewFile();
     new File(tmpDir, "recursiveDir").mkdir();
     new File(tmpDir + File.separator + "recursiveDir", 
"innerFile").createNewFile();
-    TaildirMatcher tm = new TaildirMatcher("f1", tmpDir.getAbsolutePath() + 
File.separator + ".*", isCachingNeeded);
+    TaildirMatcher tm = new TaildirMatcher("f1", tmpDir.getAbsolutePath() + 
File.separator + ".*",
+                                           isCachingNeeded);
     List<String> files = filesToNames(tm.getMatchingFiles());
 
     assertEquals(msgSubDirs, 1, files.size());
@@ -207,9 +221,13 @@ public class TestTaildirMatcher {
     append("c.log.yyyy.MM-02");
 
     // Tail a.log and b.log
-    TaildirMatcher tm1 = new TaildirMatcher("ab", tmpDir.getAbsolutePath() + 
File.separator + "[ab].log", isCachingNeeded);
+    TaildirMatcher tm1 = new TaildirMatcher("ab",
+                                            tmpDir.getAbsolutePath() + 
File.separator + "[ab].log",
+                                            isCachingNeeded);
     // Tail files that starts with c.log
-    TaildirMatcher tm2 = new TaildirMatcher("c", tmpDir.getAbsolutePath() + 
File.separator + "c.log.*", isCachingNeeded);
+    TaildirMatcher tm2 = new TaildirMatcher("c",
+                                            tmpDir.getAbsolutePath() + 
File.separator + "c.log.*",
+                                            isCachingNeeded);
 
     List<String> files1 = filesToNames(tm1.getMatchingFiles());
     List<String> files2 = filesToNames(tm2.getMatchingFiles());
@@ -217,11 +235,16 @@ public class TestTaildirMatcher {
     assertEquals(2, files1.size());
     assertEquals(2, files2.size());
     // Make sure we got every file
-    assertTrue("Regex pattern for ab should have matched a.log file", 
files1.contains("a.log"));
-    assertFalse("Regex pattern for ab should NOT have matched a.log.1 file", 
files1.contains("a.log.1"));
-    assertTrue("Regex pattern for ab should have matched b.log file", 
files1.contains("b.log"));
-    assertTrue("Regex pattern for c should have matched c.log.yyyy-MM-01 
file", files2.contains("c.log.yyyy.MM-01"));
-    assertTrue("Regex pattern for c should have matched c.log.yyyy-MM-02 
file", files2.contains("c.log.yyyy.MM-02"));
+    assertTrue("Regex pattern for ab should have matched a.log file",
+               files1.contains("a.log"));
+    assertFalse("Regex pattern for ab should NOT have matched a.log.1 file",
+                files1.contains("a.log.1"));
+    assertTrue("Regex pattern for ab should have matched b.log file",
+               files1.contains("b.log"));
+    assertTrue("Regex pattern for c should have matched c.log.yyyy-MM-01 file",
+               files2.contains("c.log.yyyy.MM-01"));
+    assertTrue("Regex pattern for c should have matched c.log.yyyy-MM-02 file",
+               files2.contains("c.log.yyyy.MM-02"));
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
 
b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
index f6289cd..e090b74 100644
--- 
a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
+++ 
b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
@@ -17,14 +17,9 @@
 
 package org.apache.flume.source.taildir;
 
-import static 
org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.*;
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
@@ -40,9 +35,21 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.base.Charsets;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILE_GROUPS;
+import static 
org.apache.flume.source.taildir.TaildirSourceConfigurationConstants
+                  .FILE_GROUPS_PREFIX;
+import static 
org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.HEADERS_PREFIX;
+import static 
org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.POSITION_FILE;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class TestTaildirSource {
   static TaildirSource source;
@@ -274,10 +281,11 @@ public class TestTaildirSource {
                                                     line1b, line2b, line3b, // 
file2
                                                     line1d, line2d, line3d, // 
file4
                                                     line1c, line2c, line3c  // 
file3
-                                                     );
-    for(int i =0; i!=expected.size(); ++i) {
-      expected.set(i, expected.get(i).trim() );
+                                                   );
+    for (int i = 0; i != expected.size(); ++i) {
+      expected.set(i, expected.get(i).trim());
     }
-    assertArrayEquals("Files not consumed in expected order", 
expected.toArray(), consumedOrder.toArray());
+    assertArrayEquals("Files not consumed in expected order", 
expected.toArray(),
+                      consumedOrder.toArray());
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestFileChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestFileChannel.java 
b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestFileChannel.java
index 4a80b8c..6a98292 100644
--- 
a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestFileChannel.java
+++ 
b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestFileChannel.java
@@ -18,14 +18,8 @@
  */
 package org.apache.flume.test.agent;
 
-import java.io.File;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
-
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
 import org.apache.flume.test.util.StagedInstall;
 import org.apache.log4j.Logger;
 import org.junit.After;
@@ -33,9 +27,11 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.base.Charsets;
-import com.google.common.base.Splitter;
-import com.google.common.io.Files;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 public class TestFileChannel {
 
@@ -48,61 +44,57 @@ public class TestFileChannel {
 
   @Before
   public void setUp() throws Exception {
-      /* Create 3 temp dirs, each used as value within agentProps */
-
-      final File sinkOutputDir = Files.createTempDir();
-      tempResources.add(sinkOutputDir);
-      final String sinkOutputDirPath = sinkOutputDir.getCanonicalPath();
-      LOGGER.info("Created rolling file sink's output dir: "
-              + sinkOutputDirPath);
-
-      final File channelCheckpointDir = Files.createTempDir();
-      tempResources.add(channelCheckpointDir);
-      final String channelCheckpointDirPath = channelCheckpointDir
-              .getCanonicalPath();
-      LOGGER.info("Created file channel's checkpoint dir: "
-              + channelCheckpointDirPath);
-
-      final File channelDataDir = Files.createTempDir();
-      tempResources.add(channelDataDir);
-      final String channelDataDirPath = channelDataDir.getCanonicalPath();
-      LOGGER.info("Created file channel's data dir: "
-              + channelDataDirPath);
-
-      /* Build props to pass to flume agent */
-
-      Properties agentProps = new Properties();
-
-      // Active sets
-      agentProps.put("a1.channels", "c1");
-      agentProps.put("a1.sources", "r1");
-      agentProps.put("a1.sinks", "k1");
-
-      // c1
-      agentProps.put("a1.channels.c1.type", "FILE");
-      agentProps.put("a1.channels.c1.checkpointDir", channelCheckpointDirPath);
-      agentProps.put("a1.channels.c1.dataDirs", channelDataDirPath);
-
-      // r1
-      agentProps.put("a1.sources.r1.channels", "c1");
-      agentProps.put("a1.sources.r1.type", "EXEC");
-      agentProps.put("a1.sources.r1.command", "seq 1 100");
-
-      // k1
-      agentProps.put("a1.sinks.k1.channel", "c1");
-      agentProps.put("a1.sinks.k1.type", "FILE_ROLL");
-      agentProps.put("a1.sinks.k1.sink.directory", sinkOutputDirPath);
-      agentProps.put("a1.sinks.k1.sink.rollInterval", "0");
-
-      this.agentProps = agentProps;
-      this.sinkOutputDir = sinkOutputDir;
+    /* Create 3 temp dirs, each used as value within agentProps */
+
+    final File sinkOutputDir = Files.createTempDir();
+    tempResources.add(sinkOutputDir);
+    final String sinkOutputDirPath = sinkOutputDir.getCanonicalPath();
+    LOGGER.info("Created rolling file sink's output dir: " + 
sinkOutputDirPath);
+
+    final File channelCheckpointDir = Files.createTempDir();
+    tempResources.add(channelCheckpointDir);
+    final String channelCheckpointDirPath = 
channelCheckpointDir.getCanonicalPath();
+    LOGGER.info("Created file channel's checkpoint dir: " + 
channelCheckpointDirPath);
+
+    final File channelDataDir = Files.createTempDir();
+    tempResources.add(channelDataDir);
+    final String channelDataDirPath = channelDataDir.getCanonicalPath();
+    LOGGER.info("Created file channel's data dir: " + channelDataDirPath);
+
+    /* Build props to pass to flume agent */
+
+    Properties agentProps = new Properties();
+
+    // Active sets
+    agentProps.put("a1.channels", "c1");
+    agentProps.put("a1.sources", "r1");
+    agentProps.put("a1.sinks", "k1");
+
+    // c1
+    agentProps.put("a1.channels.c1.type", "FILE");
+    agentProps.put("a1.channels.c1.checkpointDir", channelCheckpointDirPath);
+    agentProps.put("a1.channels.c1.dataDirs", channelDataDirPath);
+
+    // r1
+    agentProps.put("a1.sources.r1.channels", "c1");
+    agentProps.put("a1.sources.r1.type", "EXEC");
+    agentProps.put("a1.sources.r1.command", "seq 1 100");
+
+    // k1
+    agentProps.put("a1.sinks.k1.channel", "c1");
+    agentProps.put("a1.sinks.k1.type", "FILE_ROLL");
+    agentProps.put("a1.sinks.k1.sink.directory", sinkOutputDirPath);
+    agentProps.put("a1.sinks.k1.sink.rollInterval", "0");
+
+    this.agentProps = agentProps;
+    this.sinkOutputDir = sinkOutputDir;
   }
 
   @After
   public void tearDown() throws Exception {
     StagedInstall.getInstance().stopAgent();
     for (File tempResource : tempResources) {
-        tempResource.delete();
+      tempResource.delete();
     }
     agentProps = null;
   }
@@ -110,7 +102,7 @@ public class TestFileChannel {
   /**
    * File channel in/out test. Verifies that all events inserted into the
    * file channel are received by the sink in order.
-   *
+   * <p>
    * The EXEC source creates 100 events where the event bodies have
    * sequential numbers. The source puts those events into the file channel,
    * and the FILE_ROLL The sink is expected to take all 100 events in FIFO
@@ -119,38 +111,36 @@ public class TestFileChannel {
    * @throws Exception
    */
   @Test
-   public void testInOut() throws Exception {
-      LOGGER.debug("testInOut() started.");
-
-      StagedInstall.getInstance().startAgent("a1", agentProps);
-      TimeUnit.SECONDS.sleep(10); // Wait for source and sink to finish
-                                  // TODO make this more deterministic
-
-      /* Create expected output */
-
-      StringBuffer sb = new StringBuffer();
-      for (int i = 1; i <= 100; i++) {
-          sb.append(i).append("\n");
-      }
-      String expectedOutput = sb.toString();
-      LOGGER.info("Created expected output: " + expectedOutput);
-
-      /* Create actual output file */
-
-      File[] sinkOutputDirChildren = sinkOutputDir.listFiles();
-      // Only 1 file should be in FILE_ROLL sink's dir (rolling is disabled)
-      Assert.assertEquals("Expected FILE_ROLL sink's dir to have only 1 
child," +
-              " but found " + sinkOutputDirChildren.length + " children.",
-              1, sinkOutputDirChildren.length);
-      File actualOutput = sinkOutputDirChildren[0];
-
-      if (!Files.toString(actualOutput, 
Charsets.UTF_8).equals(expectedOutput)) {
-          LOGGER.error("Actual output doesn't match expected output.\n");
-          throw new AssertionError("FILE_ROLL sink's actual output doesn't " +
-                  "match expected output.");
-      }
-
-      LOGGER.debug("testInOut() ended.");
-  }
+  public void testInOut() throws Exception {
+    LOGGER.debug("testInOut() started.");
 
+    StagedInstall.getInstance().startAgent("a1", agentProps);
+    TimeUnit.SECONDS.sleep(10); // Wait for source and sink to finish
+    // TODO make this more deterministic
+
+    /* Create expected output */
+    StringBuffer sb = new StringBuffer();
+    for (int i = 1; i <= 100; i++) {
+      sb.append(i).append("\n");
+    }
+    String expectedOutput = sb.toString();
+    LOGGER.info("Created expected output: " + expectedOutput);
+
+    /* Create actual output file */
+
+    File[] sinkOutputDirChildren = sinkOutputDir.listFiles();
+    // Only 1 file should be in FILE_ROLL sink's dir (rolling is disabled)
+    Assert.assertEquals("Expected FILE_ROLL sink's dir to have only 1 child," +
+                        " but found " + sinkOutputDirChildren.length + " 
children.",
+                        1, sinkOutputDirChildren.length);
+    File actualOutput = sinkOutputDirChildren[0];
+
+    if (!Files.toString(actualOutput, Charsets.UTF_8).equals(expectedOutput)) {
+      LOGGER.error("Actual output doesn't match expected output.\n");
+      throw new AssertionError("FILE_ROLL sink's actual output doesn't " +
+                               "match expected output.");
+    }
+
+    LOGGER.debug("testInOut() ended.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java 
b/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
index 973ff4a..51194b6 100644
--- a/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
+++ b/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
@@ -18,6 +18,14 @@
  */
 package org.apache.flume.test.util;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.log4j.Logger;
+
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileInputStream;
@@ -29,18 +37,8 @@ import java.net.Socket;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.TimeUnit;
 import java.util.zip.GZIPInputStream;
 
-import com.google.common.base.Preconditions;
-import com.google.common.io.Files;
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
-import org.apache.log4j.Logger;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-
 
 /**
  * Attempts to setup a staged install using explicitly specified tar-ball
@@ -73,7 +71,7 @@ public class StagedInstall {
 
   private static StagedInstall INSTANCE;
 
-  public synchronized static StagedInstall getInstance() throws Exception {
+  public static synchronized StagedInstall getInstance() throws Exception {
     if (INSTANCE == null) {
       INSTANCE = new StagedInstall();
     }
@@ -124,8 +122,7 @@ public class StagedInstall {
     if (process != null) {
       throw new Exception("A process is already running");
     }
-    LOGGER.info("Starting process for agent: " + agentName + " using config: "
-       + properties);
+    LOGGER.info("Starting process for agent: " + agentName + " using config: " 
+ properties);
 
     File configFile = createConfigurationFile(agentName, properties);
     configFilePath = configFile.getCanonicalPath();
@@ -252,7 +249,7 @@ public class StagedInstall {
     File[] listBaseDirs = stageDir.listFiles();
     if (listBaseDirs != null && listBaseDirs.length == 1
         && listBaseDirs[0].isDirectory()) {
-      rootDir =listBaseDirs[0];
+      rootDir = listBaseDirs[0];
     }
     baseDir = rootDir;
 
@@ -417,7 +414,6 @@ public class StagedInstall {
       if (testFile.exists() && testFile.isDirectory()) {
         LOGGER.info("Found candidate dir: " + testFile.getCanonicalPath());
         File[] candidateFiles = testFile.listFiles(new FileFilter() {
-
           @Override
           public boolean accept(File pathname) {
             String name = pathname.getName();
@@ -426,7 +422,8 @@ public class StagedInstall {
               return true;
             }
             return false;
-          }});
+          }
+        });
 
         // There should be at most one
         if (candidateFiles != null && candidateFiles.length > 0) {
@@ -466,22 +463,22 @@ public class StagedInstall {
   }
 
   public static void waitUntilPortOpens(String host, int port, long timeout)
-      throws IOException, InterruptedException{
+      throws IOException, InterruptedException {
     long startTime = System.currentTimeMillis();
     Socket socket;
     boolean connected = false;
     //See if port has opened for timeout.
-    while(System.currentTimeMillis() - startTime < timeout){
-      try{
+    while (System.currentTimeMillis() - startTime < timeout) {
+      try {
         socket = new Socket(host, port);
         socket.close();
         connected = true;
         break;
-      } catch (IOException e){
+      } catch (IOException e) {
         Thread.sleep(2000);
       }
     }
-    if(!connected) {
+    if (!connected) {
       throw new IOException("Port not opened within specified timeout.");
     }
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-tests/src/test/java/org/apache/flume/test/util/SyslogAgent.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-tests/src/test/java/org/apache/flume/test/util/SyslogAgent.java 
b/flume-ng-tests/src/test/java/org/apache/flume/test/util/SyslogAgent.java
index 7159549..c908fc1 100644
--- a/flume-ng-tests/src/test/java/org/apache/flume/test/util/SyslogAgent.java
+++ b/flume-ng-tests/src/test/java/org/apache/flume/test/util/SyslogAgent.java
@@ -60,7 +60,7 @@ public class SyslogAgent {
     public String toString() {
       return syslogSourceType;
     }
-  };
+  }
 
   private Properties agentProps;
   private File sinkOutputDir;
@@ -73,7 +73,6 @@ public class SyslogAgent {
 
   public SyslogAgent() throws IOException {
     hostname = "localhost";
-
     setRandomPort();
   }
 
@@ -141,7 +140,7 @@ public class SyslogAgent {
     while (client == null) {
       try {
         client = new BufferedOutputStream(new Socket(hostname, 
port).getOutputStream());
-      } catch(IOException e) {
+      } catch (IOException e) {
         if (++numberOfAttempts >= DEFAULT_ATTEMPTS) {
           throw new AssertionError("Could not connect to source after "
               + DEFAULT_ATTEMPTS + " attempts with " + DEFAULT_TIMEOUT + " ms 
timeout.");
@@ -206,8 +205,8 @@ public class SyslogAgent {
     // Only 1 file should be in FILE_ROLL sink's dir (rolling is disabled)
     File[] sinkOutputDirChildren = sinkOutputDir.listFiles();
     Assert.assertEquals("Expected FILE_ROLL sink's dir to have only 1 child," +
-        " but found " + sinkOutputDirChildren.length + " children.",
-    1, sinkOutputDirChildren.length);
+                        " but found " + sinkOutputDirChildren.length + " 
children.",
+                        1, sinkOutputDirChildren.length);
 
     /* Wait for output file stats to be as expected. */
     File outputDirChild = sinkOutputDirChildren[0];

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
----------------------------------------------------------------------
diff --git 
a/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
 
b/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
index a11126d..9fc5f2c 100644
--- 
a/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
+++ 
b/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
@@ -34,7 +34,6 @@ import org.apache.flume.channel.file.WriteOrderOracle;
 import org.apache.flume.event.EventBuilder;
 import org.junit.After;
 import org.junit.AfterClass;
-import static org.fest.reflect.core.Reflection.*;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -47,6 +46,8 @@ import java.util.HashSet;
 import java.util.Random;
 import java.util.Set;
 
+import static org.fest.reflect.core.Reflection.field;
+import static org.fest.reflect.core.Reflection.method;
 
 public class TestFileChannelIntegrityTool {
   private static File baseDir;
@@ -61,7 +62,7 @@ public class TestFileChannelIntegrityTool {
   private static int invalidEvent = 0;
 
   @BeforeClass
-  public static void setUpClass() throws Exception{
+  public static void setUpClass() throws Exception {
     createDataFiles();
   }
 
@@ -74,13 +75,13 @@ public class TestFileChannelIntegrityTool {
     File[] dataFiles = origDataDir.listFiles(new FilenameFilter() {
       @Override
       public boolean accept(File dir, String name) {
-        if(name.contains("lock")) {
+        if (name.contains("lock")) {
           return false;
         }
         return true;
       }
     });
-    for(File dataFile : dataFiles) {
+    for (File dataFile : dataFiles) {
       Serialization.copyFile(dataFile, new File(dataDir, dataFile.getName()));
     }
   }
@@ -146,7 +147,7 @@ public class TestFileChannelIntegrityTool {
     Transaction tx = channel.getTransaction();
     tx.begin();
     int i = 0;
-    while(channel.take() != null) {
+    while (channel.take() != null) {
       i++;
     }
     tx.commit();
@@ -161,7 +162,7 @@ public class TestFileChannelIntegrityTool {
     File[] files = dataDir.listFiles(new FilenameFilter() {
       @Override
       public boolean accept(File dir, String name) {
-        if(name.contains("lock") || name.contains("meta")) {
+        if (name.contains("lock") || name.contains("meta")) {
           return false;
         }
         return true;
@@ -170,15 +171,13 @@ public class TestFileChannelIntegrityTool {
     Random random = new Random();
     int corrupted = 0;
     for (File dataFile : files) {
-      LogFile.SequentialReader reader =
-        new LogFileV3.SequentialReader(dataFile, null, true);
+      LogFile.SequentialReader reader = new 
LogFileV3.SequentialReader(dataFile, null, true);
       RandomAccessFile handle = new RandomAccessFile(dataFile, "rw");
       long eventPosition1 = reader.getPosition();
       LogRecord rec = reader.next();
       //No point corrupting commits, so ignore them
-      if(rec == null ||
-        rec.getEvent().getClass().getName().
-          equals("org.apache.flume.channel.file.Commit")) {
+      if (rec == null ||
+          
rec.getEvent().getClass().getName().equals("org.apache.flume.channel.file.Commit"))
 {
         handle.close();
         reader.close();
         continue;
@@ -190,8 +189,7 @@ public class TestFileChannelIntegrityTool {
       corrupted++;
       corruptFiles.add(dataFile.getName());
       if (rec == null ||
-        rec.getEvent().getClass().getName().
-          equals("org.apache.flume.channel.file.Commit")) {
+          
rec.getEvent().getClass().getName().equals("org.apache.flume.channel.file.Commit"))
 {
         handle.close();
         reader.close();
         continue;
@@ -231,7 +229,7 @@ public class TestFileChannelIntegrityTool {
     Transaction tx = channel.getTransaction();
     tx.begin();
     int i = 0;
-    while(channel.take() != null) {
+    while (channel.take() != null) {
       i++;
     }
     tx.commit();
@@ -241,14 +239,14 @@ public class TestFileChannelIntegrityTool {
     files = dataDir.listFiles(new FilenameFilter() {
       @Override
       public boolean accept(File dir, String name) {
-        if(name.contains(".bak")) {
+        if (name.contains(".bak")) {
           return true;
         }
         return false;
       }
     });
     Assert.assertEquals(corruptFiles.size(), files.length);
-    for(File file : files) {
+    for (File file : files) {
       String name = file.getName();
       name = name.replaceAll(".bak", "");
       Assert.assertTrue(corruptFiles.remove(name));
@@ -258,13 +256,13 @@ public class TestFileChannelIntegrityTool {
 
   private static void createDataFiles() throws Exception {
     final byte[] eventData = new byte[2000];
-    for(int i = 0; i < 2000; i++) {
+    for (int i = 0; i < 2000; i++) {
       eventData[i] = 1;
     }
     WriteOrderOracle.setSeed(System.currentTimeMillis());
     event = EventBuilder.withBody(eventData);
     baseDir = Files.createTempDir();
-    if(baseDir.exists()) {
+    if (baseDir.exists()) {
       FileUtils.deleteDirectory(baseDir);
     }
     baseDir = Files.createTempDir();
@@ -286,7 +284,7 @@ public class TestFileChannelIntegrityTool {
       Transaction tx = channel.getTransaction();
       tx.begin();
       for (int i = 0; i < 5; i++) {
-        if(i % 3 == 0) {
+        if (i % 3 == 0) {
           event.getBody()[0] = 0;
           invalidEvent++;
         } else {
@@ -297,22 +295,19 @@ public class TestFileChannelIntegrityTool {
       tx.commit();
       tx.close();
     }
-    Log log = field("log")
-      .ofType(Log.class)
-      .in(channel)
-      .get();
+    Log log = field("log").ofType(Log.class)
+                          .in(channel)
+                          .get();
 
     Assert.assertTrue("writeCheckpoint returned false",
-      method("writeCheckpoint")
-        .withReturnType(Boolean.class)
-        .withParameterTypes(Boolean.class)
-        .in(log)
-        .invoke(true));
+                      method("writeCheckpoint").withReturnType(Boolean.class)
+                                               
.withParameterTypes(Boolean.class)
+                                               .in(log)
+                                               .invoke(true));
     channel.stop();
   }
 
   public static class DummyEventVerifier implements EventValidator {
-
     private int value = 0;
 
     private DummyEventVerifier(int val) {
@@ -325,7 +320,6 @@ public class TestFileChannelIntegrityTool {
     }
 
     public static class Builder implements EventValidator.Builder {
-
       private int binaryValidator = 0;
 
       @Override

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 85c0dc8..b50693e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -672,9 +672,7 @@ limitations under the License.
               
<suppressionsLocation>flume/checkstyle-suppressions.xml</suppressionsLocation>
               
<suppressionsFileExpression>checkstyle.suppressions.file</suppressionsFileExpression>
               <encoding>UTF-8</encoding>
-              <consoleOutput>true</consoleOutput>
-              <failsOnError>true</failsOnError>
-              <includeTestSourceDirectory>false</includeTestSourceDirectory>
+              <includeTestSourceDirectory>true</includeTestSourceDirectory>
               <linkXRef>false</linkXRef>
             </configuration>
             <goals>
@@ -1474,7 +1472,7 @@ limitations under the License.
           
<suppressionsLocation>flume/checkstyle-suppressions.xml</suppressionsLocation>
           
<suppressionsFileExpression>checkstyle.suppressions.file</suppressionsFileExpression>
           <encoding>UTF-8</encoding>
-          <includeTestSourceDirectory>false</includeTestSourceDirectory>
+          <includeTestSourceDirectory>true</includeTestSourceDirectory>
           <linkXRef>false</linkXRef>
         </configuration>
       </plugin>

Reply via email to