http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java index 1adb21a..f1700f9 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java @@ -55,6 +55,7 @@ public class TestFlumeEventQueue { File inflightTakes; File inflightPuts; File queueSetDir; + EventQueueBackingStoreSupplier() { baseDir = Files.createTempDir(); checkpoint = new File(baseDir, "checkpoint"); @@ -62,62 +63,73 @@ public class TestFlumeEventQueue { inflightPuts = new File(baseDir, "inflighttakes"); queueSetDir = new File(baseDir, "queueset"); } + File getCheckpoint() { return checkpoint; } + File getInflightPuts() { return inflightPuts; } + File getInflightTakes() { return inflightTakes; } + File getQueueSetDir() { return queueSetDir; } + void delete() { FileUtils.deleteQuietly(baseDir); } - abstract EventQueueBackingStore get() throws Exception ; + + abstract EventQueueBackingStore get() throws Exception; } @Parameters public static Collection<Object[]> data() throws Exception { - Object[][] data = new Object[][] { { - new EventQueueBackingStoreSupplier() { - @Override - public EventQueueBackingStore get() throws Exception { - Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs()); - return new EventQueueBackingStoreFileV2(getCheckpoint(), 1000, - "test"); + Object[][] data = new Object[][] { + { + new EventQueueBackingStoreSupplier() { + @Override + public EventQueueBackingStore get() throws Exception { + Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs()); + return new EventQueueBackingStoreFileV2(getCheckpoint(), 1000, + "test"); + } } - } - }, { - new EventQueueBackingStoreSupplier() { - @Override - public EventQueueBackingStore get() throws Exception { - Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs()); - return new EventQueueBackingStoreFileV3(getCheckpoint(), 1000, - "test"); + }, + { + new EventQueueBackingStoreSupplier() { + @Override + public EventQueueBackingStore get() throws Exception { + Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs()); + return new EventQueueBackingStoreFileV3(getCheckpoint(), 1000, "test"); + } } } - } }; + }; return Arrays.asList(data); } public TestFlumeEventQueue(EventQueueBackingStoreSupplier backingStoreSupplier) { this.backingStoreSupplier = backingStoreSupplier; } + @Before public void setup() throws Exception { backingStore = backingStoreSupplier.get(); } + @After public void cleanup() throws IOException { - if(backingStore != null) { + if (backingStore != null) { backingStore.close(); } backingStoreSupplier.delete(); } + @Test public void testCapacity() throws Exception { backingStore.close(); @@ -125,70 +137,76 @@ public class TestFlumeEventQueue { Assert.assertTrue(checkpoint.delete()); backingStore = new EventQueueBackingStoreFileV2(checkpoint, 1, "test"); queue = new FlumeEventQueue(backingStore, - backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts(), - backingStoreSupplier.getQueueSetDir()); + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); Assert.assertTrue(queue.addTail(pointer1)); Assert.assertFalse(queue.addTail(pointer2)); } - @Test(expected=IllegalArgumentException.class) + + @Test(expected = IllegalArgumentException.class) public void testInvalidCapacityZero() throws Exception { backingStore.close(); File checkpoint = backingStoreSupplier.getCheckpoint(); Assert.assertTrue(checkpoint.delete()); backingStore = new EventQueueBackingStoreFileV2(checkpoint, 0, "test"); queue = new FlumeEventQueue(backingStore, - backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts(), - backingStoreSupplier.getQueueSetDir()); + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); } - @Test(expected=IllegalArgumentException.class) + + @Test(expected = IllegalArgumentException.class) public void testInvalidCapacityNegative() throws Exception { backingStore.close(); File checkpoint = backingStoreSupplier.getCheckpoint(); Assert.assertTrue(checkpoint.delete()); backingStore = new EventQueueBackingStoreFileV2(checkpoint, -1, "test"); queue = new FlumeEventQueue(backingStore, - backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts(), - backingStoreSupplier.getQueueSetDir()); + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); } + @Test public void testQueueIsEmptyAfterCreation() throws Exception { queue = new FlumeEventQueue(backingStore, - backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts(), - backingStoreSupplier.getQueueSetDir()); + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); Assert.assertNull(queue.removeHead(0L)); } + @Test public void addTail1() throws Exception { queue = new FlumeEventQueue(backingStore, - backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts(), - backingStoreSupplier.getQueueSetDir()); + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); Assert.assertTrue(queue.addTail(pointer1)); Assert.assertEquals(pointer1, queue.removeHead(0)); Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs()); } + @Test public void addTail2() throws Exception { queue = new FlumeEventQueue(backingStore, - backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts(), - backingStoreSupplier.getQueueSetDir()); + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); Assert.assertTrue(queue.addTail(pointer1)); Assert.assertTrue(queue.addTail(pointer2)); Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs()); Assert.assertEquals(pointer1, queue.removeHead(0)); Assert.assertEquals(Sets.newHashSet(2), queue.getFileIDs()); } + @Test public void addTailLarge() throws Exception { queue = new FlumeEventQueue(backingStore, - backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts(), - backingStoreSupplier.getQueueSetDir()); + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); int size = 500; Set<Integer> fileIDs = Sets.newHashSet(); for (int i = 1; i <= size; i++) { @@ -203,23 +221,25 @@ public class TestFlumeEventQueue { } Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs()); } + @Test public void addHead1() throws Exception { queue = new FlumeEventQueue(backingStore, - backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts(), - backingStoreSupplier.getQueueSetDir()); + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); Assert.assertTrue(queue.addHead(pointer1)); Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs()); Assert.assertEquals(pointer1, queue.removeHead(0)); Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs()); } + @Test public void addHead2() throws Exception { queue = new FlumeEventQueue(backingStore, - backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts(), - backingStoreSupplier.getQueueSetDir()); + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); queue.replayComplete(); Assert.assertTrue(queue.addHead(pointer1)); Assert.assertTrue(queue.addHead(pointer2)); @@ -227,12 +247,13 @@ public class TestFlumeEventQueue { Assert.assertEquals(pointer2, queue.removeHead(0)); Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs()); } + @Test public void addHeadLarge() throws Exception { queue = new FlumeEventQueue(backingStore, - backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts(), - backingStoreSupplier.getQueueSetDir()); + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); queue.replayComplete(); int size = 500; Set<Integer> fileIDs = Sets.newHashSet(); @@ -248,12 +269,13 @@ public class TestFlumeEventQueue { } Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs()); } + @Test public void addTailRemove1() throws Exception { queue = new FlumeEventQueue(backingStore, - backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts(), - backingStoreSupplier.getQueueSetDir()); + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); Assert.assertTrue(queue.addTail(pointer1)); Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs()); Assert.assertTrue(queue.remove(pointer1)); @@ -266,9 +288,9 @@ public class TestFlumeEventQueue { @Test public void addTailRemove2() throws Exception { queue = new FlumeEventQueue(backingStore, - backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts(), - backingStoreSupplier.getQueueSetDir()); + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); Assert.assertTrue(queue.addTail(pointer1)); Assert.assertTrue(queue.addTail(pointer2)); Assert.assertTrue(queue.remove(pointer1)); @@ -279,31 +301,33 @@ public class TestFlumeEventQueue { @Test public void addHeadRemove1() throws Exception { queue = new FlumeEventQueue(backingStore, - backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts(), - backingStoreSupplier.getQueueSetDir()); + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); queue.addHead(pointer1); Assert.assertTrue(queue.remove(pointer1)); Assert.assertNull(queue.removeHead(0)); } + @Test public void addHeadRemove2() throws Exception { queue = new FlumeEventQueue(backingStore, - backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts(), - backingStoreSupplier.getQueueSetDir()); + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); Assert.assertTrue(queue.addHead(pointer1)); Assert.assertTrue(queue.addHead(pointer2)); Assert.assertTrue(queue.remove(pointer1)); queue.replayComplete(); Assert.assertEquals(pointer2, queue.removeHead(0)); } + @Test public void testUnknownPointerDoesNotCauseSearch() throws Exception { queue = new FlumeEventQueue(backingStore, - backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts(), - backingStoreSupplier.getQueueSetDir()); + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); Assert.assertTrue(queue.addHead(pointer1)); Assert.assertTrue(queue.addHead(pointer2)); Assert.assertFalse(queue.remove(pointer3)); // does search @@ -312,44 +336,47 @@ public class TestFlumeEventQueue { queue.replayComplete(); Assert.assertEquals(2, queue.getSearchCount()); } - @Test(expected=IllegalStateException.class) + + @Test(expected = IllegalStateException.class) public void testRemoveAfterReplayComplete() throws Exception { queue = new FlumeEventQueue(backingStore, - backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts(), - backingStoreSupplier.getQueueSetDir()); + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); queue.replayComplete(); queue.remove(pointer1); } + @Test public void testWrappingCorrectly() throws Exception { queue = new FlumeEventQueue(backingStore, - backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts(), - backingStoreSupplier.getQueueSetDir()); + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); int size = Integer.MAX_VALUE; for (int i = 1; i <= size; i++) { - if(!queue.addHead(new FlumeEventPointer(i, i))) { + if (!queue.addHead(new FlumeEventPointer(i, i))) { break; } } - for (int i = queue.getSize()/2; i > 0; i--) { + for (int i = queue.getSize() / 2; i > 0; i--) { Assert.assertNotNull(queue.removeHead(0)); } // addHead below would throw an IndexOOBounds with // bad version of FlumeEventQueue.convert for (int i = 1; i <= size; i++) { - if(!queue.addHead(new FlumeEventPointer(i, i))) { + if (!queue.addHead(new FlumeEventPointer(i, i))) { break; } } } + @Test - public void testInflightPuts() throws Exception{ + public void testInflightPuts() throws Exception { queue = new FlumeEventQueue(backingStore, - backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts(), - backingStoreSupplier.getQueueSetDir()); + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1); long txnID2 = txnID1 + 1; queue.addWithoutCommit(new FlumeEventPointer(1, 1), txnID1); @@ -358,16 +385,13 @@ public class TestFlumeEventQueue { queue.checkpoint(true); TimeUnit.SECONDS.sleep(3L); queue = new FlumeEventQueue(backingStore, - backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts(), - backingStoreSupplier.getQueueSetDir()); + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightPuts(); - Assert.assertTrue(deserializedMap.get( - txnID1).contains(new FlumeEventPointer(1, 1).toLong())); - Assert.assertTrue(deserializedMap.get( - txnID1).contains(new FlumeEventPointer(2, 1).toLong())); - Assert.assertTrue(deserializedMap.get( - txnID2).contains(new FlumeEventPointer(2, 2).toLong())); + Assert.assertTrue(deserializedMap.get(txnID1).contains(new FlumeEventPointer(1, 1).toLong())); + Assert.assertTrue(deserializedMap.get(txnID1).contains(new FlumeEventPointer(2, 1).toLong())); + Assert.assertTrue(deserializedMap.get(txnID2).contains(new FlumeEventPointer(2, 2).toLong())); } @Test
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java index 2fbe116..a138ed4 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestIntegration.java @@ -18,13 +18,8 @@ */ package org.apache.flume.channel.file; -import java.io.File; -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - +import com.google.common.collect.Lists; +import com.google.common.io.Files; import org.apache.commons.io.FileUtils; import org.apache.flume.Context; import org.apache.flume.conf.Configurables; @@ -37,8 +32,12 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; -import com.google.common.io.Files; +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; public class TestIntegration { @@ -58,19 +57,21 @@ public class TestIntegration { dataDirs = new File[3]; dataDir = ""; for (int i = 0; i < dataDirs.length; i++) { - dataDirs[i] = new File(baseDir, "data" + (i+1)); + dataDirs[i] = new File(baseDir, "data" + (i + 1)); Assert.assertTrue(dataDirs[i].mkdirs() || dataDirs[i].isDirectory()); dataDir += dataDirs[i].getAbsolutePath() + ","; } dataDir = dataDir.substring(0, dataDir.length() - 1); } + @After public void teardown() { - if(channel != null && channel.isOpen()) { + if (channel != null && channel.isOpen()) { channel.stop(); } FileUtils.deleteQuietly(baseDir); } + @Test public void testIntegration() throws IOException, InterruptedException { // set shorter checkpoint and filesize to ensure @@ -106,11 +107,11 @@ public class TestIntegration { TimeUnit.SECONDS.sleep(30); // shutdown source sourceRunner.shutdown(); - while(sourceRunner.isAlive()) { + while (sourceRunner.isAlive()) { Thread.sleep(10L); } // wait for queue to clear - while(channel.getDepth() > 0) { + while (channel.getDepth() > 0) { Thread.sleep(10L); } // shutdown size @@ -122,15 +123,15 @@ public class TestIntegration { logs.addAll(LogUtils.getLogs(dataDirs[i])); } LOG.info("Total Number of Logs = " + logs.size()); - for(File logFile : logs) { + for (File logFile : logs) { LOG.info("LogFile = " + logFile); } LOG.info("Source processed " + sinkRunner.getCount()); LOG.info("Sink processed " + sourceRunner.getCount()); - for(Exception ex : sourceRunner.getErrors()) { + for (Exception ex : sourceRunner.getErrors()) { LOG.warn("Source had error", ex); } - for(Exception ex : sinkRunner.getErrors()) { + for (Exception ex : sinkRunner.getErrors()) { LOG.warn("Sink had error", ex); } Assert.assertEquals(sinkRunner.getCount(), sinkRunner.getCount()); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java index b1f59cd..f7f0950 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java @@ -18,14 +18,8 @@ */ package org.apache.flume.channel.file; -import static org.mockito.Mockito.*; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.channels.*; -import java.util.Collection; -import java.util.List; - +import com.google.common.collect.Lists; +import com.google.common.io.Files; import org.apache.commons.io.FileUtils; import org.junit.After; import org.junit.Assert; @@ -34,8 +28,13 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; -import com.google.common.io.Files; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestLog { private static final Logger LOGGER = LoggerFactory.getLogger(TestLog.class); @@ -45,6 +44,7 @@ public class TestLog { private File checkpointDir; private File[] dataDirs; private long transactionID; + @Before public void setup() throws IOException { transactionID = 0; @@ -56,15 +56,20 @@ public class TestLog { dataDirs[i] = Files.createTempDir(); Assert.assertTrue(dataDirs[i].isDirectory()); } - log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize( - MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir( - checkpointDir).setLogDirs(dataDirs).setCheckpointOnClose(false) - .setChannelName("testlog").build(); + log = new Log.Builder().setCheckpointInterval(1L) + .setMaxFileSize(MAX_FILE_SIZE) + .setQueueSize(CAPACITY) + .setCheckpointDir(checkpointDir) + .setLogDirs(dataDirs) + .setCheckpointOnClose(false) + .setChannelName("testlog") + .build(); log.replay(); } + @After - public void cleanup() throws Exception{ - if(log != null) { + public void cleanup() throws Exception { + if (log != null) { log.close(); } FileUtils.deleteQuietly(checkpointDir); @@ -72,13 +77,14 @@ public class TestLog { FileUtils.deleteQuietly(dataDirs[i]); } } + /** * Test that we can put, commit and then get. Note that get is * not transactional so the commit is not required. */ @Test public void testPutGet() - throws IOException, InterruptedException, NoopRecordException, CorruptEventException { + throws IOException, InterruptedException, NoopRecordException, CorruptEventException { FlumeEvent eventIn = TestUtils.newPersistableEvent(); long transactionID = ++this.transactionID; FlumeEventPointer eventPointer = log.put(transactionID, eventIn); @@ -89,9 +95,10 @@ public class TestLog { Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders()); Assert.assertArrayEquals(eventIn.getBody(), eventOut.getBody()); } + @Test public void testRoll() - throws IOException, InterruptedException, NoopRecordException, CorruptEventException { + throws IOException, InterruptedException, NoopRecordException, CorruptEventException { log.shutdownWorker(); Thread.sleep(1000); for (int i = 0; i < 1000; i++) { @@ -105,9 +112,9 @@ public class TestLog { Assert.assertArrayEquals(eventIn.getBody(), eventOut.getBody()); } int logCount = 0; - for(File dataDir : dataDirs) { - for(File logFile : dataDir.listFiles()) { - if(logFile.getName().startsWith("log-")) { + for (File dataDir : dataDirs) { + for (File logFile : dataDir.listFiles()) { + if (logFile.getName().startsWith("log-")) { logCount++; } } @@ -115,26 +122,30 @@ public class TestLog { // 93 (*2 for meta) files with TestLog.MAX_FILE_SIZE=1000 Assert.assertEquals(186, logCount); } + /** * After replay of the log, we should find the event because the put * was committed */ @Test public void testPutCommit() - throws IOException, InterruptedException, NoopRecordException, CorruptEventException { + throws IOException, InterruptedException, NoopRecordException, CorruptEventException { FlumeEvent eventIn = TestUtils.newPersistableEvent(); long transactionID = ++this.transactionID; FlumeEventPointer eventPointerIn = log.put(transactionID, eventIn); log.commitPut(transactionID); log.close(); - log = new Log.Builder().setCheckpointInterval( - Long.MAX_VALUE).setMaxFileSize( - FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize( - CAPACITY).setCheckpointDir(checkpointDir).setLogDirs( - dataDirs).setChannelName("testlog").build(); + log = new Log.Builder().setCheckpointInterval(Long.MAX_VALUE) + .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE) + .setQueueSize(CAPACITY) + .setCheckpointDir(checkpointDir) + .setLogDirs(dataDirs) + .setChannelName("testlog") + .build(); log.replay(); takeAndVerify(eventPointerIn, eventIn); } + /** * After replay of the log, we should not find the event because the * put was rolled back @@ -146,39 +157,44 @@ public class TestLog { log.put(transactionID, eventIn); log.rollback(transactionID); // rolled back so it should not be replayed log.close(); - log = new Log.Builder().setCheckpointInterval( - Long.MAX_VALUE).setMaxFileSize( - FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize( - CAPACITY).setCheckpointDir(checkpointDir).setLogDirs( - dataDirs).setChannelName("testlog").build(); + log = new Log.Builder().setCheckpointInterval(Long.MAX_VALUE) + .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE) + .setQueueSize(CAPACITY) + .setCheckpointDir(checkpointDir) + .setLogDirs(dataDirs) + .setChannelName("testlog") + .build(); log.replay(); FlumeEventQueue queue = log.getFlumeEventQueue(); Assert.assertNull(queue.removeHead(transactionID)); } + @Test public void testMinimumRequiredSpaceTooSmallOnStartup() throws IOException, - InterruptedException { + InterruptedException { log.close(); - log = new Log.Builder().setCheckpointInterval( - Long.MAX_VALUE).setMaxFileSize( - FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize( - CAPACITY).setCheckpointDir(checkpointDir).setLogDirs( - dataDirs).setChannelName("testlog"). - setMinimumRequiredSpace(Long.MAX_VALUE).build(); + log = new Log.Builder().setCheckpointInterval(Long.MAX_VALUE) + .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE) + .setQueueSize(CAPACITY) + .setCheckpointDir(checkpointDir) + .setLogDirs(dataDirs) + .setChannelName("testlog") + .setMinimumRequiredSpace(Long.MAX_VALUE) + .build(); try { log.replay(); Assert.fail(); } catch (IOException e) { - Assert.assertTrue(e.getMessage(), e.getMessage() - .startsWith("Usable space exhausted")); + Assert.assertTrue(e.getMessage(), + e.getMessage().startsWith("Usable space exhausted")); } } + /** * There is a race here in that someone could take up some space */ @Test - public void testMinimumRequiredSpaceTooSmallForPut() throws IOException, - InterruptedException { + public void testMinimumRequiredSpaceTooSmallForPut() throws IOException, InterruptedException { try { doTestMinimumRequiredSpaceTooSmallForPut(); } catch (IOException e) { @@ -189,23 +205,26 @@ public class TestLog { doTestMinimumRequiredSpaceTooSmallForPut(); } } + public void doTestMinimumRequiredSpaceTooSmallForPut() throws IOException, - InterruptedException { + InterruptedException { long minimumRequiredSpace = checkpointDir.getUsableSpace() - - (10L* 1024L * 1024L); + (10L * 1024L * 1024L); log.close(); - log = new Log.Builder().setCheckpointInterval( - Long.MAX_VALUE).setMaxFileSize( - FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize( - CAPACITY).setCheckpointDir(checkpointDir).setLogDirs( - dataDirs).setChannelName("testlog"). - setMinimumRequiredSpace(minimumRequiredSpace) - .setUsableSpaceRefreshInterval(1L).build(); + log = new Log.Builder().setCheckpointInterval(Long.MAX_VALUE) + .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE) + .setQueueSize(CAPACITY) + .setCheckpointDir(checkpointDir) + .setLogDirs(dataDirs) + .setChannelName("testlog") + .setMinimumRequiredSpace(minimumRequiredSpace) + .setUsableSpaceRefreshInterval(1L) + .build(); log.replay(); File filler = new File(checkpointDir, "filler"); byte[] buffer = new byte[64 * 1024]; FileOutputStream out = new FileOutputStream(filler); - while(checkpointDir.getUsableSpace() > minimumRequiredSpace) { + while (checkpointDir.getUsableSpace() > minimumRequiredSpace) { out.write(buffer); } out.close(); @@ -215,10 +234,11 @@ public class TestLog { log.put(transactionID, eventIn); Assert.fail(); } catch (IOException e) { - Assert.assertTrue(e.getMessage(), e.getMessage() - .startsWith("Usable space exhausted")); + Assert.assertTrue(e.getMessage(), + e.getMessage().startsWith("Usable space exhausted")); } } + /** * After replay of the log, we should not find the event because the take * was committed @@ -233,11 +253,13 @@ public class TestLog { log.take(takeTransactionID, eventPointer); log.commitTake(takeTransactionID); log.close(); - new Log.Builder().setCheckpointInterval( - Long.MAX_VALUE).setMaxFileSize( - FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize( - 1).setCheckpointDir(checkpointDir).setLogDirs(dataDirs) - .setChannelName("testlog").build(); + new Log.Builder().setCheckpointInterval(Long.MAX_VALUE) + .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE) + .setQueueSize(1) + .setCheckpointDir(checkpointDir) + .setLogDirs(dataDirs) + .setChannelName("testlog") + .build(); log.replay(); FlumeEventQueue queue = log.getFlumeEventQueue(); Assert.assertNull(queue.removeHead(0)); @@ -249,16 +271,18 @@ public class TestLog { */ @Test public void testPutTakeRollbackLogReplayV1() - throws IOException, InterruptedException, NoopRecordException, CorruptEventException { + throws IOException, InterruptedException, NoopRecordException, CorruptEventException { doPutTakeRollback(true); } + @Test public void testPutTakeRollbackLogReplayV2() - throws IOException, InterruptedException, NoopRecordException, CorruptEventException { + throws IOException, InterruptedException, NoopRecordException, CorruptEventException { doPutTakeRollback(false); } + public void doPutTakeRollback(boolean useLogReplayV1) - throws IOException, InterruptedException, NoopRecordException, CorruptEventException { + throws IOException, InterruptedException, NoopRecordException, CorruptEventException { FlumeEvent eventIn = TestUtils.newPersistableEvent(); long putTransactionID = ++transactionID; FlumeEventPointer eventPointerIn = log.put(putTransactionID, eventIn); @@ -267,11 +291,14 @@ public class TestLog { log.take(takeTransactionID, eventPointerIn); log.rollback(takeTransactionID); log.close(); - new Log.Builder().setCheckpointInterval( - Long.MAX_VALUE).setMaxFileSize( - FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize( - 1).setCheckpointDir(checkpointDir).setLogDirs(dataDirs) - .setChannelName("testlog").setUseLogReplayV1(useLogReplayV1).build(); + new Log.Builder().setCheckpointInterval(Long.MAX_VALUE) + .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE) + .setQueueSize(1) + .setCheckpointDir(checkpointDir) + .setLogDirs(dataDirs) + .setChannelName("testlog") + .setUseLogReplayV1(useLogReplayV1) + .build(); log.replay(); takeAndVerify(eventPointerIn, eventIn); } @@ -281,11 +308,13 @@ public class TestLog { long putTransactionID = ++transactionID; log.commitPut(putTransactionID); log.close(); - new Log.Builder().setCheckpointInterval( - Long.MAX_VALUE).setMaxFileSize( - FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize( - 1).setCheckpointDir(checkpointDir).setLogDirs(dataDirs) - .setChannelName("testlog").build(); + new Log.Builder().setCheckpointInterval(Long.MAX_VALUE) + .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE) + .setQueueSize(1) + .setCheckpointDir(checkpointDir) + .setLogDirs(dataDirs) + .setChannelName("testlog") + .build(); log.replay(); FlumeEventQueue queue = log.getFlumeEventQueue(); FlumeEventPointer eventPointerOut = queue.removeHead(0); @@ -297,11 +326,13 @@ public class TestLog { long putTransactionID = ++transactionID; log.commitTake(putTransactionID); log.close(); - new Log.Builder().setCheckpointInterval( - Long.MAX_VALUE).setMaxFileSize( - FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize( - 1).setCheckpointDir(checkpointDir).setLogDirs(dataDirs) - .setChannelName("testlog").build(); + new Log.Builder().setCheckpointInterval(Long.MAX_VALUE) + .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE) + .setQueueSize(1) + .setCheckpointDir(checkpointDir) + .setLogDirs(dataDirs) + .setChannelName("testlog") + .build(); log.replay(); FlumeEventQueue queue = log.getFlumeEventQueue(); FlumeEventPointer eventPointerOut = queue.removeHead(0); @@ -313,11 +344,13 @@ public class TestLog { long putTransactionID = ++transactionID; log.rollback(putTransactionID); log.close(); - new Log.Builder().setCheckpointInterval( - Long.MAX_VALUE).setMaxFileSize( - FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize( - 1).setCheckpointDir(checkpointDir).setLogDirs(dataDirs) - .setChannelName("testlog").build(); + new Log.Builder().setCheckpointInterval(Long.MAX_VALUE) + .setMaxFileSize(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE) + .setQueueSize(1) + .setCheckpointDir(checkpointDir) + .setLogDirs(dataDirs) + .setChannelName("testlog") + .build(); log.replay(); FlumeEventQueue queue = log.getFlumeEventQueue(); FlumeEventPointer eventPointerOut = queue.removeHead(0); @@ -337,7 +370,7 @@ public class TestLog { File logGzip = new File(logDir, Log.PREFIX + i + ".gz"); Assert.assertTrue(metaDataFile.isFile() || metaDataFile.createNewFile()); Assert.assertTrue(metaDataTempFile.isFile() || - metaDataTempFile.createNewFile()); + metaDataTempFile.createNewFile()); Assert.assertTrue(log.isFile() || logGzip.createNewFile()); } List<File> actual = LogUtils.getLogs(logDir); @@ -345,31 +378,38 @@ public class TestLog { LogUtils.sort(expected); Assert.assertEquals(expected, actual); } + @Test public void testReplayFailsWithAllEmptyLogMetaDataNormalReplay() throws IOException, InterruptedException { doTestReplayFailsWithAllEmptyLogMetaData(false); } + @Test public void testReplayFailsWithAllEmptyLogMetaDataFastReplay() throws IOException, InterruptedException { doTestReplayFailsWithAllEmptyLogMetaData(true); } + public void doTestReplayFailsWithAllEmptyLogMetaData(boolean useFastReplay) throws IOException, InterruptedException { // setup log with correct fast replay parameter log.close(); - log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize( - MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir( - checkpointDir).setLogDirs(dataDirs) - .setChannelName("testlog").setUseFastReplay(useFastReplay).build(); + log = new Log.Builder().setCheckpointInterval(1L) + .setMaxFileSize(MAX_FILE_SIZE) + .setQueueSize(CAPACITY) + .setCheckpointDir(checkpointDir) + .setLogDirs(dataDirs) + .setChannelName("testlog") + .setUseFastReplay(useFastReplay) + .build(); log.replay(); FlumeEvent eventIn = TestUtils.newPersistableEvent(); long transactionID = ++this.transactionID; log.put(transactionID, eventIn); log.commitPut(transactionID); log.close(); - if(useFastReplay) { + if (useFastReplay) { FileUtils.deleteQuietly(checkpointDir); Assert.assertTrue(checkpointDir.mkdir()); } @@ -378,41 +418,50 @@ public class TestLog { logFiles.addAll(LogUtils.getLogs(dataDirs[i])); } Assert.assertTrue(logFiles.size() > 0); - for(File logFile : logFiles) { + for (File logFile : logFiles) { File logFileMeta = Serialization.getMetaDataFile(logFile); Assert.assertTrue(logFileMeta.delete()); Assert.assertTrue(logFileMeta.createNewFile()); } - log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize( - MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir( - checkpointDir).setLogDirs(dataDirs) - .setChannelName("testlog").setUseFastReplay(useFastReplay).build(); + log = new Log.Builder().setCheckpointInterval(1L) + .setMaxFileSize(MAX_FILE_SIZE) + .setQueueSize(CAPACITY) + .setCheckpointDir(checkpointDir) + .setLogDirs(dataDirs) + .setChannelName("testlog") + .setUseFastReplay(useFastReplay) + .build(); try { log.replay(); Assert.fail(); - } catch(IllegalStateException expected) { + } catch (IllegalStateException expected) { String msg = expected.getMessage(); Assert.assertNotNull(msg); Assert.assertTrue(msg, msg.contains(".meta is empty, but log")); } } + @Test public void testReplaySucceedsWithUnusedEmptyLogMetaDataNormalReplay() - throws IOException, InterruptedException, NoopRecordException, CorruptEventException { + throws IOException, InterruptedException, NoopRecordException, CorruptEventException { FlumeEvent eventIn = TestUtils.newPersistableEvent(); long transactionID = ++this.transactionID; FlumeEventPointer eventPointer = log.put(transactionID, eventIn); log.commitPut(transactionID); // this is not required since log.close(); - log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize( - MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir( - checkpointDir).setLogDirs(dataDirs) - .setChannelName("testlog").build(); + log = new Log.Builder().setCheckpointInterval(1L) + .setMaxFileSize(MAX_FILE_SIZE) + .setQueueSize(CAPACITY) + .setCheckpointDir(checkpointDir) + .setLogDirs(dataDirs) + .setChannelName("testlog") + .build(); doTestReplaySucceedsWithUnusedEmptyLogMetaData(eventIn, eventPointer); } + @Test public void testReplaySucceedsWithUnusedEmptyLogMetaDataFastReplay() - throws IOException, InterruptedException, NoopRecordException, CorruptEventException { + throws IOException, InterruptedException, NoopRecordException, CorruptEventException { FlumeEvent eventIn = TestUtils.newPersistableEvent(); long transactionID = ++this.transactionID; FlumeEventPointer eventPointer = log.put(transactionID, eventIn); @@ -421,18 +470,23 @@ public class TestLog { checkpointDir = Files.createTempDir(); FileUtils.forceDeleteOnExit(checkpointDir); Assert.assertTrue(checkpointDir.isDirectory()); - log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize( - MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir( - checkpointDir).setLogDirs(dataDirs) - .setChannelName("testlog").setUseFastReplay(true).build(); + log = new Log.Builder().setCheckpointInterval(1L) + .setMaxFileSize(MAX_FILE_SIZE) + .setQueueSize(CAPACITY) + .setCheckpointDir(checkpointDir) + .setLogDirs(dataDirs) + .setChannelName("testlog") + .setUseFastReplay(true) + .build(); doTestReplaySucceedsWithUnusedEmptyLogMetaData(eventIn, eventPointer); } + public void doTestReplaySucceedsWithUnusedEmptyLogMetaData(FlumeEvent eventIn, - FlumeEventPointer eventPointer) throws IOException, - InterruptedException, NoopRecordException, CorruptEventException { + FlumeEventPointer eventPointer) + throws IOException, InterruptedException, NoopRecordException, CorruptEventException { for (int i = 0; i < dataDirs.length; i++) { - for(File logFile : LogUtils.getLogs(dataDirs[i])) { - if(logFile.length() == 0L) { + for (File logFile : LogUtils.getLogs(dataDirs[i])) { + if (logFile.length() == 0L) { File logFileMeta = Serialization.getMetaDataFile(logFile); Assert.assertTrue(logFileMeta.delete()); Assert.assertTrue(logFileMeta.createNewFile()); @@ -445,16 +499,15 @@ public class TestLog { Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders()); Assert.assertArrayEquals(eventIn.getBody(), eventOut.getBody()); } + @Test public void testCachedFSUsableSpace() throws Exception { File fs = mock(File.class); when(fs.getUsableSpace()).thenReturn(Long.MAX_VALUE); - LogFile.CachedFSUsableSpace cachedFS = - new LogFile.CachedFSUsableSpace(fs, 1000L); + LogFile.CachedFSUsableSpace cachedFS = new LogFile.CachedFSUsableSpace(fs, 1000L); Assert.assertEquals(cachedFS.getUsableSpace(), Long.MAX_VALUE); cachedFS.decrement(Integer.MAX_VALUE); - Assert.assertEquals(cachedFS.getUsableSpace(), - Long.MAX_VALUE - Integer.MAX_VALUE); + Assert.assertEquals(cachedFS.getUsableSpace(), Long.MAX_VALUE - Integer.MAX_VALUE); try { cachedFS.decrement(-1); Assert.fail(); @@ -463,20 +516,22 @@ public class TestLog { } when(fs.getUsableSpace()).thenReturn(Long.MAX_VALUE - 1L); Thread.sleep(1100); - Assert.assertEquals(cachedFS.getUsableSpace(), - Long.MAX_VALUE - 1L); + Assert.assertEquals(cachedFS.getUsableSpace(), Long.MAX_VALUE - 1L); } @Test public void testCheckpointOnClose() throws Exception { log.close(); - log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize( - MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir( - checkpointDir).setLogDirs(dataDirs).setCheckpointOnClose(true) - .setChannelName("testLog").build(); + log = new Log.Builder().setCheckpointInterval(1L) + .setMaxFileSize(MAX_FILE_SIZE) + .setQueueSize(CAPACITY) + .setCheckpointDir(checkpointDir) + .setLogDirs(dataDirs) + .setCheckpointOnClose(true) + .setChannelName("testLog") + .build(); log.replay(); - // 1 Write One Event FlumeEvent eventIn = TestUtils.newPersistableEvent(); log.put(transactionID, eventIn); @@ -484,20 +539,19 @@ public class TestLog { // 2 Check state of checkpoint before close File checkPointMetaFile = - FileUtils.listFiles(checkpointDir,new String[]{"meta"},false).iterator().next(); - long before = FileUtils.checksumCRC32( checkPointMetaFile ); + FileUtils.listFiles(checkpointDir, new String[] { "meta" }, false).iterator().next(); + long before = FileUtils.checksumCRC32(checkPointMetaFile); // 3 Close Log log.close(); // 4 Verify that checkpoint was modified on close - long after = FileUtils.checksumCRC32( checkPointMetaFile ); - Assert.assertFalse( before == after ); + long after = FileUtils.checksumCRC32(checkPointMetaFile); + Assert.assertFalse(before == after); } - private void takeAndVerify(FlumeEventPointer eventPointerIn, - FlumeEvent eventIn) - throws IOException, InterruptedException, NoopRecordException, CorruptEventException { + private void takeAndVerify(FlumeEventPointer eventPointerIn, FlumeEvent eventIn) + throws IOException, InterruptedException, NoopRecordException, CorruptEventException { FlumeEventQueue queue = log.getFlumeEventQueue(); FlumeEventPointer eventPointerOut = queue.removeHead(0); Assert.assertNotNull(eventPointerOut); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java index 976a112..d945c7f 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java @@ -18,6 +18,16 @@ */ package org.apache.flume.channel.file; +import com.google.common.base.Throwables; +import com.google.common.collect.Maps; +import com.google.common.io.Files; +import org.apache.commons.io.FileUtils; +import org.apache.flume.channel.file.proto.ProtosFactory; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -28,33 +38,21 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.io.FileUtils; -import org.apache.flume.channel.file.proto.ProtosFactory; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import com.google.common.base.Throwables; -import com.google.common.collect.Maps; -import com.google.common.io.Files; - public class TestLogFile { private int fileID; private long transactionID; private LogFile.Writer logFileWriter; private File dataDir; private File dataFile; + @Before public void setup() throws IOException { fileID = 1; @@ -65,28 +63,30 @@ public class TestLogFile { logFileWriter = LogFileFactory.getWriter(dataFile, fileID, Integer.MAX_VALUE, null, null, null, Long.MAX_VALUE, true, 0); } + @After public void cleanup() throws IOException { try { - if(logFileWriter != null) { + if (logFileWriter != null) { logFileWriter.close(); } } finally { FileUtils.deleteQuietly(dataDir); } } + @Test public void testWriterRefusesToOverwriteFile() throws IOException { Assert.assertTrue(dataFile.isFile() || dataFile.createNewFile()); try { LogFileFactory.getWriter(dataFile, fileID, Integer.MAX_VALUE, null, null, - null, Long.MAX_VALUE, true, 0); + null, Long.MAX_VALUE, true, 0); Assert.fail(); } catch (IllegalStateException e) { - Assert.assertEquals("File already exists " + dataFile.getAbsolutePath(), - e.getMessage()); + Assert.assertEquals("File already exists " + dataFile.getAbsolutePath(), e.getMessage()); } } + @Test public void testWriterFailsWithDirectory() throws IOException { FileUtils.deleteQuietly(dataFile); @@ -94,30 +94,29 @@ public class TestLogFile { Assert.assertTrue(dataFile.mkdirs()); try { LogFileFactory.getWriter(dataFile, fileID, Integer.MAX_VALUE, null, null, - null, Long.MAX_VALUE, true, 0); + null, Long.MAX_VALUE, true, 0); Assert.fail(); } catch (IllegalStateException e) { - Assert.assertEquals("File already exists " + dataFile.getAbsolutePath(), - e.getMessage()); + Assert.assertEquals("File already exists " + dataFile.getAbsolutePath(), e.getMessage()); } } + @Test public void testPutGet() throws InterruptedException, IOException { final List<Throwable> errors = Collections.synchronizedList(new ArrayList<Throwable>()); ExecutorService executorService = Executors.newFixedThreadPool(10); CompletionService<Void> completionService = new ExecutorCompletionService - <Void>(executorService); - final LogFile.RandomReader logFileReader = - LogFileFactory.getRandomReader(dataFile, null, true); + <Void>(executorService); + final LogFile.RandomReader logFileReader = LogFileFactory.getRandomReader(dataFile, null, true); for (int i = 0; i < 1000; i++) { // first try and throw failures synchronized (errors) { - for(Throwable throwable : errors) { + for (Throwable throwable : errors) { Throwables.propagateIfInstanceOf(throwable, AssertionError.class); } // then throw errors - for(Throwable throwable : errors) { + for (Throwable throwable : errors) { Throwables.propagate(throwable); } } @@ -134,7 +133,7 @@ public class TestLogFile { FlumeEvent eventOut = logFileReader.get(offset); Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders()); Assert.assertTrue(Arrays.equals(eventIn.getBody(), eventOut.getBody())); - } catch(Throwable throwable) { + } catch (Throwable throwable) { synchronized (errors) { errors.add(throwable); } @@ -143,26 +142,26 @@ public class TestLogFile { }, null); } - for(int i = 0; i < 1000; i++) { + for (int i = 0; i < 1000; i++) { completionService.take(); } // first try and throw failures - for(Throwable throwable : errors) { + for (Throwable throwable : errors) { Throwables.propagateIfInstanceOf(throwable, AssertionError.class); } // then throw errors - for(Throwable throwable : errors) { + for (Throwable throwable : errors) { Throwables.propagate(throwable); } } + @Test public void testReader() throws InterruptedException, IOException, - CorruptEventException { + CorruptEventException { Map<Integer, Put> puts = Maps.newHashMap(); for (int i = 0; i < 1000; i++) { FlumeEvent eventIn = TestUtils.newPersistableEvent(); - Put put = new Put(++transactionID, WriteOrderOracle.next(), - eventIn); + Put put = new Put(++transactionID, WriteOrderOracle.next(), eventIn); ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put); FlumeEventPointer ptr = logFileWriter.put(bytes); puts.put(ptr.getOffset(), put); @@ -170,14 +169,14 @@ public class TestLogFile { LogFile.SequentialReader reader = LogFileFactory.getSequentialReader(dataFile, null, true); LogRecord entry; - while((entry = reader.next()) != null) { + while ((entry = reader.next()) != null) { Integer offset = entry.getOffset(); TransactionEventRecord record = entry.getEvent(); Put put = puts.get(offset); FlumeEvent eventIn = put.getEvent(); Assert.assertEquals(put.getTransactionID(), record.getTransactionID()); Assert.assertTrue(record instanceof Put); - FlumeEvent eventOut = ((Put)record).getEvent(); + FlumeEvent eventOut = ((Put) record).getEvent(); Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders()); Assert.assertTrue(Arrays.equals(eventIn.getBody(), eventOut.getBody())); } @@ -185,12 +184,12 @@ public class TestLogFile { @Test public void testReaderOldMetaFile() throws InterruptedException, - IOException, CorruptEventException { + IOException, CorruptEventException { Map<Integer, Put> puts = Maps.newHashMap(); for (int i = 0; i < 1000; i++) { FlumeEvent eventIn = TestUtils.newPersistableEvent(); Put put = new Put(++transactionID, WriteOrderOracle.next(), - eventIn); + eventIn); ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put); FlumeEventPointer ptr = logFileWriter.put(bytes); puts.put(ptr.getOffset(), put); @@ -202,7 +201,7 @@ public class TestLogFile { Assert.fail("Renaming to meta.old failed"); } LogFile.SequentialReader reader = - LogFileFactory.getSequentialReader(dataFile, null, true); + LogFileFactory.getSequentialReader(dataFile, null, true); Assert.assertTrue(metadataFile.exists()); Assert.assertFalse(oldMetadataFile.exists()); LogRecord entry; @@ -219,14 +218,14 @@ public class TestLogFile { } } - @Test - public void testReaderTempMetaFile() throws InterruptedException, - IOException, CorruptEventException { + @Test + public void testReaderTempMetaFile() + throws InterruptedException, IOException, CorruptEventException { Map<Integer, Put> puts = Maps.newHashMap(); for (int i = 0; i < 1000; i++) { FlumeEvent eventIn = TestUtils.newPersistableEvent(); Put put = new Put(++transactionID, WriteOrderOracle.next(), - eventIn); + eventIn); ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put); FlumeEventPointer ptr = logFileWriter.put(bytes); puts.put(ptr.getOffset(), put); @@ -240,7 +239,7 @@ public class TestLogFile { Assert.fail("Renaming to meta.temp failed"); } LogFile.SequentialReader reader = - LogFileFactory.getSequentialReader(dataFile, null, true); + LogFileFactory.getSequentialReader(dataFile, null, true); Assert.assertTrue(metadataFile.exists()); Assert.assertFalse(tempMetadataFile.exists()); Assert.assertFalse(oldMetadataFile.exists()); @@ -257,9 +256,10 @@ public class TestLogFile { Assert.assertTrue(Arrays.equals(eventIn.getBody(), eventOut.getBody())); } } + @Test public void testWriteDelimitedTo() throws IOException { - if(dataFile.isFile()) { + if (dataFile.isFile()) { Assert.assertTrue(dataFile.delete()); } Assert.assertTrue(dataFile.createNewFile()); @@ -270,25 +270,24 @@ public class TestLogFile { metaDataBuilder.setCheckpointPosition(3); metaDataBuilder.setCheckpointWriteOrderID(4); LogFileV3.writeDelimitedTo(metaDataBuilder.build(), dataFile); - ProtosFactory.LogFileMetaData metaData = ProtosFactory.LogFileMetaData. - parseDelimitedFrom(new FileInputStream(dataFile)); + ProtosFactory.LogFileMetaData metaData = + ProtosFactory.LogFileMetaData.parseDelimitedFrom(new FileInputStream(dataFile)); Assert.assertEquals(1, metaData.getVersion()); Assert.assertEquals(2, metaData.getLogFileID()); Assert.assertEquals(3, metaData.getCheckpointPosition()); Assert.assertEquals(4, metaData.getCheckpointWriteOrderID()); } - @Test (expected = CorruptEventException.class) + @Test(expected = CorruptEventException.class) public void testPutGetCorruptEvent() throws Exception { final LogFile.RandomReader logFileReader = - LogFileFactory.getRandomReader(dataFile, null, true); + LogFileFactory.getRandomReader(dataFile, null, true); final FlumeEvent eventIn = TestUtils.newPersistableEvent(2500); - final Put put = new Put(++transactionID, WriteOrderOracle.next(), - eventIn); + final Put put = new Put(++transactionID, WriteOrderOracle.next(), eventIn); ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put); FlumeEventPointer ptr = logFileWriter.put(bytes); - logFileWriter.commit(TransactionEventRecord.toByteBuffer(new Commit - (transactionID, WriteOrderOracle.next()))); + logFileWriter.commit(TransactionEventRecord.toByteBuffer( + new Commit(transactionID, WriteOrderOracle.next()))); logFileWriter.sync(); final int offset = ptr.getOffset(); RandomAccessFile writer = new RandomAccessFile(dataFile, "rw"); @@ -300,24 +299,22 @@ public class TestLogFile { // Should have thrown an exception by now. Assert.fail(); - } - @Test (expected = NoopRecordException.class) + @Test(expected = NoopRecordException.class) public void testPutGetNoopEvent() throws Exception { final LogFile.RandomReader logFileReader = - LogFileFactory.getRandomReader(dataFile, null, true); + LogFileFactory.getRandomReader(dataFile, null, true); final FlumeEvent eventIn = TestUtils.newPersistableEvent(2500); - final Put put = new Put(++transactionID, WriteOrderOracle.next(), - eventIn); + final Put put = new Put(++transactionID, WriteOrderOracle.next(), eventIn); ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put); FlumeEventPointer ptr = logFileWriter.put(bytes); - logFileWriter.commit(TransactionEventRecord.toByteBuffer(new Commit - (transactionID, WriteOrderOracle.next()))); + logFileWriter.commit(TransactionEventRecord.toByteBuffer( + new Commit(transactionID, WriteOrderOracle.next()))); logFileWriter.sync(); final int offset = ptr.getOffset(); - LogFile.OperationRecordUpdater updater = new LogFile - .OperationRecordUpdater(dataFile); + LogFile.OperationRecordUpdater updater = + new LogFile.OperationRecordUpdater(dataFile); updater.markRecordAsNoop(offset); logFileReader.get(offset); @@ -330,40 +327,38 @@ public class TestLogFile { File tempDir = Files.createTempDir(); File temp = new File(tempDir, "temp"); final RandomAccessFile tempFile = new RandomAccessFile(temp, "rw"); - for(int i = 0; i < 5000; i++) { + for (int i = 0; i < 5000; i++) { tempFile.write(LogFile.OP_RECORD); } tempFile.seek(0); LogFile.OperationRecordUpdater recordUpdater = new LogFile - .OperationRecordUpdater(temp); + .OperationRecordUpdater(temp); //Convert every 10th byte into a noop byte - for(int i = 0; i < 5000; i+=10) { + for (int i = 0; i < 5000; i += 10) { recordUpdater.markRecordAsNoop(i); } recordUpdater.close(); tempFile.seek(0); // Verify every 10th byte is actually a NOOP - for(int i = 0; i < 5000; i+=10) { + for (int i = 0; i < 5000; i += 10) { tempFile.seek(i); Assert.assertEquals(LogFile.OP_NOOP, tempFile.readByte()); } - } @Test - public void testOpRecordUpdaterWithFlumeEvents() throws Exception{ + public void testOpRecordUpdaterWithFlumeEvents() throws Exception { final FlumeEvent eventIn = TestUtils.newPersistableEvent(2500); - final Put put = new Put(++transactionID, WriteOrderOracle.next(), - eventIn); + final Put put = new Put(++transactionID, WriteOrderOracle.next(), eventIn); ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put); FlumeEventPointer ptr = logFileWriter.put(bytes); - logFileWriter.commit(TransactionEventRecord.toByteBuffer(new Commit - (transactionID, WriteOrderOracle.next()))); + logFileWriter.commit(TransactionEventRecord.toByteBuffer( + new Commit(transactionID, WriteOrderOracle.next()))); logFileWriter.sync(); final int offset = ptr.getOffset(); - LogFile.OperationRecordUpdater updater = new LogFile - .OperationRecordUpdater(dataFile); + LogFile.OperationRecordUpdater updater = + new LogFile.OperationRecordUpdater(dataFile); updater.markRecordAsNoop(offset); RandomAccessFile fileReader = new RandomAccessFile(dataFile, "rw"); Assert.assertEquals(LogFile.OP_NOOP, fileReader.readByte()); @@ -375,7 +370,7 @@ public class TestLogFile { final CyclicBarrier barrier = new CyclicBarrier(20); ExecutorService executorService = Executors.newFixedThreadPool(20); ExecutorCompletionService<Void> completionService = new - ExecutorCompletionService<Void>(executorService); + ExecutorCompletionService<Void>(executorService); final LogFile.Writer writer = logFileWriter; final AtomicLong txnId = new AtomicLong(++transactionID); for (int i = 0; i < 20; i++) { @@ -384,11 +379,11 @@ public class TestLogFile { public Void call() { try { Put put = new Put(txnId.incrementAndGet(), - WriteOrderOracle.next(), eventIn); + WriteOrderOracle.next(), eventIn); ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put); writer.put(bytes); writer.commit(TransactionEventRecord.toByteBuffer( - new Commit(txnId.get(), WriteOrderOracle.next()))); + new Commit(txnId.get(), WriteOrderOracle.next()))); barrier.await(); writer.sync(); } catch (Exception ex) { @@ -399,17 +394,15 @@ public class TestLogFile { }); } - for(int i = 0; i < 20; i++) { + for (int i = 0; i < 20; i++) { completionService.take().get(); } - //At least 250*20, but can be higher due to serialization overhead + // At least 250*20, but can be higher due to serialization overhead Assert.assertTrue(logFileWriter.position() >= 5000); Assert.assertEquals(1, writer.getSyncCount()); - Assert.assertTrue(logFileWriter.getLastCommitPosition() == - logFileWriter.getLastSyncPosition()); + Assert.assertTrue(logFileWriter.getLastCommitPosition() == logFileWriter.getLastSyncPosition()); executorService.shutdown(); - } } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV2.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV2.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV2.java index 2356d90..1f07e1f 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV2.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV2.java @@ -18,7 +18,8 @@ */ package org.apache.flume.channel.file; -import static org.mockito.Mockito.*; +import junit.framework.Assert; +import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -30,9 +31,8 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; -import junit.framework.Assert; - -import org.junit.Test; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; @SuppressWarnings("deprecation") public class TestTransactionEventRecordV2 { @@ -127,7 +127,7 @@ public class TestTransactionEventRecordV2 { try { TransactionEventRecord.fromDataInputV2(toDataInput(in)); Assert.fail(); - } catch(NullPointerException e) { + } catch (NullPointerException e) { Assert.assertEquals("Unknown action ffff8000", e.getMessage()); } } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java index eb0ce04..512d290 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java @@ -18,7 +18,8 @@ */ package org.apache.flume.channel.file; -import static org.mockito.Mockito.*; +import junit.framework.Assert; +import org.junit.Test; import java.io.IOException; import java.nio.ByteBuffer; @@ -26,9 +27,8 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; -import junit.framework.Assert; - -import org.junit.Test; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestTransactionEventRecordV3 { @@ -52,6 +52,7 @@ public class TestTransactionEventRecordV3 { Assert.assertEquals(TransactionEventRecord.Type.COMMIT.get(), commit.getRecordType()); } + @Test public void testPutSerialization() throws IOException, CorruptEventException { Map<String, String> headers = new HashMap<String, String>(); @@ -69,9 +70,9 @@ public class TestTransactionEventRecordV3 { Assert.assertEquals(headers, out.getEvent().getHeaders()); Assert.assertTrue(Arrays.equals(in.getEvent().getBody(), out.getEvent().getBody())); } + @Test - public void testPutSerializationNullHeader() throws IOException, - CorruptEventException { + public void testPutSerializationNullHeader() throws IOException, CorruptEventException { Put in = new Put(System.currentTimeMillis(), WriteOrderOracle.next(), new FlumeEvent(null, new byte[0])); @@ -84,11 +85,10 @@ public class TestTransactionEventRecordV3 { Assert.assertNotNull(out.getEvent().getHeaders()); Assert.assertTrue(Arrays.equals(in.getEvent().getBody(), out.getEvent().getBody())); } + @Test - public void testTakeSerialization() throws IOException, - CorruptEventException { - Take in = new Take(System.currentTimeMillis(), - WriteOrderOracle.next(), 10, 20); + public void testTakeSerialization() throws IOException, CorruptEventException { + Take in = new Take(System.currentTimeMillis(), WriteOrderOracle.next(), 10, 20); Take out = (Take)TransactionEventRecord.fromByteArray(toByteArray(in)); Assert.assertEquals(in.getClass(), out.getClass()); Assert.assertEquals(in.getRecordType(), out.getRecordType()); @@ -99,10 +99,8 @@ public class TestTransactionEventRecordV3 { } @Test - public void testRollbackSerialization() throws IOException, - CorruptEventException { - Rollback in = new Rollback(System.currentTimeMillis(), - WriteOrderOracle.next()); + public void testRollbackSerialization() throws IOException, CorruptEventException { + Rollback in = new Rollback(System.currentTimeMillis(), WriteOrderOracle.next()); Rollback out = (Rollback)TransactionEventRecord.fromByteArray(toByteArray(in)); Assert.assertEquals(in.getClass(), out.getClass()); Assert.assertEquals(in.getRecordType(), out.getRecordType()); @@ -111,10 +109,8 @@ public class TestTransactionEventRecordV3 { } @Test - public void testCommitSerialization() throws IOException, - CorruptEventException { - Commit in = new Commit(System.currentTimeMillis(), - WriteOrderOracle.next()); + public void testCommitSerialization() throws IOException, CorruptEventException { + Commit in = new Commit(System.currentTimeMillis(), WriteOrderOracle.next()); Commit out = (Commit)TransactionEventRecord.fromByteArray(toByteArray(in)); Assert.assertEquals(in.getClass(), out.getClass()); Assert.assertEquals(in.getRecordType(), out.getRecordType()); @@ -129,7 +125,7 @@ public class TestTransactionEventRecordV3 { try { TransactionEventRecord.fromByteArray(toByteArray(in)); Assert.fail(); - } catch(NullPointerException e) { + } catch (NullPointerException e) { Assert.assertEquals("Unknown action ffff8000", e.getMessage()); } } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java index 61f38d2..0ec1831 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java @@ -18,7 +18,21 @@ */ package org.apache.flume.channel.file; -import static org.fest.reflect.core.Reflection.*; +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.io.ByteStreams; +import com.google.common.io.Files; +import com.google.common.io.Resources; +import org.apache.flume.Channel; +import org.apache.flume.ChannelException; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.Transaction; +import org.apache.flume.conf.Configurables; +import org.apache.flume.event.EventBuilder; +import org.junit.Assert; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -36,22 +50,8 @@ import java.util.Set; import java.util.UUID; import java.util.zip.GZIPInputStream; -import org.apache.flume.Channel; -import org.apache.flume.ChannelException; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.Transaction; -import org.apache.flume.conf.Configurables; -import org.apache.flume.event.EventBuilder; -import org.junit.Assert; - -import com.google.common.base.Charsets; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.common.io.ByteStreams; -import com.google.common.io.Files; -import com.google.common.io.Resources; +import static org.fest.reflect.core.Reflection.field; +import static org.fest.reflect.core.Reflection.method; public class TestUtils { @@ -119,7 +119,7 @@ public class TestUtils { public static List<File> getAllLogs(File[] dataDirs) { List<File> result = Lists.newArrayList(); - for(File dataDir : dataDirs) { + for (File dataDir : dataDirs) { result.addAll(LogUtils.getLogs(dataDir)); } return result; @@ -139,24 +139,22 @@ public class TestUtils { .invoke(true)); } - public static Set<String> takeEvents(Channel channel, int batchSize) - throws Exception { + public static Set<String> takeEvents(Channel channel, int batchSize) throws Exception { return takeEvents(channel, batchSize, false); } - public static Set<String> takeEvents(Channel channel, - int batchSize, boolean checkForCorruption) throws Exception { + public static Set<String> takeEvents(Channel channel, int batchSize, boolean checkForCorruption) + throws Exception { return takeEvents(channel, batchSize, Integer.MAX_VALUE, checkForCorruption); } - public static Set<String> takeEvents(Channel channel, - int batchSize, int numEvents) throws Exception { + public static Set<String> takeEvents(Channel channel, int batchSize, int numEvents) + throws Exception { return takeEvents(channel, batchSize, numEvents, false); } - public static Set<String> takeEvents(Channel channel, - int batchSize, int numEvents, boolean checkForCorruption) throws - Exception { + public static Set<String> takeEvents(Channel channel, int batchSize, int numEvents, + boolean checkForCorruption) throws Exception { Set<String> result = Sets.newHashSet(); for (int i = 0; i < numEvents; i += batchSize) { Transaction transaction = channel.getTransaction(); @@ -169,16 +167,15 @@ public class TestUtils { } catch (ChannelException ex) { Throwable th = ex; String msg; - if(checkForCorruption) { + if (checkForCorruption) { msg = "Corrupt event found. Please run File Channel"; th = ex.getCause(); } else { msg = "Take list for FileBackedTransaction, capacity"; } - Assert.assertTrue(th.getMessage().startsWith( - msg)); - if(checkForCorruption) { - throw (Exception) th; + Assert.assertTrue(th.getMessage().startsWith(msg)); + if (checkForCorruption) { + throw (Exception)th; } transaction.commit(); return result; @@ -204,16 +201,16 @@ public class TestUtils { public static Set<String> consumeChannel(Channel channel) throws Exception { return consumeChannel(channel, false); } - public static Set<String> consumeChannel(Channel channel, - boolean checkForCorruption) throws Exception { + public static Set<String> consumeChannel(Channel channel, boolean checkForCorruption) + throws Exception { Set<String> result = Sets.newHashSet(); int[] batchSizes = new int[] { 1000, 100, 10, 1 }; for (int i = 0; i < batchSizes.length; i++) { - while(true) { + while (true) { Set<String> batch = takeEvents(channel, batchSizes[i], checkForCorruption); - if(batch.isEmpty()) { + if (batch.isEmpty()) { break; } result.addAll(batch); @@ -221,18 +218,16 @@ public class TestUtils { } return result; } - public static Set<String> fillChannel(Channel channel, String prefix) - throws Exception { + public static Set<String> fillChannel(Channel channel, String prefix) throws Exception { Set<String> result = Sets.newHashSet(); int[] batchSizes = new int[] { 1000, 100, 10, 1 }; for (int i = 0; i < batchSizes.length; i++) { try { - while(true) { - Set<String> batch = putEvents(channel, prefix, batchSizes[i], - Integer.MAX_VALUE, true); - if(batch.isEmpty()) { + while (true) { + Set<String> batch = putEvents(channel, prefix, batchSizes[i], Integer.MAX_VALUE, true); + if (batch.isEmpty()) { break; } result.addAll(batch); @@ -243,19 +238,17 @@ public class TestUtils { + "size, a downstream system running slower than normal, or that " + "the channel capacity is just too low. [channel=" + channel.getName() + "]").equals(e.getMessage()) - || e.getMessage().startsWith("Put queue for FileBackedTransaction " + - "of capacity ")); + || e.getMessage().startsWith("Put queue for FileBackedTransaction of capacity ")); } } return result; } - public static Set<String> putEvents(Channel channel, String prefix, - int batchSize, int numEvents) throws Exception { + public static Set<String> putEvents(Channel channel, String prefix, int batchSize, int numEvents) + throws Exception { return putEvents(channel, prefix, batchSize, numEvents, false); } - public static Set<String> putEvents(Channel channel, String prefix, - int batchSize, int numEvents, boolean untilCapacityIsReached) - throws Exception { + public static Set<String> putEvents(Channel channel, String prefix, int batchSize, int numEvents, + boolean untilCapacityIsReached) throws Exception { Set<String> result = Sets.newHashSet(); for (int i = 0; i < numEvents; i += batchSize) { Transaction transaction = channel.getTransaction(); @@ -272,13 +265,12 @@ public class TestUtils { result.addAll(batch); } catch (Exception ex) { transaction.rollback(); - if(untilCapacityIsReached && ex instanceof ChannelException && + if (untilCapacityIsReached && ex instanceof ChannelException && ("The channel has reached it's capacity. " + "This might be the result of a sink on the channel having too " + "low of batch size, a downstream system running slower than " + "normal, or that the channel capacity is just too low. " - + "[channel=" +channel.getName() + "]"). - equals(ex.getMessage())) { + + "[channel=" + channel.getName() + "]").equals(ex.getMessage())) { break; } throw ex; @@ -288,6 +280,7 @@ public class TestUtils { } return result; } + public static void copyDecompressed(String resource, File output) throws IOException { URL input = Resources.getResource(resource); @@ -298,12 +291,11 @@ public class TestUtils { gzis.close(); } - public static Context createFileChannelContext(String checkpointDir, - String dataDir, String backupDir, Map<String, String> overrides) { + public static Context createFileChannelContext(String checkpointDir, String dataDir, + String backupDir, Map<String, String> overrides) { Context context = new Context(); - context.put(FileChannelConfiguration.CHECKPOINT_DIR, - checkpointDir); - if(backupDir != null) { + context.put(FileChannelConfiguration.CHECKPOINT_DIR, checkpointDir); + if (backupDir != null) { context.put(FileChannelConfiguration.BACKUP_CHECKPOINT_DIR, backupDir); } context.put(FileChannelConfiguration.DATA_DIRS, dataDir); @@ -312,22 +304,22 @@ public class TestUtils { context.putAll(overrides); return context; } - public static FileChannel createFileChannel(String checkpointDir, - String dataDir, Map<String, String> overrides) { + + public static FileChannel createFileChannel(String checkpointDir, String dataDir, + Map<String, String> overrides) { return createFileChannel(checkpointDir, dataDir, null, overrides); } - public static FileChannel createFileChannel(String checkpointDir, - String dataDir, String backupDir, Map<String, String> overrides) { + public static FileChannel createFileChannel(String checkpointDir, String dataDir, + String backupDir, Map<String, String> overrides) { FileChannel channel = new FileChannel(); channel.setName("FileChannel-" + UUID.randomUUID()); - Context context = createFileChannelContext(checkpointDir, dataDir, - backupDir, overrides); + Context context = createFileChannelContext(checkpointDir, dataDir, backupDir, overrides); Configurables.configure(channel, context); return channel; } - public static File writeStringToFile(File baseDir, String name, - String text) throws IOException { + + public static File writeStringToFile(File baseDir, String name, String text) throws IOException { File passwordFile = new File(baseDir, name); Files.write(text, passwordFile, Charsets.UTF_8); return passwordFile; http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/CipherProviderTestSuite.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/CipherProviderTestSuite.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/CipherProviderTestSuite.java index 530ccf6..22848d2 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/CipherProviderTestSuite.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/CipherProviderTestSuite.java @@ -32,24 +32,28 @@ public class CipherProviderTestSuite { this.encryptor = encryptor; this.decryptor = decryptor; } + public void test() throws Exception { testBasic(); testEmpty(); testNullPlainText(); testNullCipherText(); } + public void testBasic() throws Exception { String expected = "mn state fair is the place to be"; byte[] cipherText = encryptor.encrypt(expected.getBytes(Charsets.UTF_8)); byte[] clearText = decryptor.decrypt(cipherText); Assert.assertEquals(expected, new String(clearText, Charsets.UTF_8)); } + public void testEmpty() throws Exception { String expected = ""; byte[] cipherText = encryptor.encrypt(new byte[]{}); byte[] clearText = decryptor.decrypt(cipherText); Assert.assertEquals(expected, new String(clearText)); } + public void testNullPlainText() throws Exception { try { encryptor.encrypt(null); @@ -58,6 +62,7 @@ public class CipherProviderTestSuite { // expected } } + public void testNullCipherText() throws Exception { try { decryptor.decrypt(null);
