Repository: flume Updated Branches: refs/heads/trunk c8c0f9b84 -> cfbf11568
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java index 5423f8f..cdc09b5 100644 --- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java +++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java @@ -61,6 +61,7 @@ public class TestJMSSource extends JMSMessageConsumerTestBase { private InitialContextFactory contextFactory; private File baseDir; private File passwordFile; + @SuppressWarnings("unchecked") @Override void afterSetup() throws Exception { @@ -79,10 +80,11 @@ public class TestJMSSource extends JMSMessageConsumerTestBase { }).when(channelProcessor).processEventBatch(any(List.class)); consumerFactory = mock(JMSMessageConsumerFactory.class); consumer = spy(create()); - when(consumerFactory.create(any(InitialContext.class), any(ConnectionFactory.class), anyString(), - any(JMSDestinationType.class), any(JMSDestinationLocator.class), anyString(), anyInt(), anyLong(), - any(JMSMessageConverter.class), any(Optional.class), - any(Optional.class))).thenReturn(consumer); + when(consumerFactory.create(any(InitialContext.class), any(ConnectionFactory.class), + anyString(), any(JMSDestinationType.class), + any(JMSDestinationLocator.class), anyString(), anyInt(), anyLong(), + any(JMSMessageConverter.class), any(Optional.class), + any(Optional.class))).thenReturn(consumer); when(initialContext.lookup(anyString())).thenReturn(connectionFactory); contextFactory = mock(InitialContextFactory.class); when(contextFactory.create(any(Properties.class))).thenReturn(initialContext); @@ -97,10 +99,12 @@ public class TestJMSSource extends JMSMessageConsumerTestBase { context.put(JMSSourceConfiguration.PROVIDER_URL, "dummy:1414"); context.put(JMSSourceConfiguration.INITIAL_CONTEXT_FACTORY, "ldap://dummy:389"); } + @Override void afterTearDown() throws Exception { FileUtils.deleteDirectory(baseDir); } + @Test public void testStop() throws Exception { source.configure(context); @@ -108,38 +112,45 @@ public class TestJMSSource extends JMSMessageConsumerTestBase { source.stop(); verify(consumer).close(); } + @Test(expected = IllegalArgumentException.class) public void testConfigureWithoutInitialContextFactory() throws Exception { context.put(JMSSourceConfiguration.INITIAL_CONTEXT_FACTORY, ""); source.configure(context); } + @Test(expected = IllegalArgumentException.class) public void testConfigureWithoutProviderURL() throws Exception { context.put(JMSSourceConfiguration.PROVIDER_URL, ""); source.configure(context); } + @Test(expected = IllegalArgumentException.class) public void testConfigureWithoutDestinationName() throws Exception { context.put(JMSSourceConfiguration.DESTINATION_NAME, ""); source.configure(context); } + @Test(expected = FlumeException.class) public void testConfigureWithBadDestinationType() throws Exception { context.put(JMSSourceConfiguration.DESTINATION_TYPE, "DUMMY"); source.configure(context); } + @Test(expected = IllegalArgumentException.class) public void testConfigureWithEmptyDestinationType() throws Exception { context.put(JMSSourceConfiguration.DESTINATION_TYPE, ""); source.configure(context); } + @SuppressWarnings("unchecked") @Test public void testStartConsumerCreateThrowsException() throws Exception { - when(consumerFactory.create(any(InitialContext.class), any(ConnectionFactory.class), anyString(), - any(JMSDestinationType.class), any(JMSDestinationLocator.class), anyString(), anyInt(), anyLong(), - any(JMSMessageConverter.class), any(Optional.class), - any(Optional.class))).thenThrow(new RuntimeException()); + when(consumerFactory.create(any(InitialContext.class), any(ConnectionFactory.class), + anyString(), any(JMSDestinationType.class), + any(JMSDestinationLocator.class), anyString(), anyInt(), anyLong(), + any(JMSMessageConverter.class), any(Optional.class), + any(Optional.class))).thenThrow(new RuntimeException()); source.configure(context); source.start(); try { @@ -149,28 +160,33 @@ public class TestJMSSource extends JMSMessageConsumerTestBase { } } + @Test(expected = FlumeException.class) public void testConfigureWithContextLookupThrowsException() throws Exception { when(initialContext.lookup(anyString())).thenThrow(new NamingException()); source.configure(context); } + @Test(expected = FlumeException.class) public void testConfigureWithContextCreateThrowsException() throws Exception { when(contextFactory.create(any(Properties.class))) .thenThrow(new NamingException()); source.configure(context); } + @Test(expected = IllegalArgumentException.class) public void testConfigureWithInvalidBatchSize() throws Exception { context.put(JMSSourceConfiguration.BATCH_SIZE, "0"); source.configure(context); } + @Test(expected = FlumeException.class) public void testConfigureWithInvalidPasswordFile() throws Exception { context.put(JMSSourceConfiguration.PASSWORD_FILE, "/dev/does/not/exist/nor/will/ever/exist"); source.configure(context); } + @Test public void testConfigureWithUserNameButNoPasswordFile() throws Exception { context.put(JMSSourceConfiguration.USERNAME, "dummy"); @@ -180,6 +196,7 @@ public class TestJMSSource extends JMSMessageConsumerTestBase { Assert.assertEquals(batchSize, events.size()); assertBodyIsExpected(events); } + @Test public void testConfigureWithUserNameAndPasswordFile() throws Exception { context.put(JMSSourceConfiguration.USERNAME, "dummy"); @@ -191,11 +208,13 @@ public class TestJMSSource extends JMSMessageConsumerTestBase { Assert.assertEquals(batchSize, events.size()); assertBodyIsExpected(events); } + @Test(expected = FlumeException.class) public void testConfigureWithInvalidConverterClass() throws Exception { context.put(JMSSourceConfiguration.CONVERTER_TYPE, "not a valid classname"); source.configure(context); } + @Test public void testProcessNoStart() throws Exception { try { @@ -205,6 +224,7 @@ public class TestJMSSource extends JMSMessageConsumerTestBase { } } + @Test public void testNonDefaultConverter() throws Exception { // tests that a classname can be specified @@ -217,24 +237,26 @@ public class TestJMSSource extends JMSMessageConsumerTestBase { assertBodyIsExpected(events); verify(consumer).commit(); } - public static class NonBuilderNonConfigurableConverter - implements JMSMessageConverter { + + public static class NonBuilderNonConfigurableConverter implements JMSMessageConverter { @Override public List<Event> convert(Message message) throws JMSException { throw new UnsupportedOperationException(); } } - public static class NonBuilderConfigurableConverter - implements JMSMessageConverter, Configurable { + + public static class NonBuilderConfigurableConverter implements JMSMessageConverter, Configurable { @Override public List<Event> convert(Message message) throws JMSException { throw new UnsupportedOperationException(); } + @Override public void configure(Context context) { } } + @Test public void testNonBuilderConfigurableConverter() throws Exception { // tests that a non builder by configurable converter works @@ -247,6 +269,7 @@ public class TestJMSSource extends JMSMessageConsumerTestBase { assertBodyIsExpected(events); verify(consumer).commit(); } + @Test public void testNonBuilderNonConfigurableConverter() throws Exception { // tests that a non builder non configurable converter @@ -259,6 +282,7 @@ public class TestJMSSource extends JMSMessageConsumerTestBase { assertBodyIsExpected(events); verify(consumer).commit(); } + @Test public void testProcessFullBatch() throws Exception { source.configure(context); @@ -268,6 +292,7 @@ public class TestJMSSource extends JMSMessageConsumerTestBase { assertBodyIsExpected(events); verify(consumer).commit(); } + @Test public void testProcessNoEvents() throws Exception { when(messageConsumer.receive(anyLong())).thenReturn(null); @@ -277,6 +302,7 @@ public class TestJMSSource extends JMSMessageConsumerTestBase { Assert.assertEquals(0, events.size()); verify(consumer).commit(); } + @Test public void testProcessPartialBatch() throws Exception { when(messageConsumer.receiveNoWait()).thenReturn(message, (Message)null); @@ -287,6 +313,7 @@ public class TestJMSSource extends JMSMessageConsumerTestBase { assertBodyIsExpected(events); verify(consumer).commit(); } + @SuppressWarnings("unchecked") @Test public void testProcessChannelProcessorThrowsChannelException() throws Exception { @@ -297,6 +324,7 @@ public class TestJMSSource extends JMSMessageConsumerTestBase { Assert.assertEquals(Status.BACKOFF, source.process()); verify(consumer).rollback(); } + @SuppressWarnings("unchecked") @Test public void testProcessChannelProcessorThrowsError() throws Exception { @@ -312,6 +340,7 @@ public class TestJMSSource extends JMSMessageConsumerTestBase { } verify(consumer).rollback(); } + @Test public void testProcessReconnect() throws Exception { source.configure(context); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java index affac03..b72c532 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java @@ -16,9 +16,9 @@ */ package org.apache.flume.source.kafka; +import kafka.admin.AdminUtils; import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; -import kafka.admin.AdminUtils; import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.apache.commons.io.FileUtils; @@ -70,8 +70,9 @@ public class KafkaSourceEmbeddedKafka { props.put("host.name", "localhost"); props.put("port", String.valueOf(serverPort)); props.put("log.dir", dir.getAbsolutePath()); - if (properties != null) + if (properties != null) { props.putAll(props); + } KafkaConfig config = new KafkaConfig(props); kafkaServer = new KafkaServerStartable(config); kafkaServer.startup(); @@ -134,12 +135,12 @@ public class KafkaSourceEmbeddedKafka { // Create a ZooKeeper client int sessionTimeoutMs = 10000; int connectionTimeoutMs = 10000; - ZkClient zkClient = ZkUtils.createZkClient(HOST + ":" + zkPort, sessionTimeoutMs, connectionTimeoutMs); + ZkClient zkClient = + ZkUtils.createZkClient(HOST + ":" + zkPort, sessionTimeoutMs, connectionTimeoutMs); ZkUtils zkUtils = ZkUtils.apply(zkClient, false); int replicationFactor = 1; Properties topicConfig = new Properties(); - AdminUtils.createTopic(zkUtils, topicName, numPartitions, - replicationFactor, topicConfig); + AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig); } } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java index db144c2..f04fc64 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java @@ -31,7 +31,6 @@ public class KafkaSourceEmbeddedZookeeper { private NIOServerCnxnFactory factory; File dir; - public KafkaSourceEmbeddedZookeeper(int zkPort) { int tickTime = 2000; http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java index b4250de..1598741 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java @@ -17,29 +17,18 @@ package org.apache.flume.source.kafka; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.*; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.regex.Pattern; - import com.google.common.base.Charsets; import com.google.common.collect.Lists; import junit.framework.Assert; import kafka.common.TopicExistsException; - import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.flume.*; +import org.apache.flume.ChannelException; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.FlumeException; import org.apache.flume.PollableSource.Status; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.source.avro.AvroFlumeEvent; @@ -52,11 +41,36 @@ import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.flume.source.kafka.KafkaSourceConstants.*; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Pattern; + +import static org.apache.flume.source.kafka.KafkaSourceConstants.AVRO_EVENT; +import static org.apache.flume.source.kafka.KafkaSourceConstants.BATCH_DURATION_MS; +import static org.apache.flume.source.kafka.KafkaSourceConstants.BATCH_SIZE; +import static org.apache.flume.source.kafka.KafkaSourceConstants.BOOTSTRAP_SERVERS; +import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_AUTO_COMMIT; +import static org.apache.flume.source.kafka.KafkaSourceConstants.KAFKA_CONSUMER_PREFIX; +import static org.apache.flume.source.kafka.KafkaSourceConstants.OLD_GROUP_ID; +import static org.apache.flume.source.kafka.KafkaSourceConstants.PARTITION_HEADER; +import static org.apache.flume.source.kafka.KafkaSourceConstants.TIMESTAMP_HEADER; +import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC; +import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS; +import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS_REGEX; +import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC_HEADER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; public class TestKafkaSource { - private static final Logger log = - LoggerFactory.getLogger(TestKafkaSource.class); + private static final Logger log = LoggerFactory.getLogger(TestKafkaSource.class); private KafkaSource kafkaSource; private KafkaSourceEmbeddedKafka kafkaServer; @@ -243,10 +257,10 @@ public class TestKafkaSource { } @SuppressWarnings("unchecked") - @Test(expected= FlumeException.class) - public void testNonExistingKafkaServer() throws EventDeliveryException, - SecurityException, NoSuchFieldException, IllegalArgumentException, - IllegalAccessException, InterruptedException { + @Test(expected = FlumeException.class) + public void testNonExistingKafkaServer() throws EventDeliveryException, SecurityException, + NoSuchFieldException, IllegalArgumentException, + IllegalAccessException, InterruptedException { context.put(TOPICS, topic0); context.put(BOOTSTRAP_SERVERS,"blabla:666"); kafkaSource.configure(context); @@ -258,8 +272,7 @@ public class TestKafkaSource { } @Test - public void testBatchTime() throws InterruptedException, - EventDeliveryException { + public void testBatchTime() throws InterruptedException, EventDeliveryException { context.put(TOPICS, topic0); context.put(BATCH_DURATION_MS, "250"); kafkaSource.configure(context); @@ -267,7 +280,7 @@ public class TestKafkaSource { Thread.sleep(500L); - for (int i=1; i<5000; i++) { + for (int i = 1; i < 5000; i++) { kafkaServer.produce(topic0, "", "hello, world " + i); } Thread.sleep(500L); @@ -277,8 +290,7 @@ public class TestKafkaSource { Status status = kafkaSource.process(); long endTime = System.currentTimeMillis(); assertEquals(Status.READY, status); - assertTrue(endTime - startTime < - (context.getLong(BATCH_DURATION_MS) + error)); + assertTrue(endTime - startTime < (context.getLong(BATCH_DURATION_MS) + error)); } // Consume event, stop source, start again and make sure we are not @@ -302,13 +314,11 @@ public class TestKafkaSource { kafkaSource.start(); Thread.sleep(500L); Assert.assertEquals(Status.BACKOFF, kafkaSource.process()); - } // Remove channel processor and test if we can consume events again @Test - public void testNonCommit() throws EventDeliveryException, - InterruptedException { + public void testNonCommit() throws EventDeliveryException, InterruptedException { context.put(TOPICS, topic0); context.put(BATCH_SIZE,"1"); context.put(BATCH_DURATION_MS,"30000"); @@ -328,15 +338,11 @@ public class TestKafkaSource { log.debug("re-process to good channel - this should work"); kafkaSource.process(); - Assert.assertEquals("hello, world", new String(events.get(0).getBody(), - Charsets.UTF_8)); - - + Assert.assertEquals("hello, world", new String(events.get(0).getBody(), Charsets.UTF_8)); } @Test - public void testTwoBatches() throws InterruptedException, - EventDeliveryException { + public void testTwoBatches() throws InterruptedException, EventDeliveryException { context.put(TOPICS, topic0); context.put(BATCH_SIZE,"1"); context.put(BATCH_DURATION_MS, "30000"); @@ -348,20 +354,17 @@ public class TestKafkaSource { Thread.sleep(500L); kafkaSource.process(); - Assert.assertEquals("event 1", new String(events.get(0).getBody(), - Charsets.UTF_8)); + Assert.assertEquals("event 1", new String(events.get(0).getBody(), Charsets.UTF_8)); events.clear(); kafkaServer.produce(topic0, "", "event 2"); Thread.sleep(500L); kafkaSource.process(); - Assert.assertEquals("event 2", new String(events.get(0).getBody(), - Charsets.UTF_8)); + Assert.assertEquals("event 2", new String(events.get(0).getBody(), Charsets.UTF_8)); } @Test - public void testTwoBatchesWithAutocommit() throws InterruptedException, - EventDeliveryException { + public void testTwoBatchesWithAutocommit() throws InterruptedException, EventDeliveryException { context.put(TOPICS, topic0); context.put(BATCH_SIZE,"1"); context.put(BATCH_DURATION_MS,"30000"); @@ -374,23 +377,20 @@ public class TestKafkaSource { Thread.sleep(500L); kafkaSource.process(); - Assert.assertEquals("event 1", new String(events.get(0).getBody(), - Charsets.UTF_8)); + Assert.assertEquals("event 1", new String(events.get(0).getBody(), Charsets.UTF_8)); events.clear(); kafkaServer.produce(topic0, "", "event 2"); Thread.sleep(500L); kafkaSource.process(); - Assert.assertEquals("event 2", new String(events.get(0).getBody(), - Charsets.UTF_8)); - + Assert.assertEquals("event 2", new String(events.get(0).getBody(), Charsets.UTF_8)); } @SuppressWarnings("unchecked") @Test - public void testNullKey() throws EventDeliveryException, - SecurityException, NoSuchFieldException, IllegalArgumentException, - IllegalAccessException, InterruptedException { + public void testNullKey() throws EventDeliveryException, SecurityException, NoSuchFieldException, + IllegalArgumentException, IllegalAccessException, + InterruptedException { context.put(TOPICS, topic0); context.put(BATCH_SIZE, "1"); kafkaSource.configure(context); @@ -406,8 +406,7 @@ public class TestKafkaSource { Assert.assertEquals(Status.BACKOFF, kafkaSource.process()); Assert.assertEquals(1, events.size()); - Assert.assertEquals("hello, world", new String(events.get(0).getBody(), - Charsets.UTF_8)); + Assert.assertEquals("hello, world", new String(events.get(0).getBody(), Charsets.UTF_8)); } @Test @@ -430,7 +429,8 @@ public class TestKafkaSource { public void testKafkaProperties() { Context context = new Context(); context.put(TOPICS, "test1, test2"); - context.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG, "override.default.group.id"); + context.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG, + "override.default.group.id"); context.put(KAFKA_CONSUMER_PREFIX + "fake.property", "kafka.property.value"); context.put(BOOTSTRAP_SERVERS, "real-bootstrap-servers-list"); context.put(KAFKA_CONSUMER_PREFIX + "bootstrap.servers", "bad-bootstrap-servers-list"); @@ -439,21 +439,17 @@ public class TestKafkaSource { Properties kafkaProps = source.getConsumerProps(); //check that we have defaults set - assertEquals( - String.valueOf(DEFAULT_AUTO_COMMIT), - kafkaProps.getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); + assertEquals(String.valueOf(DEFAULT_AUTO_COMMIT), + kafkaProps.getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); //check that kafka properties override the default and get correct name - assertEquals( - "override.default.group.id", - kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG)); + assertEquals("override.default.group.id", + kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG)); //check that any kafka property gets in - assertEquals( - "kafka.property.value", - kafkaProps.getProperty("fake.property")); + assertEquals("kafka.property.value", + kafkaProps.getProperty("fake.property")); //check that documented property overrides defaults - assertEquals( - "real-bootstrap-servers-list", - kafkaProps.getProperty("bootstrap.servers")); + assertEquals("real-bootstrap-servers-list", + kafkaProps.getProperty("bootstrap.servers")); } @Test @@ -469,22 +465,16 @@ public class TestKafkaSource { KafkaSource.Subscriber<List<String>> subscriber = source.getSubscriber(); //check topic was set - assertEquals( - "old.topic", - subscriber.get().get(0)); + assertEquals("old.topic", subscriber.get().get(0)); //check that kafka old properties override the default and get correct name - assertEquals( - "old.groupId", - kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG)); + assertEquals("old.groupId", kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG)); source = new KafkaSource(); context.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG, "override.old.group.id"); source.doConfigure(context); kafkaProps = source.getConsumerProps(); //check that kafka new properties override old - assertEquals( - "override.old.group.id", - kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG)); + assertEquals("override.old.group.id", kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG)); context.clear(); context.put(BOOTSTRAP_SERVERS, "real-bootstrap-servers-list"); @@ -493,9 +483,8 @@ public class TestKafkaSource { source.doConfigure(context); kafkaProps = source.getConsumerProps(); //check defaults set - assertEquals( - KafkaSourceConstants.DEFAULT_GROUP_ID, - kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG)); + assertEquals(KafkaSourceConstants.DEFAULT_GROUP_ID, + kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG)); } @Test @@ -568,8 +557,7 @@ public class TestKafkaSource { Event event = events.get(0); - Assert.assertEquals("hello, world", new String(event.getBody(), - Charsets.UTF_8)); + Assert.assertEquals("hello, world", new String(event.getBody(), Charsets.UTF_8)); Assert.assertEquals("value1", e.getHeaders().get("header1")); Assert.assertEquals("value2", e.getHeaders().get("header2")); @@ -577,8 +565,7 @@ public class TestKafkaSource { event = events.get(1); - Assert.assertEquals("hello, world2", new String(event.getBody(), - Charsets.UTF_8)); + Assert.assertEquals("hello, world2", new String(event.getBody(), Charsets.UTF_8)); Assert.assertEquals("value1", e.getHeaders().get("header1")); Assert.assertEquals("value2", e.getHeaders().get("header2")); @@ -603,7 +590,6 @@ public class TestKafkaSource { }).when(channelProcessor).processEventBatch(any(List.class)); return channelProcessor; - } ChannelProcessor createBadChannel() { http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java index 1896883..bcfe4bb 100644 --- a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java +++ b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java @@ -19,20 +19,6 @@ package org.apache.flume.source.taildir; -import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.*; -import static org.junit.Assert.*; - -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.flume.Event; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - import com.google.common.base.Charsets; import com.google.common.base.Throwables; import com.google.common.collect.HashBasedTable; @@ -41,6 +27,22 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.collect.Table; import com.google.common.io.Files; +import org.apache.flume.Event; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants + .BYTE_OFFSET_HEADER_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class TestTaildirEventReader { private File tmpDir; @@ -85,7 +87,8 @@ public class TestTaildirEventReader { } private ReliableTaildirEventReader getReader(boolean addByteOffset) { - Map<String, String> filePaths = ImmutableMap.of("testFiles", tmpDir.getAbsolutePath() + "/file.*"); + Map<String, String> filePaths = ImmutableMap.of("testFiles", + tmpDir.getAbsolutePath() + "/file.*"); Table<String, String, String> headerTable = HashBasedTable.create(); return getReader(filePaths, headerTable, addByteOffset); } @@ -472,7 +475,8 @@ public class TestTaildirEventReader { @Test public void testNewLineBoundaries() throws IOException { File f1 = new File(tmpDir, "file1"); - Files.write("file1line1\nfile1line2\rfile1line2\nfile1line3\r\nfile1line4\n", f1, Charsets.UTF_8); + Files.write("file1line1\nfile1line2\rfile1line2\nfile1line3\r\nfile1line4\n", + f1, Charsets.UTF_8); ReliableTaildirEventReader reader = getReader(); List<String> out = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirMatcher.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirMatcher.java b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirMatcher.java index 4bff841..c341054 100644 --- a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirMatcher.java +++ b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirMatcher.java @@ -33,7 +33,10 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class TestTaildirMatcher { private File tmpDir; @@ -55,10 +58,10 @@ public class TestTaildirMatcher { */ private void append(String fileName) throws IOException { File f; - if(!files.containsKey(fileName)){ + if (!files.containsKey(fileName)) { f = new File(tmpDir, fileName); files.put(fileName, f); - }else{ + } else { f = files.get(fileName); } Files.append(fileName + "line\n", f, Charsets.UTF_8); @@ -67,7 +70,7 @@ public class TestTaildirMatcher { /** * Translate a list of files to list of filename strings. */ - private static List<String> filesToNames(List<File> origList){ + private static List<String> filesToNames(List<File> origList) { Function<File, String> file2nameFn = new Function<File, String>() { @Override public String apply(File input) { @@ -102,7 +105,9 @@ public class TestTaildirMatcher { append("file0"); append("file1"); - TaildirMatcher tm = new TaildirMatcher("f1", tmpDir.getAbsolutePath() + File.separator + "file.*", isCachingNeeded); + TaildirMatcher tm = new TaildirMatcher("f1", + tmpDir.getAbsolutePath() + File.separator + "file.*", + isCachingNeeded); List<String> files = filesToNames(tm.getMatchingFiles()); assertEquals(msgAlreadyExistingFile, 2, files.size()); assertTrue(msgAlreadyExistingFile, files.contains("file1")); @@ -136,7 +141,9 @@ public class TestTaildirMatcher { append("file0"); append("file1"); - TaildirMatcher tm = new TaildirMatcher("f1", tmpDir.getAbsolutePath() + File.separator + "file.*", false); + TaildirMatcher tm = new TaildirMatcher("f1", + tmpDir.getAbsolutePath() + File.separator + "file.*", + false); List<String> files = filesToNames(tm.getMatchingFiles()); assertEquals(msgAlreadyExistingFile, 2, files.size()); assertTrue(msgAlreadyExistingFile, files.contains("file1")); @@ -167,7 +174,9 @@ public class TestTaildirMatcher { @Test public void testEmtpyDirMatching() throws Exception { - TaildirMatcher tm = new TaildirMatcher("empty", tmpDir.getAbsolutePath() + File.separator + ".*", isCachingNeeded); + TaildirMatcher tm = new TaildirMatcher("empty", + tmpDir.getAbsolutePath() + File.separator + ".*", + isCachingNeeded); List<File> files = tm.getMatchingFiles(); assertNotNull(msgEmptyDir, files); assertTrue(msgEmptyDir, files.isEmpty()); @@ -175,7 +184,10 @@ public class TestTaildirMatcher { @Test public void testNoMatching() throws Exception { - TaildirMatcher tm = new TaildirMatcher("nomatch", tmpDir.getAbsolutePath() + File.separator + "abracadabra_nonexisting", isCachingNeeded); + TaildirMatcher tm = new TaildirMatcher( + "nomatch", + tmpDir.getAbsolutePath() + File.separator + "abracadabra_nonexisting", + isCachingNeeded); List<File> files = tm.getMatchingFiles(); assertNotNull(msgNoMatch, files); assertTrue(msgNoMatch, files.isEmpty()); @@ -183,7 +195,8 @@ public class TestTaildirMatcher { @Test(expected = IllegalStateException.class) public void testNonExistingDir() { - TaildirMatcher tm = new TaildirMatcher("exception", "/abracadabra/doesntexist/.*", isCachingNeeded); + TaildirMatcher tm = new TaildirMatcher("exception", "/abracadabra/doesntexist/.*", + isCachingNeeded); } @Test @@ -191,7 +204,8 @@ public class TestTaildirMatcher { new File(tmpDir, "outerFile").createNewFile(); new File(tmpDir, "recursiveDir").mkdir(); new File(tmpDir + File.separator + "recursiveDir", "innerFile").createNewFile(); - TaildirMatcher tm = new TaildirMatcher("f1", tmpDir.getAbsolutePath() + File.separator + ".*", isCachingNeeded); + TaildirMatcher tm = new TaildirMatcher("f1", tmpDir.getAbsolutePath() + File.separator + ".*", + isCachingNeeded); List<String> files = filesToNames(tm.getMatchingFiles()); assertEquals(msgSubDirs, 1, files.size()); @@ -207,9 +221,13 @@ public class TestTaildirMatcher { append("c.log.yyyy.MM-02"); // Tail a.log and b.log - TaildirMatcher tm1 = new TaildirMatcher("ab", tmpDir.getAbsolutePath() + File.separator + "[ab].log", isCachingNeeded); + TaildirMatcher tm1 = new TaildirMatcher("ab", + tmpDir.getAbsolutePath() + File.separator + "[ab].log", + isCachingNeeded); // Tail files that starts with c.log - TaildirMatcher tm2 = new TaildirMatcher("c", tmpDir.getAbsolutePath() + File.separator + "c.log.*", isCachingNeeded); + TaildirMatcher tm2 = new TaildirMatcher("c", + tmpDir.getAbsolutePath() + File.separator + "c.log.*", + isCachingNeeded); List<String> files1 = filesToNames(tm1.getMatchingFiles()); List<String> files2 = filesToNames(tm2.getMatchingFiles()); @@ -217,11 +235,16 @@ public class TestTaildirMatcher { assertEquals(2, files1.size()); assertEquals(2, files2.size()); // Make sure we got every file - assertTrue("Regex pattern for ab should have matched a.log file", files1.contains("a.log")); - assertFalse("Regex pattern for ab should NOT have matched a.log.1 file", files1.contains("a.log.1")); - assertTrue("Regex pattern for ab should have matched b.log file", files1.contains("b.log")); - assertTrue("Regex pattern for c should have matched c.log.yyyy-MM-01 file", files2.contains("c.log.yyyy.MM-01")); - assertTrue("Regex pattern for c should have matched c.log.yyyy-MM-02 file", files2.contains("c.log.yyyy.MM-02")); + assertTrue("Regex pattern for ab should have matched a.log file", + files1.contains("a.log")); + assertFalse("Regex pattern for ab should NOT have matched a.log.1 file", + files1.contains("a.log.1")); + assertTrue("Regex pattern for ab should have matched b.log file", + files1.contains("b.log")); + assertTrue("Regex pattern for c should have matched c.log.yyyy-MM-01 file", + files2.contains("c.log.yyyy.MM-01")); + assertTrue("Regex pattern for c should have matched c.log.yyyy-MM-02 file", + files2.contains("c.log.yyyy.MM-02")); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java index f6289cd..e090b74 100644 --- a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java +++ b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java @@ -17,14 +17,9 @@ package org.apache.flume.source.taildir; -import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.*; -import static org.junit.Assert.*; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; +import com.google.common.io.Files; import org.apache.flume.Channel; import org.apache.flume.ChannelSelector; import org.apache.flume.Context; @@ -40,9 +35,21 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import com.google.common.base.Charsets; -import com.google.common.collect.Lists; -import com.google.common.io.Files; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILE_GROUPS; +import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants + .FILE_GROUPS_PREFIX; +import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.HEADERS_PREFIX; +import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.POSITION_FILE; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class TestTaildirSource { static TaildirSource source; @@ -274,10 +281,11 @@ public class TestTaildirSource { line1b, line2b, line3b, // file2 line1d, line2d, line3d, // file4 line1c, line2c, line3c // file3 - ); - for(int i =0; i!=expected.size(); ++i) { - expected.set(i, expected.get(i).trim() ); + ); + for (int i = 0; i != expected.size(); ++i) { + expected.set(i, expected.get(i).trim()); } - assertArrayEquals("Files not consumed in expected order", expected.toArray(), consumedOrder.toArray()); + assertArrayEquals("Files not consumed in expected order", expected.toArray(), + consumedOrder.toArray()); } } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestFileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestFileChannel.java b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestFileChannel.java index 4a80b8c..6a98292 100644 --- a/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestFileChannel.java +++ b/flume-ng-tests/src/test/java/org/apache/flume/test/agent/TestFileChannel.java @@ -18,14 +18,8 @@ */ package org.apache.flume.test.agent; -import java.io.File; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Properties; -import java.util.concurrent.TimeUnit; -import java.util.regex.Pattern; - +import com.google.common.base.Charsets; +import com.google.common.io.Files; import org.apache.flume.test.util.StagedInstall; import org.apache.log4j.Logger; import org.junit.After; @@ -33,9 +27,11 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import com.google.common.base.Charsets; -import com.google.common.base.Splitter; -import com.google.common.io.Files; +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Properties; +import java.util.concurrent.TimeUnit; public class TestFileChannel { @@ -48,61 +44,57 @@ public class TestFileChannel { @Before public void setUp() throws Exception { - /* Create 3 temp dirs, each used as value within agentProps */ - - final File sinkOutputDir = Files.createTempDir(); - tempResources.add(sinkOutputDir); - final String sinkOutputDirPath = sinkOutputDir.getCanonicalPath(); - LOGGER.info("Created rolling file sink's output dir: " - + sinkOutputDirPath); - - final File channelCheckpointDir = Files.createTempDir(); - tempResources.add(channelCheckpointDir); - final String channelCheckpointDirPath = channelCheckpointDir - .getCanonicalPath(); - LOGGER.info("Created file channel's checkpoint dir: " - + channelCheckpointDirPath); - - final File channelDataDir = Files.createTempDir(); - tempResources.add(channelDataDir); - final String channelDataDirPath = channelDataDir.getCanonicalPath(); - LOGGER.info("Created file channel's data dir: " - + channelDataDirPath); - - /* Build props to pass to flume agent */ - - Properties agentProps = new Properties(); - - // Active sets - agentProps.put("a1.channels", "c1"); - agentProps.put("a1.sources", "r1"); - agentProps.put("a1.sinks", "k1"); - - // c1 - agentProps.put("a1.channels.c1.type", "FILE"); - agentProps.put("a1.channels.c1.checkpointDir", channelCheckpointDirPath); - agentProps.put("a1.channels.c1.dataDirs", channelDataDirPath); - - // r1 - agentProps.put("a1.sources.r1.channels", "c1"); - agentProps.put("a1.sources.r1.type", "EXEC"); - agentProps.put("a1.sources.r1.command", "seq 1 100"); - - // k1 - agentProps.put("a1.sinks.k1.channel", "c1"); - agentProps.put("a1.sinks.k1.type", "FILE_ROLL"); - agentProps.put("a1.sinks.k1.sink.directory", sinkOutputDirPath); - agentProps.put("a1.sinks.k1.sink.rollInterval", "0"); - - this.agentProps = agentProps; - this.sinkOutputDir = sinkOutputDir; + /* Create 3 temp dirs, each used as value within agentProps */ + + final File sinkOutputDir = Files.createTempDir(); + tempResources.add(sinkOutputDir); + final String sinkOutputDirPath = sinkOutputDir.getCanonicalPath(); + LOGGER.info("Created rolling file sink's output dir: " + sinkOutputDirPath); + + final File channelCheckpointDir = Files.createTempDir(); + tempResources.add(channelCheckpointDir); + final String channelCheckpointDirPath = channelCheckpointDir.getCanonicalPath(); + LOGGER.info("Created file channel's checkpoint dir: " + channelCheckpointDirPath); + + final File channelDataDir = Files.createTempDir(); + tempResources.add(channelDataDir); + final String channelDataDirPath = channelDataDir.getCanonicalPath(); + LOGGER.info("Created file channel's data dir: " + channelDataDirPath); + + /* Build props to pass to flume agent */ + + Properties agentProps = new Properties(); + + // Active sets + agentProps.put("a1.channels", "c1"); + agentProps.put("a1.sources", "r1"); + agentProps.put("a1.sinks", "k1"); + + // c1 + agentProps.put("a1.channels.c1.type", "FILE"); + agentProps.put("a1.channels.c1.checkpointDir", channelCheckpointDirPath); + agentProps.put("a1.channels.c1.dataDirs", channelDataDirPath); + + // r1 + agentProps.put("a1.sources.r1.channels", "c1"); + agentProps.put("a1.sources.r1.type", "EXEC"); + agentProps.put("a1.sources.r1.command", "seq 1 100"); + + // k1 + agentProps.put("a1.sinks.k1.channel", "c1"); + agentProps.put("a1.sinks.k1.type", "FILE_ROLL"); + agentProps.put("a1.sinks.k1.sink.directory", sinkOutputDirPath); + agentProps.put("a1.sinks.k1.sink.rollInterval", "0"); + + this.agentProps = agentProps; + this.sinkOutputDir = sinkOutputDir; } @After public void tearDown() throws Exception { StagedInstall.getInstance().stopAgent(); for (File tempResource : tempResources) { - tempResource.delete(); + tempResource.delete(); } agentProps = null; } @@ -110,7 +102,7 @@ public class TestFileChannel { /** * File channel in/out test. Verifies that all events inserted into the * file channel are received by the sink in order. - * + * <p> * The EXEC source creates 100 events where the event bodies have * sequential numbers. The source puts those events into the file channel, * and the FILE_ROLL The sink is expected to take all 100 events in FIFO @@ -119,38 +111,36 @@ public class TestFileChannel { * @throws Exception */ @Test - public void testInOut() throws Exception { - LOGGER.debug("testInOut() started."); - - StagedInstall.getInstance().startAgent("a1", agentProps); - TimeUnit.SECONDS.sleep(10); // Wait for source and sink to finish - // TODO make this more deterministic - - /* Create expected output */ - - StringBuffer sb = new StringBuffer(); - for (int i = 1; i <= 100; i++) { - sb.append(i).append("\n"); - } - String expectedOutput = sb.toString(); - LOGGER.info("Created expected output: " + expectedOutput); - - /* Create actual output file */ - - File[] sinkOutputDirChildren = sinkOutputDir.listFiles(); - // Only 1 file should be in FILE_ROLL sink's dir (rolling is disabled) - Assert.assertEquals("Expected FILE_ROLL sink's dir to have only 1 child," + - " but found " + sinkOutputDirChildren.length + " children.", - 1, sinkOutputDirChildren.length); - File actualOutput = sinkOutputDirChildren[0]; - - if (!Files.toString(actualOutput, Charsets.UTF_8).equals(expectedOutput)) { - LOGGER.error("Actual output doesn't match expected output.\n"); - throw new AssertionError("FILE_ROLL sink's actual output doesn't " + - "match expected output."); - } - - LOGGER.debug("testInOut() ended."); - } + public void testInOut() throws Exception { + LOGGER.debug("testInOut() started."); + StagedInstall.getInstance().startAgent("a1", agentProps); + TimeUnit.SECONDS.sleep(10); // Wait for source and sink to finish + // TODO make this more deterministic + + /* Create expected output */ + StringBuffer sb = new StringBuffer(); + for (int i = 1; i <= 100; i++) { + sb.append(i).append("\n"); + } + String expectedOutput = sb.toString(); + LOGGER.info("Created expected output: " + expectedOutput); + + /* Create actual output file */ + + File[] sinkOutputDirChildren = sinkOutputDir.listFiles(); + // Only 1 file should be in FILE_ROLL sink's dir (rolling is disabled) + Assert.assertEquals("Expected FILE_ROLL sink's dir to have only 1 child," + + " but found " + sinkOutputDirChildren.length + " children.", + 1, sinkOutputDirChildren.length); + File actualOutput = sinkOutputDirChildren[0]; + + if (!Files.toString(actualOutput, Charsets.UTF_8).equals(expectedOutput)) { + LOGGER.error("Actual output doesn't match expected output.\n"); + throw new AssertionError("FILE_ROLL sink's actual output doesn't " + + "match expected output."); + } + + LOGGER.debug("testInOut() ended."); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java ---------------------------------------------------------------------- diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java b/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java index 973ff4a..51194b6 100644 --- a/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java +++ b/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java @@ -18,6 +18,14 @@ */ package org.apache.flume.test.util; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.io.Files; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.log4j.Logger; + import java.io.File; import java.io.FileFilter; import java.io.FileInputStream; @@ -29,18 +37,8 @@ import java.net.Socket; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.TimeUnit; import java.util.zip.GZIPInputStream; -import com.google.common.base.Preconditions; -import com.google.common.io.Files; -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; -import org.apache.log4j.Logger; - -import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableList; - /** * Attempts to setup a staged install using explicitly specified tar-ball @@ -73,7 +71,7 @@ public class StagedInstall { private static StagedInstall INSTANCE; - public synchronized static StagedInstall getInstance() throws Exception { + public static synchronized StagedInstall getInstance() throws Exception { if (INSTANCE == null) { INSTANCE = new StagedInstall(); } @@ -124,8 +122,7 @@ public class StagedInstall { if (process != null) { throw new Exception("A process is already running"); } - LOGGER.info("Starting process for agent: " + agentName + " using config: " - + properties); + LOGGER.info("Starting process for agent: " + agentName + " using config: " + properties); File configFile = createConfigurationFile(agentName, properties); configFilePath = configFile.getCanonicalPath(); @@ -252,7 +249,7 @@ public class StagedInstall { File[] listBaseDirs = stageDir.listFiles(); if (listBaseDirs != null && listBaseDirs.length == 1 && listBaseDirs[0].isDirectory()) { - rootDir =listBaseDirs[0]; + rootDir = listBaseDirs[0]; } baseDir = rootDir; @@ -417,7 +414,6 @@ public class StagedInstall { if (testFile.exists() && testFile.isDirectory()) { LOGGER.info("Found candidate dir: " + testFile.getCanonicalPath()); File[] candidateFiles = testFile.listFiles(new FileFilter() { - @Override public boolean accept(File pathname) { String name = pathname.getName(); @@ -426,7 +422,8 @@ public class StagedInstall { return true; } return false; - }}); + } + }); // There should be at most one if (candidateFiles != null && candidateFiles.length > 0) { @@ -466,22 +463,22 @@ public class StagedInstall { } public static void waitUntilPortOpens(String host, int port, long timeout) - throws IOException, InterruptedException{ + throws IOException, InterruptedException { long startTime = System.currentTimeMillis(); Socket socket; boolean connected = false; //See if port has opened for timeout. - while(System.currentTimeMillis() - startTime < timeout){ - try{ + while (System.currentTimeMillis() - startTime < timeout) { + try { socket = new Socket(host, port); socket.close(); connected = true; break; - } catch (IOException e){ + } catch (IOException e) { Thread.sleep(2000); } } - if(!connected) { + if (!connected) { throw new IOException("Port not opened within specified timeout."); } } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-tests/src/test/java/org/apache/flume/test/util/SyslogAgent.java ---------------------------------------------------------------------- diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/util/SyslogAgent.java b/flume-ng-tests/src/test/java/org/apache/flume/test/util/SyslogAgent.java index 7159549..c908fc1 100644 --- a/flume-ng-tests/src/test/java/org/apache/flume/test/util/SyslogAgent.java +++ b/flume-ng-tests/src/test/java/org/apache/flume/test/util/SyslogAgent.java @@ -60,7 +60,7 @@ public class SyslogAgent { public String toString() { return syslogSourceType; } - }; + } private Properties agentProps; private File sinkOutputDir; @@ -73,7 +73,6 @@ public class SyslogAgent { public SyslogAgent() throws IOException { hostname = "localhost"; - setRandomPort(); } @@ -141,7 +140,7 @@ public class SyslogAgent { while (client == null) { try { client = new BufferedOutputStream(new Socket(hostname, port).getOutputStream()); - } catch(IOException e) { + } catch (IOException e) { if (++numberOfAttempts >= DEFAULT_ATTEMPTS) { throw new AssertionError("Could not connect to source after " + DEFAULT_ATTEMPTS + " attempts with " + DEFAULT_TIMEOUT + " ms timeout."); @@ -206,8 +205,8 @@ public class SyslogAgent { // Only 1 file should be in FILE_ROLL sink's dir (rolling is disabled) File[] sinkOutputDirChildren = sinkOutputDir.listFiles(); Assert.assertEquals("Expected FILE_ROLL sink's dir to have only 1 child," + - " but found " + sinkOutputDirChildren.length + " children.", - 1, sinkOutputDirChildren.length); + " but found " + sinkOutputDirChildren.length + " children.", + 1, sinkOutputDirChildren.length); /* Wait for output file stats to be as expected. */ File outputDirChild = sinkOutputDirChildren[0]; http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java ---------------------------------------------------------------------- diff --git a/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java b/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java index a11126d..9fc5f2c 100644 --- a/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java +++ b/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java @@ -34,7 +34,6 @@ import org.apache.flume.channel.file.WriteOrderOracle; import org.apache.flume.event.EventBuilder; import org.junit.After; import org.junit.AfterClass; -import static org.fest.reflect.core.Reflection.*; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; @@ -47,6 +46,8 @@ import java.util.HashSet; import java.util.Random; import java.util.Set; +import static org.fest.reflect.core.Reflection.field; +import static org.fest.reflect.core.Reflection.method; public class TestFileChannelIntegrityTool { private static File baseDir; @@ -61,7 +62,7 @@ public class TestFileChannelIntegrityTool { private static int invalidEvent = 0; @BeforeClass - public static void setUpClass() throws Exception{ + public static void setUpClass() throws Exception { createDataFiles(); } @@ -74,13 +75,13 @@ public class TestFileChannelIntegrityTool { File[] dataFiles = origDataDir.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { - if(name.contains("lock")) { + if (name.contains("lock")) { return false; } return true; } }); - for(File dataFile : dataFiles) { + for (File dataFile : dataFiles) { Serialization.copyFile(dataFile, new File(dataDir, dataFile.getName())); } } @@ -146,7 +147,7 @@ public class TestFileChannelIntegrityTool { Transaction tx = channel.getTransaction(); tx.begin(); int i = 0; - while(channel.take() != null) { + while (channel.take() != null) { i++; } tx.commit(); @@ -161,7 +162,7 @@ public class TestFileChannelIntegrityTool { File[] files = dataDir.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { - if(name.contains("lock") || name.contains("meta")) { + if (name.contains("lock") || name.contains("meta")) { return false; } return true; @@ -170,15 +171,13 @@ public class TestFileChannelIntegrityTool { Random random = new Random(); int corrupted = 0; for (File dataFile : files) { - LogFile.SequentialReader reader = - new LogFileV3.SequentialReader(dataFile, null, true); + LogFile.SequentialReader reader = new LogFileV3.SequentialReader(dataFile, null, true); RandomAccessFile handle = new RandomAccessFile(dataFile, "rw"); long eventPosition1 = reader.getPosition(); LogRecord rec = reader.next(); //No point corrupting commits, so ignore them - if(rec == null || - rec.getEvent().getClass().getName(). - equals("org.apache.flume.channel.file.Commit")) { + if (rec == null || + rec.getEvent().getClass().getName().equals("org.apache.flume.channel.file.Commit")) { handle.close(); reader.close(); continue; @@ -190,8 +189,7 @@ public class TestFileChannelIntegrityTool { corrupted++; corruptFiles.add(dataFile.getName()); if (rec == null || - rec.getEvent().getClass().getName(). - equals("org.apache.flume.channel.file.Commit")) { + rec.getEvent().getClass().getName().equals("org.apache.flume.channel.file.Commit")) { handle.close(); reader.close(); continue; @@ -231,7 +229,7 @@ public class TestFileChannelIntegrityTool { Transaction tx = channel.getTransaction(); tx.begin(); int i = 0; - while(channel.take() != null) { + while (channel.take() != null) { i++; } tx.commit(); @@ -241,14 +239,14 @@ public class TestFileChannelIntegrityTool { files = dataDir.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { - if(name.contains(".bak")) { + if (name.contains(".bak")) { return true; } return false; } }); Assert.assertEquals(corruptFiles.size(), files.length); - for(File file : files) { + for (File file : files) { String name = file.getName(); name = name.replaceAll(".bak", ""); Assert.assertTrue(corruptFiles.remove(name)); @@ -258,13 +256,13 @@ public class TestFileChannelIntegrityTool { private static void createDataFiles() throws Exception { final byte[] eventData = new byte[2000]; - for(int i = 0; i < 2000; i++) { + for (int i = 0; i < 2000; i++) { eventData[i] = 1; } WriteOrderOracle.setSeed(System.currentTimeMillis()); event = EventBuilder.withBody(eventData); baseDir = Files.createTempDir(); - if(baseDir.exists()) { + if (baseDir.exists()) { FileUtils.deleteDirectory(baseDir); } baseDir = Files.createTempDir(); @@ -286,7 +284,7 @@ public class TestFileChannelIntegrityTool { Transaction tx = channel.getTransaction(); tx.begin(); for (int i = 0; i < 5; i++) { - if(i % 3 == 0) { + if (i % 3 == 0) { event.getBody()[0] = 0; invalidEvent++; } else { @@ -297,22 +295,19 @@ public class TestFileChannelIntegrityTool { tx.commit(); tx.close(); } - Log log = field("log") - .ofType(Log.class) - .in(channel) - .get(); + Log log = field("log").ofType(Log.class) + .in(channel) + .get(); Assert.assertTrue("writeCheckpoint returned false", - method("writeCheckpoint") - .withReturnType(Boolean.class) - .withParameterTypes(Boolean.class) - .in(log) - .invoke(true)); + method("writeCheckpoint").withReturnType(Boolean.class) + .withParameterTypes(Boolean.class) + .in(log) + .invoke(true)); channel.stop(); } public static class DummyEventVerifier implements EventValidator { - private int value = 0; private DummyEventVerifier(int val) { @@ -325,7 +320,6 @@ public class TestFileChannelIntegrityTool { } public static class Builder implements EventValidator.Builder { - private int binaryValidator = 0; @Override http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 85c0dc8..b50693e 100644 --- a/pom.xml +++ b/pom.xml @@ -672,9 +672,7 @@ limitations under the License. <suppressionsLocation>flume/checkstyle-suppressions.xml</suppressionsLocation> <suppressionsFileExpression>checkstyle.suppressions.file</suppressionsFileExpression> <encoding>UTF-8</encoding> - <consoleOutput>true</consoleOutput> - <failsOnError>true</failsOnError> - <includeTestSourceDirectory>false</includeTestSourceDirectory> + <includeTestSourceDirectory>true</includeTestSourceDirectory> <linkXRef>false</linkXRef> </configuration> <goals> @@ -1474,7 +1472,7 @@ limitations under the License. <suppressionsLocation>flume/checkstyle-suppressions.xml</suppressionsLocation> <suppressionsFileExpression>checkstyle.suppressions.file</suppressionsFileExpression> <encoding>UTF-8</encoding> - <includeTestSourceDirectory>false</includeTestSourceDirectory> + <includeTestSourceDirectory>true</includeTestSourceDirectory> <linkXRef>false</linkXRef> </configuration> </plugin>
