http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java index 7851536..8921a19 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java +++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java @@ -19,10 +19,6 @@ package org.apache.flume.channel; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.LinkedBlockingDeque; - import org.apache.flume.Channel; import org.apache.flume.ChannelException; import org.apache.flume.Context; @@ -34,8 +30,12 @@ import org.apache.flume.event.EventBuilder; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import static org.fest.reflect.core.Reflection.*; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.LinkedBlockingDeque; + +import static org.fest.reflect.core.Reflection.field; public class TestMemoryChannel { @@ -81,7 +81,7 @@ public class TestMemoryChannel { Transaction transaction = channel.getTransaction(); transaction.begin(); - for(int i=0; i < 5; i++) { + for (int i = 0; i < 5; i++) { channel.put(EventBuilder.withBody(String.format("test event %d", i).getBytes())); } transaction.commit(); @@ -124,7 +124,7 @@ public class TestMemoryChannel { parms.put("transactionCapacity", "2"); context.putAll(parms); Configurables.configure(channel, context); - for(int i=0; i < 6; i++) { + for (int i = 0; i < 6; i++) { transaction = channel.getTransaction(); transaction.begin(); Assert.assertNotNull(channel.take()); @@ -133,7 +133,7 @@ public class TestMemoryChannel { } } - @Test(expected=ChannelException.class) + @Test(expected = ChannelException.class) public void testTransactionPutCapacityOverload() { Context context = new Context(); Map<String, String> parms = new HashMap<String, String>(); @@ -151,7 +151,7 @@ public class TestMemoryChannel { Assert.fail(); } - @Test(expected=ChannelException.class) + @Test(expected = ChannelException.class) public void testCapacityOverload() { Context context = new Context(); Map<String, String> parms = new HashMap<String, String>(); @@ -236,7 +236,7 @@ public class TestMemoryChannel { tx.close(); } - @Test(expected=ChannelException.class) + @Test(expected = ChannelException.class) public void testByteCapacityOverload() { Context context = new Context(); Map<String, String> parms = new HashMap<String, String>(); @@ -284,8 +284,7 @@ public class TestMemoryChannel { try { channel.put(EventBuilder.withBody(eventBody)); throw new RuntimeException("Put was able to overflow byte capacity."); - } catch (ChannelException ce) - { + } catch (ChannelException ce) { //Do nothing } @@ -306,8 +305,7 @@ public class TestMemoryChannel { try { channel.put(EventBuilder.withBody(eventBody)); throw new RuntimeException("Put was able to overflow byte capacity."); - } catch (ChannelException ce) - { + } catch (ChannelException ce) { //Do nothing } tx.commit(); @@ -370,7 +368,7 @@ public class TestMemoryChannel { channel.put(EventBuilder.withBody(eventBody)); tx.commit(); Assert.fail(); - } catch ( ChannelException e ) { + } catch (ChannelException e) { //success tx.rollback(); } finally { @@ -397,12 +395,12 @@ public class TestMemoryChannel { tx = channel.getTransaction(); tx.begin(); try { - for(int i = 0; i < 2; i++) { + for (int i = 0; i < 2; i++) { channel.put(EventBuilder.withBody(eventBody)); } tx.commit(); Assert.fail(); - } catch ( ChannelException e ) { + } catch (ChannelException e) { //success tx.rollback(); } finally { @@ -418,12 +416,12 @@ public class TestMemoryChannel { tx.begin(); try { - for(int i = 0; i < 15; i++) { + for (int i = 0; i < 15; i++) { channel.put(EventBuilder.withBody(eventBody)); } tx.commit(); Assert.fail(); - } catch ( ChannelException e ) { + } catch (ChannelException e) { //success tx.rollback(); } finally { @@ -438,12 +436,12 @@ public class TestMemoryChannel { tx.begin(); try { - for(int i = 0; i < 25; i++) { + for (int i = 0; i < 25; i++) { channel.put(EventBuilder.withBody(eventBody)); } tx.commit(); Assert.fail(); - } catch ( ChannelException e ) { + } catch (ChannelException e) { //success tx.rollback(); } finally {
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java index d4ba705..68aa117 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java +++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java @@ -17,15 +17,6 @@ */ package org.apache.flume.channel; -import java.util.Map.Entry; -import java.util.Random; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - import org.apache.flume.Channel; import org.apache.flume.ChannelException; import org.apache.flume.Context; @@ -37,6 +28,15 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.Map.Entry; +import java.util.Random; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + public class TestMemoryChannelConcurrency { private CyclicBarrier barrier; @@ -120,10 +120,10 @@ public class TestMemoryChannelConcurrency { } /** - * Works with a startgate/endgate latches to make sure all threads run at the same time. Threads randomly - * choose to commit or rollback random numbers of actions, tagging them with the thread no. - * The correctness check is made by recording committed entries into a map, and verifying the count - * after the endgate + * Works with a startgate/endgate latches to make sure all threads run at the same time. + * Threads randomly choose to commit or rollback random numbers of actions, tagging them with the + * thread no. The correctness check is made by recording committed entries into a map, and + * verifying the count after the endgate. * Since nothing is taking the puts out, allow for a big capacity * * @throws InterruptedException @@ -135,7 +135,8 @@ public class TestMemoryChannelConcurrency { context.put("keep-alive", "1"); context.put("capacity", "5000"); // theoretical maximum of 100 threads * 10 * 5 // because we're just grabbing the whole lot in one commit - // normally a transactionCapacity significantly lower than the channel capacity would be recommended + // normally a transactionCapacity significantly lower than the channel capacity would be + // recommended context.put("transactionCapacity", "5000"); Configurables.configure(channel, context); final ConcurrentHashMap<String, AtomicInteger> committedPuts = @@ -158,17 +159,17 @@ public class TestMemoryChannelConcurrency { } catch (InterruptedException e1) { Thread.currentThread().interrupt(); } - for(int j = 0; j < 10; j++) { + for (int j = 0; j < 10; j++) { int events = rng.nextInt(5) + 1; Transaction tx = channel.getTransaction(); tx.begin(); - for(int k = 0; k < events; k++) { + for (int k = 0; k < events; k++) { channel.put(EventBuilder.withBody(strtid.getBytes())); } - if(rng.nextBoolean()) { + if (rng.nextBoolean()) { tx.commit(); AtomicInteger tcount = committedPuts.get(strtid); - if(tcount == null) { + if (tcount == null) { committedPuts.put(strtid, new AtomicInteger(events)); } else { tcount.addAndGet(events); @@ -186,7 +187,7 @@ public class TestMemoryChannelConcurrency { startGate.countDown(); endGate.await(); - if(committedPuts.isEmpty()) { + if (committedPuts.isEmpty()) { Assert.fail(); } @@ -194,17 +195,17 @@ public class TestMemoryChannelConcurrency { Transaction tx = channel.getTransaction(); tx.begin(); Event e; - while((e = channel.take()) != null) { + while ((e = channel.take()) != null) { String index = new String(e.getBody()); AtomicInteger remain = committedPuts.get(index); int post = remain.decrementAndGet(); - if(post == 0) { + if (post == 0) { committedPuts.remove(index); } } tx.commit(); tx.close(); - if(!committedPuts.isEmpty()) { + if (!committedPuts.isEmpty()) { Assert.fail(); } } @@ -216,10 +217,12 @@ public class TestMemoryChannelConcurrency { context.put("keep-alive", "1"); context.put("capacity", "100"); // theoretical maximum of 100 threads * 10 * 5 // because we're just grabbing the whole lot in one commit - // normally a transactionCapacity significantly lower than the channel capacity would be recommended + // normally a transactionCapacity significantly lower than the channel capacity would be + // recommended context.put("transactionCapacity", "100"); Configurables.configure(channel, context); - final ConcurrentHashMap<String, AtomicInteger> committedPuts = new ConcurrentHashMap<String, AtomicInteger>(); + final ConcurrentHashMap<String, AtomicInteger> committedPuts = + new ConcurrentHashMap<String, AtomicInteger>(); final ConcurrentHashMap<String, AtomicInteger> committedTakes = new ConcurrentHashMap<String, AtomicInteger>(); @@ -228,7 +231,7 @@ public class TestMemoryChannelConcurrency { final CountDownLatch endGate = new CountDownLatch(threadCount); // start a sink and source for each - for (int i = 0; i < threadCount/2; i++) { + for (int i = 0; i < threadCount / 2; i++) { Thread t = new Thread() { @Override public void run() { @@ -241,23 +244,23 @@ public class TestMemoryChannelConcurrency { } catch (InterruptedException e1) { Thread.currentThread().interrupt(); } - for(int j = 0; j < 10; j++) { + for (int j = 0; j < 10; j++) { int events = rng.nextInt(5) + 1; Transaction tx = channel.getTransaction(); tx.begin(); - for(int k = 0; k < events; k++) { + for (int k = 0; k < events; k++) { channel.put(EventBuilder.withBody(strtid.getBytes())); } - if(rng.nextBoolean()) { + if (rng.nextBoolean()) { try { tx.commit(); AtomicInteger tcount = committedPuts.get(strtid); - if(tcount == null) { + if (tcount == null) { committedPuts.put(strtid, new AtomicInteger(events)); } else { tcount.addAndGet(events); } - } catch(ChannelException e) { + } catch (ChannelException e) { System.out.print("puts commit failed"); tx.rollback(); } @@ -282,25 +285,25 @@ public class TestMemoryChannelConcurrency { } catch (InterruptedException e1) { Thread.currentThread().interrupt(); } - for(int j = 0; j < 10; j++) { + for (int j = 0; j < 10; j++) { int events = rng.nextInt(5) + 1; Transaction tx = channel.getTransaction(); tx.begin(); Event[] taken = new Event[events]; int k; - for(k = 0; k < events; k++) { + for (k = 0; k < events; k++) { taken[k] = channel.take(); - if(taken[k] == null) break; + if (taken[k] == null) break; } - if(rng.nextBoolean()) { + if (rng.nextBoolean()) { try { tx.commit(); - for(Event e : taken) { - if(e == null) break; + for (Event e : taken) { + if (e == null) break; String index = new String(e.getBody()); - synchronized(takeMapLock) { + synchronized (takeMapLock) { AtomicInteger remain = committedTakes.get(index); - if(remain == null) { + if (remain == null) { committedTakes.put(index, new AtomicInteger(1)); } else { remain.incrementAndGet(); @@ -323,7 +326,7 @@ public class TestMemoryChannelConcurrency { t.start(); } startGate.countDown(); - if(!endGate.await(20, TimeUnit.SECONDS)) { + if (!endGate.await(20, TimeUnit.SECONDS)) { Assert.fail("Not all threads ended succesfully"); } @@ -333,11 +336,11 @@ public class TestMemoryChannelConcurrency { Event e; // first pull out what's left in the channel and remove it from the // committed map - while((e = channel.take()) != null) { + while ((e = channel.take()) != null) { String index = new String(e.getBody()); AtomicInteger remain = committedPuts.get(index); int post = remain.decrementAndGet(); - if(post == 0) { + if (post == 0) { committedPuts.remove(index); } } @@ -345,14 +348,19 @@ public class TestMemoryChannelConcurrency { tx.close(); // now just check the committed puts match the committed takes - for(Entry<String, AtomicInteger> takes : committedTakes.entrySet()) { + for (Entry<String, AtomicInteger> takes : committedTakes.entrySet()) { AtomicInteger count = committedPuts.get(takes.getKey()); - if(count == null) + if (count == null) { Assert.fail("Putted data doesn't exist"); - if(count.get() != takes.getValue().get()) - Assert.fail(String.format("Mismatched put and take counts expected %d had %d", count.get(), takes.getValue().get())); + } + if (count.get() != takes.getValue().get()) { + Assert.fail(String.format("Mismatched put and take counts expected %d had %d", + count.get(), takes.getValue().get())); + } committedPuts.remove(takes.getKey()); } - if(!committedPuts.isEmpty()) Assert.fail("Puts still has entries remaining"); + if (!committedPuts.isEmpty()) { + Assert.fail("Puts still has entries remaining"); + } } } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java index b8e00d8..55b81ee 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java +++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java @@ -43,7 +43,8 @@ public class TestMemoryChannelTransaction { @Test public void testCommit() throws InterruptedException, EventDeliveryException { - Event event, event2; + Event event; + Event event2; Context context = new Context(); int putCounter = 0; @@ -85,7 +86,8 @@ public class TestMemoryChannelTransaction { public void testRollBack() throws InterruptedException, EventDeliveryException { - Event event, event2; + Event event; + Event event2; Context context = new Context(); int putCounter = 0; @@ -158,7 +160,8 @@ public class TestMemoryChannelTransaction { public void testReEntTxn() throws InterruptedException, EventDeliveryException { - Event event, event2; + Event event; + Event event2; Context context = new Context(); int putCounter = 0; @@ -199,7 +202,8 @@ public class TestMemoryChannelTransaction { @Test public void testReEntTxnRollBack() throws InterruptedException, EventDeliveryException { - Event event, event2; + Event event; + Event event2; Context context = new Context(); int putCounter = 0; http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java index 4e90054..fdc3ce9 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java +++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java @@ -37,14 +37,25 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileFilter; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.*; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; public class TestReliableSpoolingFileEventReader { - private static final Logger logger = LoggerFactory.getLogger - (TestReliableSpoolingFileEventReader.class); + private static final Logger logger = + LoggerFactory.getLogger(TestReliableSpoolingFileEventReader.class); private static final File WORK_DIR = new File("target/test/work/" + TestReliableSpoolingFileEventReader.class.getSimpleName()); @@ -57,7 +68,7 @@ public class TestReliableSpoolingFileEventReader { // write out a few files for (int i = 0; i < 4; i++) { - File fileName = new File(WORK_DIR, "file"+i); + File fileName = new File(WORK_DIR, "file" + i); StringBuilder sb = new StringBuilder(); // write as many lines as the index of the file @@ -102,11 +113,12 @@ public class TestReliableSpoolingFileEventReader { @Test public void testIgnorePattern() throws IOException { - ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder() - .spoolDirectory(WORK_DIR) - .ignorePattern("^file2$") - .deletePolicy(DeletePolicy.IMMEDIATE.toString()) - .build(); + ReliableEventReader reader = + new ReliableSpoolingFileEventReader.Builder() + .spoolDirectory(WORK_DIR) + .ignorePattern("^file2$") + .deletePolicy(DeletePolicy.IMMEDIATE.toString()) + .build(); List<File> before = listFiles(WORK_DIR); Assert.assertEquals("Expected 5, not: " + before, 5, before.size()); @@ -128,8 +140,9 @@ public class TestReliableSpoolingFileEventReader { @Test public void testRepeatedCallsWithCommitAlways() throws IOException { - ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder() - .spoolDirectory(WORK_DIR).build(); + ReliableEventReader reader = + new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR) + .build(); final int expectedLines = 0 + 1 + 2 + 3 + 1; int seenLines = 0; @@ -148,8 +161,10 @@ public class TestReliableSpoolingFileEventReader { SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR; File trackerDir = new File(WORK_DIR, trackerDirPath); - ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder() - .spoolDirectory(WORK_DIR).trackerDirPath(trackerDirPath).build(); + ReliableEventReader reader = + new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR) + .trackerDirPath(trackerDirPath) + .build(); final int expectedLines = 0 + 1 + 2 + 3 + 1; int seenLines = 0; @@ -173,10 +188,10 @@ public class TestReliableSpoolingFileEventReader { @Test public void testFileDeletion() throws IOException { - ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder() - .spoolDirectory(WORK_DIR) - .deletePolicy(DeletePolicy.IMMEDIATE.name()) - .build(); + ReliableEventReader reader = + new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR) + .deletePolicy(DeletePolicy.IMMEDIATE.name()) + .build(); List<File> before = listFiles(WORK_DIR); Assert.assertEquals("Expected 5, not: " + before, 5, before.size()); @@ -197,29 +212,25 @@ public class TestReliableSpoolingFileEventReader { @Test(expected = NullPointerException.class) public void testNullConsumeOrder() throws IOException { - new ReliableSpoolingFileEventReader.Builder() - .spoolDirectory(WORK_DIR) - .consumeOrder(null) - .build(); + new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR) + .consumeOrder(null) + .build(); } @Test public void testConsumeFileRandomly() throws IOException { - ReliableEventReader reader - = new ReliableSpoolingFileEventReader.Builder() - .spoolDirectory(WORK_DIR) - .consumeOrder(ConsumeOrder.RANDOM) - .build(); + ReliableEventReader reader = + new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR) + .consumeOrder(ConsumeOrder.RANDOM) + .build(); File fileName = new File(WORK_DIR, "new-file"); - FileUtils.write(fileName, - "New file created in the end. Shoud be read randomly.\n"); + FileUtils.write(fileName, "New file created in the end. Shoud be read randomly.\n"); Set<String> actual = Sets.newHashSet(); readEventsForFilesInDir(WORK_DIR, reader, actual); Set<String> expected = Sets.newHashSet(); createExpectedFromFilesInSetup(expected); expected.add(""); - expected.add( - "New file created in the end. Shoud be read randomly."); + expected.add("New file created in the end. Shoud be read randomly."); Assert.assertEquals(expected, actual); } @@ -229,54 +240,46 @@ public class TestReliableSpoolingFileEventReader { if (SystemUtils.IS_OS_WINDOWS) { return; } - final ReliableEventReader reader - = new ReliableSpoolingFileEventReader.Builder() - .spoolDirectory(WORK_DIR) - .consumeOrder(ConsumeOrder.RANDOM) - .build(); + final ReliableEventReader reader = + new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR) + .consumeOrder(ConsumeOrder.RANDOM) + .build(); File fileName = new File(WORK_DIR, "new-file"); - FileUtils.write(fileName, - "New file created in the end. Shoud be read randomly.\n"); + FileUtils.write(fileName, "New file created in the end. Shoud be read randomly.\n"); Set<String> expected = Sets.newHashSet(); int totalFiles = WORK_DIR.listFiles().length; final Set<String> actual = Sets.newHashSet(); ExecutorService executor = Executors.newSingleThreadExecutor(); final Semaphore semaphore1 = new Semaphore(0); final Semaphore semaphore2 = new Semaphore(0); - Future<Void> wait = executor.submit( - new Callable<Void>() { - @Override - public Void call() throws Exception { - readEventsForFilesInDir(WORK_DIR, reader, actual, semaphore1, semaphore2); - return null; - } + Future<Void> wait = executor.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + readEventsForFilesInDir(WORK_DIR, reader, actual, semaphore1, semaphore2); + return null; } - ); + }); semaphore1.acquire(); File finalFile = new File(WORK_DIR, "t-file"); FileUtils.write(finalFile, "Last file"); semaphore2.release(); wait.get(); - int listFilesCount = ((ReliableSpoolingFileEventReader)reader) - .getListFilesCount(); + int listFilesCount = ((ReliableSpoolingFileEventReader)reader).getListFilesCount(); finalFile.delete(); createExpectedFromFilesInSetup(expected); expected.add(""); - expected.add( - "New file created in the end. Shoud be read randomly."); + expected.add("New file created in the end. Shoud be read randomly."); expected.add("Last file"); Assert.assertTrue(listFilesCount < (totalFiles + 2)); Assert.assertEquals(expected, actual); } - @Test public void testConsumeFileOldest() throws IOException, InterruptedException { - ReliableEventReader reader - = new ReliableSpoolingFileEventReader.Builder() - .spoolDirectory(WORK_DIR) - .consumeOrder(ConsumeOrder.OLDEST) - .build(); + ReliableEventReader reader = + new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR) + .consumeOrder(ConsumeOrder.OLDEST) + .build(); File file1 = new File(WORK_DIR, "new-file1"); File file2 = new File(WORK_DIR, "new-file2"); File file3 = new File(WORK_DIR, "new-file3"); @@ -299,13 +302,11 @@ public class TestReliableSpoolingFileEventReader { } @Test - public void testConsumeFileYoungest() - throws IOException, InterruptedException { - ReliableEventReader reader - = new ReliableSpoolingFileEventReader.Builder() - .spoolDirectory(WORK_DIR) - .consumeOrder(ConsumeOrder.YOUNGEST) - .build(); + public void testConsumeFileYoungest() throws IOException, InterruptedException { + ReliableEventReader reader = + new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR) + .consumeOrder(ConsumeOrder.YOUNGEST) + .build(); File file1 = new File(WORK_DIR, "new-file1"); File file2 = new File(WORK_DIR, "new-file2"); File file3 = new File(WORK_DIR, "new-file3"); @@ -332,12 +333,11 @@ public class TestReliableSpoolingFileEventReader { @Test public void testConsumeFileOldestWithLexicographicalComparision() - throws IOException, InterruptedException { - ReliableEventReader reader - = new ReliableSpoolingFileEventReader.Builder() - .spoolDirectory(WORK_DIR) - .consumeOrder(ConsumeOrder.OLDEST) - .build(); + throws IOException, InterruptedException { + ReliableEventReader reader = + new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR) + .consumeOrder(ConsumeOrder.OLDEST) + .build(); File file1 = new File(WORK_DIR, "new-file1"); File file2 = new File(WORK_DIR, "new-file2"); File file3 = new File(WORK_DIR, "new-file3"); @@ -362,12 +362,11 @@ public class TestReliableSpoolingFileEventReader { @Test public void testConsumeFileYoungestWithLexicographicalComparision() - throws IOException, InterruptedException { - ReliableEventReader reader - = new ReliableSpoolingFileEventReader.Builder() - .spoolDirectory(WORK_DIR) - .consumeOrder(ConsumeOrder.YOUNGEST) - .build(); + throws IOException, InterruptedException { + ReliableEventReader reader = + new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR) + .consumeOrder(ConsumeOrder.YOUNGEST) + .build(); File file1 = new File(WORK_DIR, "new-file1"); File file2 = new File(WORK_DIR, "new-file2"); File file3 = new File(WORK_DIR, "new-file3"); @@ -393,6 +392,7 @@ public class TestReliableSpoolingFileEventReader { @Test public void testLargeNumberOfFilesOLDEST() throws IOException { templateTestForLargeNumberOfFiles(ConsumeOrder.OLDEST, null, 1000); } + @Test public void testLargeNumberOfFilesYOUNGEST() throws IOException { templateTestForLargeNumberOfFiles(ConsumeOrder.YOUNGEST, new Comparator<Long>() { @@ -402,6 +402,7 @@ public class TestReliableSpoolingFileEventReader { } }, 1000); } + @Test public void testLargeNumberOfFilesRANDOM() throws IOException { templateTestForLargeNumberOfFiles(ConsumeOrder.RANDOM, null, 1000); } @@ -409,19 +410,21 @@ public class TestReliableSpoolingFileEventReader { @Test public void testZeroByteTrackerFile() throws IOException { String trackerDirPath = - SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR; + SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR; File trackerDir = new File(WORK_DIR, trackerDirPath); - if(!trackerDir.exists()) { + if (!trackerDir.exists()) { trackerDir.mkdir(); } File trackerFile = new File(trackerDir, ReliableSpoolingFileEventReader.metaFileName); - if(trackerFile.exists()) { + if (trackerFile.exists()) { trackerFile.delete(); } trackerFile.createNewFile(); - ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder() - .spoolDirectory(WORK_DIR).trackerDirPath(trackerDirPath).build(); + ReliableEventReader reader = + new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR) + .trackerDirPath(trackerDirPath) + .build(); final int expectedLines = 1; int seenLines = 0; List<Event> events = reader.readEvents(10); @@ -434,18 +437,16 @@ public class TestReliableSpoolingFileEventReader { Assert.assertEquals(expectedLines, seenLines); } - private void templateTestForLargeNumberOfFiles(ConsumeOrder order, - Comparator<Long> comparator, - int N) throws IOException { + private void templateTestForLargeNumberOfFiles(ConsumeOrder order, Comparator<Long> comparator, + int N) throws IOException { File dir = null; try { - dir = new File( - "target/test/work/" + this.getClass().getSimpleName() + - "_large"); + dir = new File("target/test/work/" + this.getClass().getSimpleName() + "_large"); Files.createParentDirs(new File(dir, "dummy")); - ReliableEventReader reader - = new ReliableSpoolingFileEventReader.Builder() - .spoolDirectory(dir).consumeOrder(order).build(); + ReliableEventReader reader = + new ReliableSpoolingFileEventReader.Builder().spoolDirectory(dir) + .consumeOrder(order) + .build(); Map<Long, List<String>> expected; if (comparator == null) { expected = new TreeMap<Long, List<String>>(); @@ -476,16 +477,14 @@ public class TestReliableSpoolingFileEventReader { List<Event> events; events = reader.readEvents(10); for (Event e : events) { - if (order == ConsumeOrder.RANDOM) { + if (order == ConsumeOrder.RANDOM) { Assert.assertTrue(expectedList.remove(new String(e.getBody()))); } else { - Assert.assertEquals( - ((ArrayList<String>) expectedList).get(0), - new String(e.getBody())); + Assert.assertEquals(((ArrayList<String>) expectedList).get(0), new String(e.getBody())); ((ArrayList<String>) expectedList).remove(0); } } - reader.commit(); + reader.commit(); } } finally { deleteDir(dir); @@ -493,23 +492,24 @@ public class TestReliableSpoolingFileEventReader { } private void readEventsForFilesInDir(File dir, ReliableEventReader reader, - Collection<String> actual) throws IOException { + Collection<String> actual) throws IOException { readEventsForFilesInDir(dir, reader, actual, null, null); } /* Read events, one for each file in the given directory. */ - private void readEventsForFilesInDir(File dir, ReliableEventReader reader, - Collection<String> actual, Semaphore semaphore1, Semaphore semaphore2) throws IOException { + private void readEventsForFilesInDir(File dir, ReliableEventReader reader, + Collection<String> actual, Semaphore semaphore1, + Semaphore semaphore2) throws IOException { List<Event> events; boolean executed = false; - for (int i=0; i < listFiles(dir).size(); i++) { + for (int i = 0; i < listFiles(dir).size(); i++) { events = reader.readEvents(10); for (Event e : events) { actual.add(new String(e.getBody())); } reader.commit(); try { - if(!executed) { + if (!executed) { executed = true; if (semaphore1 != null) { semaphore1.release(); @@ -533,8 +533,7 @@ public class TestReliableSpoolingFileEventReader { } private static List<File> listFiles(File dir) { - List<File> files = Lists.newArrayList(dir.listFiles(new FileFilter - () { + List<File> files = Lists.newArrayList(dir.listFiles(new FileFilter() { @Override public boolean accept(File pathname) { return !pathname.isDirectory(); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java b/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java index 21b972b..b1b828a 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java +++ b/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java @@ -18,6 +18,12 @@ package org.apache.flume.formatter.output; +import org.apache.flume.Clock; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; import java.text.SimpleDateFormat; import java.util.Calendar; @@ -26,21 +32,15 @@ import java.util.HashMap; import java.util.Map; import java.util.TimeZone; -import org.apache.flume.Clock; -import org.joda.time.format.DateTimeFormatter; -import org.joda.time.format.ISODateTimeFormat; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class TestBucketPath { Calendar cal; Map<String, String> headers; + @Before - public void setUp(){ + public void setUp() { cal = Calendar.getInstance(); cal.set(2012, 5, 23, 13, 46, 33); cal.set(Calendar.MILLISECOND, 234); @@ -49,7 +49,7 @@ public class TestBucketPath { } @Test - public void testDateFormatCache(){ + public void testDateFormatCache() { TimeZone utcTimeZone = TimeZone.getTimeZone("UTC"); String test = "%c"; BucketPath.escapeString( @@ -60,7 +60,7 @@ public class TestBucketPath { SimpleDateFormat format = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy"); Date d = new Date(cal.getTimeInMillis()); String expectedString = format.format(d); - System.out.println("Expected String: "+ expectedString); + System.out.println("Expected String: " + expectedString); Assert.assertEquals(expectedString, escapedString); } @@ -76,7 +76,7 @@ public class TestBucketPath { SimpleDateFormat format = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy"); Date d = new Date(cal2.getTimeInMillis()); String expectedString = format.format(d); - System.out.println("Expected String: "+ expectedString); + System.out.println("Expected String: " + expectedString); Assert.assertEquals(expectedString, escapedString); } @@ -89,8 +89,8 @@ public class TestBucketPath { Calendar cal2 = Calendar.getInstance(); cal2.set(2012, 5, 23, 13, 45, 0); cal2.set(Calendar.MILLISECOND, 0); - String expectedString = String.valueOf(cal2.getTimeInMillis()/1000); - System.out.println("Expected String: "+ expectedString); + String expectedString = String.valueOf(cal2.getTimeInMillis() / 1000); + System.out.println("Expected String: " + expectedString); Assert.assertEquals(expectedString, escapedString); } @@ -103,13 +103,13 @@ public class TestBucketPath { Calendar cal2 = Calendar.getInstance(); cal2.set(2012, 5, 23, 13, 46, 30); cal2.set(Calendar.MILLISECOND, 0); - String expectedString = String.valueOf(cal2.getTimeInMillis()/1000); - System.out.println("Expected String: "+ expectedString); + String expectedString = String.valueOf(cal2.getTimeInMillis() / 1000); + System.out.println("Expected String: " + expectedString); Assert.assertEquals(expectedString, escapedString); } @Test - public void testNoRounding(){ + public void testNoRounding() { String test = "%c"; String escapedString = BucketPath.escapeString( test, headers, false, Calendar.HOUR_OF_DAY, 12); @@ -117,19 +117,19 @@ public class TestBucketPath { SimpleDateFormat format = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy"); Date d = new Date(cal.getTimeInMillis()); String expectedString = format.format(d); - System.out.println("Expected String: "+ expectedString); + System.out.println("Expected String: " + expectedString); Assert.assertEquals(expectedString, escapedString); } @Test - public void testNoPadding(){ + public void testNoPadding() { Calendar calender; Map<String, String> calender_timestamp; calender = Calendar.getInstance(); - + //Check single digit dates - calender.set(2014, (5-1), 3, 13, 46, 33); + calender.set(2014, (5 - 1), 3, 13, 46, 33); calender_timestamp = new HashMap<String, String>(); calender_timestamp.put("timestamp", String.valueOf(calender.getTimeInMillis())); SimpleDateFormat format = new SimpleDateFormat("M-d"); @@ -141,19 +141,19 @@ public class TestBucketPath { String expectedString = format.format(d); //Check two digit dates - calender.set(2014, (11-1), 13, 13, 46, 33); + calender.set(2014, (11 - 1), 13, 13, 46, 33); calender_timestamp.put("timestamp", String.valueOf(calender.getTimeInMillis())); - escapedString += " " + BucketPath.escapeString( + escapedString += " " + BucketPath.escapeString( test, calender_timestamp, false, Calendar.HOUR_OF_DAY, 12); System.out.println("Escaped String: " + escapedString); d = new Date(calender.getTimeInMillis()); - expectedString += " " + format.format(d); - System.out.println("Expected String: "+ expectedString); + expectedString += " " + format.format(d); + System.out.println("Expected String: " + expectedString); Assert.assertEquals(expectedString, escapedString); } @Test - public void testDateFormatTimeZone(){ + public void testDateFormatTimeZone() { TimeZone utcTimeZone = TimeZone.getTimeZone("UTC"); String test = "%c"; String escapedString = BucketPath.escapeString( @@ -163,7 +163,7 @@ public class TestBucketPath { format.setTimeZone(utcTimeZone); Date d = new Date(cal.getTimeInMillis()); String expectedString = format.format(d); - System.out.println("Expected String: "+ expectedString); + System.out.println("Expected String: " + expectedString); Assert.assertEquals(expectedString, escapedString); } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/instrumentation/TestMonitoredCounterGroup.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/TestMonitoredCounterGroup.java b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/TestMonitoredCounterGroup.java index b1f637f..7db535e 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/TestMonitoredCounterGroup.java +++ b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/TestMonitoredCounterGroup.java @@ -18,22 +18,15 @@ */ package org.apache.flume.instrumentation; -import java.lang.management.ManagementFactory; -import java.util.Random; - -import javax.management.AttributeNotFoundException; -import javax.management.InstanceNotFoundException; -import javax.management.MBeanException; -import javax.management.MBeanInfo; -import javax.management.MBeanServer; -import javax.management.ObjectName; -import javax.management.ReflectionException; - import junit.framework.Assert; - import org.junit.Before; import org.junit.Test; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; +import java.util.Random; + public class TestMonitoredCounterGroup { private static final int MAX_BOUNDS = 1000; @@ -61,7 +54,6 @@ public class TestMonitoredCounterGroup { private static final String SRC_ATTR_APPEND_BATCH_ACCEPTED_COUNT = "AppendBatchAcceptedCount"; - private static final String CH_ATTR_CHANNEL_SIZE = "ChannelSize"; private static final String CH_ATTR_EVENT_PUT_ATTEMPT = "EventPutAttemptCount"; @@ -122,28 +114,28 @@ public class TestMonitoredCounterGroup { int eventDrainAttempt = random.nextInt(MAX_BOUNDS); int eventDrainSuccess = random.nextInt(MAX_BOUNDS); - for (int i = 0; i<connCreated; i++) { + for (int i = 0; i < connCreated; i++) { skc.incrementConnectionCreatedCount(); } - for (int i = 0; i<connClosed; i++) { + for (int i = 0; i < connClosed; i++) { skc.incrementConnectionClosedCount(); } - for (int i = 0; i<connFailed; i++) { + for (int i = 0; i < connFailed; i++) { skc.incrementConnectionFailedCount(); } - for (int i = 0; i<batchEmpty; i++) { + for (int i = 0; i < batchEmpty; i++) { skc.incrementBatchEmptyCount(); } - for (int i = 0; i<batchUnderflow; i++) { + for (int i = 0; i < batchUnderflow; i++) { skc.incrementBatchUnderflowCount(); } - for (int i = 0; i<batchComplete; i++) { + for (int i = 0; i < batchComplete; i++) { skc.incrementBatchCompleteCount(); } - for (int i = 0; i<eventDrainAttempt; i++) { + for (int i = 0; i < eventDrainAttempt; i++) { skc.incrementEventDrainAttemptCount(); } - for (int i = 0; i<eventDrainSuccess; i++) { + for (int i = 0; i < eventDrainSuccess; i++) { skc.incrementEventDrainSuccessCount(); } @@ -204,10 +196,10 @@ public class TestMonitoredCounterGroup { int numEventTakeSuccess = random.nextInt(MAX_BOUNDS); chc.setChannelSize(numChannelSize); - for (int i = 0; i<numEventPutAttempt; i++) { + for (int i = 0; i < numEventPutAttempt; i++) { chc.incrementEventPutAttemptCount(); } - for (int i = 0; i<numEventTakeAttempt; i++) { + for (int i = 0; i < numEventTakeAttempt; i++) { chc.incrementEventTakeAttemptCount(); } chc.addToEventPutSuccessCount(numEventPutSuccess); @@ -264,16 +256,16 @@ public class TestMonitoredCounterGroup { srcc.addToEventReceivedCount(numEventReceived); srcc.addToEventAcceptedCount(numEventAccepted); - for (int i = 0; i<numAppendReceived; i++) { + for (int i = 0; i < numAppendReceived; i++) { srcc.incrementAppendReceivedCount(); } - for (int i = 0; i<numAppendAccepted; i++) { + for (int i = 0; i < numAppendAccepted; i++) { srcc.incrementAppendAcceptedCount(); } - for (int i = 0; i<numAppendBatchReceived; i++) { + for (int i = 0; i < numAppendBatchReceived; i++) { srcc.incrementAppendBatchReceivedCount(); } - for (int i = 0; i<numAppendBatchAccepted; i++) { + for (int i = 0; i < numAppendBatchAccepted; i++) { srcc.incrementAppendBatchAcceptedCount(); } @@ -302,11 +294,11 @@ public class TestMonitoredCounterGroup { int numEventReceived2 = random.nextInt(MAX_BOUNDS); int numEventAccepted2 = random.nextInt(MAX_BOUNDS); - for (int i = 0; i<numEventReceived2; i++) { + for (int i = 0; i < numEventReceived2; i++) { srcc.incrementEventReceivedCount(); } - for (int i = 0; i<numEventAccepted2; i++) { + for (int i = 0; i < numEventAccepted2; i++) { srcc.incrementEventAcceptedCount(); } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java index eb2d02d..09d419f 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java +++ b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java @@ -20,12 +20,6 @@ package org.apache.flume.instrumentation.http; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.lang.reflect.Type; -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.Map; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Transaction; @@ -39,6 +33,12 @@ import org.junit.Assert; import org.junit.Test; import javax.servlet.http.HttpServletResponse; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.lang.reflect.Type; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.Map; /** * @@ -47,9 +47,7 @@ public class TestHTTPMetricsServer { Channel memChannel = new MemoryChannel(); Channel pmemChannel = new PseudoTxnMemoryChannel(); - Type mapType = - new TypeToken<Map<String, Map<String, String>>>() { - }.getType(); + Type mapType = new TypeToken<Map<String, Map<String, String>>>() {}.getType(); Gson gson = new Gson(); @Test @@ -99,7 +97,7 @@ public class TestHTTPMetricsServer { private void testWithPort(int port) throws Exception { MonitorService srv = new HTTPMetricsServer(); Context context = new Context(); - if(port > 1024){ + if (port > 1024) { context.put(HTTPMetricsServer.CONFIG_PORT, String.valueOf(port)); } else { port = HTTPMetricsServer.DEFAULT_PORT; @@ -139,8 +137,7 @@ public class TestHTTPMetricsServer { doTestForbiddenMethods(4432,"OPTIONS"); } - public void doTestForbiddenMethods(int port, String method) - throws Exception { + public void doTestForbiddenMethods(int port, String method) throws Exception { MonitorService srv = new HTTPMetricsServer(); Context context = new Context(); if (port > 1024) { @@ -154,8 +151,7 @@ public class TestHTTPMetricsServer { URL url = new URL("http://0.0.0.0:" + String.valueOf(port) + "/metrics"); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setRequestMethod(method); - Assert.assertEquals(HttpServletResponse.SC_FORBIDDEN, - conn.getResponseCode()); + Assert.assertEquals(HttpServletResponse.SC_FORBIDDEN, conn.getResponseCode()); srv.stop(); } } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterTest.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterTest.java b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterTest.java index 4a71265..6d64c53 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterTest.java +++ b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterTest.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -23,41 +23,41 @@ import org.junit.Test; public class KafkaSourceCounterTest { - KafkaSourceCounter counter; - - @Before - public void setUp() throws Exception { - counter = new KafkaSourceCounter("test"); - } - - @Test - public void testAddToKafkaEventGetTimer() throws Exception { - Assert.assertEquals(1L, counter.addToKafkaEventGetTimer(1L)); - } - - @Test - public void testAddToKafkaCommitTimer() throws Exception { - Assert.assertEquals(1L, counter.addToKafkaCommitTimer(1L)); - } - - @Test - public void testIncrementKafkaEmptyCount() throws Exception { - Assert.assertEquals(1L, counter.incrementKafkaEmptyCount()); - } - - @Test - public void testGetKafkaCommitTimer() throws Exception { - Assert.assertEquals(0, counter.getKafkaCommitTimer()); - } - - @Test - public void testGetKafkaEventGetTimer() throws Exception { - Assert.assertEquals(0, counter.getKafkaEventGetTimer()); - } - - @Test - public void testGetKafkaEmptyCount() throws Exception { - Assert.assertEquals(0, counter.getKafkaEmptyCount()); - } + KafkaSourceCounter counter; + + @Before + public void setUp() throws Exception { + counter = new KafkaSourceCounter("test"); + } + + @Test + public void testAddToKafkaEventGetTimer() throws Exception { + Assert.assertEquals(1L, counter.addToKafkaEventGetTimer(1L)); + } + + @Test + public void testAddToKafkaCommitTimer() throws Exception { + Assert.assertEquals(1L, counter.addToKafkaCommitTimer(1L)); + } + + @Test + public void testIncrementKafkaEmptyCount() throws Exception { + Assert.assertEquals(1L, counter.incrementKafkaEmptyCount()); + } + + @Test + public void testGetKafkaCommitTimer() throws Exception { + Assert.assertEquals(0, counter.getKafkaCommitTimer()); + } + + @Test + public void testGetKafkaEventGetTimer() throws Exception { + Assert.assertEquals(0, counter.getKafkaEventGetTimer()); + } + + @Test + public void testGetKafkaEmptyCount() throws Exception { + Assert.assertEquals(0, counter.getKafkaEmptyCount()); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorMillisSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorMillisSerializer.java b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorMillisSerializer.java index ac46131..dd42079 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorMillisSerializer.java +++ b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorMillisSerializer.java @@ -18,7 +18,6 @@ package org.apache.flume.interceptor; import junit.framework.Assert; - import org.apache.flume.Context; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; @@ -29,7 +28,8 @@ public class TestRegexExtractorInterceptorMillisSerializer { @Test public void shouldRequirePatternInConfiguration() { try { - RegexExtractorInterceptorMillisSerializer fixture = new RegexExtractorInterceptorMillisSerializer(); + RegexExtractorInterceptorMillisSerializer fixture = + new RegexExtractorInterceptorMillisSerializer(); fixture.configure(new Context()); Assert.fail(); } catch (IllegalArgumentException ex) { @@ -40,7 +40,8 @@ public class TestRegexExtractorInterceptorMillisSerializer { @Test public void shouldRequireValidPatternInConfiguration() { try { - RegexExtractorInterceptorMillisSerializer fixture = new RegexExtractorInterceptorMillisSerializer(); + RegexExtractorInterceptorMillisSerializer fixture = + new RegexExtractorInterceptorMillisSerializer(); Context context = new Context(); context.put("pattern", "ABCDEFG"); fixture.configure(context); @@ -52,7 +53,8 @@ public class TestRegexExtractorInterceptorMillisSerializer { @Test public void shouldReturnMillisFromPattern() { - RegexExtractorInterceptorMillisSerializer fixture = new RegexExtractorInterceptorMillisSerializer(); + RegexExtractorInterceptorMillisSerializer fixture = + new RegexExtractorInterceptorMillisSerializer(); Context context = new Context(); String pattern = "yyyy-MM-dd HH:mm:ss"; context.put("pattern", pattern); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorPassThroughSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorPassThroughSerializer.java b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorPassThroughSerializer.java index 569c274..33003e6 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorPassThroughSerializer.java +++ b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorPassThroughSerializer.java @@ -26,7 +26,8 @@ public class TestRegexExtractorInterceptorPassThroughSerializer { @Test public void shouldReturnSameValue() { - RegexExtractorInterceptorPassThroughSerializer fixture = new RegexExtractorInterceptorPassThroughSerializer(); + RegexExtractorInterceptorPassThroughSerializer fixture = + new RegexExtractorInterceptorPassThroughSerializer(); fixture.configure(new Context()); String input = "testing (1,2,3,4)"; Assert.assertEquals(input, fixture.serialize(input)); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestSearchAndReplaceInterceptor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestSearchAndReplaceInterceptor.java b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestSearchAndReplaceInterceptor.java index 2ab15f5..616b86b 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestSearchAndReplaceInterceptor.java +++ b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestSearchAndReplaceInterceptor.java @@ -35,7 +35,7 @@ public class TestSearchAndReplaceInterceptor { private void testSearchReplace(Context context, String input, String output) throws Exception { - Interceptor.Builder builder = InterceptorBuilderFactory.newInstance( + Interceptor.Builder builder = InterceptorBuilderFactory.newInstance( InterceptorType.SEARCH_REPLACE.toString()); builder.configure(context); Interceptor interceptor = builder.build(); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/serialization/SyslogAvroEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/serialization/SyslogAvroEventSerializer.java b/flume-ng-core/src/test/java/org/apache/flume/serialization/SyslogAvroEventSerializer.java index 896eced..05af3b1 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/serialization/SyslogAvroEventSerializer.java +++ b/flume-ng-core/src/test/java/org/apache/flume/serialization/SyslogAvroEventSerializer.java @@ -231,20 +231,45 @@ public class SyslogAvroEventSerializer private String hostname = ""; private String message = ""; - public void setFacility(int f) { facility = f; } - public int getFacility() { return facility; } + public void setFacility(int f) { + facility = f; + } + + public int getFacility() { + return facility; + } + + public void setSeverity(int s) { + severity = s; + } - public void setSeverity(int s) { severity = s; } - public int getSeverity() { return severity; } + public int getSeverity() { + return severity; + } - public void setTimestamp(long t) { timestamp = t; } - public long getTimestamp() { return timestamp; } + public void setTimestamp(long t) { + timestamp = t; + } - public void setHostname(String h) { hostname = h; } - public String getHostname() { return hostname; } + public long getTimestamp() { + return timestamp; + } - public void setMessage(String m) { message = m; } - public String getMessage() { return message; } + public void setHostname(String h) { + hostname = h; + } + + public String getHostname() { + return hostname; + } + + public void setMessage(String m) { + message = m; + } + + public String getMessage() { + return message; + } @Override public String toString() { http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/serialization/TestAvroEventDeserializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestAvroEventDeserializer.java b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestAvroEventDeserializer.java index 6f9ddc2..b95433f 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestAvroEventDeserializer.java +++ b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestAvroEventDeserializer.java @@ -49,6 +49,7 @@ public class TestAvroEventDeserializer { LoggerFactory.getLogger(TestAvroEventDeserializer.class); private static final Schema schema; + static { schema = Schema.createRecord("MyRecord", "", "org.apache.flume", false); Schema.Field field = new Schema.Field("foo", http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/serialization/TestDurablePositionTracker.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestDurablePositionTracker.java b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestDurablePositionTracker.java index e52affb..0c76cc9 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestDurablePositionTracker.java +++ b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestDurablePositionTracker.java @@ -31,8 +31,7 @@ import java.net.URL; public class TestDurablePositionTracker { - private static final Logger logger = LoggerFactory.getLogger - (TestDurablePositionTracker.class); + private static final Logger logger = LoggerFactory.getLogger(TestDurablePositionTracker.class); @Test public void testBasicTracker() throws IOException { http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/serialization/TestFlumeEventAvroEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestFlumeEventAvroEventSerializer.java b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestFlumeEventAvroEventSerializer.java index 3860b5e..ded3b13 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestFlumeEventAvroEventSerializer.java +++ b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestFlumeEventAvroEventSerializer.java @@ -18,14 +18,7 @@ */ package org.apache.flume.serialization; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.charset.CharsetDecoder; - +import com.google.common.base.Charsets; import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; @@ -38,7 +31,13 @@ import org.junit.Assert; import org.junit.Assume; import org.junit.Test; -import com.google.common.base.Charsets; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.CharsetDecoder; public class TestFlumeEventAvroEventSerializer { @@ -77,17 +76,16 @@ public class TestFlumeEventAvroEventSerializer { throws FileNotFoundException, IOException { // Snappy currently broken on Mac in OpenJDK 7 per FLUME-2012 Assume.assumeTrue(!"Mac OS X".equals(System.getProperty("os.name")) || - !System.getProperty("java.version").startsWith("1.7.")); + !System.getProperty("java.version").startsWith("1.7.")); createAvroFile(TESTFILE, "snappy"); validateAvroFile(TESTFILE); FileUtils.forceDelete(TESTFILE); } - public void createAvroFile(File file, String codec) - throws FileNotFoundException, IOException { + public void createAvroFile(File file, String codec) throws FileNotFoundException, IOException { - if(file.exists()){ + if (file.exists()) { FileUtils.forceDelete(file); } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java index 631bdfe..9f336eb 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java +++ b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java @@ -27,8 +27,6 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.junit.Assert.*; - import java.io.BufferedOutputStream; import java.io.ByteArrayOutputStream; import java.io.File; @@ -39,15 +37,21 @@ import java.nio.charset.Charset; import java.nio.charset.MalformedInputException; import java.util.List; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class TestResettableFileInputStream { private static final boolean CLEANUP = true; private static final File WORK_DIR = new File("target/test/work").getAbsoluteFile(); - private static final Logger logger = LoggerFactory.getLogger - (TestResettableFileInputStream.class); + private static final Logger logger = + LoggerFactory.getLogger(TestResettableFileInputStream.class); - private File file, meta; + private File file; + private File meta; @Before public void setup() throws Exception { @@ -156,7 +160,8 @@ public class TestResettableFileInputStream { public void testUtf16BOMAndSurrogatePairRead() throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); generateUtf16SurrogatePairSequence(out); - // buffer now contains 1 BOM and 2 chars (1 surrogate pair) and 6 bytes total (including 2-byte BOM) + // buffer now contains 1 BOM and 2 chars (1 surrogate pair) and 6 bytes total + // (including 2-byte BOM) Files.write(out.toByteArray(), file); ResettableInputStream in = initInputStream(8, Charsets.UTF_16, DecodeErrorPolicy.FAIL); String result = readLine(in, 2); @@ -176,7 +181,8 @@ public class TestResettableFileInputStream { generateShiftJis2ByteSequence(out); // buffer now contains 8 chars and 10 bytes total Files.write(out.toByteArray(), file); - ResettableInputStream in = initInputStream(8, Charset.forName("Shift_JIS"), DecodeErrorPolicy.FAIL); + ResettableInputStream in = initInputStream(8, Charset.forName("Shift_JIS"), + DecodeErrorPolicy.FAIL); String result = readLine(in, 8); assertEquals("1234567\u4E9C\n", result); } @@ -215,7 +221,7 @@ public class TestResettableFileInputStream { String javaVersionStr = System.getProperty("java.version"); double javaVersion = Double.parseDouble(javaVersionStr.substring(0, 3)); - if(javaVersion < 1.8) { + if (javaVersion < 1.8) { assertTrue(preJdk8ExpectedStr.replaceAll("X", "\ufffd").equals(sb.toString())); } else { assertTrue(expectedStr.replaceAll("X", "\ufffd").equals(sb.toString())); @@ -508,8 +514,8 @@ public class TestResettableFileInputStream { return initInputStream(2048, Charsets.UTF_8, policy); } - private ResettableInputStream initInputStream(int bufferSize, Charset charset, DecodeErrorPolicy policy) - throws IOException { + private ResettableInputStream initInputStream(int bufferSize, Charset charset, + DecodeErrorPolicy policy) throws IOException { PositionTracker tracker = new DurablePositionTracker(meta, file.getPath()); ResettableInputStream in = new ResettableFileInputStream(file, tracker, bufferSize, charset, policy); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/serialization/TestSyslogAvroEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestSyslogAvroEventSerializer.java b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestSyslogAvroEventSerializer.java index 7bd342a..ba9f4ab 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestSyslogAvroEventSerializer.java +++ b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestSyslogAvroEventSerializer.java @@ -75,7 +75,7 @@ public class TestSyslogAvroEventSerializer { public void test() throws FileNotFoundException, IOException { // Snappy currently broken on Mac in OpenJDK 7 per FLUME-2012 Assume.assumeTrue(!"Mac OS X".equals(System.getProperty("os.name")) || - !System.getProperty("java.version").startsWith("1.7.")); + !System.getProperty("java.version").startsWith("1.7.")); //Schema schema = new Schema.Parser().parse(schemaFile); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java index 757a536..0f3f9ec 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java +++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java @@ -19,28 +19,10 @@ package org.apache.flume.sink; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; - import com.google.common.base.Charsets; -import java.io.FileInputStream; -import java.security.KeyStore; -import java.security.Security; -import java.util.concurrent.Executors; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; import org.apache.avro.AvroRemoteException; import org.apache.avro.ipc.NettyServer; -import org.apache.avro.ipc.NettyTransceiver; import org.apache.avro.ipc.Server; -import org.apache.avro.ipc.specific.SpecificRequestor; import org.apache.avro.ipc.specific.SpecificResponder; import org.apache.flume.Channel; import org.apache.flume.ChannelSelector; @@ -49,8 +31,8 @@ import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Sink; import org.apache.flume.Transaction; -import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.api.RpcClient; +import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.channel.ReplicatingChannelSelector; import org.apache.flume.conf.Configurables; @@ -61,19 +43,30 @@ import org.apache.flume.source.AvroSource; import org.apache.flume.source.avro.AvroFlumeEvent; import org.apache.flume.source.avro.AvroSourceProtocol; import org.apache.flume.source.avro.Status; -import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.handler.ssl.SslHandler; import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.charset.Charset; +import java.security.KeyStore; +import java.security.Security; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; + public class TestAvroSink { private static final Logger logger = LoggerFactory @@ -90,7 +83,9 @@ public class TestAvroSink { } public void setUp(String compressionType, int compressionLevel) { - if (sink != null) { throw new RuntimeException("double setup");} + if (sink != null) { + throw new RuntimeException("double setup"); + } sink = new AvroSink(); channel = new MemoryChannel(); @@ -607,8 +602,9 @@ public class TestAvroSink { } @Test - public void testSslSinkWithNonTrustedCert() throws InterruptedException, - EventDeliveryException, InstantiationException, IllegalAccessException { + public void testSslSinkWithNonTrustedCert() + throws InterruptedException, EventDeliveryException, InstantiationException, + IllegalAccessException { setUp(); Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8); Server server = createSslServer(new MockAvroServer()); @@ -662,37 +658,38 @@ public class TestAvroSink { } @Test - public void testRequestWithNoCompression() throws InterruptedException, IOException, EventDeliveryException { - + public void testRequestWithNoCompression() + throws InterruptedException, IOException, EventDeliveryException { doRequest(false, false, 6); } @Test - public void testRequestWithCompressionOnClientAndServerOnLevel0() throws InterruptedException, IOException, EventDeliveryException { - + public void testRequestWithCompressionOnClientAndServerOnLevel0() + throws InterruptedException, IOException, EventDeliveryException { doRequest(true, true, 0); } @Test - public void testRequestWithCompressionOnClientAndServerOnLevel1() throws InterruptedException, IOException, EventDeliveryException { - + public void testRequestWithCompressionOnClientAndServerOnLevel1() + throws InterruptedException, IOException, EventDeliveryException { doRequest(true, true, 1); } @Test - public void testRequestWithCompressionOnClientAndServerOnLevel6() throws InterruptedException, IOException, EventDeliveryException { - + public void testRequestWithCompressionOnClientAndServerOnLevel6() + throws InterruptedException, IOException, EventDeliveryException { doRequest(true, true, 6); } @Test - public void testRequestWithCompressionOnClientAndServerOnLevel9() throws InterruptedException, IOException, EventDeliveryException { - + public void testRequestWithCompressionOnClientAndServerOnLevel9() + throws InterruptedException, IOException, EventDeliveryException { doRequest(true, true, 9); } - private void doRequest(boolean serverEnableCompression, boolean clientEnableCompression, int compressionLevel) throws InterruptedException, IOException, EventDeliveryException { - + private void doRequest(boolean serverEnableCompression, boolean clientEnableCompression, + int compressionLevel) + throws InterruptedException, IOException, EventDeliveryException { if (clientEnableCompression) { setUp("deflate", compressionLevel); } else { @@ -732,15 +729,12 @@ public class TestAvroSink { source.start(); - Assert - .assertTrue("Reached start or error", LifecycleController.waitForOneOf( - source, LifecycleState.START_OR_ERROR)); - Assert.assertEquals("Server is started", LifecycleState.START, - source.getLifecycleState()); - + Assert.assertTrue("Reached start or error", + LifecycleController.waitForOneOf(source, LifecycleState.START_OR_ERROR)); + Assert.assertEquals("Server is started", + LifecycleState.START, source.getLifecycleState()); - Event event = EventBuilder.withBody("Hello avro", - Charset.forName("UTF8")); + Event event = EventBuilder.withBody("Hello avro", Charset.forName("UTF8")); sink.start(); @@ -858,7 +852,8 @@ public class TestAvroSink { public SSLChannelPipelineFactory() { } - public SSLChannelPipelineFactory(String keystore, String keystorePassword, String keystoreType) { + public SSLChannelPipelineFactory(String keystore, String keystorePassword, + String keystoreType) { this.keystore = keystore; this.keystorePassword = keystorePassword; this.keystoreType = keystoreType; http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/sink/TestDefaultSinkFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestDefaultSinkFactory.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestDefaultSinkFactory.java index 835f541..cf6cbbc 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestDefaultSinkFactory.java +++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestDefaultSinkFactory.java @@ -37,7 +37,6 @@ public class TestDefaultSinkFactory { @Test public void testDuplicateCreate() { - Sink avroSink1 = sinkFactory.create("avroSink1", "avro"); Sink avroSink2 = sinkFactory.create("avroSink2", "avro"); @@ -55,7 +54,7 @@ public class TestDefaultSinkFactory { } private void verifySinkCreation(String name, String type, Class<?> typeClass) - throws Exception { + throws Exception { Sink sink = sinkFactory.create(name, type); Assert.assertNotNull(sink); Assert.assertTrue(typeClass.isInstance(sink)); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java index 3358cf4..8882056 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java +++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java @@ -94,7 +94,7 @@ public class TestFailoverSinkProcessor { @Override public Status process() throws EventDeliveryException { - synchronized(this) { + synchronized (this) { if (remaining <= 0) { throw new EventDeliveryException("can't consume more"); } @@ -107,7 +107,7 @@ public class TestFailoverSinkProcessor { tx.close(); if (e != null) { - synchronized(this) { + synchronized (this) { remaining--; } written++; @@ -167,7 +167,7 @@ public class TestFailoverSinkProcessor { Assert.assertEquals(LifecycleState.START, s1.getLifecycleState()); Assert.assertEquals(LifecycleState.START, s2.getLifecycleState()); Assert.assertEquals(LifecycleState.START, s3.getLifecycleState()); - for(int i = 0; i < 15; i++) { + for (int i = 0; i < 15; i++) { Transaction tx = ch.getTransaction(); tx.begin(); ch.put(EventBuilder.withBody("test".getBytes())); @@ -178,7 +178,7 @@ public class TestFailoverSinkProcessor { Assert.assertEquals(new Integer(10), s1.getWritten()); Assert.assertEquals(new Integer(5), s2.getWritten()); - for(int i = 0; i < 50; i++) { + for (int i = 0; i < 50; i++) { Transaction tx = ch.getTransaction(); tx.begin(); ch.put(EventBuilder.withBody("test".getBytes())); @@ -195,7 +195,7 @@ public class TestFailoverSinkProcessor { // get us past the retry time for the failed sink Thread.sleep(5000); - for(int i = 0; i < 100; i++) { + for (int i = 0; i < 100; i++) { Transaction tx = ch.getTransaction(); tx.begin(); ch.put(EventBuilder.withBody("test".getBytes())); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java index 7d95655..011d2d1 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java +++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java @@ -17,16 +17,7 @@ */ package org.apache.flume.sink; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - import junit.framework.Assert; - import org.apache.flume.Channel; import org.apache.flume.ChannelException; import org.apache.flume.Context; @@ -38,6 +29,14 @@ import org.apache.flume.Transaction; import org.apache.flume.channel.AbstractChannel; import org.junit.Test; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + public class TestLoadBalancingSinkProcessor { private Context getContext(String selectorType, boolean backoff) { @@ -62,8 +61,7 @@ public class TestLoadBalancingSinkProcessor { return getProcessor(sinks, getContext(selectorType, backoff)); } - private LoadBalancingSinkProcessor getProcessor(List<Sink> sinks, Context ctx) - { + private LoadBalancingSinkProcessor getProcessor(List<Sink> sinks, Context ctx) { LoadBalancingSinkProcessor lbsp = new LoadBalancingSinkProcessor(); lbsp.setSinks(sinks); lbsp.configure(ctx); @@ -77,7 +75,7 @@ public class TestLoadBalancingSinkProcessor { // If no selector is specified, the round-robin selector should be used Channel ch = new MockChannel(); int n = 100; - int numEvents = 3*n; + int numEvents = 3 * n; for (int i = 0; i < numEvents; i++) { ch.put(new MockEvent("test" + i)); } @@ -185,7 +183,7 @@ public class TestLoadBalancingSinkProcessor { // TODO: there is a remote possibility that s0 or s2 // never get hit by the random assignment // and thus not backoffed, causing the test to fail - for(int i=0; i < 50; i++) { + for (int i = 0; i < 50; i++) { // a well behaved runner would always check the return. lbsp.process(); } @@ -214,7 +212,7 @@ public class TestLoadBalancingSinkProcessor { public void testRandomPersistentFailure() throws Exception { Channel ch = new MockChannel(); int n = 100; - int numEvents = 3*n; + int numEvents = 3 * n; for (int i = 0; i < numEvents; i++) { ch.put(new MockEvent("test" + i)); } @@ -244,7 +242,7 @@ public class TestLoadBalancingSinkProcessor { } Assert.assertTrue(s2.getEvents().size() == 0); - Assert.assertTrue(s1.getEvents().size() + s3.getEvents().size() == 3*n); + Assert.assertTrue(s1.getEvents().size() + s3.getEvents().size() == 3 * n); } @Test @@ -325,8 +323,6 @@ public class TestLoadBalancingSinkProcessor { Assert.assertTrue("Miraculous distribution", sizeSet.size() > 1); } - - @Test public void testRoundRobinOneActiveSink() throws Exception { Channel ch = new MockChannel(); @@ -373,7 +369,7 @@ public class TestLoadBalancingSinkProcessor { public void testRoundRobinPersistentFailure() throws Exception { Channel ch = new MockChannel(); int n = 100; - int numEvents = 3*n; + int numEvents = 3 * n; for (int i = 0; i < numEvents; i++) { ch.put(new MockEvent("test" + i)); } @@ -404,7 +400,7 @@ public class TestLoadBalancingSinkProcessor { Assert.assertTrue(s1.getEvents().size() == n); Assert.assertTrue(s2.getEvents().size() == 0); - Assert.assertTrue(s3.getEvents().size() == 2*n); + Assert.assertTrue(s3.getEvents().size() == 2 * n); } // test that even if the sink recovers immediately that it is kept out of commission briefly @@ -413,7 +409,7 @@ public class TestLoadBalancingSinkProcessor { public void testRoundRobinBackoffInitialFailure() throws EventDeliveryException { Channel ch = new MockChannel(); int n = 100; - int numEvents = 3*n; + int numEvents = 3 * n; for (int i = 0; i < numEvents; i++) { ch.put(new MockEvent("test" + i)); } @@ -424,7 +420,7 @@ public class TestLoadBalancingSinkProcessor { MockSink s2 = new MockSink(2); s2.setChannel(ch); - MockSink s3 = new MockSink(3); + MockSink s3 = new MockSink(3); s3.setChannel(ch); List<Sink> sinks = new ArrayList<Sink>(); @@ -449,14 +445,15 @@ public class TestLoadBalancingSinkProcessor { Assert.assertEquals((3 * n) / 2, s1.getEvents().size()); Assert.assertEquals(1, s2.getEvents().size()); - Assert.assertEquals((3 * n) /2 - 1, s3.getEvents().size()); + Assert.assertEquals((3 * n) / 2 - 1, s3.getEvents().size()); } @Test - public void testRoundRobinBackoffIncreasingBackoffs() throws EventDeliveryException, InterruptedException { + public void testRoundRobinBackoffIncreasingBackoffs() + throws EventDeliveryException, InterruptedException { Channel ch = new MockChannel(); int n = 100; - int numEvents = 3*n; + int numEvents = 3 * n; for (int i = 0; i < numEvents; i++) { ch.put(new MockEvent("test" + i)); } @@ -468,7 +465,7 @@ public class TestLoadBalancingSinkProcessor { s2.setChannel(ch); s2.setFail(true); - MockSink s3 = new MockSink(3); + MockSink s3 = new MockSink(3); s3.setChannel(ch); List<Sink> sinks = new ArrayList<Sink>(); @@ -508,10 +505,11 @@ public class TestLoadBalancingSinkProcessor { } @Test - public void testRoundRobinBackoffFailureRecovery() throws EventDeliveryException, InterruptedException { + public void testRoundRobinBackoffFailureRecovery() + throws EventDeliveryException, InterruptedException { Channel ch = new MockChannel(); int n = 100; - int numEvents = 3*n; + int numEvents = 3 * n; for (int i = 0; i < numEvents; i++) { ch.put(new MockEvent("test" + i)); } @@ -523,7 +521,7 @@ public class TestLoadBalancingSinkProcessor { s2.setChannel(ch); s2.setFail(true); - MockSink s3 = new MockSink(3); + MockSink s3 = new MockSink(3); s3.setChannel(ch); List<Sink> sinks = new ArrayList<Sink>(); @@ -548,13 +546,12 @@ public class TestLoadBalancingSinkProcessor { Assert.assertEquals(n, s3.getEvents().size()); } - @Test public void testRoundRobinNoFailure() throws Exception { Channel ch = new MockChannel(); int n = 100; - int numEvents = 3*n; + int numEvents = 3 * n; for (int i = 0; i < numEvents; i++) { ch.put(new MockEvent("test" + i)); } @@ -656,8 +653,9 @@ public class TestLoadBalancingSinkProcessor { throw new EventDeliveryException("failed"); } Event e = this.getChannel().take(); - if (e == null) + if (e == null) { return Status.BACKOFF; + } events.add(e); return Status.READY;
