FLUME-2941. Integrate checkstyle for test classes Also make test code conform to style guidelines.
Additionally, this patch makes style violations fatal to the build. This patch is whitespace-only from a code perspective. After stripping line numbers, the generated test bytecode before and after these changes is identical. Code review: https://reviews.apache.org/r/49830/ Reviewed by Hari. Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/cfbf1156 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/cfbf1156 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/cfbf1156 Branch: refs/heads/trunk Commit: cfbf1156858af9ae26975fefc94594d91c8cd3f4 Parents: c8c0f9b Author: Mike Percy <[email protected]> Authored: Wed Jun 29 21:18:20 2016 -0700 Committer: Mike Percy <[email protected]> Committed: Fri Jul 8 15:48:26 2016 -0700 ---------------------------------------------------------------------- flume-checkstyle/pom.xml | 8 - .../resources/flume/checkstyle-suppressions.xml | 18 +- .../src/main/resources/flume/checkstyle.xml | 2 +- .../flume/auth/TestFlumeAuthenticator.java | 16 +- .../flume/channel/file/CountingSinkRunner.java | 22 +- .../channel/file/CountingSourceRunner.java | 21 +- .../flume/channel/file/TestCheckpoint.java | 4 +- .../file/TestEventQueueBackingStoreFactory.java | 140 ++++--- .../flume/channel/file/TestEventUtils.java | 4 +- .../flume/channel/file/TestFileChannel.java | 139 ++++--- .../file/TestFileChannelFormatRegression.java | 21 +- .../channel/file/TestFileChannelRestart.java | 208 +++++----- .../channel/file/TestFileChannelRollback.java | 16 +- .../flume/channel/file/TestFlumeEventQueue.java | 200 +++++----- .../flume/channel/file/TestIntegration.java | 33 +- .../org/apache/flume/channel/file/TestLog.java | 308 +++++++++------ .../apache/flume/channel/file/TestLogFile.java | 153 ++++---- .../file/TestTransactionEventRecordV2.java | 10 +- .../file/TestTransactionEventRecordV3.java | 34 +- .../apache/flume/channel/file/TestUtils.java | 122 +++--- .../encryption/CipherProviderTestSuite.java | 5 + .../file/encryption/EncryptionTestUtils.java | 95 +++-- .../encryption/TestAESCTRNoPaddingProvider.java | 10 +- .../encryption/TestFileChannelEncryption.java | 98 +++-- .../file/encryption/TestJCEFileKeyProvider.java | 69 ++-- .../jdbc/BaseJdbcChannelProviderTest.java | 28 +- .../apache/flume/channel/jdbc/MockEvent.java | 3 +- .../flume/channel/jdbc/MockEventUtils.java | 22 +- .../jdbc/TestDerbySchemaHandlerQueries.java | 2 - .../flume/channel/kafka/TestKafkaChannel.java | 67 ++-- .../channel/TestSpillableMemoryChannel.java | 386 +++++++++---------- .../TestLoadBalancingLog4jAppender.java | 51 ++- .../log4jappender/TestLog4jAppender.java | 32 +- .../TestLog4jAppenderWithAvro.java | 5 +- .../AbstractBasicChannelSemanticsTest.java | 7 +- .../flume/channel/TestChannelProcessor.java | 32 +- .../apache/flume/channel/TestMemoryChannel.java | 40 +- .../channel/TestMemoryChannelConcurrency.java | 100 ++--- .../channel/TestMemoryChannelTransaction.java | 12 +- .../TestReliableSpoolingFileEventReader.java | 201 +++++----- .../flume/formatter/output/TestBucketPath.java | 52 +-- .../TestMonitoredCounterGroup.java | 50 +-- .../http/TestHTTPMetricsServer.java | 24 +- .../kafka/KafkaSourceCounterTest.java | 78 ++-- ...gexExtractorInterceptorMillisSerializer.java | 10 +- ...tractorInterceptorPassThroughSerializer.java | 3 +- .../TestSearchAndReplaceInterceptor.java | 2 +- .../SyslogAvroEventSerializer.java | 45 ++- .../TestAvroEventDeserializer.java | 1 + .../TestDurablePositionTracker.java | 3 +- .../TestFlumeEventAvroEventSerializer.java | 24 +- .../TestResettableFileInputStream.java | 26 +- .../TestSyslogAvroEventSerializer.java | 2 +- .../org/apache/flume/sink/TestAvroSink.java | 87 ++--- .../flume/sink/TestDefaultSinkFactory.java | 3 +- .../flume/sink/TestFailoverSinkProcessor.java | 10 +- .../sink/TestLoadBalancingSinkProcessor.java | 60 ++- .../apache/flume/sink/TestRollingFileSink.java | 30 +- .../org/apache/flume/sink/TestThriftSink.java | 11 +- .../source/TestAbstractPollableSource.java | 24 +- .../org/apache/flume/source/TestAvroSource.java | 34 +- .../org/apache/flume/source/TestExecSource.java | 336 ++++++++-------- .../source/TestMultiportSyslogTCPSource.java | 1 + .../apache/flume/source/TestNetcatSource.java | 79 ++-- .../source/TestSequenceGeneratorSource.java | 8 +- .../flume/source/TestSpoolDirectorySource.java | 42 +- .../apache/flume/source/TestStressSource.java | 2 +- .../apache/flume/source/TestSyslogParser.java | 12 +- .../flume/source/TestSyslogTcpSource.java | 12 +- .../flume/source/TestSyslogUdpSource.java | 19 +- .../apache/flume/source/TestSyslogUtils.java | 162 ++++---- .../apache/flume/source/TestThriftSource.java | 29 +- .../http/FlumeHttpServletRequestWrapper.java | 4 +- .../flume/source/http/TestHTTPSource.java | 153 ++++---- .../flume/tools/TestTimestampRoundDownUtil.java | 10 +- .../org/apache/flume/tools/TestVersionInfo.java | 4 +- .../flume/agent/embedded/TestEmbeddedAgent.java | 24 +- .../TestEmbeddedAgentConfiguration.java | 18 +- .../TestEmbeddedAgentEmbeddedSource.java | 15 +- .../agent/embedded/TestEmbeddedAgentState.java | 30 +- .../source/avroLegacy/TestLegacyAvroSource.java | 27 +- .../thriftLegacy/TestThriftLegacySource.java | 30 +- .../node/TestAbstractConfigurationProvider.java | 71 ++-- ...tAbstractZooKeeperConfigurationProvider.java | 25 +- .../org/apache/flume/node/TestApplication.java | 2 +- ...lingPropertiesFileConfigurationProvider.java | 1 - ...TestPropertiesFileConfigurationProvider.java | 24 +- .../apache/flume/source/TestNetcatSource.java | 38 +- .../java/org/apache/flume/api/RpcTestUtils.java | 77 ++-- .../apache/flume/api/TestFailoverRpcClient.java | 4 +- .../flume/api/TestLoadBalancingRpcClient.java | 107 ++--- .../flume/api/TestNettyAvroRpcClient.java | 26 +- .../apache/flume/api/TestThriftRpcClient.java | 41 +- .../apache/flume/api/ThriftTestingSource.java | 27 +- .../apache/flume/sink/kite/TestDatasetSink.java | 79 ++-- .../flume/sink/hdfs/HDFSTestSeqWriter.java | 16 +- .../apache/flume/sink/hdfs/MockDataStream.java | 4 +- .../apache/flume/sink/hdfs/MockFileSystem.java | 23 +- .../flume/sink/hdfs/MockFsDataOutputStream.java | 12 +- .../apache/flume/sink/hdfs/MockHDFSWriter.java | 3 +- .../flume/sink/hdfs/TestBucketWriter.java | 274 ++++++------- .../flume/sink/hdfs/TestHDFSEventSink.java | 127 +++--- .../hdfs/TestSequenceFileSerializerFactory.java | 3 +- .../apache/flume/sink/hive/TestHiveSink.java | 21 +- .../apache/flume/sink/hive/TestHiveWriter.java | 50 ++- .../org/apache/flume/sink/hive/TestUtil.java | 33 +- .../org/apache/flume/sink/irc/TestIRCSink.java | 11 +- .../AbstractElasticSearchSinkTest.java | 25 +- ...ElasticSearchIndexRequestBuilderFactory.java | 30 +- ...estElasticSearchLogStashEventSerializer.java | 82 ++-- .../elasticsearch/TestElasticSearchSink.java | 52 +-- .../TestElasticSearchSinkCreation.java | 2 +- .../client/RoundRobinListTest.java | 3 +- .../client/TestElasticSearchClientFactory.java | 10 +- .../client/TestElasticSearchRestClient.java | 35 +- .../hbase/IncrementAsyncHBaseSerializer.java | 6 +- .../flume/sink/hbase/TestAsyncHBaseSink.java | 114 +++--- .../apache/flume/sink/hbase/TestHBaseSink.java | 129 ++++--- .../hbase/TestRegexHbaseEventSerializer.java | 37 +- .../apache/flume/sink/kafka/TestKafkaSink.java | 89 ++--- .../flume/sink/kafka/util/KafkaLocal.java | 57 ++- .../flume/sink/kafka/util/ZooKeeperLocal.java | 75 ++-- .../solr/morphline/TestBlobDeserializer.java | 10 +- .../morphline/TestMorphlineInterceptor.java | 57 +-- .../solr/morphline/TestMorphlineSolrSink.java | 19 +- .../source/jms/JMSMessageConsumerTestBase.java | 13 +- .../jms/TestDefaultJMSMessageConverter.java | 2 +- .../source/jms/TestIntegrationActiveMQ.java | 23 +- .../source/jms/TestJMSMessageConsumer.java | 3 +- .../apache/flume/source/jms/TestJMSSource.java | 53 ++- .../source/kafka/KafkaSourceEmbeddedKafka.java | 11 +- .../kafka/KafkaSourceEmbeddedZookeeper.java | 1 - .../flume/source/kafka/TestKafkaSource.java | 152 ++++---- .../source/taildir/TestTaildirEventReader.java | 36 +- .../source/taildir/TestTaildirMatcher.java | 57 ++- .../flume/source/taildir/TestTaildirSource.java | 38 +- .../flume/test/agent/TestFileChannel.java | 178 ++++----- .../apache/flume/test/util/StagedInstall.java | 39 +- .../org/apache/flume/test/util/SyslogAgent.java | 9 +- .../tools/TestFileChannelIntegrityTool.java | 54 ++- pom.xml | 6 +- 141 files changed, 3523 insertions(+), 3423 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-checkstyle/pom.xml ---------------------------------------------------------------------- diff --git a/flume-checkstyle/pom.xml b/flume-checkstyle/pom.xml index 31db3c0..74ebf6b 100644 --- a/flume-checkstyle/pom.xml +++ b/flume-checkstyle/pom.xml @@ -21,14 +21,6 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - <!-- - <parent> - <artifactId>flume-parent</artifactId> - <groupId>org.apache.flume</groupId> - <version>1.7.0-SNAPSHOT</version> - </parent> - --> - <groupId>org.apache.flume</groupId> <artifactId>flume-checkstyle</artifactId> <name>Flume checkstyle project</name> http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-checkstyle/src/main/resources/flume/checkstyle-suppressions.xml ---------------------------------------------------------------------- diff --git a/flume-checkstyle/src/main/resources/flume/checkstyle-suppressions.xml b/flume-checkstyle/src/main/resources/flume/checkstyle-suppressions.xml index 49c8834..2642baa 100644 --- a/flume-checkstyle/src/main/resources/flume/checkstyle-suppressions.xml +++ b/flume-checkstyle/src/main/resources/flume/checkstyle-suppressions.xml @@ -12,21 +12,29 @@ <suppress checks="PackageName" files="org/apache/flume/source/avroLegacy|org/apache/flume/source/thriftLegacy"/> + <!-- Allow unicode escapes in tests --> + <suppress checks="AvoidEscapedUnicodeCharacters" + files="Test.*\.java"/> + <!-- TODO: Rearrange methods in below classes to keep overloaded methods adjacent --> <suppress checks="OverloadMethodsDeclarationOrder" - files="channel/file|RpcClientFactory\.java|BucketPath\.java|SinkGroup\.java|DefaultSinkProcessor\.java|RegexExtractorInterceptorMillisSerializer\.java|SimpleAsyncHbaseEventSerializer\.java|hdfs/BucketWriter\.java"/> + files="channel/file|RpcClientFactory\.java|BucketPath\.java|SinkGroup\.java|DefaultSinkProcessor\.java|RegexExtractorInterceptorMillisSerializer\.java|SimpleAsyncHbaseEventSerializer\.java|hdfs/BucketWriter\.java|AbstractBasicChannelSemanticsTest\.java"/> <!-- TODO: Fix inner class names to follow standard convention --> <suppress checks="TypeName" files="SyslogUDPSource\.java|SyslogTcpSource\.java|TaildirSource\.java"/> + <!-- TODO: Method names must follow standard Java naming conventions --> + <suppress checks="MethodNameCheck" + files="TestBucketWriter\.java|TestSyslogUtils\.java"/> + <!-- TODO: Add default cases to switch statements --> <suppress checks="MissingSwitchDefault" - files="SyslogUtils\.java|ReliableTaildirEventReader\.java"/> + files="SyslogUtils\.java|ReliableTaildirEventReader\.java|AbstractBasicChannelSemanticsTest\.java"/> <!-- TODO: Avoid empty catch blocks --> <suppress checks="EmptyCatchBlock" - files="channel/file/LogFile\.java"/> + files="channel/file/LogFile\.java|TestDatasetSink\.java|CountingSourceRunner\.java|CountingSinkRunner\.java|TestKafkaChannel\.java|TestTaildirSource\.java|TestChannelProcessor\.java|TestHiveSink\.java|AbstractBasicChannelSemanticsTest\.java|TestJMSSource\.java|TestEmbeddedAgent\.java|TestAsyncHBaseSink\.java"/> <!-- TODO: Avoid empty if blocks --> <suppress checks="EmptyBlockCheck" @@ -34,10 +42,10 @@ <!-- TODO: Fix line length issues --> <suppress checks="LineLengthCheck" - files="channel/MemoryChannel\.java|ReliableSpoolingFileEventReader\.java"/> + files="channel/MemoryChannel\.java|ReliableSpoolingFileEventReader\.java|TestAvroSink\.java"/> <!-- TODO: Move helper classes to their own files --> <suppress checks="OneTopLevelClass" - files="KafkaSource\.java|KafkaChannel\.java|KafkaSink\.java"/> + files="KafkaSource\.java|KafkaChannel\.java|KafkaSink\.java|TestElasticSearchSink\.java"/> </suppressions> http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-checkstyle/src/main/resources/flume/checkstyle.xml ---------------------------------------------------------------------- diff --git a/flume-checkstyle/src/main/resources/flume/checkstyle.xml b/flume-checkstyle/src/main/resources/flume/checkstyle.xml index e8913f0..fdbcb5d 100644 --- a/flume-checkstyle/src/main/resources/flume/checkstyle.xml +++ b/flume-checkstyle/src/main/resources/flume/checkstyle.xml @@ -18,7 +18,7 @@ <module name = "Checker"> <property name="charset" value="UTF-8"/> - <property name="severity" value="warning"/> + <property name="severity" value="error"/> <property name="fileExtensions" value="java, properties, xml"/> <!-- Checks for whitespace --> http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java ---------------------------------------------------------------------- diff --git a/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java b/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java index 5a8860d..0dc8872 100644 --- a/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java +++ b/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java @@ -17,15 +17,19 @@ */ package org.apache.flume.auth; -import java.io.File; -import java.io.IOException; -import java.util.Properties; - import org.apache.hadoop.minikdc.MiniKdc; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import static org.junit.Assert.*; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class TestFlumeAuthenticator { @@ -132,7 +136,7 @@ public class TestFlumeAuthenticator { String principal = "flume"; File keytab = new File(workDir, "flume2.keytab"); kdc.createPrincipal(keytab, principal); - String expResult = principal+"@" + kdc.getRealm(); + String expResult = principal + "@" + kdc.getRealm(); // Clear the previous statically stored logged in credentials FlumeAuthenticationUtil.clearCredentials(); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSinkRunner.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSinkRunner.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSinkRunner.java index 0733dc4..a303994 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSinkRunner.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSinkRunner.java @@ -18,11 +18,10 @@ */ package org.apache.flume.channel.file; -import java.util.List; - +import com.google.common.collect.Lists; import org.apache.flume.Sink; -import com.google.common.collect.Lists; +import java.util.List; public class CountingSinkRunner extends Thread { private int count; @@ -30,39 +29,46 @@ public class CountingSinkRunner extends Thread { private final Sink sink; private volatile boolean run; private final List<Exception> errors = Lists.newArrayList(); + public CountingSinkRunner(Sink sink) { this(sink, Integer.MAX_VALUE); } + public CountingSinkRunner(Sink sink, int until) { this.sink = sink; this.until = until; } + @Override public void run() { run = true; - while(run && count < until) { + while (run && count < until) { boolean error = true; try { - if(Sink.Status.READY.equals(sink.process())) { + if (Sink.Status.READY.equals(sink.process())) { count++; error = false; } - } catch(Exception ex) { + } catch (Exception ex) { errors.add(ex); } - if(error) { + if (error) { try { Thread.sleep(1000L); - } catch (InterruptedException e) {} + } catch (InterruptedException e) { + } } } } + public void shutdown() { run = false; } + public int getCount() { return count; } + public List<Exception> getErrors() { return errors; } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSourceRunner.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSourceRunner.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSourceRunner.java index b6abc35..1119990 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSourceRunner.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/CountingSourceRunner.java @@ -33,19 +33,23 @@ public class CountingSourceRunner extends Thread { private final PollableSource source; private volatile boolean run; private final List<Exception> errors = Lists.newArrayList(); + public CountingSourceRunner(PollableSource source) { this(source, Integer.MAX_VALUE); } + public CountingSourceRunner(PollableSource source, int until) { this(source, until, null); } + public CountingSourceRunner(PollableSource source, Channel channel) { this(source, Integer.MAX_VALUE, channel); } + public CountingSourceRunner(PollableSource source, int until, Channel channel) { this.source = source; this.until = until; - if(channel != null) { + if (channel != null) { ReplicatingChannelSelector selector = new ReplicatingChannelSelector(); List<Channel> channels = Lists.newArrayList(); channels.add(channel); @@ -53,32 +57,37 @@ public class CountingSourceRunner extends Thread { this.source.setChannelProcessor(new ChannelProcessor(selector)); } } + @Override public void run() { run = true; - while(run && count < until) { + while (run && count < until) { boolean error = true; try { - if(PollableSource.Status.READY.equals(source.process())) { + if (PollableSource.Status.READY.equals(source.process())) { count++; error = false; } - } catch(Exception ex) { + } catch (Exception ex) { errors.add(ex); } - if(error) { + if (error) { try { Thread.sleep(1000L); - } catch (InterruptedException e) {} + } catch (InterruptedException e) { + } } } } + public void shutdown() { run = false; } + public int getCount() { return count; } + public List<Exception> getErrors() { return errors; } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java index c1de12e..cd1dcd9 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java @@ -28,11 +28,11 @@ import org.junit.Before; import org.junit.Test; public class TestCheckpoint { - File file; File inflightPuts; File inflightTakes; File queueSet; + @Before public void setup() throws IOException { file = File.createTempFile("Checkpoint", ""); @@ -42,10 +42,12 @@ public class TestCheckpoint { Assert.assertTrue(file.isFile()); Assert.assertTrue(file.canWrite()); } + @After public void cleanup() { file.delete(); } + @Test public void testSerialization() throws Exception { EventQueueBackingStore backingStore = http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java index 52c706d..0939454 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java @@ -18,31 +18,29 @@ */ package org.apache.flume.channel.file; -import java.io.DataInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - +import com.google.common.collect.Lists; +import com.google.common.io.Files; +import com.google.protobuf.InvalidProtocolBufferException; import junit.framework.Assert; - import org.apache.commons.io.FileUtils; +import org.apache.flume.channel.file.proto.ProtosFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; -import com.google.common.collect.Lists; -import com.google.common.io.Files; -import com.google.protobuf.InvalidProtocolBufferException; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; import java.io.FileOutputStream; +import java.io.IOException; import java.io.RandomAccessFile; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Random; -import org.apache.flume.channel.file.proto.ProtosFactory; public class TestEventQueueBackingStoreFactory { - static final List<Long> pointersInTestCheckpoint = Arrays.asList(new Long[] { + static final List<Long> pointersInTestCheckpoint = Arrays.asList(new Long[]{ 8589936804L, 4294969563L, 12884904153L, @@ -59,6 +57,7 @@ public class TestEventQueueBackingStoreFactory { File inflightTakes; File inflightPuts; File queueSetDir; + @Before public void setup() throws IOException { baseDir = Files.createTempDir(); @@ -67,42 +66,46 @@ public class TestEventQueueBackingStoreFactory { inflightPuts = new File(baseDir, "puts"); queueSetDir = new File(baseDir, "queueset"); TestUtils.copyDecompressed("fileformat-v2-checkpoint.gz", checkpoint); - } + @After public void teardown() { FileUtils.deleteQuietly(baseDir); } + @Test public void testWithNoFlag() throws Exception { verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test"), - Serialization.VERSION_3, pointersInTestCheckpoint); + Serialization.VERSION_3, pointersInTestCheckpoint); } + @Test public void testWithFlag() throws Exception { verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test", true), - Serialization.VERSION_3, pointersInTestCheckpoint); + Serialization.VERSION_3, pointersInTestCheckpoint); } + @Test public void testNoUprade() throws Exception { verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test", false), - Serialization.VERSION_2, pointersInTestCheckpoint); + Serialization.VERSION_2, pointersInTestCheckpoint); } - @Test (expected = BadCheckpointException.class) + + @Test(expected = BadCheckpointException.class) public void testDecreaseCapacity() throws Exception { Assert.assertTrue(checkpoint.delete()); - EventQueueBackingStore backingStore = EventQueueBackingStoreFactory. - get(checkpoint, 10, "test"); + EventQueueBackingStore backingStore = + EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); backingStore.close(); EventQueueBackingStoreFactory.get(checkpoint, 9, "test"); Assert.fail(); } - @Test (expected = BadCheckpointException.class) + @Test(expected = BadCheckpointException.class) public void testIncreaseCapacity() throws Exception { Assert.assertTrue(checkpoint.delete()); - EventQueueBackingStore backingStore = EventQueueBackingStoreFactory. - get(checkpoint, 10, "test"); + EventQueueBackingStore backingStore = + EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); backingStore.close(); EventQueueBackingStoreFactory.get(checkpoint, 11, "test"); Assert.fail(); @@ -112,22 +115,21 @@ public class TestEventQueueBackingStoreFactory { public void testNewCheckpoint() throws Exception { Assert.assertTrue(checkpoint.delete()); verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test", false), - Serialization.VERSION_3, Collections.<Long>emptyList()); + Serialization.VERSION_3, Collections.<Long>emptyList()); } - @Test (expected = BadCheckpointException.class) + @Test(expected = BadCheckpointException.class) public void testCheckpointBadVersion() throws Exception { - RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw"); + RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw"); try { - EventQueueBackingStore backingStore = EventQueueBackingStoreFactory. - get(checkpoint, 10, "test"); - backingStore.close(); - writer.seek( - EventQueueBackingStoreFile.INDEX_VERSION * Serialization.SIZE_OF_LONG); - writer.writeLong(94L); - writer.getFD().sync(); + EventQueueBackingStore backingStore = + EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + backingStore.close(); + writer.seek(EventQueueBackingStoreFile.INDEX_VERSION * Serialization.SIZE_OF_LONG); + writer.writeLong(94L); + writer.getFD().sync(); - backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); } finally { writer.close(); } @@ -138,15 +140,13 @@ public class TestEventQueueBackingStoreFactory { RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw"); try { - EventQueueBackingStore backingStore = EventQueueBackingStoreFactory. - get(checkpoint, 10, "test"); - backingStore.close(); - writer.seek( - EventQueueBackingStoreFile.INDEX_CHECKPOINT_MARKER * - Serialization.SIZE_OF_LONG); - writer.writeLong(EventQueueBackingStoreFile.CHECKPOINT_INCOMPLETE); - writer.getFD().sync(); - backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + EventQueueBackingStore backingStore = + EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + backingStore.close(); + writer.seek(EventQueueBackingStoreFile.INDEX_CHECKPOINT_MARKER * Serialization.SIZE_OF_LONG); + writer.writeLong(EventQueueBackingStoreFile.CHECKPOINT_INCOMPLETE); + writer.getFD().sync(); + backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); } finally { writer.close(); } @@ -156,12 +156,10 @@ public class TestEventQueueBackingStoreFactory { public void testCheckpointVersionNotEqualToMeta() throws Exception { RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw"); try { - EventQueueBackingStore backingStore = EventQueueBackingStoreFactory. - get(checkpoint, 10, "test"); + EventQueueBackingStore backingStore = + EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); backingStore.close(); - writer.seek( - EventQueueBackingStoreFile.INDEX_VERSION - * Serialization.SIZE_OF_LONG); + writer.seek(EventQueueBackingStoreFile.INDEX_VERSION * Serialization.SIZE_OF_LONG); writer.writeLong(2L); writer.getFD().sync(); backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); @@ -174,8 +172,8 @@ public class TestEventQueueBackingStoreFactory { public void testCheckpointVersionNotEqualToMeta2() throws Exception { FileOutputStream os = null; try { - EventQueueBackingStore backingStore = EventQueueBackingStoreFactory. - get(checkpoint, 10, "test"); + EventQueueBackingStore backingStore = + EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); backingStore.close(); Assert.assertTrue(checkpoint.exists()); Assert.assertTrue(Serialization.getMetaDataFile(checkpoint).length() != 0); @@ -183,8 +181,7 @@ public class TestEventQueueBackingStoreFactory { ProtosFactory.Checkpoint meta = ProtosFactory.Checkpoint.parseDelimitedFrom(is); Assert.assertNotNull(meta); is.close(); - os = new FileOutputStream( - Serialization.getMetaDataFile(checkpoint)); + os = new FileOutputStream(Serialization.getMetaDataFile(checkpoint)); meta.toBuilder().setVersion(2).build().writeDelimitedTo(os); os.flush(); backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); @@ -197,12 +194,10 @@ public class TestEventQueueBackingStoreFactory { public void testCheckpointOrderIdNotEqualToMeta() throws Exception { RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw"); try { - EventQueueBackingStore backingStore = EventQueueBackingStoreFactory. - get(checkpoint, 10, "test"); + EventQueueBackingStore backingStore = + EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); backingStore.close(); - writer.seek( - EventQueueBackingStoreFile.INDEX_WRITE_ORDER_ID - * Serialization.SIZE_OF_LONG); + writer.seek(EventQueueBackingStoreFile.INDEX_WRITE_ORDER_ID * Serialization.SIZE_OF_LONG); writer.writeLong(2L); writer.getFD().sync(); backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); @@ -215,8 +210,8 @@ public class TestEventQueueBackingStoreFactory { public void testCheckpointOrderIdNotEqualToMeta2() throws Exception { FileOutputStream os = null; try { - EventQueueBackingStore backingStore = EventQueueBackingStoreFactory. - get(checkpoint, 10, "test"); + EventQueueBackingStore backingStore = + EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); backingStore.close(); Assert.assertTrue(checkpoint.exists()); Assert.assertTrue(Serialization.getMetaDataFile(checkpoint).length() != 0); @@ -225,7 +220,7 @@ public class TestEventQueueBackingStoreFactory { Assert.assertNotNull(meta); is.close(); os = new FileOutputStream( - Serialization.getMetaDataFile(checkpoint)); + Serialization.getMetaDataFile(checkpoint)); meta.toBuilder().setWriteOrderID(1).build().writeDelimitedTo(os); os.flush(); backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); @@ -234,11 +229,10 @@ public class TestEventQueueBackingStoreFactory { } } - @Test(expected = BadCheckpointException.class) public void testTruncateMeta() throws Exception { - EventQueueBackingStore backingStore = EventQueueBackingStoreFactory. - get(checkpoint, 10, "test"); + EventQueueBackingStore backingStore = + EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); backingStore.close(); Assert.assertTrue(checkpoint.exists()); File metaFile = Serialization.getMetaDataFile(checkpoint); @@ -250,10 +244,10 @@ public class TestEventQueueBackingStoreFactory { backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); } - @Test (expected = InvalidProtocolBufferException.class) + @Test(expected = InvalidProtocolBufferException.class) public void testCorruptMeta() throws Throwable { - EventQueueBackingStore backingStore = EventQueueBackingStoreFactory. - get(checkpoint, 10, "test"); + EventQueueBackingStore backingStore = + EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); backingStore.close(); Assert.assertTrue(checkpoint.exists()); File metaFile = Serialization.getMetaDataFile(checkpoint); @@ -270,17 +264,13 @@ public class TestEventQueueBackingStoreFactory { } } - - - private void verify(EventQueueBackingStore backingStore, long expectedVersion, - List<Long> expectedPointers) - throws Exception { - FlumeEventQueue queue = new FlumeEventQueue(backingStore, inflightTakes, - inflightPuts, queueSetDir); + List<Long> expectedPointers) throws Exception { + FlumeEventQueue queue = + new FlumeEventQueue(backingStore, inflightTakes, inflightPuts, queueSetDir); List<Long> actualPointers = Lists.newArrayList(); FlumeEventPointer ptr; - while((ptr = queue.removeHead(0L)) != null) { + while ((ptr = queue.removeHead(0L)) != null) { actualPointers.add(ptr.toLong()); } Assert.assertEquals(expectedPointers, actualPointers); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventUtils.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventUtils.java index c72e3f2..26f9cae 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventUtils.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventUtils.java @@ -28,7 +28,7 @@ public class TestEventUtils { @Test public void testPutEvent() { FlumeEvent event = new FlumeEvent(null, new byte[5]); - Put put = new Put(1l, 1l, event); + Put put = new Put(1L, 1L, event); Event returnEvent = EventUtils.getEventFromTransactionEvent(put); Assert.assertNotNull(returnEvent); Assert.assertEquals(5, returnEvent.getBody().length); @@ -36,7 +36,7 @@ public class TestEventUtils { @Test public void testInvalidEvent() { - Take take = new Take(1l, 1l); + Take take = new Take(1L, 1L); Event returnEvent = EventUtils.getEventFromTransactionEvent(take); Assert.assertNull(returnEvent); } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java index bb22e26..bfc2d0d 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java @@ -18,8 +18,22 @@ */ package org.apache.flume.channel.file; -import static org.apache.flume.channel.file.TestUtils.*; -import static org.fest.reflect.core.Reflection.*; +import com.google.common.base.Throwables; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.flume.ChannelException; +import org.apache.flume.Event; +import org.apache.flume.Transaction; +import org.apache.flume.channel.file.FileChannel.FileBackedTransaction; +import org.apache.flume.channel.file.FlumeEventQueue.InflightEventWrapper; +import org.apache.flume.conf.Configurables; +import org.apache.flume.event.EventBuilder; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.FilenameFilter; @@ -32,7 +46,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -41,23 +54,15 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import org.apache.flume.ChannelException; -import org.apache.flume.Event; -import org.apache.flume.Transaction; -import org.apache.flume.conf.Configurables; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Throwables; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.flume.channel.file.FileChannel.FileBackedTransaction; -import org.apache.flume.channel.file.FlumeEventQueue.InflightEventWrapper; -import org.apache.flume.event.EventBuilder; +import static org.apache.flume.channel.file.TestUtils.compareInputAndOut; +import static org.apache.flume.channel.file.TestUtils.consumeChannel; +import static org.apache.flume.channel.file.TestUtils.fillChannel; +import static org.apache.flume.channel.file.TestUtils.forceCheckpoint; +import static org.apache.flume.channel.file.TestUtils.putEvents; +import static org.apache.flume.channel.file.TestUtils.putWithoutCommit; +import static org.apache.flume.channel.file.TestUtils.takeEvents; +import static org.apache.flume.channel.file.TestUtils.takeWithoutCommit; +import static org.fest.reflect.core.Reflection.field; public class TestFileChannel extends TestFileChannelBase { @@ -68,6 +73,7 @@ public class TestFileChannel extends TestFileChannelBase { public void setup() throws Exception { super.setup(); } + @After public void teardown() { super.teardown(); @@ -146,23 +152,22 @@ public class TestFileChannel extends TestFileChannelBase { //Simulate multiple sources, so separate thread - txns are thread local, //so a new txn wont be created here unless it is in a different thread. final CountDownLatch latch = new CountDownLatch(1); - Executors.newSingleThreadExecutor().submit( - new Runnable() { - @Override - public void run() { - Transaction tx = channel.getTransaction(); - input.addAll(putWithoutCommit(channel, tx, "failAfterPut", 3)); - try { - latch.await(); - tx.commit(); - } catch (InterruptedException e) { - tx.rollback(); - Throwables.propagate(e); - } finally { - tx.close(); - } - } - }); + Executors.newSingleThreadExecutor().submit(new Runnable() { + @Override + public void run() { + Transaction tx = channel.getTransaction(); + input.addAll(putWithoutCommit(channel, tx, "failAfterPut", 3)); + try { + latch.await(); + tx.commit(); + } catch (InterruptedException e) { + tx.rollback(); + Throwables.propagate(e); + } finally { + tx.close(); + } + } + }); forceCheckpoint(channel); tx.commit(); tx.close(); @@ -198,7 +203,7 @@ public class TestFileChannel extends TestFileChannelBase { Assert.assertTrue(channel.isOpen()); Set<String> in = Sets.newHashSet(); try { - while(true) { + while (true) { in.addAll(putEvents(channel, "reconfig", 1, 1)); } } catch (ChannelException e) { @@ -206,12 +211,13 @@ public class TestFileChannel extends TestFileChannelBase { + "This might be the result of a sink on the channel having too " + "low of batch size, a downstream system running slower than " + "normal, or that the channel capacity is just too low. [channel=" - + channel.getName()+"]", e.getMessage()); + + channel.getName() + "]", e.getMessage()); } Configurables.configure(channel, createContext()); Set<String> out = takeEvents(channel, 1, Integer.MAX_VALUE); compareInputAndOut(in, out); } + @Test public void testPut() throws Exception { channel.start(); @@ -225,6 +231,7 @@ public class TestFileChannel extends TestFileChannelBase { Set<String> actual = takeEvents(channel, 1); compareInputAndOut(expected, actual); } + @Test public void testCommitAfterNoPutTake() throws Exception { channel.start(); @@ -246,6 +253,7 @@ public class TestFileChannel extends TestFileChannelBase { transaction.commit(); transaction.close(); } + @Test public void testCapacity() throws Exception { Map<String, String> overrides = Maps.newHashMap(); @@ -270,6 +278,7 @@ public class TestFileChannel extends TestFileChannelBase { // ensure we the events back Assert.assertEquals(5, takeEvents(channel, 1, 5).size()); } + /** * This test is here to make sure we can replay a full queue * when we have a PUT with a lower txid than the take which @@ -287,16 +296,14 @@ public class TestFileChannel extends TestFileChannelBase { // the idea here is we will fill up the channel Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.KEEP_ALIVE, String.valueOf(10L)); - overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10)); - overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY, - String.valueOf(10)); + overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10L)); + overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY, String.valueOf(10L)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); fillChannel(channel, "fillup"); // then do a put which will block but it will be assigned a tx id - Future<String> put = Executors.newSingleThreadExecutor() - .submit(new Callable<String>() { + Future<String> put = Executors.newSingleThreadExecutor().submit(new Callable<String>() { @Override public String call() throws Exception { Set<String> result = putEvents(channel, "blocked-put", 1, 1); @@ -321,6 +328,7 @@ public class TestFileChannel extends TestFileChannelBase { channel.start(); Assert.assertTrue(channel.isOpen()); } + @Test public void testThreaded() throws IOException, InterruptedException { channel.start(); @@ -328,12 +336,9 @@ public class TestFileChannel extends TestFileChannelBase { int numThreads = 10; final CountDownLatch producerStopLatch = new CountDownLatch(numThreads); final CountDownLatch consumerStopLatch = new CountDownLatch(numThreads); - final List<Exception> errors = Collections - .synchronizedList(new ArrayList<Exception>()); - final Set<String> expected = Collections.synchronizedSet( - new HashSet<String>()); - final Set<String> actual = Collections.synchronizedSet( - new HashSet<String>()); + final List<Exception> errors = Collections.synchronizedList(new ArrayList<Exception>()); + final Set<String> expected = Collections.synchronizedSet(new HashSet<String>()); + final Set<String> actual = Collections.synchronizedSet(new HashSet<String>()); for (int i = 0; i < numThreads; i++) { final int id = i; Thread t = new Thread() { @@ -363,15 +368,15 @@ public class TestFileChannel extends TestFileChannelBase { @Override public void run() { try { - while(!producerStopLatch.await(1, TimeUnit.SECONDS) || - expected.size() > actual.size()) { + while (!producerStopLatch.await(1, TimeUnit.SECONDS) || + expected.size() > actual.size()) { if (id % 2 == 0) { actual.addAll(takeEvents(channel, 1, Integer.MAX_VALUE)); } else { actual.addAll(takeEvents(channel, 5, Integer.MAX_VALUE)); } } - if(actual.isEmpty()) { + if (actual.isEmpty()) { LOG.error("Found nothing!"); } else { LOG.info("Completed some takes " + actual.size()); @@ -388,12 +393,13 @@ public class TestFileChannel extends TestFileChannelBase { t.start(); } Assert.assertTrue("Timed out waiting for producers", - producerStopLatch.await(30, TimeUnit.SECONDS)); + producerStopLatch.await(30, TimeUnit.SECONDS)); Assert.assertTrue("Timed out waiting for consumer", - consumerStopLatch.await(30, TimeUnit.SECONDS)); + consumerStopLatch.await(30, TimeUnit.SECONDS)); Assert.assertEquals(Collections.EMPTY_LIST, errors); compareInputAndOut(expected, actual); } + @Test public void testLocking() throws IOException { channel.start(); @@ -403,7 +409,6 @@ public class TestFileChannel extends TestFileChannelBase { Assert.assertTrue(!fileChannel.isOpen()); } - /** * Test contributed by Brock Noland during code review. * @throws Exception @@ -437,11 +442,11 @@ public class TestFileChannel extends TestFileChannelBase { } @Test - public void testPutForceCheckpointCommitReplay() throws Exception{ + public void testPutForceCheckpointCommitReplay() throws Exception { Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(2)); overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY, - String.valueOf(2)); + String.valueOf(2)); overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "10000"); FileChannel channel = createFileChannel(overrides); channel.start(); @@ -578,28 +583,22 @@ public class TestFileChannel extends TestFileChannelBase { testChannelDiesOnCorruptEvent(true); } - @Test - public void testChannelDiesOnCorruptEventNoFsync() throws - Exception { + public void testChannelDiesOnCorruptEventNoFsync() throws Exception { testChannelDiesOnCorruptEvent(false); } - - - private void testChannelDiesOnCorruptEvent(boolean fsyncPerTxn) - throws Exception { + private void testChannelDiesOnCorruptEvent(boolean fsyncPerTxn) throws Exception { Map<String, String> overrides = new HashMap<String, String>(); - overrides.put(FileChannelConfiguration.FSYNC_PER_TXN, - String.valueOf(fsyncPerTxn)); + overrides.put(FileChannelConfiguration.FSYNC_PER_TXN, String.valueOf(fsyncPerTxn)); final FileChannel channel = createFileChannel(overrides); channel.start(); putEvents(channel,"test-corrupt-event",100,100); - for(File dataDir : dataDirs) { + for (File dataDir : dataDirs) { File[] files = dataDir.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { - if(!name.endsWith("meta") && !name.contains("lock")){ + if (!name.endsWith("meta") && !name.contains("lock")) { return true; } return false; @@ -624,7 +623,7 @@ public class TestFileChannel extends TestFileChannelBase { Assert.assertTrue(ex.getMessage().contains("Log is closed")); throw ex; } - if(fsyncPerTxn) { + if (fsyncPerTxn) { Assert.fail(); } else { // The corrupt event must be missing, the rest should be http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java index c95122b..f0638f9 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java @@ -18,14 +18,7 @@ */ package org.apache.flume.channel.file; -import static org.apache.flume.channel.file.TestUtils.*; - -import java.io.File; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - +import com.google.common.collect.Maps; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -33,8 +26,14 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Maps; +import java.io.File; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import static org.apache.flume.channel.file.TestUtils.compareInputAndOut; +import static org.apache.flume.channel.file.TestUtils.takeEvents; public class TestFileChannelFormatRegression extends TestFileChannelBase { protected static final Logger LOG = LoggerFactory @@ -60,8 +59,8 @@ public class TestFileChannelFormatRegression extends TestFileChannelBase { new File(checkpointDir, "checkpoint")); for (int i = 0; i < dataDirs.length; i++) { int fileIndex = i + 1; - TestUtils.copyDecompressed("fileformat-v2-log-"+fileIndex+".gz", - new File(dataDirs[i], "log-" + fileIndex)); + TestUtils.copyDecompressed("fileformat-v2-log-" + fileIndex + ".gz", + new File(dataDirs[i], "log-" + fileIndex)); } Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10)); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java index d5fe6fb..d21f140 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java @@ -55,8 +55,7 @@ import static org.apache.flume.channel.file.TestUtils.takeWithoutCommit; import static org.fest.reflect.core.Reflection.*; public class TestFileChannelRestart extends TestFileChannelBase { - protected static final Logger LOG = LoggerFactory - .getLogger(TestFileChannelRestart.class); + protected static final Logger LOG = LoggerFactory.getLogger(TestFileChannelRestart.class); @Before public void setup() throws Exception { @@ -72,8 +71,8 @@ public class TestFileChannelRestart extends TestFileChannelBase { protected FileChannel createFileChannel(Map<String, String> overrides) { // FLUME-2482, making sure scheduled checkpoint never gets called overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "6000000"); - return TestUtils.createFileChannel(checkpointDir.getAbsolutePath(), - dataDir, backupDir.getAbsolutePath(), overrides); + return TestUtils.createFileChannel(checkpointDir.getAbsolutePath(), dataDir, + backupDir.getAbsolutePath(), overrides); } @Test @@ -116,14 +115,14 @@ public class TestFileChannelRestart extends TestFileChannelBase { } public void doTestRestart(boolean useLogReplayV1, - boolean forceCheckpoint, boolean deleteCheckpoint, - boolean useFastReplay) throws Exception { + boolean forceCheckpoint, boolean deleteCheckpoint, + boolean useFastReplay) throws Exception { Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.USE_LOG_REPLAY_V1, - String.valueOf(useLogReplayV1)); + String.valueOf(useLogReplayV1)); overrides.put( - FileChannelConfiguration.USE_FAST_REPLAY, - String.valueOf(useFastReplay)); + FileChannelConfiguration.USE_FAST_REPLAY, + String.valueOf(useFastReplay)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); @@ -132,7 +131,7 @@ public class TestFileChannelRestart extends TestFileChannelBase { forceCheckpoint(channel); } channel.stop(); - if(deleteCheckpoint) { + if (deleteCheckpoint) { File checkpoint = new File(checkpointDir, "checkpoint"); Assert.assertTrue(checkpoint.delete()); File checkpointMetaData = Serialization.getMetaDataFile(checkpoint); @@ -146,19 +145,17 @@ public class TestFileChannelRestart extends TestFileChannelBase { } @Test - public void testRestartWhenMetaDataExistsButCheckpointDoesNot() throws - Exception { + public void testRestartWhenMetaDataExistsButCheckpointDoesNot() throws Exception { doTestRestartWhenMetaDataExistsButCheckpointDoesNot(false); } @Test - public void testRestartWhenMetaDataExistsButCheckpointDoesNotWithBackup() - throws Exception { + public void testRestartWhenMetaDataExistsButCheckpointDoesNotWithBackup() throws Exception { doTestRestartWhenMetaDataExistsButCheckpointDoesNot(true); } - private void doTestRestartWhenMetaDataExistsButCheckpointDoesNot( - boolean backup) throws Exception { + private void doTestRestartWhenMetaDataExistsButCheckpointDoesNot(boolean backup) + throws Exception { Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup)); channel = createFileChannel(overrides); @@ -167,7 +164,7 @@ public class TestFileChannelRestart extends TestFileChannelBase { Set<String> in = putEvents(channel, "restart", 10, 100); Assert.assertEquals(100, in.size()); forceCheckpoint(channel); - if(backup) { + if (backup) { Thread.sleep(2000); } channel.stop(); @@ -186,19 +183,16 @@ public class TestFileChannelRestart extends TestFileChannelBase { } @Test - public void testRestartWhenCheckpointExistsButMetaDoesNot() throws Exception{ + public void testRestartWhenCheckpointExistsButMetaDoesNot() throws Exception { doTestRestartWhenCheckpointExistsButMetaDoesNot(false); } @Test - public void testRestartWhenCheckpointExistsButMetaDoesNotWithBackup() throws - Exception{ + public void testRestartWhenCheckpointExistsButMetaDoesNotWithBackup() throws Exception { doTestRestartWhenCheckpointExistsButMetaDoesNot(true); } - - private void doTestRestartWhenCheckpointExistsButMetaDoesNot(boolean backup) - throws Exception { + private void doTestRestartWhenCheckpointExistsButMetaDoesNot(boolean backup) throws Exception { Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup)); channel = createFileChannel(overrides); @@ -207,7 +201,7 @@ public class TestFileChannelRestart extends TestFileChannelBase { Set<String> in = putEvents(channel, "restart", 10, 100); Assert.assertEquals(100, in.size()); forceCheckpoint(channel); - if(backup) { + if (backup) { Thread.sleep(2000); } channel.stop(); @@ -235,8 +229,7 @@ public class TestFileChannelRestart extends TestFileChannelBase { doTestRestartWhenNoCheckpointExists(true); } - private void doTestRestartWhenNoCheckpointExists(boolean backup) throws - Exception { + private void doTestRestartWhenNoCheckpointExists(boolean backup) throws Exception { Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup)); channel = createFileChannel(overrides); @@ -245,7 +238,7 @@ public class TestFileChannelRestart extends TestFileChannelBase { Set<String> in = putEvents(channel, "restart", 10, 100); Assert.assertEquals(100, in.size()); forceCheckpoint(channel); - if(backup) { + if (backup) { Thread.sleep(2000); } channel.stop(); @@ -273,7 +266,7 @@ public class TestFileChannelRestart extends TestFileChannelBase { doTestBadCheckpointVersion(true); } - private void doTestBadCheckpointVersion(boolean backup) throws Exception{ + private void doTestBadCheckpointVersion(boolean backup) throws Exception { Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup)); channel = createFileChannel(overrides); @@ -282,14 +275,14 @@ public class TestFileChannelRestart extends TestFileChannelBase { Set<String> in = putEvents(channel, "restart", 10, 100); Assert.assertEquals(100, in.size()); forceCheckpoint(channel); - if(backup) { + if (backup) { Thread.sleep(2000); } channel.stop(); File checkpoint = new File(checkpointDir, "checkpoint"); RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw"); writer.seek(EventQueueBackingStoreFile.INDEX_VERSION * - Serialization.SIZE_OF_LONG); + Serialization.SIZE_OF_LONG); writer.writeLong(2L); writer.getFD().sync(); writer.close(); @@ -311,8 +304,7 @@ public class TestFileChannelRestart extends TestFileChannelBase { doTestBadCheckpointMetaVersion(true); } - private void doTestBadCheckpointMetaVersion(boolean backup) throws - Exception { + private void doTestBadCheckpointMetaVersion(boolean backup) throws Exception { Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup)); channel = createFileChannel(overrides); @@ -321,7 +313,7 @@ public class TestFileChannelRestart extends TestFileChannelBase { Set<String> in = putEvents(channel, "restart", 10, 100); Assert.assertEquals(100, in.size()); forceCheckpoint(channel); - if(backup) { + if (backup) { Thread.sleep(2000); } channel.stop(); @@ -331,7 +323,7 @@ public class TestFileChannelRestart extends TestFileChannelBase { Assert.assertNotNull(meta); is.close(); FileOutputStream os = new FileOutputStream( - Serialization.getMetaDataFile(checkpoint)); + Serialization.getMetaDataFile(checkpoint)); meta.toBuilder().setVersion(2).build().writeDelimitedTo(os); os.flush(); channel = createFileChannel(overrides); @@ -348,13 +340,11 @@ public class TestFileChannelRestart extends TestFileChannelBase { } @Test - public void testDifferingOrderIDCheckpointAndMetaVersionWithBackup() throws - Exception { + public void testDifferingOrderIDCheckpointAndMetaVersionWithBackup() throws Exception { doTestDifferingOrderIDCheckpointAndMetaVersion(true); } - private void doTestDifferingOrderIDCheckpointAndMetaVersion(boolean backup) - throws Exception { + private void doTestDifferingOrderIDCheckpointAndMetaVersion(boolean backup) throws Exception { Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup)); channel = createFileChannel(overrides); @@ -363,7 +353,7 @@ public class TestFileChannelRestart extends TestFileChannelBase { Set<String> in = putEvents(channel, "restart", 10, 100); Assert.assertEquals(100, in.size()); forceCheckpoint(channel); - if(backup) { + if (backup) { Thread.sleep(2000); } channel.stop(); @@ -373,7 +363,7 @@ public class TestFileChannelRestart extends TestFileChannelBase { Assert.assertNotNull(meta); is.close(); FileOutputStream os = new FileOutputStream( - Serialization.getMetaDataFile(checkpoint)); + Serialization.getMetaDataFile(checkpoint)); meta.toBuilder().setWriteOrderID(12).build().writeDelimitedTo(os); os.flush(); channel = createFileChannel(overrides); @@ -385,12 +375,12 @@ public class TestFileChannelRestart extends TestFileChannelBase { } @Test - public void testIncompleteCheckpoint() throws Exception{ + public void testIncompleteCheckpoint() throws Exception { doTestIncompleteCheckpoint(false); } @Test - public void testIncompleteCheckpointWithCheckpoint() throws Exception{ + public void testIncompleteCheckpointWithCheckpoint() throws Exception { doTestIncompleteCheckpoint(true); } @@ -403,14 +393,14 @@ public class TestFileChannelRestart extends TestFileChannelBase { Set<String> in = putEvents(channel, "restart", 10, 100); Assert.assertEquals(100, in.size()); forceCheckpoint(channel); - if(backup) { + if (backup) { Thread.sleep(2000); } channel.stop(); File checkpoint = new File(checkpointDir, "checkpoint"); RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw"); writer.seek(EventQueueBackingStoreFile.INDEX_CHECKPOINT_MARKER - * Serialization.SIZE_OF_LONG); + * Serialization.SIZE_OF_LONG); writer.writeLong(EventQueueBackingStoreFile.CHECKPOINT_INCOMPLETE); writer.getFD().sync(); writer.close(); @@ -443,30 +433,30 @@ public class TestFileChannelRestart extends TestFileChannelBase { } @Test - public void testFastReplayWithCheckpoint() throws Exception{ + public void testFastReplayWithCheckpoint() throws Exception { testFastReplay(false, true); } @Test - public void testFastReplayWithBadCheckpoint() throws Exception{ + public void testFastReplayWithBadCheckpoint() throws Exception { testFastReplay(true, true); } @Test - public void testNoFastReplayWithCheckpoint() throws Exception{ + public void testNoFastReplayWithCheckpoint() throws Exception { testFastReplay(false, false); } @Test - public void testNoFastReplayWithBadCheckpoint() throws Exception{ + public void testNoFastReplayWithBadCheckpoint() throws Exception { testFastReplay(true, false); } - private void testFastReplay(boolean shouldCorruptCheckpoint, - boolean useFastReplay) throws Exception{ + private void testFastReplay(boolean shouldCorruptCheckpoint, boolean useFastReplay) + throws Exception { Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.USE_FAST_REPLAY, - String.valueOf(useFastReplay)); + String.valueOf(useFastReplay)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); @@ -477,7 +467,8 @@ public class TestFileChannelRestart extends TestFileChannelBase { if (shouldCorruptCheckpoint) { File checkpoint = new File(checkpointDir, "checkpoint"); RandomAccessFile writer = new RandomAccessFile( - Serialization.getMetaDataFile(checkpoint), "rw"); + Serialization.getMetaDataFile(checkpoint), + "rw"); writer.seek(10); writer.writeLong(new Random().nextLong()); writer.getFD().sync(); @@ -495,14 +486,13 @@ public class TestFileChannelRestart extends TestFileChannelBase { compareInputAndOut(in, out); } - private void doTestCorruptInflights(String name, - boolean backup) throws Exception { + private void doTestCorruptInflights(String name, boolean backup) throws Exception { Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); - final Set<String> in1 = putEvents(channel, "restart-",10, 100); + final Set<String> in1 = putEvents(channel, "restart-", 10, 100); Assert.assertEquals(100, in1.size()); Executors.newSingleThreadScheduledExecutor().submit(new Runnable() { @Override @@ -516,7 +506,7 @@ public class TestFileChannelRestart extends TestFileChannelBase { Set<String> in2 = putWithoutCommit(channel, tx, "restart", 100); Assert.assertEquals(100, in2.size()); forceCheckpoint(channel); - if(backup) { + if (backup) { Thread.sleep(2000); } tx.commit(); @@ -554,13 +544,12 @@ public class TestFileChannelRestart extends TestFileChannelBase { Set<String> in = putEvents(channel, "restart", 10, 100); Assert.assertEquals(100, in.size()); forceCheckpoint(channel); - if(backup) { + if (backup) { Thread.sleep(2000); } channel.stop(); File checkpoint = new File(checkpointDir, "checkpoint"); - RandomAccessFile writer = new RandomAccessFile( - Serialization.getMetaDataFile(checkpoint), "rw"); + RandomAccessFile writer = new RandomAccessFile(Serialization.getMetaDataFile(checkpoint), "rw"); writer.setLength(0); writer.getFD().sync(); writer.close(); @@ -591,13 +580,12 @@ public class TestFileChannelRestart extends TestFileChannelBase { Set<String> in = putEvents(channel, "restart", 10, 100); Assert.assertEquals(100, in.size()); forceCheckpoint(channel); - if(backup) { + if (backup) { Thread.sleep(2000); } channel.stop(); File checkpoint = new File(checkpointDir, "checkpoint"); - RandomAccessFile writer = new RandomAccessFile( - Serialization.getMetaDataFile(checkpoint), "rw"); + RandomAccessFile writer = new RandomAccessFile(Serialization.getMetaDataFile(checkpoint), "rw"); writer.seek(10); writer.writeLong(new Random().nextLong()); writer.getFD().sync(); @@ -618,11 +606,10 @@ public class TestFileChannelRestart extends TestFileChannelBase { Assert.assertFalse(backupRestored); } } - + //This test will fail without FLUME-1893 @Test - public void testCorruptCheckpointVersionMostSignificant4Bytes() - throws Exception { + public void testCorruptCheckpointVersionMostSignificant4Bytes() throws Exception { Map<String, String> overrides = Maps.newHashMap(); channel = createFileChannel(overrides); channel.start(); @@ -634,8 +621,8 @@ public class TestFileChannelRestart extends TestFileChannelBase { File checkpoint = new File(checkpointDir, "checkpoint"); RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw"); writer.seek(EventQueueBackingStoreFile.INDEX_VERSION * - Serialization.SIZE_OF_LONG); - writer.write(new byte[]{(byte)1, (byte)5}); + Serialization.SIZE_OF_LONG); + writer.write(new byte[] { (byte) 1, (byte) 5 }); writer.getFD().sync(); writer.close(); channel = createFileChannel(overrides); @@ -648,8 +635,7 @@ public class TestFileChannelRestart extends TestFileChannelBase { //This test will fail without FLUME-1893 @Test - public void testCorruptCheckpointCompleteMarkerMostSignificant4Bytes() - throws Exception { + public void testCorruptCheckpointCompleteMarkerMostSignificant4Bytes() throws Exception { Map<String, String> overrides = Maps.newHashMap(); channel = createFileChannel(overrides); channel.start(); @@ -661,8 +647,8 @@ public class TestFileChannelRestart extends TestFileChannelBase { File checkpoint = new File(checkpointDir, "checkpoint"); RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw"); writer.seek(EventQueueBackingStoreFile.INDEX_CHECKPOINT_MARKER * - Serialization.SIZE_OF_LONG); - writer.write(new byte[]{(byte) 1, (byte) 5}); + Serialization.SIZE_OF_LONG); + writer.write(new byte[] { (byte) 1, (byte) 5 }); writer.getFD().sync(); writer.close(); channel = createFileChannel(overrides); @@ -674,8 +660,7 @@ public class TestFileChannelRestart extends TestFileChannelBase { } @Test - public void testWithExtraLogs() - throws Exception { + public void testWithExtraLogs() throws Exception { Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.CAPACITY, "10"); overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY, "10"); @@ -702,27 +687,24 @@ public class TestFileChannelRestart extends TestFileChannelBase { // Make sure the entire channel was not replayed, only the events from the // backup. @Test - public void testBackupUsedEnsureNoFullReplayWithoutCompression() throws - Exception { + public void testBackupUsedEnsureNoFullReplayWithoutCompression() throws Exception { testBackupUsedEnsureNoFullReplay(false); } + @Test - public void testBackupUsedEnsureNoFullReplayWithCompression() throws - Exception { + public void testBackupUsedEnsureNoFullReplayWithCompression() throws Exception { testBackupUsedEnsureNoFullReplay(true); } private void testBackupUsedEnsureNoFullReplay(boolean compressedBackup) - throws Exception { + throws Exception { File dataDir = Files.createTempDir(); File tempBackup = Files.createTempDir(); Map<String, String> overrides = Maps.newHashMap(); - overrides.put(FileChannelConfiguration.DATA_DIRS, - dataDir.getAbsolutePath()); - overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, - "true"); + overrides.put(FileChannelConfiguration.DATA_DIRS, dataDir.getAbsolutePath()); + overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, "true"); overrides.put(FileChannelConfiguration.COMPRESS_BACKUP_CHECKPOINT, - String.valueOf(compressedBackup)); + String.valueOf(compressedBackup)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); @@ -734,8 +716,8 @@ public class TestFileChannelRestart extends TestFileChannelBase { in = putEvents(channel, "restart", 10, 100); takeEvents(channel, 10, 100); Assert.assertEquals(100, in.size()); - for(File file : backupDir.listFiles()) { - if(file.getName().equals(Log.FILE_LOCK)) { + for (File file : backupDir.listFiles()) { + if (file.getName().equals(Log.FILE_LOCK)) { continue; } Files.copy(file, new File(tempBackup, file.getName())); @@ -749,8 +731,8 @@ public class TestFileChannelRestart extends TestFileChannelBase { // tests), so throw away the backup and force the use of an older backup by // bringing in the copy of the last backup before the checkpoint. Serialization.deleteAllFiles(backupDir, Log.EXCLUDES); - for(File file : tempBackup.listFiles()) { - if(file.getName().equals(Log.FILE_LOCK)) { + for (File file : tempBackup.listFiles()) { + if (file.getName().equals(Log.FILE_LOCK)) { continue; } Files.copy(file, new File(backupDir, file.getName())); @@ -782,7 +764,7 @@ public class TestFileChannelRestart extends TestFileChannelBase { Assert.assertTrue(channel.isOpen()); putEvents(channel, prefix, 10, 100); Set<String> origFiles = Sets.newHashSet(); - for(File dir : dataDirs) { + for (File dir : dataDirs) { origFiles.addAll(Lists.newArrayList(dir.list())); } forceCheckpoint(channel); @@ -792,7 +774,7 @@ public class TestFileChannelRestart extends TestFileChannelBase { Set<String> newFiles = Sets.newHashSet(); int olderThanCheckpoint = 0; int totalMetaFiles = 0; - for(File dir : dataDirs) { + for (File dir : dataDirs) { File[] metadataFiles = dir.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { @@ -803,8 +785,8 @@ public class TestFileChannelRestart extends TestFileChannelBase { } }); totalMetaFiles = metadataFiles.length; - for(File metadataFile : metadataFiles) { - if(metadataFile.lastModified() < beforeSecondCheckpoint) { + for (File metadataFile : metadataFiles) { + if (metadataFile.lastModified() < beforeSecondCheckpoint) { olderThanCheckpoint++; } } @@ -824,13 +806,13 @@ public class TestFileChannelRestart extends TestFileChannelBase { takeEvents(channel, 10, 50); forceCheckpoint(channel); newFiles = Sets.newHashSet(); - for(File dir : dataDirs) { + for (File dir : dataDirs) { newFiles.addAll(Lists.newArrayList(dir.list())); } Assert.assertTrue(!newFiles.containsAll(origFiles)); } - @Test (expected = IOException.class) + @Test(expected = IOException.class) public void testSlowBackup() throws Throwable { Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, "true"); @@ -858,10 +840,10 @@ public class TestFileChannelRestart extends TestFileChannelBase { public void testCompressBackup() throws Throwable { Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, - "true"); + "true"); overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "1000"); overrides.put(FileChannelConfiguration.COMPRESS_BACKUP_CHECKPOINT, - "true"); + "true"); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); @@ -873,36 +855,34 @@ public class TestFileChannelRestart extends TestFileChannelBase { Assert.assertTrue(compressedBackupCheckpoint.exists()); - Serialization.decompressFile(compressedBackupCheckpoint, - uncompressedBackupCheckpoint); + Serialization.decompressFile(compressedBackupCheckpoint, uncompressedBackupCheckpoint); File checkpoint = new File(checkpointDir, "checkpoint"); - Assert.assertTrue(FileUtils.contentEquals(checkpoint, - uncompressedBackupCheckpoint)); + Assert.assertTrue(FileUtils.contentEquals(checkpoint, uncompressedBackupCheckpoint)); channel.stop(); } @Test public void testToggleCheckpointCompressionFromTrueToFalse() - throws Exception { + throws Exception { restartToggleCompression(true); } @Test public void testToggleCheckpointCompressionFromFalseToTrue() - throws Exception { + throws Exception { restartToggleCompression(false); } public void restartToggleCompression(boolean originalCheckpointCompressed) - throws Exception { + throws Exception { Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, - "true"); + "true"); overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "1000"); overrides.put(FileChannelConfiguration.COMPRESS_BACKUP_CHECKPOINT, - String.valueOf(originalCheckpointCompressed)); + String.valueOf(originalCheckpointCompressed)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); @@ -910,17 +890,17 @@ public class TestFileChannelRestart extends TestFileChannelBase { forceCheckpoint(channel); Thread.sleep(2000); Assert.assertEquals(compressedBackupCheckpoint.exists(), - originalCheckpointCompressed); + originalCheckpointCompressed); Assert.assertEquals(uncompressedBackupCheckpoint.exists(), - !originalCheckpointCompressed); + !originalCheckpointCompressed); channel.stop(); File checkpoint = new File(checkpointDir, "checkpoint"); Assert.assertTrue(checkpoint.delete()); File checkpointMetaData = Serialization.getMetaDataFile( - checkpoint); + checkpoint); Assert.assertTrue(checkpointMetaData.delete()); overrides.put(FileChannelConfiguration.COMPRESS_BACKUP_CHECKPOINT, - String.valueOf(!originalCheckpointCompressed)); + String.valueOf(!originalCheckpointCompressed)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); @@ -929,21 +909,21 @@ public class TestFileChannelRestart extends TestFileChannelBase { forceCheckpoint(channel); Thread.sleep(2000); Assert.assertEquals(compressedBackupCheckpoint.exists(), - !originalCheckpointCompressed); + !originalCheckpointCompressed); Assert.assertEquals(uncompressedBackupCheckpoint.exists(), - originalCheckpointCompressed); + originalCheckpointCompressed); } private static void slowdownBackup(FileChannel channel) { Log log = field("log").ofType(Log.class).in(channel).get(); FlumeEventQueue queue = field("queue") - .ofType(FlumeEventQueue.class) - .in(log).get(); + .ofType(FlumeEventQueue.class) + .in(log).get(); EventQueueBackingStore backingStore = field("backingStore") - .ofType(EventQueueBackingStore.class) - .in(queue).get(); + .ofType(EventQueueBackingStore.class) + .in(queue).get(); field("slowdownBackup").ofType(Boolean.class).in(backingStore).set(true); } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRollback.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRollback.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRollback.java index 23fc64b..c06d498 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRollback.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRollback.java @@ -18,11 +18,7 @@ */ package org.apache.flume.channel.file; -import static org.apache.flume.channel.file.TestUtils.*; - -import java.util.Collections; -import java.util.Set; - +import com.google.common.base.Charsets; import org.apache.flume.Transaction; import org.apache.flume.event.EventBuilder; import org.apache.flume.sink.LoggerSink; @@ -33,8 +29,12 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Charsets; +import java.util.Collections; +import java.util.Set; +import static org.apache.flume.channel.file.TestUtils.compareInputAndOut; +import static org.apache.flume.channel.file.TestUtils.putEvents; +import static org.apache.flume.channel.file.TestUtils.takeEvents; public class TestFileChannelRollback extends TestFileChannelBase { protected static final Logger LOG = LoggerFactory @@ -117,11 +117,11 @@ public class TestFileChannelRollback extends TestFileChannelBase { transaction.rollback(); transaction.close(); - while(runner.isAlive()) { + while (runner.isAlive()) { Thread.sleep(10L); } Assert.assertEquals(numEvents - 1, runner.getCount()); - for(Exception ex : runner.getErrors()) { + for (Exception ex : runner.getErrors()) { LOG.warn("Sink had error", ex); } Assert.assertEquals(Collections.EMPTY_LIST, runner.getErrors());
